[jira] [Commented] (FLINK-4367) Offer separate API for watermark generation and timestamp extraction

2016-08-24 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436334#comment-15436334
 ] 

ramkrishna.s.vasudevan commented on FLINK-4367:
---

So it is better to remove the TimeStampASsigner interface from the two classes 
AssignerWithPunctuatedWatermarks  and AssignerWithPeriodicWatermarks ?
So for 2.0 we will do this and for the lower branch versions we will mark it as 
@Deprecated and say that in future from 2.0 they will not be implementing 
TimeStampAssigner - [~rmetzger]?

> Offer separate API for watermark generation and timestamp extraction
> 
>
> Key: FLINK-4367
> URL: https://issues.apache.org/jira/browse/FLINK-4367
> Project: Flink
>  Issue Type: Sub-task
>  Components: DataStream API
>Affects Versions: 1.1.0
>Reporter: Robert Metzger
> Fix For: 2.0.0
>
>
> Right now, the {{AssignerWithPunctuatedWatermarks}} and 
> {{AssignerWithPeriodicWatermarks}} interfaces also require implementing a 
> {{TimestampAssigner}}.
> For cases where the source emits records with timestamps, its not necessary 
> to extract timestamps again from the records, we just want to generate 
> watermarks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436242#comment-15436242
 ] 

ASF GitHub Bot commented on FLINK-4449:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76176826
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatScheduler.java
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.heartbeat;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.slf4j.Logger;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This utility class implements the basis of trigger heartbeat from one 
component to another component periodically,
+ * for example trigger heartbeat from the ResourceManager to TaskExecutor.
+ *
+ * @param  The type of the gateway to connect to.
+ * @param  The type of the successful heartbeat responses with 
payload.
+ */
+public abstract class HeartbeatScheduler {
+   /** default heartbeat interval time in millisecond */
+   private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
+
+   /** default heartbeat timeout in millisecond */
+   private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
+
+   /** default max heartbeat interval time in millisecond (which is used 
in retry heartbeat case) */
+   private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 3;
+
+   /** default heartbeat attempt delay after an exception has occurred */
+   private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
+
+   /** default max heartbeat retry time for one heartbeat */
+   private static final long MAX_HEARTBEAT_ATTEMPT_MILLIS = 6;
+
+   private final long heartbeatInterval;
+
+   private final long heartbeatTimeout;
+
+   private final long maxHeartbeatTimeout;
+
+   private final long delayOnError;
+
+   private final long maxAttemptTime;
+
+   /** target gateway to receive the heartbeat and give heartbeatResponse 
*/
+   protected final Gateway targetGateway;
+
+   /** the target address */
+   private final String targetAddress;
+
+   /** the target gateway name */
+   private final String targetName;
+
+   private final RpcService rpcService;
+
+   private final UUID leaderID;
+
+   private final Logger log;
+
+   private volatile boolean closed;
+
+   /**
+* @param rpcServicerpcService
+* @param leaderID  leader session id of current source end which 
send heartbeat
+* @param targetGateway target gateway which receive heartbeat and 
response
+* @param targetAddress target gateway address
+* @param targetNametarget name
+* @param log   log
+*/
+   public HeartbeatScheduler(RpcService rpcService, UUID leaderID, Gateway 
targetGateway,
+   String targetAddress, String targetName, Logger log) {
+   this(rpcService, leaderID, targetGateway, targetAddress, 
targetName, log, INITIAL_HEARTBEAT_INTERVAL_MILLIS,
+   INITIAL_HEARTBEAT_TIMEOUT_MILLIS, 
MAX_HEARTBEAT_TIMEOUT_MILLIS, ERROR_HEARTBEAT_DELAY_MILLIS, 
MAX_HEARTBEAT_ATTEMPT_MILLIS);
+   }
+
+   /**
+* @param rpcService  rpcService
+* @param leaderID   

[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...

2016-08-24 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76176826
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatScheduler.java
 ---
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.heartbeat;
+
+import akka.dispatch.OnFailure;
+import akka.dispatch.OnSuccess;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.slf4j.Logger;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * This utility class implements the basis of trigger heartbeat from one 
component to another component periodically,
+ * for example trigger heartbeat from the ResourceManager to TaskExecutor.
+ *
+ * @param  The type of the gateway to connect to.
+ * @param  The type of the successful heartbeat responses with 
payload.
+ */
+public abstract class HeartbeatScheduler {
+   /** default heartbeat interval time in millisecond */
+   private static final long INITIAL_HEARTBEAT_INTERVAL_MILLIS = 5000;
+
+   /** default heartbeat timeout in millisecond */
+   private static final long INITIAL_HEARTBEAT_TIMEOUT_MILLIS = 200;
+
+   /** default max heartbeat interval time in millisecond (which is used 
in retry heartbeat case) */
+   private static final long MAX_HEARTBEAT_TIMEOUT_MILLIS = 3;
+
+   /** default heartbeat attempt delay after an exception has occurred */
+   private static final long ERROR_HEARTBEAT_DELAY_MILLIS = 2000;
+
+   /** default max heartbeat retry time for one heartbeat */
+   private static final long MAX_HEARTBEAT_ATTEMPT_MILLIS = 6;
+
+   private final long heartbeatInterval;
+
+   private final long heartbeatTimeout;
+
+   private final long maxHeartbeatTimeout;
+
+   private final long delayOnError;
+
+   private final long maxAttemptTime;
+
+   /** target gateway to receive the heartbeat and give heartbeatResponse 
*/
+   protected final Gateway targetGateway;
+
+   /** the target address */
+   private final String targetAddress;
+
+   /** the target gateway name */
+   private final String targetName;
+
+   private final RpcService rpcService;
+
+   private final UUID leaderID;
+
+   private final Logger log;
+
+   private volatile boolean closed;
+
+   /**
+* @param rpcServicerpcService
+* @param leaderID  leader session id of current source end which 
send heartbeat
+* @param targetGateway target gateway which receive heartbeat and 
response
+* @param targetAddress target gateway address
+* @param targetNametarget name
+* @param log   log
+*/
+   public HeartbeatScheduler(RpcService rpcService, UUID leaderID, Gateway 
targetGateway,
+   String targetAddress, String targetName, Logger log) {
+   this(rpcService, leaderID, targetGateway, targetAddress, 
targetName, log, INITIAL_HEARTBEAT_INTERVAL_MILLIS,
+   INITIAL_HEARTBEAT_TIMEOUT_MILLIS, 
MAX_HEARTBEAT_TIMEOUT_MILLIS, ERROR_HEARTBEAT_DELAY_MILLIS, 
MAX_HEARTBEAT_ATTEMPT_MILLIS);
+   }
+
+   /**
+* @param rpcService  rpcService
+* @param leaderIDleader session id of current source end 
which send heartbeat
+* @param targetGateway   target gateway which receive heartbeat 
and response
+* @param targetAddress   target gateway address
+* @param 

[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436237#comment-15436237
 ] 

ASF GitHub Bot commented on FLINK-4449:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76176673
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke 
leadership
+ */
+public class ResourceManagerHATest {
+   private static ActorSystem actorSystem;
+   private static AkkaRpcService akkaRpcService;
+
+   private static final Timeout timeout = new Timeout(1, 
TimeUnit.MILLISECONDS);
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   actorSystem = AkkaUtils.createDefaultActorSystem();
+
+   akkaRpcService = new AkkaRpcService(actorSystem, timeout);
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   akkaRpcService.stopService();
+
+   actorSystem.shutdown();
+   actorSystem.awaitTermination();
+   }
+
+   @Test
+   public void testGrantAndRevokeLeadership() throws Exception {
+   TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+   HighAvailabilityServices highAvailabilityServices = 
mock(HighAvailabilityServices.class);
+   
when(highAvailabilityServices.getResourceManagerLeaderElectionService()).thenReturn(leaderElectionService);
+   final ResourceManager resourceManager = new 
ResourceManager(akkaRpcService, highAvailabilityServices);
+   resourceManager.start();
+   // before grant leadership, resourceManager's leaderId is null
+   Assert.assertNull(resourceManager.getLeaderSessionID());
+   final UUID leaderId = UUID.randomUUID();
+   leaderElectionService.isLeader(leaderId);
+   // after grant leadership, resourceManager's leaderId has value
+   Assert.assertEquals(getLatestLeaderId(resourceManager), 
leaderId);
--- End diff --

I don't think so. ResourceManager.getLeaderSessionID has to run in the main 
thread of resourceManager, so that it can get new LeaderSessionId after 
resourceManager is granted leadership or revoker leadership. 


> Heartbeat Manager between ResourceManager and TaskExecutor
> --
>
> Key: FLINK-4449
> URL: https://issues.apache.org/jira/browse/FLINK-4449
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
>
> 

[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...

2016-08-24 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76176673
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke 
leadership
+ */
+public class ResourceManagerHATest {
+   private static ActorSystem actorSystem;
+   private static AkkaRpcService akkaRpcService;
+
+   private static final Timeout timeout = new Timeout(1, 
TimeUnit.MILLISECONDS);
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   actorSystem = AkkaUtils.createDefaultActorSystem();
+
+   akkaRpcService = new AkkaRpcService(actorSystem, timeout);
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   akkaRpcService.stopService();
+
+   actorSystem.shutdown();
+   actorSystem.awaitTermination();
+   }
+
+   @Test
+   public void testGrantAndRevokeLeadership() throws Exception {
+   TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+   HighAvailabilityServices highAvailabilityServices = 
mock(HighAvailabilityServices.class);
+   
when(highAvailabilityServices.getResourceManagerLeaderElectionService()).thenReturn(leaderElectionService);
+   final ResourceManager resourceManager = new 
ResourceManager(akkaRpcService, highAvailabilityServices);
+   resourceManager.start();
+   // before grant leadership, resourceManager's leaderId is null
+   Assert.assertNull(resourceManager.getLeaderSessionID());
+   final UUID leaderId = UUID.randomUUID();
+   leaderElectionService.isLeader(leaderId);
+   // after grant leadership, resourceManager's leaderId has value
+   Assert.assertEquals(getLatestLeaderId(resourceManager), 
leaderId);
--- End diff --

I don't think so. ResourceManager.getLeaderSessionID has to run in the main 
thread of resourceManager, so that it can get new LeaderSessionId after 
resourceManager is granted leadership or revoker leadership. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436225#comment-15436225
 ] 

ASF GitHub Bot commented on FLINK-4449:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76176200
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke 
leadership
+ */
+public class ResourceManagerHATest {
+   private static ActorSystem actorSystem;
+   private static AkkaRpcService akkaRpcService;
+
+   private static final Timeout timeout = new Timeout(1, 
TimeUnit.MILLISECONDS);
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   actorSystem = AkkaUtils.createDefaultActorSystem();
+
+   akkaRpcService = new AkkaRpcService(actorSystem, timeout);
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   akkaRpcService.stopService();
+
+   actorSystem.shutdown();
+   actorSystem.awaitTermination();
+   }
+
+   @Test
+   public void testGrantAndRevokeLeadership() throws Exception {
+   TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+   HighAvailabilityServices highAvailabilityServices = 
mock(HighAvailabilityServices.class);
--- End diff --

good


> Heartbeat Manager between ResourceManager and TaskExecutor
> --
>
> Key: FLINK-4449
> URL: https://issues.apache.org/jira/browse/FLINK-4449
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
>
> HeartbeatManager is responsible for heartbeat between resourceManager to 
> TaskExecutor
> 1. Register taskExecutors
> register heartbeat targets. If the heartbeat response for these targets is 
> not reported in time, mark target failed and notify resourceManager
> 2. trigger heartbeat
> trigger heartbeat from resourceManager to TaskExecutor periodically
> taskExecutor report slot allocation in the heartbeat response
> ResourceManager sync self slot allocation with the heartbeat response



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...

2016-08-24 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76176210
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatSchedulerTest.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.heartbeat;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * test for HeartbeatScheduler, including successful case, timeout 
case(retry on backoff timeout), failure case(retry)
+ */
+public class HeartbeatSchedulerTest extends TestLogger {
+   private static final long INITIAL_INTERVAL = 200;
+   private static final long INITIAL_TIMEOUT = 20;
+   private static final long MAX_TIMEOUT = 150;
+   private static final long DELAY_ON_ERROR = 100;
+   private static final long MAX_ATTEMPT_TIME = 300;
+
+
+   private static ActorSystem actorSystem;
+   private static AkkaRpcService akkaRpcService;
+
+   private static final Timeout timeout = new Timeout(1, 
TimeUnit.MILLISECONDS);
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   actorSystem = AkkaUtils.createDefaultActorSystem();
+
+   akkaRpcService = new AkkaRpcService(actorSystem, timeout);
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   akkaRpcService.stopService();
--- End diff --

Cool


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...

2016-08-24 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76176200
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke 
leadership
+ */
+public class ResourceManagerHATest {
+   private static ActorSystem actorSystem;
+   private static AkkaRpcService akkaRpcService;
+
+   private static final Timeout timeout = new Timeout(1, 
TimeUnit.MILLISECONDS);
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   actorSystem = AkkaUtils.createDefaultActorSystem();
+
+   akkaRpcService = new AkkaRpcService(actorSystem, timeout);
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   akkaRpcService.stopService();
+
+   actorSystem.shutdown();
+   actorSystem.awaitTermination();
+   }
+
+   @Test
+   public void testGrantAndRevokeLeadership() throws Exception {
+   TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+   HighAvailabilityServices highAvailabilityServices = 
mock(HighAvailabilityServices.class);
--- End diff --

good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436226#comment-15436226
 ] 

ASF GitHub Bot commented on FLINK-4449:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76176210
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatSchedulerTest.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.heartbeat;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * test for HeartbeatScheduler, including successful case, timeout 
case(retry on backoff timeout), failure case(retry)
+ */
+public class HeartbeatSchedulerTest extends TestLogger {
+   private static final long INITIAL_INTERVAL = 200;
+   private static final long INITIAL_TIMEOUT = 20;
+   private static final long MAX_TIMEOUT = 150;
+   private static final long DELAY_ON_ERROR = 100;
+   private static final long MAX_ATTEMPT_TIME = 300;
+
+
+   private static ActorSystem actorSystem;
+   private static AkkaRpcService akkaRpcService;
+
+   private static final Timeout timeout = new Timeout(1, 
TimeUnit.MILLISECONDS);
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   actorSystem = AkkaUtils.createDefaultActorSystem();
+
+   akkaRpcService = new AkkaRpcService(actorSystem, timeout);
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   akkaRpcService.stopService();
--- End diff --

Cool


> Heartbeat Manager between ResourceManager and TaskExecutor
> --
>
> Key: FLINK-4449
> URL: https://issues.apache.org/jira/browse/FLINK-4449
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
>
> HeartbeatManager is responsible for heartbeat between resourceManager to 
> TaskExecutor
> 1. Register taskExecutors
> register heartbeat targets. If the heartbeat response for these targets is 
> not reported in time, mark target failed and notify resourceManager
> 2. trigger heartbeat
> trigger heartbeat from resourceManager to TaskExecutor periodically
> taskExecutor report slot allocation in the heartbeat response
> ResourceManager sync self slot allocation with the heartbeat response



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3322) MemoryManager creates too much GC pressure with iterative jobs

2016-08-24 Thread ramkrishna.s.vasudevan (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436212#comment-15436212
 ] 

ramkrishna.s.vasudevan commented on FLINK-3322:
---

I can work on this and come up with a design doc to ensure that Iterative jobs 
are smart enough to ensure they don't go for requesting memory segments every 
time. [~StephanEwen]?

> MemoryManager creates too much GC pressure with iterative jobs
> --
>
> Key: FLINK-3322
> URL: https://issues.apache.org/jira/browse/FLINK-3322
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Affects Versions: 1.0.0
>Reporter: Gabor Gevay
>Priority: Critical
> Fix For: 1.0.0
>
>
> When taskmanager.memory.preallocate is false (the default), released memory 
> segments are not added to a pool, but the GC is expected to take care of 
> them. This puts too much pressure on the GC with iterative jobs, where the 
> operators reallocate all memory at every superstep.
> See the following discussion on the mailing list:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Memory-manager-behavior-in-iterative-jobs-tt10066.html
> Reproducing the issue:
> https://github.com/ggevay/flink/tree/MemoryManager-crazy-gc
> The class to start is malom.Solver. If you increase the memory given to the 
> JVM from 1 to 50 GB, performance gradually degrades by more than 10 times. 
> (It will generate some lookuptables to /tmp on first run for a few minutes.) 
> (I think the slowdown might also depend somewhat on 
> taskmanager.memory.fraction, because more unused non-managed memory results 
> in rarer GCs.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2416: FLINK-4480: Incorrect link to elastic.co in documentation

2016-08-24 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2416
  
Hi @smarthi,
Thank you for picking this up. The changes LGTM :) Also tested the 2.3.5 ES 
version bump.
Good catches on the Guava and broken example also!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4480) Incorrect link to elastic.co in documentation

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436175#comment-15436175
 ] 

ASF GitHub Bot commented on FLINK-4480:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/2416
  
Hi @smarthi,
Thank you for picking this up. The changes LGTM :) Also tested the 2.3.5 ES 
version bump.
Good catches on the Guava and broken example also!



> Incorrect link to elastic.co in documentation
> -
>
> Key: FLINK-4480
> URL: https://issues.apache.org/jira/browse/FLINK-4480
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0, 1.1.1
>Reporter: Fabian Hueske
>Assignee: Suneel Marthi
>Priority: Trivial
>
> The link URL of the entry "Elasticsearch 2x (sink)" on the connector's 
> documentation page 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html
>  is pointing to http://elastic.com but should point to http://elastic.co



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4339) Implement Slot Pool Core

2016-08-24 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young reassigned FLINK-4339:
-

Assignee: Kurt Young

> Implement Slot Pool Core
> 
>
> Key: FLINK-4339
> URL: https://issues.apache.org/jira/browse/FLINK-4339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
>Assignee: Kurt Young
> Fix For: 1.2.0
>
>
> Impements the core slot structures and behavior of the {{SlotPool}}:
>   - pool of available slots
>   - request slots and response if slot is available in pool
>   - return / deallocate slots
> Detail design in here: 
> https://docs.google.com/document/d/1y4D-0KGiMNDFYOLRkJy-C04nl8fwJNdm9hoUfxce6zY/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4339) Implement Slot Pool Core

2016-08-24 Thread Kurt Young (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kurt Young updated FLINK-4339:
--
Description: 
Impements the core slot structures and behavior of the {{SlotPool}}:
  - pool of available slots
  - request slots and response if slot is available in pool
  - return / deallocate slots

Detail design in here: 
https://docs.google.com/document/d/1y4D-0KGiMNDFYOLRkJy-C04nl8fwJNdm9hoUfxce6zY/

  was:
Impements the core slot structures and behavior of the {{SlotPool}}:
  - pool of available slots
  - request slots and response if slot is available in pool
  - return / deallocate slots


> Implement Slot Pool Core
> 
>
> Key: FLINK-4339
> URL: https://issues.apache.org/jira/browse/FLINK-4339
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Affects Versions: 1.1.0
>Reporter: Stephan Ewen
> Fix For: 1.2.0
>
>
> Impements the core slot structures and behavior of the {{SlotPool}}:
>   - pool of available slots
>   - request slots and response if slot is available in pool
>   - return / deallocate slots
> Detail design in here: 
> https://docs.google.com/document/d/1y4D-0KGiMNDFYOLRkJy-C04nl8fwJNdm9hoUfxce6zY/



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4478) Implement heartbeat logic

2016-08-24 Thread zhangjing (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436167#comment-15436167
 ] 

zhangjing commented on FLINK-4478:
--

[~till.rohrmann]. I agree we should define how should it look like first. And I 
try to give my opinions for your question. Here's my thought, What's your 
advice?
1. exponential backoff strategy.

In fact, it is not complete exponential backoff. like 'Math.min(2 * 
timeoutMillis, maxHeartbeatTimeout)', Maybe we could use maxHeartbeatTimeout to 
decrease the risk of wait twice as long as defined until notified about a 
heartbeat failure.
Also we could use constant retry period instead of backoff strategy
2. whether every heartbeat connection should be responsible for triggering 
itself or whether the heartbeat manager should be responsible for that?
Every heartbeat scheduler don't trigger itself, it depends on outer world(Here 
i means HeartbeatManager) call it's start method to trigger it.

3. Is the heartbeat receiving end an independent RpcEndpoint? How does the 
payload delivery works? Does the sender side asks for the result (future) or 
does the receiving side answers via a tell message to the heartbeat manager?
On the sender side, receiving end is a gateway which can be got by its address. 
And Sender side ask receiver for the heartbeat payload.
4. How does receiving end monitor the sender so that if the heartbeat request 
is not delivered, then receiving end could mark sending end as dead?
I think it could be independent of heartbeat manager on the sending side. It 
should run on the receiving end while heartbeat scheduler run on the sending 
side.



> Implement heartbeat logic
> -
>
> Key: FLINK-4478
> URL: https://issues.apache.org/jira/browse/FLINK-4478
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.1.0
>Reporter: Till Rohrmann
> Fix For: 1.2.0
>
>
> Parent issue to track the development of the heartbeat logic (sender and 
> receiver) for the new Flip-6 refactoring.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436164#comment-15436164
 ] 

ASF GitHub Bot commented on FLINK-4449:
---

Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2410
  
Hi, till. Thanks for reviewing and good advices so much.  I agree we should 
define how should it look like first. And I try to give my opinions  for your 
question.
1. exponential backoff strategy.  
In fact, it is not complete exponential backoff. like 'Math.min(2 * 
timeoutMillis, maxHeartbeatTimeout)', Maybe we could use maxHeartbeatTimeout to 
decrease the risk of wait twice as long as defined until notified about a 
heartbeat failure.
Also we could use constant retry period instead of backoff strategy
2. whether every heartbeat connection should be responsible for triggering 
itself or whether the heartbeat manager should be responsible for that?
Every heartbeat scheduler don't trigger itself, it depends on outer 
world(Here i means HeartbeatManager) call it's start method to trigger it.  
3. Is the heartbeat receiving end an independent RpcEndpoint? How does the 
payload delivery works? Does the sender side asks for the result (future) or 
does the receiving side answers via a tell message to the heartbeat manager?
On the sender side, receiving end is a gateway which can be got by its 
address. And Sender side ask receiver for the heartbeat payload.
4. How does receiving end monitor the sender so that if the heartbeat 
request is not delivered, then receiving end could mark sending end as dead?
I think it could be independent of heartbeat manager on the sending side. 
It should run on the receiving end while heartbeat scheduler run on the sending 
side.

What's your advice?


> Heartbeat Manager between ResourceManager and TaskExecutor
> --
>
> Key: FLINK-4449
> URL: https://issues.apache.org/jira/browse/FLINK-4449
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: zhangjing
>Assignee: zhangjing
>
> HeartbeatManager is responsible for heartbeat between resourceManager to 
> TaskExecutor
> 1. Register taskExecutors
> register heartbeat targets. If the heartbeat response for these targets is 
> not reported in time, mark target failed and notify resourceManager
> 2. trigger heartbeat
> trigger heartbeat from resourceManager to TaskExecutor periodically
> taskExecutor report slot allocation in the heartbeat response
> ResourceManager sync self slot allocation with the heartbeat response



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2410: [FLINK-4449] [cluster management] Heartbeat Manager betwe...

2016-08-24 Thread beyond1920
Github user beyond1920 commented on the issue:

https://github.com/apache/flink/pull/2410
  
Hi, till. Thanks for reviewing and good advices so much.  I agree we should 
define how should it look like first. And I try to give my opinions  for your 
question.
1. exponential backoff strategy.  
In fact, it is not complete exponential backoff. like 'Math.min(2 * 
timeoutMillis, maxHeartbeatTimeout)', Maybe we could use maxHeartbeatTimeout to 
decrease the risk of wait twice as long as defined until notified about a 
heartbeat failure.
Also we could use constant retry period instead of backoff strategy
2. whether every heartbeat connection should be responsible for triggering 
itself or whether the heartbeat manager should be responsible for that?
Every heartbeat scheduler don't trigger itself, it depends on outer 
world(Here i means HeartbeatManager) call it's start method to trigger it.  
3. Is the heartbeat receiving end an independent RpcEndpoint? How does the 
payload delivery works? Does the sender side asks for the result (future) or 
does the receiving side answers via a tell message to the heartbeat manager?
On the sender side, receiving end is a gateway which can be got by its 
address. And Sender side ask receiver for the heartbeat payload.
4. How does receiving end monitor the sender so that if the heartbeat 
request is not delivered, then receiving end could mark sending end as dead?
I think it could be independent of heartbeat manager on the sending side. 
It should run on the receiving end while heartbeat scheduler run on the sending 
side.

What's your advice?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4449) Heartbeat Manager between ResourceManager and TaskExecutor

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15436094#comment-15436094
 ] 

ASF GitHub Bot commented on FLINK-4449:
---

Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76168206
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatSchedulerTest.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.heartbeat;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * test for HeartbeatScheduler, including successful case, timeout 
case(retry on backoff timeout), failure case(retry)
+ */
+public class HeartbeatSchedulerTest extends TestLogger {
+   private static final long INITIAL_INTERVAL = 200;
+   private static final long INITIAL_TIMEOUT = 20;
+   private static final long MAX_TIMEOUT = 150;
+   private static final long DELAY_ON_ERROR = 100;
+   private static final long MAX_ATTEMPT_TIME = 300;
+
+
+   private static ActorSystem actorSystem;
+   private static AkkaRpcService akkaRpcService;
+
+   private static final Timeout timeout = new Timeout(1, 
TimeUnit.MILLISECONDS);
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   actorSystem = AkkaUtils.createDefaultActorSystem();
+
+   akkaRpcService = new AkkaRpcService(actorSystem, timeout);
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   akkaRpcService.stopService();
+
+   actorSystem.shutdown();
+   actorSystem.awaitTermination();
+   }
+
+
+   /**
+* test for receiving a regular heartbeat response in time, and 
checking the heartbeatResponse is properly delivered
+* @throws Exception
+*/
+   @Test
+   public void testHeartbeatSuccessful() throws Exception {
+   HeartbeatSender sender = mock(HeartbeatSender.class);
+   UUID leaderSessionID = UUID.randomUUID();
+
+   HeartbeatReceiverGateway targetGateway = 
mock(HeartbeatReceiverGateway.class);
+
+   String response = "ok";
+   when(targetGateway.triggerHeartbeat(any(UUID.class), 
any(FiniteDuration.class))).thenReturn(
+   Futures.successful(response)
+   );
+
+   TestingHeartbeatScheduler heartbeatScheduler = new 
TestingHeartbeatScheduler(sender, akkaRpcService, leaderSessionID,
+   targetGateway, "taskExecutor-test-address", 
"testTargetGateway", log, INITIAL_INTERVAL, INITIAL_TIMEOUT, MAX_TIMEOUT, 
DELAY_ON_ERROR, MAX_ATTEMPT_TIME);
+   heartbeatScheduler.start();
+
+   // verify heartbeat successful and syncHeartbeatResponse is 
invoked 

[GitHub] flink pull request #2410: [FLINK-4449] [cluster management] Heartbeat Manage...

2016-08-24 Thread beyond1920
Github user beyond1920 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2410#discussion_r76168206
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/heartbeat/HeartbeatSchedulerTest.java
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.heartbeat;
+
+import akka.actor.ActorSystem;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * test for HeartbeatScheduler, including successful case, timeout 
case(retry on backoff timeout), failure case(retry)
+ */
+public class HeartbeatSchedulerTest extends TestLogger {
+   private static final long INITIAL_INTERVAL = 200;
+   private static final long INITIAL_TIMEOUT = 20;
+   private static final long MAX_TIMEOUT = 150;
+   private static final long DELAY_ON_ERROR = 100;
+   private static final long MAX_ATTEMPT_TIME = 300;
+
+
+   private static ActorSystem actorSystem;
+   private static AkkaRpcService akkaRpcService;
+
+   private static final Timeout timeout = new Timeout(1, 
TimeUnit.MILLISECONDS);
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   actorSystem = AkkaUtils.createDefaultActorSystem();
+
+   akkaRpcService = new AkkaRpcService(actorSystem, timeout);
+   }
+
+   @AfterClass
+   public static void teardown() throws Exception {
+   akkaRpcService.stopService();
+
+   actorSystem.shutdown();
+   actorSystem.awaitTermination();
+   }
+
+
+   /**
+* test for receiving a regular heartbeat response in time, and 
checking the heartbeatResponse is properly delivered
+* @throws Exception
+*/
+   @Test
+   public void testHeartbeatSuccessful() throws Exception {
+   HeartbeatSender sender = mock(HeartbeatSender.class);
+   UUID leaderSessionID = UUID.randomUUID();
+
+   HeartbeatReceiverGateway targetGateway = 
mock(HeartbeatReceiverGateway.class);
+
+   String response = "ok";
+   when(targetGateway.triggerHeartbeat(any(UUID.class), 
any(FiniteDuration.class))).thenReturn(
+   Futures.successful(response)
+   );
+
+   TestingHeartbeatScheduler heartbeatScheduler = new 
TestingHeartbeatScheduler(sender, akkaRpcService, leaderSessionID,
+   targetGateway, "taskExecutor-test-address", 
"testTargetGateway", log, INITIAL_INTERVAL, INITIAL_TIMEOUT, MAX_TIMEOUT, 
DELAY_ON_ERROR, MAX_ATTEMPT_TIME);
+   heartbeatScheduler.start();
+
+   // verify heartbeat successful and syncHeartbeatResponse is 
invoked for sync heartbeat response
+   verify(sender, 
timeout(5000)).syncHeartbeatResponse(eq(response));
+   // verify heartbeat trigger is still on
+   Assert.assertFalse(heartbeatScheduler.isClosed());
+ 

[jira] [Commented] (FLINK-4483) Complete test coverage integration

2016-08-24 Thread Pavel Fadeev (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4483?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435930#comment-15435930
 ] 

Pavel Fadeev commented on FLINK-4483:
-

Team,
can you please help with this ticket assignment to my account?

> Complete test coverage integration
> --
>
> Key: FLINK-4483
> URL: https://issues.apache.org/jira/browse/FLINK-4483
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Pavel Fadeev
>
> With INFRA-12458 implemented now we need to move on and:
> - recover code coverage reporting for maven build
> - report it continuously with coveralls.io . Obsolete reports could be found 
> [here|https://coveralls.io/github/apache/flink] 
> It is good to:
> - prevent extra tests run for reporting
> - minimize build timing impact with agent/instrumentation/reporting
> - provide java/scala code coverage metrics there 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4483) Complete test coverage integration

2016-08-24 Thread Pavel Fadeev (JIRA)
Pavel Fadeev created FLINK-4483:
---

 Summary: Complete test coverage integration
 Key: FLINK-4483
 URL: https://issues.apache.org/jira/browse/FLINK-4483
 Project: Flink
  Issue Type: Improvement
  Components: Build System
Reporter: Pavel Fadeev


With INFRA-12458 implemented now we need to move on and:
- recover code coverage reporting for maven build
- report it continuously with coveralls.io . Obsolete reports could be found 
[here|https://coveralls.io/github/apache/flink] 

It is good to:
- prevent extra tests run for reporting
- minimize build timing impact with agent/instrumentation/reporting
- provide java/scala code coverage metrics there 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4482) numUnsuccessfulCheckpointsTriggers is accessed without holding triggerLock

2016-08-24 Thread Ted Yu (JIRA)
Ted Yu created FLINK-4482:
-

 Summary: numUnsuccessfulCheckpointsTriggers is accessed without 
holding triggerLock
 Key: FLINK-4482
 URL: https://issues.apache.org/jira/browse/FLINK-4482
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu
Priority: Minor


In CheckpointCoordinator#stopCheckpointScheduler() :
{code}
synchronized (lock) {
...
  numUnsuccessfulCheckpointsTriggers = 0;
{code}
triggerLock is not held in the above operation.
See comment for triggerLock earlier in triggerCheckpoint():
{code}
// we lock with a special lock to make sure that trigger requests do not 
overtake each other.
// this is not done with the coordinator-wide lock, because the 
'checkpointIdCounter'
// may issue blocking operations. Using a different lock than teh 
coordinator-wide lock,
// we avoid blocking the processing of 'acknowledge/decline' messages 
during that time.
synchronized (triggerLock) {
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2383: [FLINK-4418] [client] Improve resilience when InetAddress...

2016-08-24 Thread rehevkor5
Github user rehevkor5 commented on the issue:

https://github.com/apache/flink/pull/2383
  
@StephanEwen do you want me to take a look & see what straightforward 
refactoring I can make, and squeeze that into this PR? I can't tell if that was 
a hint or not :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4418) ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if InetAddress.getLocalHost throws exception

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435824#comment-15435824
 ] 

ASF GitHub Bot commented on FLINK-4418:
---

Github user rehevkor5 commented on the issue:

https://github.com/apache/flink/pull/2383
  
@StephanEwen do you want me to take a look & see what straightforward 
refactoring I can make, and squeeze that into this PR? I can't tell if that was 
a hint or not :)


> ClusterClient/ConnectionUtils#findConnectingAddress fails immediately if 
> InetAddress.getLocalHost throws exception
> --
>
> Key: FLINK-4418
> URL: https://issues.apache.org/jira/browse/FLINK-4418
> Project: Flink
>  Issue Type: Bug
>  Components: Client
>Affects Versions: 1.1.0
>Reporter: Shannon Carey
>Assignee: Shannon Carey
>
> When attempting to connect to a cluster with a ClusterClient, if the 
> machine's hostname is not resolvable to an IP, an exception is thrown 
> preventing success.
> This is the case if, for example, the hostname is not present & mapped to a 
> local IP in /etc/hosts.
> The exception is below. I suggest that findAddressUsingStrategy() should 
> catch java.net.UnknownHostException thrown by InetAddress.getLocalHost() and 
> return null, allowing alternative strategies to be attempted by 
> findConnectingAddress(). I will open a PR to this effect. Ideally this could 
> be included in both 1.2 and 1.1.2.
> In the stack trace below, "ip-10-2-64-47" is the internal host name of an AWS 
> EC2 instance.
> {code}
> 21:11:35 org.apache.flink.client.program.ProgramInvocationException: Failed 
> to retrieve the JobManager gateway.
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:430)
> 21:11:35  at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:90)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
> 21:11:35  at 
> org.apache.flink.client.program.DetachedEnvironment.finalizeExecute(DetachedEnvironment.java:75)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:334)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.FlinkJobSubmitter.get(FlinkJobSubmitter.java:81)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.streaming.StreamingJobManager.run(StreamingJobManager.java:105)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.JobScheduler.runStreamingApp(JobScheduler.java:69)
> 21:11:35  at 
> com.expedia.www.flink.job.scheduler.JobScheduler.main(JobScheduler.java:34)
> 21:11:35 Caused by: java.lang.RuntimeException: Failed to resolve JobManager 
> address at /10.2.89.80:43126
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:189)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.getJobManagerGateway(ClusterClient.java:649)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient.runDetached(ClusterClient.java:428)
> 21:11:35  ... 8 more
> 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: 
> ip-10-2-64-47: unknown error
> 21:11:35  at java.net.InetAddress.getLocalHost(InetAddress.java:1505)
> 21:11:35  at 
> org.apache.flink.runtime.net.ConnectionUtils.findAddressUsingStrategy(ConnectionUtils.java:232)
> 21:11:35  at 
> org.apache.flink.runtime.net.ConnectionUtils.findConnectingAddress(ConnectionUtils.java:123)
> 21:11:35  at 
> org.apache.flink.client.program.ClusterClient$LazyActorSystemLoader.get(ClusterClient.java:187)
> 21:11:35  ... 10 more
> 21:11:35 Caused by: java.net.UnknownHostException: ip-10-2-64-47: unknown 
> error
> 21:11:35  at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
> 21:11:35  at 
> java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
> 21:11:35  at 
> java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
> 21:11:35  at java.net.InetAddress.getLocalHost(InetAddress.java:1500)
> 21:11:35  ... 13 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435814#comment-15435814
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2109
  
@mxm Thank you for the review. I've updated PR accordingly.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435813#comment-15435813
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76148927
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
+   @Test
+   public void defaultConstructorCreateMatchAllFilter() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter();
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   }
+
+   @Test
+   public void matchAllFilesByDefault() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.emptyList(),
+   Collections.emptyList());
+
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   }
+
+   @Test
+   public void excludeFilesNotInIncludePatterns() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.singletonList("dir/*"),
+   Collections.emptyList());
+
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   assertTrue(matcher.filterPath(new Path("dir1/file.txt")));
+   }
+
+   @Test
+   public void excludeFilesIfMatchesExclude() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.singletonList("dir/*"),
--- End diff --

Yes, this is supported. Added a test for this.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2109: [FLINK-3677] FileInputFormat: Allow to specify include/ex...

2016-08-24 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2109
  
@mxm Thank you for the review. I've updated PR accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-24 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76148927
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/GlobFilePathFilterTest.java
 ---
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class GlobFilePathFilterTest {
+   @Test
+   public void defaultConstructorCreateMatchAllFilter() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter();
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   }
+
+   @Test
+   public void matchAllFilesByDefault() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.emptyList(),
+   Collections.emptyList());
+
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   }
+
+   @Test
+   public void excludeFilesNotInIncludePatterns() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.singletonList("dir/*"),
+   Collections.emptyList());
+
+   assertFalse(matcher.filterPath(new Path("dir/file.txt")));
+   assertTrue(matcher.filterPath(new Path("dir1/file.txt")));
+   }
+
+   @Test
+   public void excludeFilesIfMatchesExclude() {
+   GlobFilePathFilter matcher = new GlobFilePathFilter(
+   Collections.singletonList("dir/*"),
--- End diff --

Yes, this is supported. Added a test for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435810#comment-15435810
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76148802
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() {
}
 
@Test
+   public void testExcludeFiles() {
+   try {
+   final String contents = "CONTENTS";
+
+   // create some accepted, some ignored files
+
+   File child1 = temporaryFolder.newFile("dataFile1.txt");
+   File child2 = 
temporaryFolder.newFile("another_file.bin");
+
+   File[] files = { child1, child2 };
+
+   createTempFiles(contents.getBytes(), files);
+
+   // test that only the valid files are accepted
+
+   Configuration configuration = new Configuration();
+
+   final DummyFileInputFormat format = new 
DummyFileInputFormat();
+   
format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+   format.configure(configuration);
+   format.setFilesFilter(new GlobFilePathFilter(
+   Collections.singletonList("**"),
--- End diff --

* - means any file
** - means any file in any subdirectory 

Added a comment and test for that.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-24 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76148802
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 ---
@@ -314,43 +299,95 @@ public void testIgnoredUnderscoreFiles() {
}
 
@Test
+   public void testExcludeFiles() {
+   try {
+   final String contents = "CONTENTS";
+
+   // create some accepted, some ignored files
+
+   File child1 = temporaryFolder.newFile("dataFile1.txt");
+   File child2 = 
temporaryFolder.newFile("another_file.bin");
+
+   File[] files = { child1, child2 };
+
+   createTempFiles(contents.getBytes(), files);
+
+   // test that only the valid files are accepted
+
+   Configuration configuration = new Configuration();
+
+   final DummyFileInputFormat format = new 
DummyFileInputFormat();
+   
format.setFilePath(temporaryFolder.getRoot().toURI().toString());
+   format.configure(configuration);
+   format.setFilesFilter(new GlobFilePathFilter(
+   Collections.singletonList("**"),
--- End diff --

* - means any file
** - means any file in any subdirectory 

Added a comment and test for that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2416: FLINK-4480: Incorrect link to elastic.co in documentation

2016-08-24 Thread smarthi
Github user smarthi commented on the issue:

https://github.com/apache/flink/pull/2416
  
1. Changed the ES2 version to 2.3.5
2. Fixed broken ElasticsearchExample.java
3. Removed use of Guava API.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3677) FileInputFormat: Allow to specify include/exclude file name patterns

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3677?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435807#comment-15435807
 ] 

ASF GitHub Bot commented on FLINK-3677:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76148725
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class DefaultFilterTest {
+   @Parameters
+   public static Collection data() {
+   return Arrays.asList(new Object[][] {
+   {"file.txt",false},
+   {".file.txt",   true},
+   {"_file.txt",   true},
+   {"_COPYING_",   true},
+   {"dir/.file.txt",   true},
+   {"dir/_file.txt",   true},
+   {"dir/_COPYING_",   true},
--- End diff --

No prolems. I've extracted a constant.


> FileInputFormat: Allow to specify include/exclude file name patterns
> 
>
> Key: FLINK-3677
> URL: https://issues.apache.org/jira/browse/FLINK-3677
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Maximilian Michels
>Assignee: Ivan Mushketyk
>Priority: Minor
>  Labels: starter
>
> It would be nice to be able to specify a regular expression to filter files.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4480) Incorrect link to elastic.co in documentation

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435808#comment-15435808
 ] 

ASF GitHub Bot commented on FLINK-4480:
---

Github user smarthi commented on the issue:

https://github.com/apache/flink/pull/2416
  
1. Changed the ES2 version to 2.3.5
2. Fixed broken ElasticsearchExample.java
3. Removed use of Guava API.


> Incorrect link to elastic.co in documentation
> -
>
> Key: FLINK-4480
> URL: https://issues.apache.org/jira/browse/FLINK-4480
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0, 1.1.1
>Reporter: Fabian Hueske
>Assignee: Suneel Marthi
>Priority: Trivial
>
> The link URL of the entry "Elasticsearch 2x (sink)" on the connector's 
> documentation page 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html
>  is pointing to http://elastic.com but should point to http://elastic.co



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4480) Incorrect link to elastic.co in documentation

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435804#comment-15435804
 ] 

ASF GitHub Bot commented on FLINK-4480:
---

GitHub user smarthi opened a pull request:

https://github.com/apache/flink/pull/2416

FLINK-4480: Incorrect link to elastic.co in documentation

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smarthi/flink FLINK-4480

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2416


commit ea53aa1e715a8d862282cfbaff2243431455c0ae
Author: smarthi 
Date:   2016-08-24T22:07:23Z

FLINK-4480: Incorrect link to elastic.co in documentation




> Incorrect link to elastic.co in documentation
> -
>
> Key: FLINK-4480
> URL: https://issues.apache.org/jira/browse/FLINK-4480
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0, 1.1.1
>Reporter: Fabian Hueske
>Assignee: Suneel Marthi
>Priority: Trivial
>
> The link URL of the entry "Elasticsearch 2x (sink)" on the connector's 
> documentation page 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html
>  is pointing to http://elastic.com but should point to http://elastic.co



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2109: [FLINK-3677] FileInputFormat: Allow to specify inc...

2016-08-24 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2109#discussion_r76148725
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/DefaultFilterTest.java 
---
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.flink.core.fs.Path;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class DefaultFilterTest {
+   @Parameters
+   public static Collection data() {
+   return Arrays.asList(new Object[][] {
+   {"file.txt",false},
+   {".file.txt",   true},
+   {"_file.txt",   true},
+   {"_COPYING_",   true},
+   {"dir/.file.txt",   true},
+   {"dir/_file.txt",   true},
+   {"dir/_COPYING_",   true},
--- End diff --

No prolems. I've extracted a constant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2416: FLINK-4480: Incorrect link to elastic.co in docume...

2016-08-24 Thread smarthi
GitHub user smarthi opened a pull request:

https://github.com/apache/flink/pull/2416

FLINK-4480: Incorrect link to elastic.co in documentation

Thanks for contributing to Apache Flink. Before you open your pull request, 
please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your 
pull request. For more information and/or questions please refer to the [How To 
Contribute guide](http://flink.apache.org/how-to-contribute.html).
In addition to going through the list, please provide a meaningful 
description of your changes.

- [X] General
  - The pull request references the related JIRA issue ("[FLINK-XXX] Jira 
title text")
  - The pull request addresses only one issue
  - Each commit in the PR has a meaningful commit message (including the 
JIRA id)

- [X] Documentation
  - Documentation has been added for new functionality
  - Old documentation affected by the pull request has been updated
  - JavaDoc for public methods has been added

- [X] Tests & Build
  - Functionality added by the pull request is covered by tests
  - `mvn clean verify` has been executed successfully locally or a Travis 
build has passed



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/smarthi/flink FLINK-4480

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2416


commit ea53aa1e715a8d862282cfbaff2243431455c0ae
Author: smarthi 
Date:   2016-08-24T22:07:23Z

FLINK-4480: Incorrect link to elastic.co in documentation




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4453) Scala code example in Window documentation shows Java

2016-08-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4453.
---

> Scala code example in Window documentation shows Java
> -
>
> Key: FLINK-4453
> URL: https://issues.apache.org/jira/browse/FLINK-4453
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Trivial
> Fix For: 1.2.0
>
>
> The first code example in the section "WindowFunction - The Generic Case" of 
> the window documentation of the 1.2 SNAPSHOT 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#windowfunction---the-generic-case)
>  shows Java code in the Scala tab.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4453) Scala code example in Window documentation shows Java

2016-08-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4453.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed via 42f65e4b93ef7f71b6252bc9c664bee727fd4278

> Scala code example in Window documentation shows Java
> -
>
> Key: FLINK-4453
> URL: https://issues.apache.org/jira/browse/FLINK-4453
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Trivial
> Fix For: 1.2.0
>
>
> The first code example in the section "WindowFunction - The Generic Case" of 
> the window documentation of the 1.2 SNAPSHOT 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#windowfunction---the-generic-case)
>  shows Java code in the Scala tab.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4437.
---

> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
> Fix For: 1.2.0
>
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4437.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed via 4da40bcb9ea01cb0c5e6fd0d7472dc09397f648e

> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
> Fix For: 1.2.0
>
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435730#comment-15435730
 ] 

ASF GitHub Bot commented on FLINK-4437:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2409
  
@tedyu No problem, happens to everyone.

I actually took your code and extended it a bit. Merged in 
4da40bcb9ea01cb0c5e6fd0d7472dc09397f648e


> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...

2016-08-24 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2409
  
@tedyu No problem, happens to everyone.

I actually took your code and extended it a bit. Merged in 
4da40bcb9ea01cb0c5e6fd0d7472dc09397f648e


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Closed] (FLINK-4417) Checkpoints should be subsumed by CheckpointID not, by timestamp

2016-08-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4417.
---

> Checkpoints should be subsumed by CheckpointID not, by timestamp
> 
>
> Key: FLINK-4417
> URL: https://issues.apache.org/jira/browse/FLINK-4417
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.2.0
>
>
> Since the system clocks cannot be expected to be stable/monotonous, the 
> subsumption logic in the checkpoint coordinator should not decide which 
> checkpoint is "newer" based on the system time.
> The checkpoint ID is guaranteed to be strictly monotonously increasing. It is 
> a better measure to decide which checkpoint is newer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4417) Checkpoints should be subsumed by CheckpointID not, by timestamp

2016-08-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4417.
-
Resolution: Fixed

Fixed via 4e9d1775b5514c87981c78d55323cc2b17361867

Thank you for the contribution!

> Checkpoints should be subsumed by CheckpointID not, by timestamp
> 
>
> Key: FLINK-4417
> URL: https://issues.apache.org/jira/browse/FLINK-4417
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.2.0
>
>
> Since the system clocks cannot be expected to be stable/monotonous, the 
> subsumption logic in the checkpoint coordinator should not decide which 
> checkpoint is "newer" based on the system time.
> The checkpoint ID is guaranteed to be strictly monotonously increasing. It is 
> a better measure to decide which checkpoint is newer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4480) Incorrect link to elastic.co in documentation

2016-08-24 Thread Suneel Marthi (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suneel Marthi reassigned FLINK-4480:


Assignee: Suneel Marthi

> Incorrect link to elastic.co in documentation
> -
>
> Key: FLINK-4480
> URL: https://issues.apache.org/jira/browse/FLINK-4480
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0, 1.1.1
>Reporter: Fabian Hueske
>Assignee: Suneel Marthi
>Priority: Trivial
>
> The link URL of the entry "Elasticsearch 2x (sink)" on the connector's 
> documentation page 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html
>  is pointing to http://elastic.com but should point to http://elastic.co



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4457) Make the ExecutionGraph independent of Akka

2016-08-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4457.
---

> Make the ExecutionGraph independent of Akka
> ---
>
> Key: FLINK-4457
> URL: https://issues.apache.org/jira/browse/FLINK-4457
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Currently, the {{ExecutionGraph}} strongly depends on Akka as it requires an 
> {{ActorSystem}} to create the {{CheckpointCoordinatorDeActivator}}. 
> Furthermore, it allows {{ActorGateways}} to register for job status and 
> execution updates.
> In order to improve modularization and abstraction I propose to introduce 
> proper listener interfaces. This would also allow to get rid of the 
> {{CheckpointCoordinatorDeActivator}} by simply implementing this interface. 
> Furthermore it will pave the way for the upcoming Flip-6 refactoring as it 
> offers a better abstraction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2411: [FLINK-4453] [docs] Scala code example in Window d...

2016-08-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2411


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4417) Checkpoints should be subsumed by CheckpointID not, by timestamp

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435701#comment-15435701
 ] 

ASF GitHub Bot commented on FLINK-4417:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2407


> Checkpoints should be subsumed by CheckpointID not, by timestamp
> 
>
> Key: FLINK-4417
> URL: https://issues.apache.org/jira/browse/FLINK-4417
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.1.2
>Reporter: Stephan Ewen
>Assignee: ramkrishna.s.vasudevan
> Fix For: 1.2.0
>
>
> Since the system clocks cannot be expected to be stable/monotonous, the 
> subsumption logic in the checkpoint coordinator should not decide which 
> checkpoint is "newer" based on the system time.
> The checkpoint ID is guaranteed to be strictly monotonously increasing. It is 
> a better measure to decide which checkpoint is newer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4457) Make the ExecutionGraph independent of Akka

2016-08-24 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4457.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed via 635c869326cc77e4199e4d8ee597aed69ed16cd2

> Make the ExecutionGraph independent of Akka
> ---
>
> Key: FLINK-4457
> URL: https://issues.apache.org/jira/browse/FLINK-4457
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> Currently, the {{ExecutionGraph}} strongly depends on Akka as it requires an 
> {{ActorSystem}} to create the {{CheckpointCoordinatorDeActivator}}. 
> Furthermore, it allows {{ActorGateways}} to register for job status and 
> execution updates.
> In order to improve modularization and abstraction I propose to introduce 
> proper listener interfaces. This would also allow to get rid of the 
> {{CheckpointCoordinatorDeActivator}} by simply implementing this interface. 
> Furthermore it will pave the way for the upcoming Flip-6 refactoring as it 
> offers a better abstraction.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4453) Scala code example in Window documentation shows Java

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435702#comment-15435702
 ] 

ASF GitHub Bot commented on FLINK-4453:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2411


> Scala code example in Window documentation shows Java
> -
>
> Key: FLINK-4453
> URL: https://issues.apache.org/jira/browse/FLINK-4453
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.2.0
>Reporter: Fabian Hueske
>Assignee: Jark Wu
>Priority: Trivial
>
> The first code example in the section "WindowFunction - The Generic Case" of 
> the window documentation of the 1.2 SNAPSHOT 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html#windowfunction---the-generic-case)
>  shows Java code in the Scala tab.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2407: FLINK-4417 Checkpoints should be subsumed by Check...

2016-08-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2407


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3850) Add forward field annotations to DataSet operators generated by the Table API

2016-08-24 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435668#comment-15435668
 ] 

Fabian Hueske commented on FLINK-3850:
--

Sorry for the very long delay [~ram_krish]. I was gone for a while.
You can certainly work on this. I would recommend to first read the 
documentation about semantic annotations. They can significantly improve the 
performance if used correctly, but if they are incorrectly applied, the result 
might be invalid. So one has to be careful ;-)

> Add forward field annotations to DataSet operators generated by the Table API
> -
>
> Key: FLINK-3850
> URL: https://issues.apache.org/jira/browse/FLINK-3850
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>
> The DataSet API features semantic annotations [1] to hint the optimizer which 
> input fields an operator copies. This information is valuable for the 
> optimizer because it can infer that certain physical properties such as 
> partitioning or sorting are not destroyed by user functions and thus generate 
> more efficient execution plans.
> The Table API is built on top of the DataSet API and generates DataSet 
> programs and code for user-defined functions. Hence, it knows exactly which 
> fields are modified and which not. We should use this information to 
> automatically generate forward field annotations and attach them to the 
> operators. This can help to significantly improve the performance of certain 
> jobs.
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/index.html#semantic-annotations



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3874) Add a Kafka TableSink with JSON serialization

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435630#comment-15435630
 ] 

ASF GitHub Bot commented on FLINK-3874:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
Thank you for your review @fhueske. I've updated it. Could you please take 
a look at it?


> Add a Kafka TableSink with JSON serialization
> -
>
> Key: FLINK-3874
> URL: https://issues.apache.org/jira/browse/FLINK-3874
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Ivan Mushketyk
>Priority: Minor
>
> Add a TableSink that writes JSON serialized data to Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2244: [FLINK-3874] Add a Kafka TableSink with JSON serializatio...

2016-08-24 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2244
  
Thank you for your review @fhueske. I've updated it. Could you please take 
a look at it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---



[jira] [Created] (FLINK-4481) Maximum results for pairwise algorithms

2016-08-24 Thread Greg Hogan (JIRA)
Greg Hogan created FLINK-4481:
-

 Summary: Maximum results for pairwise algorithms
 Key: FLINK-4481
 URL: https://issues.apache.org/jira/browse/FLINK-4481
 Project: Flink
  Issue Type: New Feature
  Components: Gelly
Affects Versions: 1.2.0
Reporter: Greg Hogan
Assignee: Greg Hogan
Priority: Minor


Return the per-vertex maximum scores for algorithms ({{AdamicAdar}}, 
{{JaccardIndex}}) which return pairwise results. The number of pairwise scores 
can be >> O(edges) but the number of maximum scores is O(vertices). It can also 
be most useful to know what vertices a vertex is most similar to. This 
implementation is very efficient through use of the hash-combine.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435522#comment-15435522
 ] 

ASF GitHub Bot commented on FLINK-3899:
---

Github user danielblazevski commented on the issue:

https://github.com/apache/flink/pull/2368
  
Pushed the Scala example


> Document window processing with Reduce/FoldFunction + WindowFunction
> 
>
> Key: FLINK-3899
> URL: https://issues.apache.org/jira/browse/FLINK-3899
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed 
> with FoldFunction or ReduceFunction and a subsequent WindowFunction. This 
> combination allows for eager window aggregation (only a single element is 
> kept in the window) and access of the Window object, e.g., to have access to 
> the window's start and end time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2368: [FLINK-3899] Document window processing with Reduce/FoldF...

2016-08-24 Thread danielblazevski
Github user danielblazevski commented on the issue:

https://github.com/apache/flink/pull/2368
  
Pushed the Scala example


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435489#comment-15435489
 ] 

ASF GitHub Bot commented on FLINK-3899:
---

Github user danielblazevski commented on a diff in the pull request:

https://github.com/apache/flink/pull/2368#discussion_r76120063
  
--- Diff: docs/apis/streaming/windows.md ---
@@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit 
of incremental window compu
 the additional meta information that writing a `WindowFunction` provides.
 
 This is an example that shows how incremental aggregation functions can be 
combined with
-a `WindowFunction`.
+a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how 
to extract the
+ending event-time of a window of sensor readings that contain a timestamp, 
+and the `ReduceFunction`/`WindowFunctions` example shows how to do eager 
window
+aggregation (only a single element is kept in the window).
 
 
 
 {% highlight java %}
-DataStream> input = ...;
+DataStream input = ...;
 
 // for folding incremental computation
 input
 .keyBy()
 .window()
-.apply(, new MyFoldFunction(), new MyWindowFunction());
+.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
+
+/* ... */
+
+private static  class myFoldFunction implements 
FoldFunction {
+
+public Long fold(Long acc, SensorReading s) {
+return Math.max(acc, s.timestamp());
+}
+}
+
+private static class MyWindowFunction implements WindowFunction {
+
+public void apply(String key, TimeWindow window, Iterable 
timestamps, Collector out) {
+out.collect(timestamps.iterator().next());
--- End diff --

Ah, lol, `Int` vs `Integer`...


> Document window processing with Reduce/FoldFunction + WindowFunction
> 
>
> Key: FLINK-3899
> URL: https://issues.apache.org/jira/browse/FLINK-3899
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed 
> with FoldFunction or ReduceFunction and a subsequent WindowFunction. This 
> combination allows for eager window aggregation (only a single element is 
> kept in the window) and access of the Window object, e.g., to have access to 
> the window's start and end time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

2016-08-24 Thread danielblazevski
Github user danielblazevski commented on a diff in the pull request:

https://github.com/apache/flink/pull/2368#discussion_r76120063
  
--- Diff: docs/apis/streaming/windows.md ---
@@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit 
of incremental window compu
 the additional meta information that writing a `WindowFunction` provides.
 
 This is an example that shows how incremental aggregation functions can be 
combined with
-a `WindowFunction`.
+a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how 
to extract the
+ending event-time of a window of sensor readings that contain a timestamp, 
+and the `ReduceFunction`/`WindowFunctions` example shows how to do eager 
window
+aggregation (only a single element is kept in the window).
 
 
 
 {% highlight java %}
-DataStream> input = ...;
+DataStream input = ...;
 
 // for folding incremental computation
 input
 .keyBy()
 .window()
-.apply(, new MyFoldFunction(), new MyWindowFunction());
+.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
+
+/* ... */
+
+private static  class myFoldFunction implements 
FoldFunction {
+
+public Long fold(Long acc, SensorReading s) {
+return Math.max(acc, s.timestamp());
+}
+}
+
+private static class MyWindowFunction implements WindowFunction {
+
+public void apply(String key, TimeWindow window, Iterable 
timestamps, Collector out) {
+out.collect(timestamps.iterator().next());
--- End diff --

Ah, lol, `Int` vs `Integer`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435452#comment-15435452
 ] 

ASF GitHub Bot commented on FLINK-3899:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2368#discussion_r76115561
  
--- Diff: docs/apis/streaming/windows.md ---
@@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit 
of incremental window compu
 the additional meta information that writing a `WindowFunction` provides.
 
 This is an example that shows how incremental aggregation functions can be 
combined with
-a `WindowFunction`.
+a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how 
to extract the
+ending event-time of a window of sensor readings that contain a timestamp, 
+and the `ReduceFunction`/`WindowFunctions` example shows how to do eager 
window
+aggregation (only a single element is kept in the window).
 
 
 
 {% highlight java %}
-DataStream> input = ...;
+DataStream input = ...;
 
 // for folding incremental computation
 input
 .keyBy()
 .window()
-.apply(, new MyFoldFunction(), new MyWindowFunction());
+.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
+
+/* ... */
+
+private static  class myFoldFunction implements 
FoldFunction {
+
+public Long fold(Long acc, SensorReading s) {
+return Math.max(acc, s.timestamp());
+}
+}
+
+private static class MyWindowFunction implements WindowFunction {
+
+public void apply(String key, TimeWindow window, Iterable 
timestamps, Collector out) {
+out.collect(timestamps.iterator().next());
--- End diff --

Thanks for the update. 
The following Scala code does not show an error in my IntelliJ:

```
val readings: DataStream[SensorReading] = ???

val result: DataStream[(String, Long, Int)] = readings
  .keyBy(_.sensorId)
  .timeWindow(Time.minutes(1), Time.seconds(10))
  .apply(
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => {
  ("", 0L, acc._3 + 1)
},
(k: String, w: TimeWindow, cnts: Iterable[(String, Long, Int)], 
out: Collector[(String, Long, Int)]) => {
  val cnt = cnts.iterator.next()
  out.collect((k, w.getEnd, cnt._3))
}
  )
```

Thanks, Fabian


> Document window processing with Reduce/FoldFunction + WindowFunction
> 
>
> Key: FLINK-3899
> URL: https://issues.apache.org/jira/browse/FLINK-3899
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed 
> with FoldFunction or ReduceFunction and a subsequent WindowFunction. This 
> combination allows for eager window aggregation (only a single element is 
> kept in the window) and access of the Window object, e.g., to have access to 
> the window's start and end time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

2016-08-24 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/2368#discussion_r76115561
  
--- Diff: docs/apis/streaming/windows.md ---
@@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit 
of incremental window compu
 the additional meta information that writing a `WindowFunction` provides.
 
 This is an example that shows how incremental aggregation functions can be 
combined with
-a `WindowFunction`.
+a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how 
to extract the
+ending event-time of a window of sensor readings that contain a timestamp, 
+and the `ReduceFunction`/`WindowFunctions` example shows how to do eager 
window
+aggregation (only a single element is kept in the window).
 
 
 
 {% highlight java %}
-DataStream> input = ...;
+DataStream input = ...;
 
 // for folding incremental computation
 input
 .keyBy()
 .window()
-.apply(, new MyFoldFunction(), new MyWindowFunction());
+.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
+
+/* ... */
+
+private static  class myFoldFunction implements 
FoldFunction {
+
+public Long fold(Long acc, SensorReading s) {
+return Math.max(acc, s.timestamp());
+}
+}
+
+private static class MyWindowFunction implements WindowFunction {
+
+public void apply(String key, TimeWindow window, Iterable 
timestamps, Collector out) {
+out.collect(timestamps.iterator().next());
--- End diff --

Thanks for the update. 
The following Scala code does not show an error in my IntelliJ:

```
val readings: DataStream[SensorReading] = ???

val result: DataStream[(String, Long, Int)] = readings
  .keyBy(_.sensorId)
  .timeWindow(Time.minutes(1), Time.seconds(10))
  .apply(
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => {
  ("", 0L, acc._3 + 1)
},
(k: String, w: TimeWindow, cnts: Iterable[(String, Long, Int)], 
out: Collector[(String, Long, Int)]) => {
  val cnt = cnts.iterator.next()
  out.collect((k, w.getEnd, cnt._3))
}
  )
```

Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-08-24 Thread Gabor Gevay (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Gevay updated FLINK-3999:
---
Assignee: Neelesh Srinivas Salian

> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Assignee: Neelesh Srinivas Salian
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3999) Rename the `running` flag in the drivers to `canceled`

2016-08-24 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3999?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435446#comment-15435446
 ] 

Gabor Gevay commented on FLINK-3999:


Yes, thank you for working on it!

> Rename the `running` flag in the drivers to `canceled`
> --
>
> Key: FLINK-3999
> URL: https://issues.apache.org/jira/browse/FLINK-3999
> Project: Flink
>  Issue Type: Bug
>  Components: Local Runtime
>Reporter: Gabor Gevay
>Priority: Trivial
>
> The name of the {{running}} flag in the drivers doesn't reflect its usage: 
> when the operator just stops normally, then it is not running anymore, but 
> the {{running}}  flag will still be true, since the {{running}} flag is only 
> set when cancelling.
> It should be renamed, and the value inverted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4407) Implement the trigger DSL

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435357#comment-15435357
 ] 

ASF GitHub Bot commented on FLINK-4407:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2415

[FLINK-4407] Implement the trigger DSL

This PR implements the Trigger DSL as presented in 
[FLIP-9](https://cwiki.apache.org/confluence/display/FLINK/FLIP-9%3A+Trigger+DSL)
 with the addition of 
the Repeat.Once and Repeat.Forever.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink window_dsl

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2415.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2415






> Implement the trigger DSL
> -
>
> Key: FLINK-4407
> URL: https://issues.apache.org/jira/browse/FLINK-4407
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>
> This issue refers to the implementation of the trigger DSL.
> The specification of the DSL has an open FLIP here:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-9%3A+Trigger+DSL
> And is currently under discussion in the dev@ mailing list here:
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-9-Trigger-DSL-td13065.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2415: [FLINK-4407] Implement the trigger DSL

2016-08-24 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/2415

[FLINK-4407] Implement the trigger DSL

This PR implements the Trigger DSL as presented in 
[FLIP-9](https://cwiki.apache.org/confluence/display/FLINK/FLIP-9%3A+Trigger+DSL)
 with the addition of 
the Repeat.Once and Repeat.Forever.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink window_dsl

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2415.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2415






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...

2016-08-24 Thread vijikarthi
Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
>
It seems like the privileged port issues can be circumvented by setting 
conf.getBoolean("dfs.datanode.require.secure.ports", false)?

It is not supported yet  https://github.com/apache/hadoop/blob/branch-2.3.0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java#L106;>ref.
 The trunk https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java#L115;>
 code  also have similar logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435356#comment-15435356
 ] 

ASF GitHub Bot commented on FLINK-3929:
---

Github user vijikarthi commented on the issue:

https://github.com/apache/flink/pull/2275
  
>
It seems like the privileged port issues can be circumvented by setting 
conf.getBoolean("dfs.datanode.require.secure.ports", false)?

It is not supported yet  https://github.com/apache/hadoop/blob/branch-2.3.0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java#L106;>ref.
 The trunk https://github.com/apache/hadoop/blob/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/SecureDataNodeStarter.java#L115;>
 code  also have similar logic.


> Support for Kerberos Authentication with Keytab Credential
> --
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
>  Issue Type: New Feature
>Reporter: Eron Wright 
>Assignee: Vijay Srinivasaraghavan
>  Labels: kerberos, security
>   Original Estimate: 672h
>  Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc._
> Add support for a keytab credential to be associated with the Flink cluster, 
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3899) Document window processing with Reduce/FoldFunction + WindowFunction

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435353#comment-15435353
 ] 

ASF GitHub Bot commented on FLINK-3899:
---

Github user danielblazevski commented on a diff in the pull request:

https://github.com/apache/flink/pull/2368#discussion_r76102678
  
--- Diff: docs/apis/streaming/windows.md ---
@@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit 
of incremental window compu
 the additional meta information that writing a `WindowFunction` provides.
 
 This is an example that shows how incremental aggregation functions can be 
combined with
-a `WindowFunction`.
+a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how 
to extract the
+ending event-time of a window of sensor readings that contain a timestamp, 
+and the `ReduceFunction`/`WindowFunctions` example shows how to do eager 
window
+aggregation (only a single element is kept in the window).
 
 
 
 {% highlight java %}
-DataStream> input = ...;
+DataStream input = ...;
 
 // for folding incremental computation
 input
 .keyBy()
 .window()
-.apply(, new MyFoldFunction(), new MyWindowFunction());
+.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
+
+/* ... */
+
+private static  class myFoldFunction implements 
FoldFunction {
+
+public Long fold(Long acc, SensorReading s) {
+return Math.max(acc, s.timestamp());
+}
+}
+
+private static class MyWindowFunction implements WindowFunction {
+
+public void apply(String key, TimeWindow window, Iterable 
timestamps, Collector out) {
+out.collect(timestamps.iterator().next());
--- End diff --

Made the changes in the Java version and added the comments.  Had some 
issues with the Scala version.  See screenshots, the only change is really to 
change to the type of `Iterable` in the `WindowFunction`, which IntelliJ was 
saying has to have type `SensorReadng`, which is not ideal.  I removed the 
Scala version for now.  

https://cloud.githubusercontent.com/assets/10012612/17940967/4a025738-69ff-11e6-9354-31c2ead563d4.png;>

https://cloud.githubusercontent.com/assets/10012612/17940972/4dd5db28-69ff-11e6-8c6a-11b1900796ad.png;>
  


> Document window processing with Reduce/FoldFunction + WindowFunction
> 
>
> Key: FLINK-3899
> URL: https://issues.apache.org/jira/browse/FLINK-3899
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Streaming
>Affects Versions: 1.1.0
>Reporter: Fabian Hueske
>
> The streaming documentation does not describe how windows can be processed 
> with FoldFunction or ReduceFunction and a subsequent WindowFunction. This 
> combination allows for eager window aggregation (only a single element is 
> kept in the window) and access of the Window object, e.g., to have access to 
> the window's start and end time.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2368: [FLINK-3899] Document window processing with Reduc...

2016-08-24 Thread danielblazevski
Github user danielblazevski commented on a diff in the pull request:

https://github.com/apache/flink/pull/2368#discussion_r76102678
  
--- Diff: docs/apis/streaming/windows.md ---
@@ -459,42 +459,106 @@ ready for processing. This allows to get the benefit 
of incremental window compu
 the additional meta information that writing a `WindowFunction` provides.
 
 This is an example that shows how incremental aggregation functions can be 
combined with
-a `WindowFunction`.
+a `WindowFunction`.  The `FoldFunction`/`WindowFunction` example shows how 
to extract the
+ending event-time of a window of sensor readings that contain a timestamp, 
+and the `ReduceFunction`/`WindowFunctions` example shows how to do eager 
window
+aggregation (only a single element is kept in the window).
 
 
 
 {% highlight java %}
-DataStream> input = ...;
+DataStream input = ...;
 
 // for folding incremental computation
 input
 .keyBy()
 .window()
-.apply(, new MyFoldFunction(), new MyWindowFunction());
+.apply(Long.MIN_VALUE, new MyFoldFunction(), new MyWindowFunction());
+
+/* ... */
+
+private static  class myFoldFunction implements 
FoldFunction {
+
+public Long fold(Long acc, SensorReading s) {
+return Math.max(acc, s.timestamp());
+}
+}
+
+private static class MyWindowFunction implements WindowFunction {
+
+public void apply(String key, TimeWindow window, Iterable 
timestamps, Collector out) {
+out.collect(timestamps.iterator().next());
--- End diff --

Made the changes in the Java version and added the comments.  Had some 
issues with the Scala version.  See screenshots, the only change is really to 
change to the type of `Iterable` in the `WindowFunction`, which IntelliJ was 
saying has to have type `SensorReadng`, which is not ideal.  I removed the 
Scala version for now.  

https://cloud.githubusercontent.com/assets/10012612/17940967/4a025738-69ff-11e6-9354-31c2ead563d4.png;>

https://cloud.githubusercontent.com/assets/10012612/17940972/4dd5db28-69ff-11e6-8c6a-11b1900796ad.png;>
  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2414: [FLINK-4341] Let idle consumer subtasks emit max v...

2016-08-24 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2414

[FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail 
on resharding

This is a short-term fix, until the min-watermark service for the 
JobManager described in the JIRA discussion is available.

The way this fix works is that we let idle subtasks that initially don't 
get assigned shards emit a `Long.MAX_VALUE` watermark. Also, we _only fail hard 
if an idle subtask_ is assigned new shards when resharding happens, to avoid 
messing up the watermarks. So, if all subtasks are not initially idle on 
startup (i.e., when total number of shards > consumer parallelism), the Kinesis 
consumer can still transparently handle resharding like before without failing.

I've tested exactly-once with our manual tests (with and w/o resharding) 
and the fix works nicely, still retaining exactly-once guarantee despite 
non-transparency. However, I'm a bit unsure on how to test if the unbounded 
state with window operators is also fixed with this change, so we're still yet 
to clarify this.

R: @rmetzger and @aljoscha for review. Thanks in advance!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4341

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2414.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2414


commit bc8e50d99be745300f7418c58e9d30abc5469ba3
Author: Gordon Tai 
Date:   2016-08-24T08:38:06Z

[FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail 
on resharding

This no longer allows the Kinesis consumer to transparently handle 
resharding.
This is a short-term workaround until we have a min-watermark notification 
service available in the JobManager.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435319#comment-15435319
 ] 

ASF GitHub Bot commented on FLINK-4437:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/2409
  
I ran test suite which patch which failed here:
```
Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 201.106 sec 
<<< FAILURE! - in 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase
The JobManager should handle gracefully failing task manager with slot 
sharing(org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase)
  Time elapsed: 200.43 sec  <<< ERROR!
java.util.concurrent.TimeoutException: Futures timed out after [20 
milliseconds]
at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.ready(package.scala:86)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:455)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:439)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:330)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:269)
at 
org.apache.flink.runtime.testingUtils.TestingUtils$.startTestingCluster(TestingUtils.scala:86)
at 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(TaskManagerFailsWithSlotSharingITCase.scala:73)
at 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53)
at 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase.withFixture(TaskManagerFailsWithSlotSharingITCase.scala:38)
```
Doesn't seem to be related to patch.


> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...

2016-08-24 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/2409
  
I ran test suite which patch which failed here:
```
Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 201.106 sec 
<<< FAILURE! - in 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase
The JobManager should handle gracefully failing task manager with slot 
sharing(org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase)
  Time elapsed: 200.43 sec  <<< ERROR!
java.util.concurrent.TimeoutException: Futures timed out after [20 
milliseconds]
at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at 
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.ready(package.scala:86)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:455)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.waitForTaskManagersToBeRegistered(FlinkMiniCluster.scala:439)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:330)
at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster.start(FlinkMiniCluster.scala:269)
at 
org.apache.flink.runtime.testingUtils.TestingUtils$.startTestingCluster(TestingUtils.scala:86)
at 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(TaskManagerFailsWithSlotSharingITCase.scala:73)
at 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53)
at 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(TaskManagerFailsWithSlotSharingITCase.scala:53)
at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953)
at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
at 
org.apache.flink.runtime.jobmanager.TaskManagerFailsWithSlotSharingITCase.withFixture(TaskManagerFailsWithSlotSharingITCase.scala:38)
```
Doesn't seem to be related to patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4341) Kinesis connector does not emit maximum watermark properly

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435309#comment-15435309
 ] 

ASF GitHub Bot commented on FLINK-4341:
---

GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/2414

[FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail 
on resharding

This is a short-term fix, until the min-watermark service for the 
JobManager described in the JIRA discussion is available.

The way this fix works is that we let idle subtasks that initially don't 
get assigned shards emit a `Long.MAX_VALUE` watermark. Also, we _only fail hard 
if an idle subtask_ is assigned new shards when resharding happens, to avoid 
messing up the watermarks. So, if all subtasks are not initially idle on 
startup (i.e., when total number of shards > consumer parallelism), the Kinesis 
consumer can still transparently handle resharding like before without failing.

I've tested exactly-once with our manual tests (with and w/o resharding) 
and the fix works nicely, still retaining exactly-once guarantee despite 
non-transparency. However, I'm a bit unsure on how to test if the unbounded 
state with window operators is also fixed with this change, so we're still yet 
to clarify this.

R: @rmetzger and @aljoscha for review. Thanks in advance!

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-4341

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/2414.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2414


commit bc8e50d99be745300f7418c58e9d30abc5469ba3
Author: Gordon Tai 
Date:   2016-08-24T08:38:06Z

[FLINK-4341] Let idle consumer subtasks emit max value watermarks and fail 
on resharding

This no longer allows the Kinesis consumer to transparently handle 
resharding.
This is a short-term workaround until we have a min-watermark notification 
service available in the JobManager.




> Kinesis connector does not emit maximum watermark properly
> --
>
> Key: FLINK-4341
> URL: https://issues.apache.org/jira/browse/FLINK-4341
> Project: Flink
>  Issue Type: Bug
>  Components: Streaming Connectors
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Scott Kidder
>Assignee: Robert Metzger
>Priority: Blocker
> Fix For: 1.2.0, 1.1.2
>
>
> **Prevously reported as "Checkpoint state size grows unbounded when task 
> parallelism not uniform"**
> This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I 
> was previously using a 1.1.0 snapshot (commit 18995c8) which performed as 
> expected.  This issue was introduced somewhere between those commits.
> I've got a Flink application that uses the Kinesis Stream Consumer to read 
> from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots 
> each, providing a total of 4 slots.  When running the application with a 
> parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) 
> and 4 slots for subsequent tasks that process the Kinesis stream data. I use 
> an in-memory store for checkpoint data.
> Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint 
> states were growing unbounded when running with a parallelism of 4, 
> checkpoint interval of 10 seconds:
> {code}
> ID  State Size
> 1   11.3 MB
> 220.9 MB
> 3   30.6 MB
> 4   41.4 MB
> 5   52.6 MB
> 6   62.5 MB
> 7   71.5 MB
> 8   83.3 MB
> 9   93.5 MB
> {code}
> The first 4 checkpoints generally succeed, but then fail with an exception 
> like the following:
> {code}
> java.lang.RuntimeException: Error triggering a checkpoint as the result of 
> receiving checkpoint barrier at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
>   at 
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
>   at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183)
>   at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: Size of the state is larger than the maximum 
> 

[jira] [Commented] (FLINK-4035) Bump Kafka producer in Kafka sink to Kafka 0.10.0.0

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435284#comment-15435284
 ] 

ASF GitHub Bot commented on FLINK-4035:
---

Github user eliaslevy commented on the issue:

https://github.com/apache/flink/pull/2369
  
@rmetzger that's the one. NP.  I realize breaking it up makes things 
easier.  I just thought I'd mention it.


> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.0.3
>Reporter: Elias Levy
>Assignee: Robert Metzger
>Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.  
> Published messages now include timestamps and compressed messages now include 
> relative offsets.  As it is now, brokers must decompress publisher compressed 
> messages, assign offset to them, and recompress them, which is wasteful and 
> makes it less likely that compression will be used at all.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2369: [FLINK-4035] Add a streaming connector for Apache Kafka 0...

2016-08-24 Thread eliaslevy
Github user eliaslevy commented on the issue:

https://github.com/apache/flink/pull/2369
  
@rmetzger that's the one. NP.  I realize breaking it up makes things 
easier.  I just thought I'd mention it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4437) Lock evasion around lastTriggeredCheckpoint may lead to lost updates to related fields

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4437?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435172#comment-15435172
 ] 

ASF GitHub Bot commented on FLINK-4437:
---

Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/2409
  
```
Executing Maven:  -B -f 
/home/jenkins/jenkins-slave/workspace/flink-github-ci/pom.xml 
-Dmaven.repo.local=/home/jenkins/jenkins-slave/maven-repositories/1 clean 
install -Dflink.forkCount=1C
```
I don't see flink-github-ci in the source tree.


> Lock evasion around lastTriggeredCheckpoint may lead to lost updates to 
> related fields
> --
>
> Key: FLINK-4437
> URL: https://issues.apache.org/jira/browse/FLINK-4437
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>
> In CheckpointCoordinator#triggerCheckpoint():
> {code}
> // make sure the minimum interval between checkpoints has passed
> if (lastTriggeredCheckpoint + minPauseBetweenCheckpoints > timestamp) 
> {
> {code}
> If two threads evaluate 'lastTriggeredCheckpoint + minPauseBetweenCheckpoints 
> > timestamp' in close proximity before lastTriggeredCheckpoint is updated, 
> the two threads may have an inconsistent view of "lastTriggeredCheckpoint" 
> and updates to fields correlated with "lastTriggeredCheckpoint" may be lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435171#comment-15435171
 ] 

ASF GitHub Bot commented on FLINK-4363:
---

Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2400
  
Thank you @wangzhijiang999 for the changes. I've made a couple of more 
requests after which we can merge the PR.


> Implement TaskManager basic startup of all components in java
> -
>
> Key: FLINK-4363
> URL: https://issues.apache.org/jira/browse/FLINK-4363
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2409: FLINK-4437 Lock evasion around lastTriggeredCheckpoint ma...

2016-08-24 Thread tedyu
Github user tedyu commented on the issue:

https://github.com/apache/flink/pull/2409
  
```
Executing Maven:  -B -f 
/home/jenkins/jenkins-slave/workspace/flink-github-ci/pom.xml 
-Dmaven.repo.local=/home/jenkins/jenkins-slave/maven-repositories/1 clean 
install -Dflink.forkCount=1C
```
I don't see flink-github-ci in the source tree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2400: [FLINK-4363] Implement TaskManager basic startup of all c...

2016-08-24 Thread mxm
Github user mxm commented on the issue:

https://github.com/apache/flink/pull/2400
  
Thank you @wangzhijiang999 for the changes. I've made a couple of more 
requests after which we can merge the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435164#comment-15435164
 ] 

ASF GitHub Bot commented on FLINK-4363:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76085144
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -94,12 +729,11 @@ public void notifyOfNewResourceManagerLeader(String 
newLeaderAddress, UUID newLe
if (newLeaderAddress != null) {
// the resource manager switched to a new leader
log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
-   
resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
--- End diff --

Same goes for the start method.


> Implement TaskManager basic startup of all components in java
> -
>
> Key: FLINK-4363
> URL: https://issues.apache.org/jira/browse/FLINK-4363
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435163#comment-15435163
 ] 

ASF GitHub Bot commented on FLINK-4363:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76085101
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -94,12 +729,11 @@ public void notifyOfNewResourceManagerLeader(String 
newLeaderAddress, UUID newLe
if (newLeaderAddress != null) {
// the resource manager switched to a new leader
log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
-   
resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
--- End diff --

Why are these essentials RPC methods at the bottom. They should be moved to 
the top of the class.


> Implement TaskManager basic startup of all components in java
> -
>
> Key: FLINK-4363
> URL: https://issues.apache.org/jira/browse/FLINK-4363
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> Similar with current {{TaskManager}},but implement initialization and startup 
> all components in java instead of scala.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76085144
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -94,12 +729,11 @@ public void notifyOfNewResourceManagerLeader(String 
newLeaderAddress, UUID newLe
if (newLeaderAddress != null) {
// the resource manager switched to a new leader
log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
-   
resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
--- End diff --

Same goes for the start method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76085101
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -94,12 +729,11 @@ public void notifyOfNewResourceManagerLeader(String 
newLeaderAddress, UUID newLe
if (newLeaderAddress != null) {
// the resource manager switched to a new leader
log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
-   
resourceManagerConnection.getResourceManagerAddress(), newLeaderAddress);
--- End diff --

Why are these essentials RPC methods at the bottom. They should be moved to 
the top of the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76084947
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+   if (taskManagerHostname != null) {
+   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
+   } else {
+   

[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435161#comment-15435161
 ] 

ASF GitHub Bot commented on FLINK-4363:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76084947
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 

[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435159#comment-15435159
 ] 

ASF GitHub Bot commented on FLINK-4363:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76084797
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76084797
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+   if (taskManagerHostname != null) {
+   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
+   } else {
+   

[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435158#comment-15435158
 ] 

ASF GitHub Bot commented on FLINK-4363:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76084721
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76084721
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+   if (taskManagerHostname != null) {
+   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
+   } else {
+   

[jira] [Commented] (FLINK-4363) Implement TaskManager basic startup of all components in java

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435153#comment-15435153
 ] 

ASF GitHub Bot commented on FLINK-4363:
---

Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76084398
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, 

[GitHub] flink pull request #2400: [FLINK-4363] Implement TaskManager basic startup o...

2016-08-24 Thread mxm
Github user mxm commented on a diff in the pull request:

https://github.com/apache/flink/pull/2400#discussion_r76084398
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutor.java
 ---
@@ -36,27 +82,617 @@
  */
 public class TaskExecutor extends RpcEndpoint {
 
-   /** The unique resource ID of this TaskExecutor */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutor.class);
+
+   /** The unique resource ID of this TaskManager */
private final ResourceID resourceID;
 
/** The access to the leader election and metadata storage services */
private final HighAvailabilityServices haServices;
 
-   // - resource manager 
+   /** The task manager configuration */
+   private final TaskExecutorConfiguration taskExecutorConfig;
+
+   /** The connection information of the task manager */
+   private final InstanceConnectionInfo connectionInfo;
+
+   /** The I/O manager component in the task manager */
+   private final IOManager ioManager;
+
+   /** The memory manager component in the task manager */
+   private final MemoryManager memoryManager;
+
+   /** The network component in the task manager */
+   private final NetworkEnvironment networkEnvironment;
+
+   /** The number of slots in the task manager, should be 1 for YARN */
+   private final int numberOfSlots;
 
+   // - resource manager 
private TaskExecutorToResourceManagerConnection 
resourceManagerConnection;
 
// 

 
public TaskExecutor(
+   TaskExecutorConfiguration taskExecutorConfig,
+   ResourceID resourceID,
+   InstanceConnectionInfo connectionInfo,
+   MemoryManager memoryManager,
+   IOManager ioManager,
+   NetworkEnvironment networkEnvironment,
+   int numberOfSlots,
RpcService rpcService,
-   HighAvailabilityServices haServices,
-   ResourceID resourceID) {
+   HighAvailabilityServices haServices) {
 
super(rpcService);
 
-   this.haServices = checkNotNull(haServices);
+   this.taskExecutorConfig = checkNotNull(taskExecutorConfig);
this.resourceID = checkNotNull(resourceID);
+   this.connectionInfo = checkNotNull(connectionInfo);
+   this.memoryManager = checkNotNull(memoryManager);
+   this.ioManager = checkNotNull(ioManager);
+   this.networkEnvironment = checkNotNull(networkEnvironment);
+   this.numberOfSlots = checkNotNull(numberOfSlots);
+   this.haServices = checkNotNull(haServices);
+   }
+
+   /**
+* Starts and runs the TaskManager.
+* 
+* This method first tries to select the network interface to use for 
the TaskManager
+* communication. The network interface is used both for the actor 
communication
+* (coordination) as well as for the data exchange between task 
managers. Unless
+* the hostname/interface is explicitly configured in the 
configuration, this
+* method will try out various interfaces and methods to connect to the 
JobManager
+* and select the one where the connection attempt is successful.
+* 
+* After selecting the network interface, this method brings up an 
actor system
+* for the TaskManager and its actors, starts the TaskManager's services
+* (library cache, shuffle network stack, ...), and starts the 
TaskManager itself.
+*
+* @param configurationThe configuration for the TaskManager.
+* @param resourceID   The id of the resource which the task 
manager will run on.
+*/
+   public static void selectNetworkInterfaceAndRunTaskManager(
+   Configuration configuration,
+   ResourceID resourceID) throws Exception {
+
+   InetSocketAddress taskManagerAddress = 
selectNetworkInterfaceAndPort(configuration);
+
+   runTaskManager(taskManagerAddress.getHostName(), resourceID, 
taskManagerAddress.getPort(), configuration);
+   }
+
+   private static InetSocketAddress 
selectNetworkInterfaceAndPort(Configuration configuration)
+   throws Exception {
+   String taskManagerHostname = 
configuration.getString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null);
+   if (taskManagerHostname != null) {
+   LOG.info("Using configured hostname/address for 
TaskManager: " + taskManagerHostname);
+   } else {
+   

[jira] [Created] (FLINK-4480) Incorrect link to elastic.co in documentation

2016-08-24 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-4480:


 Summary: Incorrect link to elastic.co in documentation
 Key: FLINK-4480
 URL: https://issues.apache.org/jira/browse/FLINK-4480
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.2.0, 1.1.1
Reporter: Fabian Hueske
Priority: Trivial


The link URL of the entry "Elasticsearch 2x (sink)" on the connector's 
documentation page 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/index.html
 is pointing to http://elastic.com but should point to http://elastic.co



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4451) Throw exception when remote connection cannot be resolved

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435082#comment-15435082
 ] 

ASF GitHub Bot commented on FLINK-4451:
---

Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2405


> Throw exception when remote connection cannot be resolved
> -
>
> Key: FLINK-4451
> URL: https://issues.apache.org/jira/browse/FLINK-4451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{RpcService}} implementation should throw an exception (returned in the 
> future) if {{RpcService.connect(address, type)}} cannot connect to the remote 
> {{RpcEndpoint}}.
> At the moment the {{AkkaRpcService}} does not check that the 
> {{IdentifyActor}} message contains a valid {{ActorRef}} and throws due to 
> that a {{NullPointerException}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4451) Throw exception when remote connection cannot be resolved

2016-08-24 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4451.

Resolution: Fixed

Fixed via affa548cada5d9c25c32eb1f39986bbbd3e55f21

> Throw exception when remote connection cannot be resolved
> -
>
> Key: FLINK-4451
> URL: https://issues.apache.org/jira/browse/FLINK-4451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{RpcService}} implementation should throw an exception (returned in the 
> future) if {{RpcService.connect(address, type)}} cannot connect to the remote 
> {{RpcEndpoint}}.
> At the moment the {{AkkaRpcService}} does not check that the 
> {{IdentifyActor}} message contains a valid {{ActorRef}} and throws due to 
> that a {{NullPointerException}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2405: [FLINK-4451] [rpc] Throw RpcConnectionException wh...

2016-08-24 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2405


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4479) Replace trademark (tm) with registered trademark (R) sign on Flink website

2016-08-24 Thread Robert Metzger (JIRA)
Robert Metzger created FLINK-4479:
-

 Summary: Replace trademark (tm) with registered trademark (R) sign 
on Flink website
 Key: FLINK-4479
 URL: https://issues.apache.org/jira/browse/FLINK-4479
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: Robert Metzger
Assignee: Robert Metzger


Flink is now a registered trademark, so we should reflect that on our website.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4451) Throw exception when remote connection cannot be resolved

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435075#comment-15435075
 ] 

ASF GitHub Bot commented on FLINK-4451:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2405
  
Failing test cases are unrelated. Will be merging this PR.


> Throw exception when remote connection cannot be resolved
> -
>
> Key: FLINK-4451
> URL: https://issues.apache.org/jira/browse/FLINK-4451
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{RpcService}} implementation should throw an exception (returned in the 
> future) if {{RpcService.connect(address, type)}} cannot connect to the remote 
> {{RpcEndpoint}}.
> At the moment the {{AkkaRpcService}} does not check that the 
> {{IdentifyActor}} message contains a valid {{ActorRef}} and throws due to 
> that a {{NullPointerException}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2405: [FLINK-4451] [rpc] Throw RpcConnectionException when rpc ...

2016-08-24 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/2405
  
Failing test cases are unrelated. Will be merging this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1460) Typo fixes

2016-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15435069#comment-15435069
 ] 

ASF GitHub Bot commented on FLINK-1460:
---

Github user coveralls commented on the issue:

https://github.com/apache/flink/pull/346
  

[![Coverage 
Status](https://coveralls.io/builds/7588381/badge)](https://coveralls.io/builds/7588381)

Changes Unknown when pulling **acf52748f7ed9fe64d9773acbab5774b308e47eb on 
coderxiang:typo** into ** on apache:master**.



> Typo fixes
> --
>
> Key: FLINK-1460
> URL: https://issues.apache.org/jira/browse/FLINK-1460
> Project: Flink
>  Issue Type: Improvement
>Reporter: Shuo Xiang
>Priority: Minor
> Fix For: 0.9
>
>
> Fix some typos. Also fix some inconsistent uses of *partition operator* and 
> *partitioning operator* in the codebase.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #346: [FLINK-1460] fix typos

2016-08-24 Thread coveralls
Github user coveralls commented on the issue:

https://github.com/apache/flink/pull/346
  

[![Coverage 
Status](https://coveralls.io/builds/7588381/badge)](https://coveralls.io/builds/7588381)

Changes Unknown when pulling **acf52748f7ed9fe64d9773acbab5774b308e47eb on 
coderxiang:typo** into ** on apache:master**.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   >