[jira] [Commented] (FLINK-4382) Buffer rpc calls until RpcEndpoint is properly started

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422913#comment-15422913
 ] 

ASF GitHub Bot commented on FLINK-4382:
---

Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/2358


> Buffer rpc calls until RpcEndpoint is properly started
> --
>
> Key: FLINK-4382
> URL: https://issues.apache.org/jira/browse/FLINK-4382
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> When creating a {{RpcEndpoint}} it starts a rpc server. The server should 
> wait to dispatch incoming rpc calls until the {{RpcEndpoint}} signals that 
> it's ready.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4382) Buffer rpc calls until RpcEndpoint is properly started

2016-08-16 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-4382.

Resolution: Fixed

Fixed via 5df27ebd73f5218a792fb53385887968c0e4ca36

> Buffer rpc calls until RpcEndpoint is properly started
> --
>
> Key: FLINK-4382
> URL: https://issues.apache.org/jira/browse/FLINK-4382
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> When creating a {{RpcEndpoint}} it starts a rpc server. The server should 
> wait to dispatch incoming rpc calls until the {{RpcEndpoint}} signals that 
> it's ready.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2354: [FLINK-4366] Enforce parallelism=1 For AllWindowed...

2016-08-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2354


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-4366) Enforce parallelism=1 For AllWindowedStream

2016-08-16 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4366.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed via ffe406570af60057fa2ae318561aa239b99bd648

> Enforce parallelism=1 For AllWindowedStream
> ---
>
> Key: FLINK-4366
> URL: https://issues.apache.org/jira/browse/FLINK-4366
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>Assignee: Jark Wu
> Fix For: 1.2.0
>
>
> Right now, it is possible to use {{DataStream.windowAll/timeWindowAll}} and 
> then set a different parallelism afterwards. Flink will silently accept this 
> and spawn the number of parallel operators, only one instance of those will 
> do all the processing, though, since the elements are implicitly keyed by a 
> dummy key.
> We should throw an exception if users try to set a parallelism on an 
> all-windowed stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4366) Enforce parallelism=1 For AllWindowedStream

2016-08-16 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4366.
---

> Enforce parallelism=1 For AllWindowedStream
> ---
>
> Key: FLINK-4366
> URL: https://issues.apache.org/jira/browse/FLINK-4366
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>Assignee: Jark Wu
> Fix For: 1.2.0
>
>
> Right now, it is possible to use {{DataStream.windowAll/timeWindowAll}} and 
> then set a different parallelism afterwards. Flink will silently accept this 
> and spawn the number of parallel operators, only one instance of those will 
> do all the processing, though, since the elements are implicitly keyed by a 
> dummy key.
> We should throw an exception if users try to set a parallelism on an 
> all-windowed stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4366) Enforce parallelism=1 For AllWindowedStream

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422922#comment-15422922
 ] 

ASF GitHub Bot commented on FLINK-4366:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2354


> Enforce parallelism=1 For AllWindowedStream
> ---
>
> Key: FLINK-4366
> URL: https://issues.apache.org/jira/browse/FLINK-4366
> Project: Flink
>  Issue Type: Improvement
>Reporter: Aljoscha Krettek
>Assignee: Jark Wu
> Fix For: 1.2.0
>
>
> Right now, it is possible to use {{DataStream.windowAll/timeWindowAll}} and 
> then set a different parallelism afterwards. Flink will silently accept this 
> and spawn the number of parallel operators, only one instance of those will 
> do all the processing, though, since the elements are implicitly keyed by a 
> dummy key.
> We should throw an exception if users try to set a parallelism on an 
> all-windowed stream.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4388) Race condition during initialization of MemorySegmentFactory

2016-08-16 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4388.
-
   Resolution: Fixed
Fix Version/s: (was: 1.1.2)

Fixed via 6cdf06ccb4c7ac444521ca3c3ff4317a4e947e6d

> Race condition during initialization of MemorySegmentFactory
> 
>
> Key: FLINK-4388
> URL: https://issues.apache.org/jira/browse/FLINK-4388
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> The check whether the factory is initialized, and the actual initialization 
> are not atomic. When starting multiple TaskManagers, this can lead to races 
> and exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-4388) Race condition during initialization of MemorySegmentFactory

2016-08-16 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4388.
---

> Race condition during initialization of MemorySegmentFactory
> 
>
> Key: FLINK-4388
> URL: https://issues.apache.org/jira/browse/FLINK-4388
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.1.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.2.0
>
>
> The check whether the factory is initialized, and the actual initialization 
> are not atomic. When starting multiple TaskManagers, this can lead to races 
> and exceptions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2364: [FLINK-4293] Fix malformatted license headers

2016-08-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2364
  
Merging this...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4293) Malformatted Apache Headers

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422930#comment-15422930
 ] 

ASF GitHub Bot commented on FLINK-4293:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2364
  
Merging this...


> Malformatted Apache Headers
> ---
>
> Key: FLINK-4293
> URL: https://issues.apache.org/jira/browse/FLINK-4293
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Minor
>
> Several files contain this header:
> {code}
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  * 
>  * http://www.apache.org/licenses/LICENSE-2.0
>  * 
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> {code}
> The correct header format should be:
> {code}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74963589
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final Map workersInLau

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422931#comment-15422931
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74963589
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   p

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74963941
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final Map workersInLau

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422933#comment-15422933
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74963941
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   p

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74964722
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final Map workersInLau

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422934#comment-15422934
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74964722
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   p

[GitHub] flink issue #2359: [FLINK-4198] Replace org.apache.flink.streaming.api.windo...

2016-08-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2359
  
We can unfortunately not merge this change. The tools for API compatibility 
verify that this breaks the API:
```
Failed to execute goal 
com.github.siom79.japicmp:japicmp-maven-plugin:0.7.0:cmp (default) on project 
flink-streaming-java_2.10: Breaking the build because there is at least one 
binary incompatible class: org.apache.flink.streaming.api.datastream.DataStream
```

The issue has been scheduled for Flink 2.0 - let's redo once this is on the 
horizon. Could you close the pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4198) Replace org.apache.flink.streaming.api.windowing.time.Time with org.apache.flink.api.common.time.Time

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422935#comment-15422935
 ] 

ASF GitHub Bot commented on FLINK-4198:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2359
  
We can unfortunately not merge this change. The tools for API compatibility 
verify that this breaks the API:
```
Failed to execute goal 
com.github.siom79.japicmp:japicmp-maven-plugin:0.7.0:cmp (default) on project 
flink-streaming-java_2.10: Breaking the build because there is at least one 
binary incompatible class: org.apache.flink.streaming.api.datastream.DataStream
```

The issue has been scheduled for Flink 2.0 - let's redo once this is on the 
horizon. Could you close the pull request?


> Replace org.apache.flink.streaming.api.windowing.time.Time with 
> org.apache.flink.api.common.time.Time
> -
>
> Key: FLINK-4198
> URL: https://issues.apache.org/jira/browse/FLINK-4198
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
> Fix For: 2.0.0
>
>
> Remove {{org.apache.flink.streaming.api.windowing.time.Time}} and replace it 
> with {{org.apache.flink.api.common.time.Time}} which resides in 
> {{flink-core}}. The latter is basically the copy of the former which has been 
> moved to {{flink-core}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74965913
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final Map workersInLau

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422942#comment-15422942
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74965913
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   p

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422954#comment-15422954
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74966906
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   pri

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74966906
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final Map workersInLaunc

[jira] [Closed] (FLINK-4293) Malformatted Apache Headers

2016-08-16 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-4293.
---

> Malformatted Apache Headers
> ---
>
> Key: FLINK-4293
> URL: https://issues.apache.org/jira/browse/FLINK-4293
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.2.0
>
>
> Several files contain this header:
> {code}
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  * 
>  * http://www.apache.org/licenses/LICENSE-2.0
>  * 
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> {code}
> The correct header format should be:
> {code}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-4293) Malformatted Apache Headers

2016-08-16 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-4293.
-
   Resolution: Fixed
Fix Version/s: 1.2.0

Fixed via dff986dff9af4ce23d5f98306044674d009f2649

> Malformatted Apache Headers
> ---
>
> Key: FLINK-4293
> URL: https://issues.apache.org/jira/browse/FLINK-4293
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.2.0
>
>
> Several files contain this header:
> {code}
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  * 
>  * http://www.apache.org/licenses/LICENSE-2.0
>  * 
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> {code}
> The correct header format should be:
> {code}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4293) Malformatted Apache Headers

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422960#comment-15422960
 ] 

ASF GitHub Bot commented on FLINK-4293:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2364


> Malformatted Apache Headers
> ---
>
> Key: FLINK-4293
> URL: https://issues.apache.org/jira/browse/FLINK-4293
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.0.3
>Reporter: Stephan Ewen
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.2.0
>
>
> Several files contain this header:
> {code}
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  * 
>  * http://www.apache.org/licenses/LICENSE-2.0
>  * 
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> {code}
> The correct header format should be:
> {code}
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  * http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422969#comment-15422969
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74967737
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   pri

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74967737
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final Map workersInLaunc

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74967974
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 
--

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422970#comment-15422970
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74967974
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422974#comment-15422974
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74968309
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74968309
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
+import org.apache.flink.mesos.util.MesosArtifactServer;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.mesos.util.ZooKeeperUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.process.ProcessReaper;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.runtime.webmonitor.WebMonitor;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.mesos.Protos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Option;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.InetAddress;
+import java.net.URL;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.mesos.Utils.uri;
+import static org.apache.flink.mesos.Utils.variable;
+
+/**
+ * This class is the executable entry point for the Mesos Application 
Master.
+ * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
+ * and {@link MesosFlinkResourceManager}.
+ *
+ * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
+ * allocation and failure detection.
+ */
+public class MesosApplicationMasterRunner {
+   /** Logger */
+   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
+
+   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
+* before they quit */
+   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
+
+   /** The process environment variables */
+   private static final Map ENV = System.getenv();
+
+   /** The exit code returned if the initialization of the application 
master failed */
+   private static final int INIT_ERROR_EXIT_CODE = 31;
+
+   /** The exit code returned if the process exits because a critical 
actor died */
+   private static final int ACTOR_DIED_EXIT_CODE = 32;
+
+   // 
--

[GitHub] flink pull request #2364: [FLINK-4293] Fix malformatted license headers

2016-08-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/2364


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422976#comment-15422976
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74968719
  
--- Diff: flink-dist/pom.xml ---
@@ -113,8 +113,13 @@ under the License.
flink-metrics-jmx
${project.version}

+
+   
+   org.apache.flink
+   flink-mesos_2.10
+   ${project.version}
+   
--- End diff --

Thought about it but I don't see a good reason.   Why that and not for the 
various connectors and other such non-core modules?  I see Kinesis is special 
cased but for licensing reasons.


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread EronWright
Github user EronWright commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74968719
  
--- Diff: flink-dist/pom.xml ---
@@ -113,8 +113,13 @@ under the License.
flink-metrics-jmx
${project.version}

+
+   
+   org.apache.flink
+   flink-mesos_2.10
+   ${project.version}
+   
--- End diff --

Thought about it but I don't see a good reason.   Why that and not for the 
various connectors and other such non-core modules?  I see Kinesis is special 
cased but for licensing reasons.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1)

2016-08-16 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2315
  
A high-level comment to reviewers, I strove to follow the YARN 
implementation closely, in anticipation of a future refactoring.   The runner 
and RM are prime examples where consistency was emphasized.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422980#comment-15422980
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/2315
  
A high-level comment to reviewers, I strove to follow the YARN 
implementation closely, in anticipation of a future refactoring.   The runner 
and RM are prime examples where consistency was emphasized.


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422998#comment-15422998
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74971461
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   p

[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74971461
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final Map workersInLau

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423014#comment-15423014
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74972977
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+
+import static java.util.Objects.requireNonNull;
+
+public class MesosTaskManagerParameters {
--- End diff --

Java docs


> Integrate Flink with Apache Mesos
> -
>
> Key: FLINK-1984
> URL: https://issues.apache.org/jira/browse/FLINK-1984
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
>Reporter: Robert Metzger
>Assignee: Eron Wright 
>Priority: Minor
> Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74972631
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   private ActorRef taskRouter;
+
+   private ActorRef launchCoordinator;
+
+   private ActorRef reconciliationCoordinator;
+
+   private MesosWorkerStore workerStore;
+
+   final Map workersInNew;
+   final Map workersInLau

[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423008#comment-15423008
 ] 

ASF GitHub Bot commented on FLINK-1984:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74972631
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java
 ---
@@ -0,0 +1,755 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import com.netflix.fenzo.TaskRequest;
+import com.netflix.fenzo.TaskScheduler;
+import com.netflix.fenzo.VirtualMachineLease;
+import com.netflix.fenzo.functions.Action1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
+import org.apache.flink.mesos.scheduler.ConnectionMonitor;
+import org.apache.flink.mesos.scheduler.LaunchableTask;
+import org.apache.flink.mesos.scheduler.LaunchCoordinator;
+import org.apache.flink.mesos.scheduler.ReconciliationCoordinator;
+import org.apache.flink.mesos.scheduler.SchedulerProxy;
+import org.apache.flink.mesos.scheduler.TaskMonitor;
+import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
+import org.apache.flink.mesos.scheduler.Tasks;
+import org.apache.flink.mesos.scheduler.messages.AcceptOffers;
+import org.apache.flink.mesos.scheduler.messages.Disconnected;
+import org.apache.flink.mesos.scheduler.messages.Error;
+import org.apache.flink.mesos.scheduler.messages.OfferRescinded;
+import org.apache.flink.mesos.scheduler.messages.ReRegistered;
+import org.apache.flink.mesos.scheduler.messages.Registered;
+import org.apache.flink.mesos.scheduler.messages.ResourceOffers;
+import org.apache.flink.mesos.scheduler.messages.StatusUpdate;
+import org.apache.flink.mesos.util.MesosConfiguration;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.mesos.Protos;
+import org.apache.mesos.Protos.FrameworkInfo;
+import org.apache.mesos.SchedulerDriver;
+import org.slf4j.Logger;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Flink Resource Manager for Apache Mesos.
+ */
+public class MesosFlinkResourceManager extends 
FlinkResourceManager {
+
+   /** The Mesos configuration (master and framework info) */
+   private final MesosConfiguration mesosConfig;
+
+   /** The TaskManager container parameters (like container memory size) */
+   private final MesosTaskManagerParameters taskManagerParameters;
+
+   /** Context information used to start a TaskManager Java process */
+   private final Protos.TaskInfo.Builder taskManagerLaunchContext;
+
+   /** Number of failed Mesos tasks before stopping the application. -1 
means infinite. */
+   private final int maxFailedTasks;
+
+   /** Callback handler for the asynchronous Mesos scheduler */
+   private SchedulerProxy schedulerCallbackHandler;
+
+   /** Mesos scheduler driver */
+   private SchedulerDriver schedulerDriver;
+
+   private ActorRef connectionMonitor;
+
+   p

[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-16 Thread cresny
Github user cresny commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74974171
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 ---
@@ -62,4 +64,34 @@ public void run() {
throw asyncException.f0;
}
}
+
+   /**
+* Ensure that target path terminates with a new directory to be 
created by fs. If remoteURI does not specify a new
+* directory, append local directory name.
+* @param fs
+* @param localPath
+* @param remoteURI
+* @return
+* @throws IOException
+*/
+   protected static URI checkInitialDirectory(final FileSystem fs,final 
File localPath, final URI remoteURI) throws IOException {
+   if (localPath.isDirectory()) {
+   Path remotePath = new Path(remoteURI);
+   if (fs.exists(remotePath)) {
+   return new 
Path(remotePath,localPath.getName()).toUri();
+   }
+   }
+   return remoteURI;
+   }
+
+   protected static void copyFromLocalFile(final FileSystem fs, final File 
localPath, final URI remotePath) throws Exception {
--- End diff --

It looks like that won't work as-is since `flink-hadoop-compatability` is 
not a dependency of either `flink-streaming-java` or `flink-yarn`.  Maybe those 
two are entirely disjoint? I'll look a  litter deeper.

On another note-- the overloaded `FileSystem.copyFromLocal` used by Flink 
defaults to "overwrite=false". This does not have much of an impact in HDFS, 
but it carries real costs in S3, both from a performance and financial 
standpoint, and because of this S3 writes are typically a blind overwrite. 
Given the usage here -- savepoint backups and YARN staging -- it seems 
apropiate for the general case to be an overwrite = true. For the HDFS case it 
would incur a negligible check before write (rather than an optimistic write). 
It's a very minor code change even if it may warrant a longer explanation. 
Perhaps create a new issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...

2016-08-16 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/2315#discussion_r74972977
  
--- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java
 ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.mesos.runtime.clusterframework;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+
+import static java.util.Objects.requireNonNull;
+
+public class MesosTaskManagerParameters {
--- End diff --

Java docs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2288: Feature/s3 a fix

2016-08-16 Thread cresny
Github user cresny commented on a diff in the pull request:

https://github.com/apache/flink/pull/2288#discussion_r74982416
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
 ---
@@ -62,4 +64,34 @@ public void run() {
throw asyncException.f0;
}
}
+
+   /**
+* Ensure that target path terminates with a new directory to be 
created by fs. If remoteURI does not specify a new
+* directory, append local directory name.
+* @param fs
+* @param localPath
+* @param remoteURI
+* @return
+* @throws IOException
+*/
+   protected static URI checkInitialDirectory(final FileSystem fs,final 
File localPath, final URI remoteURI) throws IOException {
+   if (localPath.isDirectory()) {
+   Path remotePath = new Path(remoteURI);
+   if (fs.exists(remotePath)) {
+   return new 
Path(remotePath,localPath.getName()).toUri();
+   }
+   }
+   return remoteURI;
+   }
+
+   protected static void copyFromLocalFile(final FileSystem fs, final File 
localPath, final URI remotePath) throws Exception {
--- End diff --

Perhaps put it in `org.apache.flink.runtime.fs.util` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2359: [FLINK-4198] Replace org.apache.flink.streaming.api.windo...

2016-08-16 Thread kishorekgarg
Github user kishorekgarg commented on the issue:

https://github.com/apache/flink/pull/2359
  
Sure. I will close it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2359: [FLINK-4198] Replace org.apache.flink.streaming.ap...

2016-08-16 Thread kishorekgarg
Github user kishorekgarg closed the pull request at:

https://github.com/apache/flink/pull/2359


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4198) Replace org.apache.flink.streaming.api.windowing.time.Time with org.apache.flink.api.common.time.Time

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423169#comment-15423169
 ] 

ASF GitHub Bot commented on FLINK-4198:
---

Github user kishorekgarg closed the pull request at:

https://github.com/apache/flink/pull/2359


> Replace org.apache.flink.streaming.api.windowing.time.Time with 
> org.apache.flink.api.common.time.Time
> -
>
> Key: FLINK-4198
> URL: https://issues.apache.org/jira/browse/FLINK-4198
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
> Fix For: 2.0.0
>
>
> Remove {{org.apache.flink.streaming.api.windowing.time.Time}} and replace it 
> with {{org.apache.flink.api.common.time.Time}} which resides in 
> {{flink-core}}. The latter is basically the copy of the former which has been 
> moved to {{flink-core}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4198) Replace org.apache.flink.streaming.api.windowing.time.Time with org.apache.flink.api.common.time.Time

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423168#comment-15423168
 ] 

ASF GitHub Bot commented on FLINK-4198:
---

Github user kishorekgarg commented on the issue:

https://github.com/apache/flink/pull/2359
  
Sure. I will close it.


> Replace org.apache.flink.streaming.api.windowing.time.Time with 
> org.apache.flink.api.common.time.Time
> -
>
> Key: FLINK-4198
> URL: https://issues.apache.org/jira/browse/FLINK-4198
> Project: Flink
>  Issue Type: Sub-task
>Affects Versions: 1.0.0
>Reporter: Till Rohrmann
> Fix For: 2.0.0
>
>
> Remove {{org.apache.flink.streaming.api.windowing.time.Time}} and replace it 
> with {{org.apache.flink.api.common.time.Time}} which resides in 
> {{flink-core}}. The latter is basically the copy of the former which has been 
> moved to {{flink-core}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2353: [FLINK-4355] [cluster management] Implement TaskManager s...

2016-08-16 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2353
  
I addressed the comments by @tillrohrmann and @wenlong88
  - turning the registration object into a reusable utility that can also 
be used for the JobMaster registration
  - I made the registration object behave more like a cancelable future. 
That way, it needs not use the TaskExecutor's main-thread-execution-context for 
its retries.

This looks actually pretty nice now. Will merge this to allow other people 
to build on top of it. Will provide tests ASAP.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4355) Implement TaskManager side of registration at ResourceManager

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423229#comment-15423229
 ] 

ASF GitHub Bot commented on FLINK-4355:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/2353
  
I addressed the comments by @tillrohrmann and @wenlong88
  - turning the registration object into a reusable utility that can also 
be used for the JobMaster registration
  - I made the registration object behave more like a cancelable future. 
That way, it needs not use the TaskExecutor's main-thread-execution-context for 
its retries.

This looks actually pretty nice now. Will merge this to allow other people 
to build on top of it. Will provide tests ASAP.


> Implement TaskManager side of registration at ResourceManager
> -
>
> Key: FLINK-4355
> URL: https://issues.apache.org/jira/browse/FLINK-4355
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management
>Reporter: Zhijiang Wang
>Assignee: Stephan Ewen
>
> If the {{TaskManager}} is unregistered, it should try and register at the 
> {{ResourceManager}} leader. The registration messages are fenced via the 
> {{RmLeaderID}}.
> The ResourceManager may acknowledge the registration (or respond that the 
> TaskManager is AlreadyRegistered) or refuse the registration.
> Upon registration refusal, the TaskManager may have to kill itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4402) Wrong metrics parameter names in documentation

2016-08-16 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423241#comment-15423241
 ] 

Chesnay Schepler commented on FLINK-4402:
-

the documentation should be adjusted.

> Wrong metrics parameter names in documentation 
> ---
>
> Key: FLINK-4402
> URL: https://issues.apache.org/jira/browse/FLINK-4402
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.1
> Environment: all
>Reporter: RWenden
>Priority: Trivial
> Fix For: 1.1.2
>
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> On the page 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/metrics.html
> the following metrics parameters should be
> faulty: metrics.scope.tm.task , should be metrics.scope.task
> faulty: metrics.scope.tm.operator , should be metrics.scope.operator
> to make it work on Flink 1.1.1.
> But to fix this, the constants in ConfigConstants.java can also be changed to 
> fit the documentation. Either way...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4403) RPC proxy classloading should use Flink class' classloader

2016-08-16 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-4403:
---

 Summary: RPC proxy classloading should use Flink class' classloader
 Key: FLINK-4403
 URL: https://issues.apache.org/jira/browse/FLINK-4403
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
 Environment: FLIP-6 feature branch
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.2.0


The RPC service's proxies use the {{ClassLoader.getSystemClassLoader()}} for 
all reflective classloading.

In settings where Flink runs embedded, the Flink framework classes may not be 
in the System classloader, but for example in the classloader of an OSGI 
bundle. It is hence better to use the classloader of a Flink Framework class. 
In most cases, that will be the system classloader, in other cases it will be 
the classloader for the Flink code bundle: 
{{RpcService.class.getClassLoader()}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-16 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75010826
  
--- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml ---
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-activemq_2.10
+   flink-connector-activemq
+
+   jar
+
+   
+   
+   5.11.1
--- End diff --

Good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423313#comment-15423313
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75010826
  
--- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml ---
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-activemq_2.10
+   flink-connector-activemq
+
+   jar
+
+   
+   
+   5.11.1
--- End diff --

Good point.


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-16 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75010803
  
--- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml ---
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-activemq_2.10
+   flink-connector-activemq
+
+   jar
+
+   
+   
+   5.11.1
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.activemq
+   activemq-client
+   ${activemq.version}
+   
+
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-tests_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test-jar
+   test
+   
+   
+   org.apache.activemq.tooling
+   activemq-junit
+   5.13.1
--- End diff --

Good point. Fixed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423312#comment-15423312
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75010803
  
--- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml ---
@@ -0,0 +1,104 @@
+
+
+http://maven.apache.org/POM/4.0.0";
+xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+   4.0.0
+
+   
+   org.apache.flink
+   flink-streaming-connectors
+   1.1-SNAPSHOT
+   ..
+   
+
+   flink-connector-activemq_2.10
+   flink-connector-activemq
+
+   jar
+
+   
+   
+   5.11.1
+   
+
+   
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   provided
+   
+
+   
+   org.apache.activemq
+   activemq-client
+   ${activemq.version}
+   
+
+
+   
+   org.apache.flink
+   flink-streaming-java_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   flink-test-utils_2.10
+   ${project.version}
+   test
+   
+
+   
+   org.apache.flink
+   flink-tests_2.10
+   ${project.version}
+   test-jar
+   test
+   
+
+   
+   org.apache.flink
+   flink-runtime_2.10
+   ${project.version}
+   test-jar
+   test
+   
+   
+   org.apache.activemq.tooling
+   activemq-junit
+   5.13.1
--- End diff --

Good point. Fixed it.


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-4380) Introduce KeyGroupAssigner and Max-Parallelism Parameter

2016-08-16 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-4380:
---

Assignee: Aljoscha Krettek

> Introduce KeyGroupAssigner and Max-Parallelism Parameter
> 
>
> Key: FLINK-4380
> URL: https://issues.apache.org/jira/browse/FLINK-4380
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>
> For key-group sharding we need to introduce a {{KeyGroupAssigner}} that 
> assigns key hashes to key-groups (or shards). Also, this issue is for 
> tracking the addition of a {{max-parallelism}} parameter for tracking the 
> number of key groups.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4381) Refactor State to Prepare For Key-Group State Backends

2016-08-16 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4381:

Assignee: Stefan Richter

> Refactor State to Prepare For Key-Group State Backends
> --
>
> Key: FLINK-4381
> URL: https://issues.apache.org/jira/browse/FLINK-4381
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>
> In order to use the new {{KeyGroupAssigner}}/{{key group sharding}} the state 
> backends need no be able to deal with key groups. For this, we first need to 
> refactor the state abstractions. Specifically, this touches how key-grouped 
> state should be stored and also how state is sent to the 
> {{CheckpointCoordinator}} and how it is stored.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4380) Introduce KeyGroupAssigner and Max-Parallelism Parameter

2016-08-16 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4380:

Assignee: Stefan Richter  (was: Aljoscha Krettek)

> Introduce KeyGroupAssigner and Max-Parallelism Parameter
> 
>
> Key: FLINK-4380
> URL: https://issues.apache.org/jira/browse/FLINK-4380
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming
>Reporter: Aljoscha Krettek
>Assignee: Stefan Richter
>
> For key-group sharding we need to introduce a {{KeyGroupAssigner}} that 
> assigns key hashes to key-groups (or shards). Also, this issue is for 
> tracking the addition of a {{max-parallelism}} parameter for tracking the 
> number of key groups.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423315#comment-15423315
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011140
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
--- End diff --

This does not necessarily create a queue on the ActiveMQ side. If it 
already exists it will be used.


> Streaming connector for ActiveMQ
> --

[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-16 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011140
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
--- End diff --

This does not necessarily create a queue on the ActiveMQ side. If it 
already exists it will be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JI

[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-16 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011407
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
--- End diff --

I'll add support for topics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-16 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011452
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
+
+   // Create a MessageProducer from the Session to the Topic or
+   // Queue
+   producer = session.createProducer(destination);
+   producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
--- End diff --

Good point. Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not

[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423318#comment-15423318
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011407
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
--- End diff --

I'll add support for topics.


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> 

[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423319#comment-15423319
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011452
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
+
+   // Create the destination (Topic or Queue)
+   Destination destination = session.createQueue(queueName);
+
+   // Create a MessageProducer from the Session to the Topic or
+   // Queue
+   producer = session.createProducer(destination);

[jira] [Assigned] (FLINK-3761) Introduce key group state backend

2016-08-16 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-3761:
---

Assignee: Aljoscha Krettek  (was: Till Rohrmann)

> Introduce key group state backend
> -
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-3761) Refactor State Backends/Make Keyed State Key-Group Aware

2016-08-16 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-3761:

Summary: Refactor State Backends/Make Keyed State Key-Group Aware  (was: 
Introduce key group state backend)

> Refactor State Backends/Make Keyed State Key-Group Aware
> 
>
> Key: FLINK-3761
> URL: https://issues.apache.org/jira/browse/FLINK-3761
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Reporter: Till Rohrmann
>Assignee: Aljoscha Krettek
>
> After an off-line discussion with [~aljoscha], we came to the conclusion that 
> it would be beneficial to reflect the differences between a keyed and a 
> non-keyed stream also in the state backends. A state backend which is used 
> for a keyed stream offers a value, list, folding and value state and has to 
> group its keys into key groups. 
> A state backend for non-keyed streams can only offer a union state to make it 
> work with dynamic scaling. A union state is a state which is broadcasted to 
> all tasks in case of a recovery. The state backends can then select what 
> information they need to recover from the whole state (formerly distributed).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-16 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011941
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
+   private transient Connection connection;
+   private transient Session session;
+   private transient MessageConsumer consumer;
+   private boolean autoAck;
+   private HashMap unaknowledgedMessages = new 
HashMap<>();
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+*/
+   public AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema) {
+   this(connectionFactory, queueName, deserializationSchema, new 
RunningCheckerImpl());
+   }
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+* @param runningChecker running checker that is used to decide if the 
source is still running
+*/
+   AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema, RunningChecker 
runningChecker) {
+   super(String.class);
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.deserializationSchema = deserializationSchema

[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-16 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011880
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
+   private transient Connection connection;
+   private transient Session session;
+   private transient MessageConsumer consumer;
+   private boolean autoAck;
+   private HashMap unaknowledgedMessages = new 
HashMap<>();
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+*/
+   public AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema) {
+   this(connectionFactory, queueName, deserializationSchema, new 
RunningCheckerImpl());
+   }
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+* @param runningChecker running checker that is used to decide if the 
source is still running
+*/
+   AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema, RunningChecker 
runningChecker) {
+   super(String.class);
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.deserializationSchema = deserializationSchema

[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423325#comment-15423325
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011880
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
+   private transient Connection connection;
+   private transient Session session;
+   private transient MessageConsumer consumer;
+   private boolean autoAck;
+   private HashMap unaknowledgedMessages = new 
HashMap<>();
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+*/
+   public AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema) {
+   this(connectionFactory, queueName, deserializationSchema, new 
RunningCheckerImpl());
+   }
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+* @param runningChecker running checker that is used to decide if the 
source is still running
+*/
+   AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserial

[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423327#comment-15423327
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75011941
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java
 ---
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import 
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Source for reading messages from an ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * This source is waiting for incoming messages from ActiveMQ and converts 
them from
+ * an array of bytes into an instance of the output type. If an incoming
+ * message is not a message with an array of bytes, this message is ignored
+ * and warning message is logged.
+ *
+ * @param  type of output messages
+ */
+public class AMQSource extends MessageAcknowledgingSourceBase
+   implements ResultTypeQueryable {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSource.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final DeserializationSchema deserializationSchema;
+   private boolean logFailuresOnly = false;
+   private RunningChecker runningChecker;
+   private transient Connection connection;
+   private transient Session session;
+   private transient MessageConsumer consumer;
+   private boolean autoAck;
+   private HashMap unaknowledgedMessages = new 
HashMap<>();
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+*/
+   public AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserializationSchema) {
+   this(connectionFactory, queueName, deserializationSchema, new 
RunningCheckerImpl());
+   }
+
+   /**
+* Create AMQSource.
+*
+* @param connectionFactory factory that will be used to create a 
connection with ActiveMQ
+* @param queueName name of an ActiveMQ queue to read from
+* @param deserializationSchema schema to deserialize incoming messages
+* @param runningChecker running checker that is used to decide if the 
source is still running
+*/
+   AMQSource(ActiveMQConnectionFactory connectionFactory, String 
queueName, DeserializationSchema deserial

[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423358#comment-15423358
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2374#discussion_r75017658
  
--- Diff: 
flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 ---
@@ -245,6 +246,50 @@ public void testHistogramReporting() throws Exception {
}
}
 
+   /**
+* Tests that histograms are properly reported via the JMXReporter.
--- End diff --

meters instead of histograms


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2374: [FLINK-3950] Add Meter Metric Type

2016-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2374#discussion_r75017658
  
--- Diff: 
flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 ---
@@ -245,6 +246,50 @@ public void testHistogramReporting() throws Exception {
}
}
 
+   /**
+* Tests that histograms are properly reported via the JMXReporter.
--- End diff --

meters instead of histograms


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #2374: [FLINK-3950] Add Meter Metric Type

2016-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2374#discussion_r75018225
  
--- Diff: 
flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 ---
@@ -261,6 +327,40 @@ public long getMin() {
}
}
 
+   public static class TestingMeter implements Meter {
--- End diff --

reuse the other TestingMeter class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423364#comment-15423364
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2374#discussion_r75018180
  
--- Diff: 
flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 ---
@@ -297,4 +342,42 @@ public long getMin() {
};
}
}
+
+   static class TestingMeter implements Meter {
--- End diff --

let's put this in a separate utility test class under 
`flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util`



> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2374: [FLINK-3950] Add Meter Metric Type

2016-08-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2374#discussion_r75018180
  
--- Diff: 
flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java
 ---
@@ -297,4 +342,42 @@ public long getMin() {
};
}
}
+
+   static class TestingMeter implements Meter {
--- End diff --

let's put this in a separate utility test class under 
`flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423365#comment-15423365
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/2374#discussion_r75018225
  
--- Diff: 
flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 ---
@@ -261,6 +327,40 @@ public long getMin() {
}
}
 
+   public static class TestingMeter implements Meter {
--- End diff --

reuse the other TestingMeter class


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423388#comment-15423388
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2374
  
Good work! Only found a minor issue in regards to the TestingMeter class.

There is one thing I would like to start a discussion on however. Right now 
the Meter interface is essentially a copy of the DropWizard Meter interface, 
which is a fair starting point. 

Now my question is: does this fit our use case?  For example, should we 
really mandate exponentially weighed rate (according to the javadocs), or do we 
really need the 15 minute rate?


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2374: [FLINK-3950] Add Meter Metric Type

2016-08-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/2374
  
Good work! Only found a minor issue in regards to the TestingMeter class.

There is one thing I would like to start a discussion on however. Right now 
the Meter interface is essentially a copy of the DropWizard Meter interface, 
which is a fair starting point. 

Now my question is: does this fit our use case?  For example, should we 
really mandate exponentially weighed rate (according to the javadocs), or do we 
really need the 15 minute rate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4402) Wrong metrics parameter names in documentation

2016-08-16 Thread Neelesh Srinivas Salian (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423395#comment-15423395
 ] 

Neelesh Srinivas Salian commented on FLINK-4402:


If you don't mind, shall I work on this JIRA? 

> Wrong metrics parameter names in documentation 
> ---
>
> Key: FLINK-4402
> URL: https://issues.apache.org/jira/browse/FLINK-4402
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.1.1
> Environment: all
>Reporter: RWenden
>Priority: Trivial
> Fix For: 1.1.2
>
>   Original Estimate: 4h
>  Remaining Estimate: 4h
>
> On the page 
> https://ci.apache.org/projects/flink/flink-docs-master/apis/metrics.html
> the following metrics parameters should be
> faulty: metrics.scope.tm.task , should be metrics.scope.task
> faulty: metrics.scope.tm.operator , should be metrics.scope.operator
> to make it work on Flink 1.1.1.
> But to fix this, the constants in ConfigConstants.java can also be changed to 
> fit the documentation. Either way...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...

2016-08-16 Thread mushketyk
Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75024557
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
--- End diff --

As far as I understand AUTO_ACKNOWLEDGE is relevant only for consuming 
(AMQSource), but not for a Sink.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink issue #2314: [FLINK-3298] Implement ActiveMQ streaming connector

2016-08-16 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2314
  
@rmetzger I've updated the PR according to your review. Could you please 
take another look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423415#comment-15423415
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on a diff in the pull request:

https://github.com/apache/flink/pull/2314#discussion_r75024557
  
--- Diff: 
flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java
 ---
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.activemq;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+/**
+ * Sink class for writing data into ActiveMQ queue.
+ * 
+ * To create an instance of AMQSink class one should initialize and 
configure an
+ * instance of a connection factory that will be used to create a 
connection.
+ * Every input message is converted into a byte array using a serialization
+ * schema and being sent into a message queue.
+ *
+ * @param  type of input messages
+ */
+public class AMQSink extends RichSinkFunction {
+   private static final long serialVersionUID = 1L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(AMQSink.class);
+
+   private final ActiveMQConnectionFactory connectionFactory;
+   private final String queueName;
+   private final SerializationSchema serializationSchema;
+   private boolean logFailuresOnly = false;
+   private transient MessageProducer producer;
+   private transient Session session;
+   private transient Connection connection;
+
+   /**
+* Create AMQSink.
+*
+* @param connectionFactory factory for creating ActiveMQ connection
+* @param queueName name of a queue to write to
+* @param serializationSchema schema to serialize input message into a 
byte array
+*/
+   public AMQSink(ActiveMQConnectionFactory connectionFactory, String 
queueName, SerializationSchema serializationSchema) {
+   this.connectionFactory = connectionFactory;
+   this.queueName = queueName;
+   this.serializationSchema = serializationSchema;
+   }
+
+   /**
+* Defines whether the producer should fail on errors, or only log them.
+* If this is set to true, then exceptions will be only logged, if set 
to false,
+* exceptions will be eventually thrown and cause the streaming program 
to
+* fail (and enter recovery).
+*
+* @param logFailuresOnly The flag to indicate logging-only on 
exceptions.
+*/
+   public void setLogFailuresOnly(boolean logFailuresOnly) {
+   this.logFailuresOnly = logFailuresOnly;
+   }
+
+
+   @Override
+   public void open(Configuration config) throws Exception {
+   super.open(config);
+   // Create a Connection
+   connection = connectionFactory.createConnection();
+   connection.start();
+
+   // Create a Session
+   session = connection.createSession(false,
+   Session.AUTO_ACKNOWLEDGE);
--- End diff --

As far as I understand AUTO_ACKNOWLEDGE is relevant only for consuming 
(AMQSource), but not for a Sink.


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-32

[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423417#comment-15423417
 ] 

ASF GitHub Bot commented on FLINK-3298:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2314
  
@rmetzger I've updated the PR according to your review. Could you please 
take another look?


> Streaming connector for ActiveMQ
> 
>
> Key: FLINK-3298
> URL: https://issues.apache.org/jira/browse/FLINK-3298
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors
>Reporter: Mohit Sethi
>Assignee: Ivan Mushketyk
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink issue #2374: [FLINK-3950] Add Meter Metric Type

2016-08-16 Thread mushketyk
Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2374
  
@zentol Thank you for reviewing my PR! I'll fix these issues as soon as I 
can.

On the metrics interface. Could you please elaborate why do you think 
exponentially weighted rate and 15 minute rate may be useless for us?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-3950) Add Meter Metric Type

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423463#comment-15423463
 ] 

ASF GitHub Bot commented on FLINK-3950:
---

Github user mushketyk commented on the issue:

https://github.com/apache/flink/pull/2374
  
@zentol Thank you for reviewing my PR! I'll fix these issues as soon as I 
can.

On the metrics interface. Could you please elaborate why do you think 
exponentially weighted rate and 15 minute rate may be useless for us?


> Add Meter Metric Type
> -
>
> Key: FLINK-3950
> URL: https://issues.apache.org/jira/browse/FLINK-3950
> Project: Flink
>  Issue Type: Sub-task
>  Components: Core
>Reporter: Stephan Ewen
>Assignee: Ivan Mushketyk
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4404) Implement Data Transfer SSL

2016-08-16 Thread Suresh Krishnappa (JIRA)
Suresh Krishnappa created FLINK-4404:


 Summary: Implement Data Transfer SSL
 Key: FLINK-4404
 URL: https://issues.apache.org/jira/browse/FLINK-4404
 Project: Flink
  Issue Type: Sub-task
Reporter: Suresh Krishnappa


This issue is to address part T3-3 (Implement Data Transfer TLS/SSL) of the 
Secure Data Access design doc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-4405) Implement Blob Server SSL

2016-08-16 Thread Suresh Krishnappa (JIRA)
Suresh Krishnappa created FLINK-4405:


 Summary: Implement Blob Server SSL
 Key: FLINK-4405
 URL: https://issues.apache.org/jira/browse/FLINK-4405
 Project: Flink
  Issue Type: Sub-task
Reporter: Suresh Krishnappa


This issue is to address part T3-4 (Implement Blob Server TLS/SSL) of the 
Secure Data Access design doc.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4404) Implement Data Transfer SSL

2016-08-16 Thread Suresh Krishnappa (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suresh Krishnappa updated FLINK-4404:
-
Description: This issue is to address part T3-3 (Implement Data Transfer 
TLS/SSL) of the [Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc.  (was: This issue is to address part T3-3 (Implement Data Transfer 
TLS/SSL) of the Secure Data Access design doc.)

> Implement Data Transfer SSL
> ---
>
> Key: FLINK-4404
> URL: https://issues.apache.org/jira/browse/FLINK-4404
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Suresh Krishnappa
>  Labels: security
>
> This issue is to address part T3-3 (Implement Data Transfer TLS/SSL) of the 
> [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-4405) Implement Blob Server SSL

2016-08-16 Thread Suresh Krishnappa (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suresh Krishnappa updated FLINK-4405:
-
Description: 
This issue is to address part T3-4 (Implement Blob Server TLS/SSL) of the 
[Secure Data 
Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
 design doc.


  was:
This issue is to address part T3-4 (Implement Blob Server TLS/SSL) of the 
Secure Data Access design doc.



> Implement Blob Server SSL
> -
>
> Key: FLINK-4405
> URL: https://issues.apache.org/jira/browse/FLINK-4405
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Suresh Krishnappa
>  Labels: security
>
> This issue is to address part T3-4 (Implement Blob Server TLS/SSL) of the 
> [Secure Data 
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
>  design doc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423949#comment-15423949
 ] 

ASF GitHub Bot commented on FLINK-4253:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2342#discussion_r75065679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -77,7 +77,7 @@
 
/**
 * Shutdown hook thread to ensure deletion of the storage directory (or 
null if
-* the configured recovery mode does not equal{@link 
RecoveryMode#STANDALONE})
+* the configured recovery mode does not equal{@link RecoveryMode#NONE})
--- End diff --

Ok


> Rename "recovery.mode" config key to "high-availability"
> 
>
> Key: FLINK-4253
> URL: https://issues.apache.org/jira/browse/FLINK-4253
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: ramkrishna.s.vasudevan
>
> Currently, HA is configured via the following configuration keys:
> {code}
> recovery.mode: STANDALONE // No high availability (HA)
> recovery.mode: ZOOKEEPER // HA
> {code}
> This could be more straight forward by simply renaming the key to 
> {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We 
> already have standalone cluster mode.
> {code}
> high-availability: NONE // No HA
> high-availability: ZOOKEEPER // HA via ZooKeeper
> {code}
> The {{recovery.mode}} configuration keys would have to be deprecated before 
> completely removing them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2342: FLINK-4253 - Rename "recovery.mode" config key to ...

2016-08-16 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2342#discussion_r75065676
  
--- Diff: docs/setup/config.md ---
@@ -285,7 +285,7 @@ of the JobManager, because the same ActorSystem is 
used. Its not possible to use
 
 ## High Availability Mode
 
-- `recovery.mode`: (Default 'standalone') Defines the recovery mode used 
for the cluster execution. Currently, Flink supports the 'standalone' mode 
where only a single JobManager runs and no JobManager state is checkpointed. 
The high availability mode 'zookeeper' supports the execution of multiple 
JobManagers and JobManager state checkpointing. Among the group of JobManagers, 
ZooKeeper elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.
+- `high-availability`: (Default 'none') Defines the recovery mode used for 
the cluster execution. Currently, Flink supports the 'none' mode where only a 
single JobManager runs and no JobManager state is checkpointed. The high 
availability mode 'zookeeper' supports the execution of multiple JobManagers 
and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper 
elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.  Previously this config was 
named 'recovery.mode' and the default config was 'standalone'.
 
 - `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is 
used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is 
selected
--- End diff --

Oh yes. Will change them too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"

2016-08-16 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423948#comment-15423948
 ] 

ASF GitHub Bot commented on FLINK-4253:
---

Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2342#discussion_r75065676
  
--- Diff: docs/setup/config.md ---
@@ -285,7 +285,7 @@ of the JobManager, because the same ActorSystem is 
used. Its not possible to use
 
 ## High Availability Mode
 
-- `recovery.mode`: (Default 'standalone') Defines the recovery mode used 
for the cluster execution. Currently, Flink supports the 'standalone' mode 
where only a single JobManager runs and no JobManager state is checkpointed. 
The high availability mode 'zookeeper' supports the execution of multiple 
JobManagers and JobManager state checkpointing. Among the group of JobManagers, 
ZooKeeper elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.
+- `high-availability`: (Default 'none') Defines the recovery mode used for 
the cluster execution. Currently, Flink supports the 'none' mode where only a 
single JobManager runs and no JobManager state is checkpointed. The high 
availability mode 'zookeeper' supports the execution of multiple JobManagers 
and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper 
elects one of them as the leader which is responsible for the cluster 
execution. In case of a JobManager failure, a standby JobManager will be 
elected as the new leader and is given the last checkpointed JobManager state. 
In order to use the 'zookeeper' mode, it is mandatory to also define the 
`recovery.zookeeper.quorum` configuration value.  Previously this config was 
named 'recovery.mode' and the default config was 'standalone'.
 
 - `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is 
used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is 
selected
--- End diff --

Oh yes. Will change them too.


> Rename "recovery.mode" config key to "high-availability"
> 
>
> Key: FLINK-4253
> URL: https://issues.apache.org/jira/browse/FLINK-4253
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ufuk Celebi
>Assignee: ramkrishna.s.vasudevan
>
> Currently, HA is configured via the following configuration keys:
> {code}
> recovery.mode: STANDALONE // No high availability (HA)
> recovery.mode: ZOOKEEPER // HA
> {code}
> This could be more straight forward by simply renaming the key to 
> {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We 
> already have standalone cluster mode.
> {code}
> high-availability: NONE // No HA
> high-availability: ZOOKEEPER // HA via ZooKeeper
> {code}
> The {{recovery.mode}} configuration keys would have to be deprecated before 
> completely removing them.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request #2342: FLINK-4253 - Rename "recovery.mode" config key to ...

2016-08-16 Thread ramkrish86
Github user ramkrish86 commented on a diff in the pull request:

https://github.com/apache/flink/pull/2342#discussion_r75065679
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---
@@ -77,7 +77,7 @@
 
/**
 * Shutdown hook thread to ensure deletion of the storage directory (or 
null if
-* the configured recovery mode does not equal{@link 
RecoveryMode#STANDALONE})
+* the configured recovery mode does not equal{@link RecoveryMode#NONE})
--- End diff --

Ok


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-4406) Implement job master registration at resource manager

2016-08-16 Thread Wenlong Lyu (JIRA)
Wenlong Lyu created FLINK-4406:
--

 Summary: Implement job master registration at resource manager
 Key: FLINK-4406
 URL: https://issues.apache.org/jira/browse/FLINK-4406
 Project: Flink
  Issue Type: Sub-task
  Components: Cluster Management
Reporter: Wenlong Lyu


Job Master needs to register to Resource Manager when starting and then watches 
leadership changes of RM, and trigger re-registration.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection

2016-08-16 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423984#comment-15423984
 ] 

Zhijiang Wang commented on FLINK-4021:
--

Thank you for advice, it is indeed simple by 'return isStagedBuffer' directly. 

> Problem of setting autoread for netty channel when more tasks sharing the 
> same Tcp connection
> -
>
> Key: FLINK-4021
> URL: https://issues.apache.org/jira/browse/FLINK-4021
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> More than one task sharing the same Tcp connection for shuffling data.
> If the downstream task said as "A" has no available memory segment to read 
> netty buffer from network, it will set autoread as false for the channel.
> When the task A is failed or has available segments again, the netty handler 
> will be notified to process the staging buffers first, then reset autoread as 
> true. But in some scenarios, the autoread will not be set as true any more.
> That is when processing staging buffers, first find the corresponding input 
> channel for the buffer, if the task for that input channel is failed, the 
> decodeMsg method in PartitionRequestClientHandler will return false, that 
> means setting autoread as true will not be done anymore.
> In summary,  if one task "A" sets the autoread as false because of no 
> available segments, and resulting in some staging buffers. If another task 
> "B" is failed by accident corresponding to one staging buffer. When task A 
> trys to reset autoread as true, the process can not work because of task B 
> failed.
> I have fixed this problem in our application by adding one boolean parameter 
> in decodeBufferOrEvent method to distinguish whether this method is invoke by 
> netty IO thread channel read or staged message handler task in 
> PartitionRequestClientHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection

2016-08-16 Thread Zhijiang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423994#comment-15423994
 ] 

Zhijiang Wang commented on FLINK-4021:
--

Yeah, you are right. It is no problem in current flink failover mode. Maybe it 
is suitable for FLIP1 
'https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures'.
And we already improved the task failures by just restarting the source and 
failed ones, so noticed this issue.
I forgot to add the testing for the function, thank you for confirming it.
And what other jobs should I do next?

> Problem of setting autoread for netty channel when more tasks sharing the 
> same Tcp connection
> -
>
> Key: FLINK-4021
> URL: https://issues.apache.org/jira/browse/FLINK-4021
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.0.2
>Reporter: Zhijiang Wang
>Assignee: Zhijiang Wang
>
> More than one task sharing the same Tcp connection for shuffling data.
> If the downstream task said as "A" has no available memory segment to read 
> netty buffer from network, it will set autoread as false for the channel.
> When the task A is failed or has available segments again, the netty handler 
> will be notified to process the staging buffers first, then reset autoread as 
> true. But in some scenarios, the autoread will not be set as true any more.
> That is when processing staging buffers, first find the corresponding input 
> channel for the buffer, if the task for that input channel is failed, the 
> decodeMsg method in PartitionRequestClientHandler will return false, that 
> means setting autoread as true will not be done anymore.
> In summary,  if one task "A" sets the autoread as false because of no 
> available segments, and resulting in some staging buffers. If another task 
> "B" is failed by accident corresponding to one staging buffer. When task A 
> trys to reset autoread as true, the process can not work because of task B 
> failed.
> I have fixed this problem in our application by adding one boolean parameter 
> in decodeBufferOrEvent method to distinguish whether this method is invoke by 
> netty IO thread channel read or staged message handler task in 
> PartitionRequestClientHandler.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


<    1   2