[jira] [Commented] (FLINK-4382) Buffer rpc calls until RpcEndpoint is properly started
[ https://issues.apache.org/jira/browse/FLINK-4382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422913#comment-15422913 ] ASF GitHub Bot commented on FLINK-4382: --- Github user tillrohrmann closed the pull request at: https://github.com/apache/flink/pull/2358 > Buffer rpc calls until RpcEndpoint is properly started > -- > > Key: FLINK-4382 > URL: https://issues.apache.org/jira/browse/FLINK-4382 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > When creating a {{RpcEndpoint}} it starts a rpc server. The server should > wait to dispatch incoming rpc calls until the {{RpcEndpoint}} signals that > it's ready. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4382) Buffer rpc calls until RpcEndpoint is properly started
[ https://issues.apache.org/jira/browse/FLINK-4382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-4382. Resolution: Fixed Fixed via 5df27ebd73f5218a792fb53385887968c0e4ca36 > Buffer rpc calls until RpcEndpoint is properly started > -- > > Key: FLINK-4382 > URL: https://issues.apache.org/jira/browse/FLINK-4382 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > When creating a {{RpcEndpoint}} it starts a rpc server. The server should > wait to dispatch incoming rpc calls until the {{RpcEndpoint}} signals that > it's ready. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2354: [FLINK-4366] Enforce parallelism=1 For AllWindowed...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2354 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-4366) Enforce parallelism=1 For AllWindowedStream
[ https://issues.apache.org/jira/browse/FLINK-4366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4366. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed via ffe406570af60057fa2ae318561aa239b99bd648 > Enforce parallelism=1 For AllWindowedStream > --- > > Key: FLINK-4366 > URL: https://issues.apache.org/jira/browse/FLINK-4366 > Project: Flink > Issue Type: Improvement >Reporter: Aljoscha Krettek >Assignee: Jark Wu > Fix For: 1.2.0 > > > Right now, it is possible to use {{DataStream.windowAll/timeWindowAll}} and > then set a different parallelism afterwards. Flink will silently accept this > and spawn the number of parallel operators, only one instance of those will > do all the processing, though, since the elements are implicitly keyed by a > dummy key. > We should throw an exception if users try to set a parallelism on an > all-windowed stream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4366) Enforce parallelism=1 For AllWindowedStream
[ https://issues.apache.org/jira/browse/FLINK-4366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4366. --- > Enforce parallelism=1 For AllWindowedStream > --- > > Key: FLINK-4366 > URL: https://issues.apache.org/jira/browse/FLINK-4366 > Project: Flink > Issue Type: Improvement >Reporter: Aljoscha Krettek >Assignee: Jark Wu > Fix For: 1.2.0 > > > Right now, it is possible to use {{DataStream.windowAll/timeWindowAll}} and > then set a different parallelism afterwards. Flink will silently accept this > and spawn the number of parallel operators, only one instance of those will > do all the processing, though, since the elements are implicitly keyed by a > dummy key. > We should throw an exception if users try to set a parallelism on an > all-windowed stream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4366) Enforce parallelism=1 For AllWindowedStream
[ https://issues.apache.org/jira/browse/FLINK-4366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422922#comment-15422922 ] ASF GitHub Bot commented on FLINK-4366: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2354 > Enforce parallelism=1 For AllWindowedStream > --- > > Key: FLINK-4366 > URL: https://issues.apache.org/jira/browse/FLINK-4366 > Project: Flink > Issue Type: Improvement >Reporter: Aljoscha Krettek >Assignee: Jark Wu > Fix For: 1.2.0 > > > Right now, it is possible to use {{DataStream.windowAll/timeWindowAll}} and > then set a different parallelism afterwards. Flink will silently accept this > and spawn the number of parallel operators, only one instance of those will > do all the processing, though, since the elements are implicitly keyed by a > dummy key. > We should throw an exception if users try to set a parallelism on an > all-windowed stream. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4388) Race condition during initialization of MemorySegmentFactory
[ https://issues.apache.org/jira/browse/FLINK-4388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4388. - Resolution: Fixed Fix Version/s: (was: 1.1.2) Fixed via 6cdf06ccb4c7ac444521ca3c3ff4317a4e947e6d > Race condition during initialization of MemorySegmentFactory > > > Key: FLINK-4388 > URL: https://issues.apache.org/jira/browse/FLINK-4388 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.1.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > The check whether the factory is initialized, and the actual initialization > are not atomic. When starting multiple TaskManagers, this can lead to races > and exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-4388) Race condition during initialization of MemorySegmentFactory
[ https://issues.apache.org/jira/browse/FLINK-4388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4388. --- > Race condition during initialization of MemorySegmentFactory > > > Key: FLINK-4388 > URL: https://issues.apache.org/jira/browse/FLINK-4388 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.1.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.2.0 > > > The check whether the factory is initialized, and the actual initialization > are not atomic. When starting multiple TaskManagers, this can lead to races > and exceptions. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2364: [FLINK-4293] Fix malformatted license headers
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2364 Merging this... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4293) Malformatted Apache Headers
[ https://issues.apache.org/jira/browse/FLINK-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422930#comment-15422930 ] ASF GitHub Bot commented on FLINK-4293: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2364 Merging this... > Malformatted Apache Headers > --- > > Key: FLINK-4293 > URL: https://issues.apache.org/jira/browse/FLINK-4293 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Minor > > Several files contain this header: > {code} > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} > The correct header format should be: > {code} > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74963589 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final Map workersInLau
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422931#comment-15422931 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74963589 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + p
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74963941 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final Map workersInLau
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422933#comment-15422933 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74963941 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + p
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74964722 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final Map workersInLau
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422934#comment-15422934 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74964722 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + p
[GitHub] flink issue #2359: [FLINK-4198] Replace org.apache.flink.streaming.api.windo...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2359 We can unfortunately not merge this change. The tools for API compatibility verify that this breaks the API: ``` Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.7.0:cmp (default) on project flink-streaming-java_2.10: Breaking the build because there is at least one binary incompatible class: org.apache.flink.streaming.api.datastream.DataStream ``` The issue has been scheduled for Flink 2.0 - let's redo once this is on the horizon. Could you close the pull request? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4198) Replace org.apache.flink.streaming.api.windowing.time.Time with org.apache.flink.api.common.time.Time
[ https://issues.apache.org/jira/browse/FLINK-4198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422935#comment-15422935 ] ASF GitHub Bot commented on FLINK-4198: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2359 We can unfortunately not merge this change. The tools for API compatibility verify that this breaks the API: ``` Failed to execute goal com.github.siom79.japicmp:japicmp-maven-plugin:0.7.0:cmp (default) on project flink-streaming-java_2.10: Breaking the build because there is at least one binary incompatible class: org.apache.flink.streaming.api.datastream.DataStream ``` The issue has been scheduled for Flink 2.0 - let's redo once this is on the horizon. Could you close the pull request? > Replace org.apache.flink.streaming.api.windowing.time.Time with > org.apache.flink.api.common.time.Time > - > > Key: FLINK-4198 > URL: https://issues.apache.org/jira/browse/FLINK-4198 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.0.0 >Reporter: Till Rohrmann > Fix For: 2.0.0 > > > Remove {{org.apache.flink.streaming.api.windowing.time.Time}} and replace it > with {{org.apache.flink.api.common.time.Time}} which resides in > {{flink-core}}. The latter is basically the copy of the former which has been > moved to {{flink-core}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74965913 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final Map workersInLau
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422942#comment-15422942 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74965913 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + p
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422954#comment-15422954 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74966906 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + pri
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74966906 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final Map workersInLaunc
[jira] [Closed] (FLINK-4293) Malformatted Apache Headers
[ https://issues.apache.org/jira/browse/FLINK-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-4293. --- > Malformatted Apache Headers > --- > > Key: FLINK-4293 > URL: https://issues.apache.org/jira/browse/FLINK-4293 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.2.0 > > > Several files contain this header: > {code} > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} > The correct header format should be: > {code} > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-4293) Malformatted Apache Headers
[ https://issues.apache.org/jira/browse/FLINK-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-4293. - Resolution: Fixed Fix Version/s: 1.2.0 Fixed via dff986dff9af4ce23d5f98306044674d009f2649 > Malformatted Apache Headers > --- > > Key: FLINK-4293 > URL: https://issues.apache.org/jira/browse/FLINK-4293 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.2.0 > > > Several files contain this header: > {code} > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} > The correct header format should be: > {code} > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4293) Malformatted Apache Headers
[ https://issues.apache.org/jira/browse/FLINK-4293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422960#comment-15422960 ] ASF GitHub Bot commented on FLINK-4293: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2364 > Malformatted Apache Headers > --- > > Key: FLINK-4293 > URL: https://issues.apache.org/jira/browse/FLINK-4293 > Project: Flink > Issue Type: Bug >Affects Versions: 1.0.3 >Reporter: Stephan Ewen >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.2.0 > > > Several files contain this header: > {code} > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} > The correct header format should be: > {code} > /* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422969#comment-15422969 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74967737 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + pri
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74967737 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final Map workersInLaunc
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74967974 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + // --
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422970#comment-15422970 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74967974 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422974#comment-15422974 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74968309 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74968309 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, +* before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + // --
[GitHub] flink pull request #2364: [FLINK-4293] Fix malformatted license headers
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/2364 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422976#comment-15422976 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74968719 --- Diff: flink-dist/pom.xml --- @@ -113,8 +113,13 @@ under the License. flink-metrics-jmx ${project.version} + + + org.apache.flink + flink-mesos_2.10 + ${project.version} + --- End diff -- Thought about it but I don't see a good reason. Why that and not for the various connectors and other such non-core modules? I see Kinesis is special cased but for licensing reasons. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74968719 --- Diff: flink-dist/pom.xml --- @@ -113,8 +113,13 @@ under the License. flink-metrics-jmx ${project.version} + + + org.apache.flink + flink-mesos_2.10 + ${project.version} + --- End diff -- Thought about it but I don't see a good reason. Why that and not for the various connectors and other such non-core modules? I see Kinesis is special cased but for licensing reasons. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1)
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/2315 A high-level comment to reviewers, I strove to follow the YARN implementation closely, in anticipation of a future refactoring. The runner and RM are prime examples where consistency was emphasized. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422980#comment-15422980 ] ASF GitHub Bot commented on FLINK-1984: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/2315 A high-level comment to reviewers, I strove to follow the YARN implementation closely, in anticipation of a future refactoring. The runner and RM are prime examples where consistency was emphasized. > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422998#comment-15422998 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74971461 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + p
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74971461 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final Map workersInLau
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423014#comment-15423014 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74972977 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; + +import static java.util.Objects.requireNonNull; + +public class MesosTaskManagerParameters { --- End diff -- Java docs > Integrate Flink with Apache Mesos > - > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management >Reporter: Robert Metzger >Assignee: Eron Wright >Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74972631 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map workersInNew; + final Map workersInLau
[jira] [Commented] (FLINK-1984) Integrate Flink with Apache Mesos
[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423008#comment-15423008 ] ASF GitHub Bot commented on FLINK-1984: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74972631 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + p
[GitHub] flink pull request #2288: Feature/s3 a fix
Github user cresny commented on a diff in the pull request: https://github.com/apache/flink/pull/2288#discussion_r74974171 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java --- @@ -62,4 +64,34 @@ public void run() { throw asyncException.f0; } } + + /** +* Ensure that target path terminates with a new directory to be created by fs. If remoteURI does not specify a new +* directory, append local directory name. +* @param fs +* @param localPath +* @param remoteURI +* @return +* @throws IOException +*/ + protected static URI checkInitialDirectory(final FileSystem fs,final File localPath, final URI remoteURI) throws IOException { + if (localPath.isDirectory()) { + Path remotePath = new Path(remoteURI); + if (fs.exists(remotePath)) { + return new Path(remotePath,localPath.getName()).toUri(); + } + } + return remoteURI; + } + + protected static void copyFromLocalFile(final FileSystem fs, final File localPath, final URI remotePath) throws Exception { --- End diff -- It looks like that won't work as-is since `flink-hadoop-compatability` is not a dependency of either `flink-streaming-java` or `flink-yarn`. Maybe those two are entirely disjoint? I'll look a litter deeper. On another note-- the overloaded `FileSystem.copyFromLocal` used by Flink defaults to "overwrite=false". This does not have much of an impact in HDFS, but it carries real costs in S3, both from a performance and financial standpoint, and because of this S3 writes are typically a blind overwrite. Given the usage here -- savepoint backups and YARN staging -- it seems apropiate for the general case to be an overwrite = true. For the HDFS case it would incur a negligible check before write (rather than an optimistic write). It's a very minor code change even if it may warrant a longer explanation. Perhaps create a new issue? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2315: [FLINK-1984] Integrate Flink with Apache Mesos (T1...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74972977 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerParameters.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; + +import static java.util.Objects.requireNonNull; + +public class MesosTaskManagerParameters { --- End diff -- Java docs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2288: Feature/s3 a fix
Github user cresny commented on a diff in the pull request: https://github.com/apache/flink/pull/2288#discussion_r74982416 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java --- @@ -62,4 +64,34 @@ public void run() { throw asyncException.f0; } } + + /** +* Ensure that target path terminates with a new directory to be created by fs. If remoteURI does not specify a new +* directory, append local directory name. +* @param fs +* @param localPath +* @param remoteURI +* @return +* @throws IOException +*/ + protected static URI checkInitialDirectory(final FileSystem fs,final File localPath, final URI remoteURI) throws IOException { + if (localPath.isDirectory()) { + Path remotePath = new Path(remoteURI); + if (fs.exists(remotePath)) { + return new Path(remotePath,localPath.getName()).toUri(); + } + } + return remoteURI; + } + + protected static void copyFromLocalFile(final FileSystem fs, final File localPath, final URI remotePath) throws Exception { --- End diff -- Perhaps put it in `org.apache.flink.runtime.fs.util` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2359: [FLINK-4198] Replace org.apache.flink.streaming.api.windo...
Github user kishorekgarg commented on the issue: https://github.com/apache/flink/pull/2359 Sure. I will close it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2359: [FLINK-4198] Replace org.apache.flink.streaming.ap...
Github user kishorekgarg closed the pull request at: https://github.com/apache/flink/pull/2359 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4198) Replace org.apache.flink.streaming.api.windowing.time.Time with org.apache.flink.api.common.time.Time
[ https://issues.apache.org/jira/browse/FLINK-4198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423169#comment-15423169 ] ASF GitHub Bot commented on FLINK-4198: --- Github user kishorekgarg closed the pull request at: https://github.com/apache/flink/pull/2359 > Replace org.apache.flink.streaming.api.windowing.time.Time with > org.apache.flink.api.common.time.Time > - > > Key: FLINK-4198 > URL: https://issues.apache.org/jira/browse/FLINK-4198 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.0.0 >Reporter: Till Rohrmann > Fix For: 2.0.0 > > > Remove {{org.apache.flink.streaming.api.windowing.time.Time}} and replace it > with {{org.apache.flink.api.common.time.Time}} which resides in > {{flink-core}}. The latter is basically the copy of the former which has been > moved to {{flink-core}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4198) Replace org.apache.flink.streaming.api.windowing.time.Time with org.apache.flink.api.common.time.Time
[ https://issues.apache.org/jira/browse/FLINK-4198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423168#comment-15423168 ] ASF GitHub Bot commented on FLINK-4198: --- Github user kishorekgarg commented on the issue: https://github.com/apache/flink/pull/2359 Sure. I will close it. > Replace org.apache.flink.streaming.api.windowing.time.Time with > org.apache.flink.api.common.time.Time > - > > Key: FLINK-4198 > URL: https://issues.apache.org/jira/browse/FLINK-4198 > Project: Flink > Issue Type: Sub-task >Affects Versions: 1.0.0 >Reporter: Till Rohrmann > Fix For: 2.0.0 > > > Remove {{org.apache.flink.streaming.api.windowing.time.Time}} and replace it > with {{org.apache.flink.api.common.time.Time}} which resides in > {{flink-core}}. The latter is basically the copy of the former which has been > moved to {{flink-core}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2353: [FLINK-4355] [cluster management] Implement TaskManager s...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2353 I addressed the comments by @tillrohrmann and @wenlong88 - turning the registration object into a reusable utility that can also be used for the JobMaster registration - I made the registration object behave more like a cancelable future. That way, it needs not use the TaskExecutor's main-thread-execution-context for its retries. This looks actually pretty nice now. Will merge this to allow other people to build on top of it. Will provide tests ASAP. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4355) Implement TaskManager side of registration at ResourceManager
[ https://issues.apache.org/jira/browse/FLINK-4355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423229#comment-15423229 ] ASF GitHub Bot commented on FLINK-4355: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2353 I addressed the comments by @tillrohrmann and @wenlong88 - turning the registration object into a reusable utility that can also be used for the JobMaster registration - I made the registration object behave more like a cancelable future. That way, it needs not use the TaskExecutor's main-thread-execution-context for its retries. This looks actually pretty nice now. Will merge this to allow other people to build on top of it. Will provide tests ASAP. > Implement TaskManager side of registration at ResourceManager > - > > Key: FLINK-4355 > URL: https://issues.apache.org/jira/browse/FLINK-4355 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Stephan Ewen > > If the {{TaskManager}} is unregistered, it should try and register at the > {{ResourceManager}} leader. The registration messages are fenced via the > {{RmLeaderID}}. > The ResourceManager may acknowledge the registration (or respond that the > TaskManager is AlreadyRegistered) or refuse the registration. > Upon registration refusal, the TaskManager may have to kill itself. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4402) Wrong metrics parameter names in documentation
[ https://issues.apache.org/jira/browse/FLINK-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423241#comment-15423241 ] Chesnay Schepler commented on FLINK-4402: - the documentation should be adjusted. > Wrong metrics parameter names in documentation > --- > > Key: FLINK-4402 > URL: https://issues.apache.org/jira/browse/FLINK-4402 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.1 > Environment: all >Reporter: RWenden >Priority: Trivial > Fix For: 1.1.2 > > Original Estimate: 4h > Remaining Estimate: 4h > > On the page > https://ci.apache.org/projects/flink/flink-docs-master/apis/metrics.html > the following metrics parameters should be > faulty: metrics.scope.tm.task , should be metrics.scope.task > faulty: metrics.scope.tm.operator , should be metrics.scope.operator > to make it work on Flink 1.1.1. > But to fix this, the constants in ConfigConstants.java can also be changed to > fit the documentation. Either way... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4403) RPC proxy classloading should use Flink class' classloader
Stephan Ewen created FLINK-4403: --- Summary: RPC proxy classloading should use Flink class' classloader Key: FLINK-4403 URL: https://issues.apache.org/jira/browse/FLINK-4403 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Environment: FLIP-6 feature branch Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.2.0 The RPC service's proxies use the {{ClassLoader.getSystemClassLoader()}} for all reflective classloading. In settings where Flink runs embedded, the Flink framework classes may not be in the System classloader, but for example in the classloader of an OSGI bundle. It is hence better to use the classloader of a Flink Framework class. In most cases, that will be the system classloader, in other cases it will be the classloader for the Flink code bundle: {{RpcService.class.getClassLoader()}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75010826 --- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml --- @@ -0,0 +1,104 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-activemq_2.10 + flink-connector-activemq + + jar + + + + 5.11.1 --- End diff -- Good point. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ
[ https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423313#comment-15423313 ] ASF GitHub Bot commented on FLINK-3298: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75010826 --- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml --- @@ -0,0 +1,104 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-activemq_2.10 + flink-connector-activemq + + jar + + + + 5.11.1 --- End diff -- Good point. > Streaming connector for ActiveMQ > > > Key: FLINK-3298 > URL: https://issues.apache.org/jira/browse/FLINK-3298 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mohit Sethi >Assignee: Ivan Mushketyk >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75010803 --- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml --- @@ -0,0 +1,104 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-activemq_2.10 + flink-connector-activemq + + jar + + + + 5.11.1 + + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + provided + + + + org.apache.activemq + activemq-client + ${activemq.version} + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + test-jar + test + + + + org.apache.flink + flink-test-utils_2.10 + ${project.version} + test + + + + org.apache.flink + flink-tests_2.10 + ${project.version} + test-jar + test + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test-jar + test + + + org.apache.activemq.tooling + activemq-junit + 5.13.1 --- End diff -- Good point. Fixed it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ
[ https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423312#comment-15423312 ] ASF GitHub Bot commented on FLINK-3298: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75010803 --- Diff: flink-streaming-connectors/flink-connector-activemq/pom.xml --- @@ -0,0 +1,104 @@ + + +http://maven.apache.org/POM/4.0.0"; +xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; +xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> + + 4.0.0 + + + org.apache.flink + flink-streaming-connectors + 1.1-SNAPSHOT + .. + + + flink-connector-activemq_2.10 + flink-connector-activemq + + jar + + + + 5.11.1 + + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + provided + + + + org.apache.activemq + activemq-client + ${activemq.version} + + + + + org.apache.flink + flink-streaming-java_2.10 + ${project.version} + test-jar + test + + + + org.apache.flink + flink-test-utils_2.10 + ${project.version} + test + + + + org.apache.flink + flink-tests_2.10 + ${project.version} + test-jar + test + + + + org.apache.flink + flink-runtime_2.10 + ${project.version} + test-jar + test + + + org.apache.activemq.tooling + activemq-junit + 5.13.1 --- End diff -- Good point. Fixed it. > Streaming connector for ActiveMQ > > > Key: FLINK-3298 > URL: https://issues.apache.org/jira/browse/FLINK-3298 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mohit Sethi >Assignee: Ivan Mushketyk >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-4380) Introduce KeyGroupAssigner and Max-Parallelism Parameter
[ https://issues.apache.org/jira/browse/FLINK-4380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-4380: --- Assignee: Aljoscha Krettek > Introduce KeyGroupAssigner and Max-Parallelism Parameter > > > Key: FLINK-4380 > URL: https://issues.apache.org/jira/browse/FLINK-4380 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > > For key-group sharding we need to introduce a {{KeyGroupAssigner}} that > assigns key hashes to key-groups (or shards). Also, this issue is for > tracking the addition of a {{max-parallelism}} parameter for tracking the > number of key groups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4381) Refactor State to Prepare For Key-Group State Backends
[ https://issues.apache.org/jira/browse/FLINK-4381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-4381: Assignee: Stefan Richter > Refactor State to Prepare For Key-Group State Backends > -- > > Key: FLINK-4381 > URL: https://issues.apache.org/jira/browse/FLINK-4381 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Stefan Richter > > In order to use the new {{KeyGroupAssigner}}/{{key group sharding}} the state > backends need no be able to deal with key groups. For this, we first need to > refactor the state abstractions. Specifically, this touches how key-grouped > state should be stored and also how state is sent to the > {{CheckpointCoordinator}} and how it is stored. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4380) Introduce KeyGroupAssigner and Max-Parallelism Parameter
[ https://issues.apache.org/jira/browse/FLINK-4380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-4380: Assignee: Stefan Richter (was: Aljoscha Krettek) > Introduce KeyGroupAssigner and Max-Parallelism Parameter > > > Key: FLINK-4380 > URL: https://issues.apache.org/jira/browse/FLINK-4380 > Project: Flink > Issue Type: Sub-task > Components: Streaming >Reporter: Aljoscha Krettek >Assignee: Stefan Richter > > For key-group sharding we need to introduce a {{KeyGroupAssigner}} that > assigns key hashes to key-groups (or shards). Also, this issue is for > tracking the addition of a {{max-parallelism}} parameter for tracking the > number of key groups. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ
[ https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423315#comment-15423315 ] ASF GitHub Bot commented on FLINK-3298: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011140 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * Sink class for writing data into ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * Every input message is converted into a byte array using a serialization + * schema and being sent into a message queue. + * + * @param type of input messages + */ +public class AMQSink extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final SerializationSchema serializationSchema; + private boolean logFailuresOnly = false; + private transient MessageProducer producer; + private transient Session session; + private transient Connection connection; + + /** +* Create AMQSink. +* +* @param connectionFactory factory for creating ActiveMQ connection +* @param queueName name of a queue to write to +* @param serializationSchema schema to serialize input message into a byte array +*/ + public AMQSink(ActiveMQConnectionFactory connectionFactory, String queueName, SerializationSchema serializationSchema) { + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.serializationSchema = serializationSchema; + } + + /** +* Defines whether the producer should fail on errors, or only log them. +* If this is set to true, then exceptions will be only logged, if set to false, +* exceptions will be eventually thrown and cause the streaming program to +* fail (and enter recovery). +* +* @param logFailuresOnly The flag to indicate logging-only on exceptions. +*/ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + Destination destination = session.createQueue(queueName); --- End diff -- This does not necessarily create a queue on the ActiveMQ side. If it already exists it will be used. > Streaming connector for ActiveMQ > --
[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011140 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * Sink class for writing data into ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * Every input message is converted into a byte array using a serialization + * schema and being sent into a message queue. + * + * @param type of input messages + */ +public class AMQSink extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final SerializationSchema serializationSchema; + private boolean logFailuresOnly = false; + private transient MessageProducer producer; + private transient Session session; + private transient Connection connection; + + /** +* Create AMQSink. +* +* @param connectionFactory factory for creating ActiveMQ connection +* @param queueName name of a queue to write to +* @param serializationSchema schema to serialize input message into a byte array +*/ + public AMQSink(ActiveMQConnectionFactory connectionFactory, String queueName, SerializationSchema serializationSchema) { + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.serializationSchema = serializationSchema; + } + + /** +* Defines whether the producer should fail on errors, or only log them. +* If this is set to true, then exceptions will be only logged, if set to false, +* exceptions will be eventually thrown and cause the streaming program to +* fail (and enter recovery). +* +* @param logFailuresOnly The flag to indicate logging-only on exceptions. +*/ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + Destination destination = session.createQueue(queueName); --- End diff -- This does not necessarily create a queue on the ActiveMQ side. If it already exists it will be used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JI
[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011407 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * Sink class for writing data into ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * Every input message is converted into a byte array using a serialization + * schema and being sent into a message queue. + * + * @param type of input messages + */ +public class AMQSink extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final SerializationSchema serializationSchema; + private boolean logFailuresOnly = false; + private transient MessageProducer producer; + private transient Session session; + private transient Connection connection; + + /** +* Create AMQSink. +* +* @param connectionFactory factory for creating ActiveMQ connection +* @param queueName name of a queue to write to +* @param serializationSchema schema to serialize input message into a byte array +*/ + public AMQSink(ActiveMQConnectionFactory connectionFactory, String queueName, SerializationSchema serializationSchema) { + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.serializationSchema = serializationSchema; + } + + /** +* Defines whether the producer should fail on errors, or only log them. +* If this is set to true, then exceptions will be only logged, if set to false, +* exceptions will be eventually thrown and cause the streaming program to +* fail (and enter recovery). +* +* @param logFailuresOnly The flag to indicate logging-only on exceptions. +*/ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + Destination destination = session.createQueue(queueName); --- End diff -- I'll add support for topics. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011452 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * Sink class for writing data into ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * Every input message is converted into a byte array using a serialization + * schema and being sent into a message queue. + * + * @param type of input messages + */ +public class AMQSink extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final SerializationSchema serializationSchema; + private boolean logFailuresOnly = false; + private transient MessageProducer producer; + private transient Session session; + private transient Connection connection; + + /** +* Create AMQSink. +* +* @param connectionFactory factory for creating ActiveMQ connection +* @param queueName name of a queue to write to +* @param serializationSchema schema to serialize input message into a byte array +*/ + public AMQSink(ActiveMQConnectionFactory connectionFactory, String queueName, SerializationSchema serializationSchema) { + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.serializationSchema = serializationSchema; + } + + /** +* Defines whether the producer should fail on errors, or only log them. +* If this is set to true, then exceptions will be only logged, if set to false, +* exceptions will be eventually thrown and cause the streaming program to +* fail (and enter recovery). +* +* @param logFailuresOnly The flag to indicate logging-only on exceptions. +*/ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + Destination destination = session.createQueue(queueName); + + // Create a MessageProducer from the Session to the Topic or + // Queue + producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); --- End diff -- Good point. Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not
[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ
[ https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423318#comment-15423318 ] ASF GitHub Bot commented on FLINK-3298: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011407 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * Sink class for writing data into ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * Every input message is converted into a byte array using a serialization + * schema and being sent into a message queue. + * + * @param type of input messages + */ +public class AMQSink extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final SerializationSchema serializationSchema; + private boolean logFailuresOnly = false; + private transient MessageProducer producer; + private transient Session session; + private transient Connection connection; + + /** +* Create AMQSink. +* +* @param connectionFactory factory for creating ActiveMQ connection +* @param queueName name of a queue to write to +* @param serializationSchema schema to serialize input message into a byte array +*/ + public AMQSink(ActiveMQConnectionFactory connectionFactory, String queueName, SerializationSchema serializationSchema) { + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.serializationSchema = serializationSchema; + } + + /** +* Defines whether the producer should fail on errors, or only log them. +* If this is set to true, then exceptions will be only logged, if set to false, +* exceptions will be eventually thrown and cause the streaming program to +* fail (and enter recovery). +* +* @param logFailuresOnly The flag to indicate logging-only on exceptions. +*/ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + Destination destination = session.createQueue(queueName); --- End diff -- I'll add support for topics. > Streaming connector for ActiveMQ > > > Key: FLINK-3298 >
[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ
[ https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423319#comment-15423319 ] ASF GitHub Bot commented on FLINK-3298: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011452 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * Sink class for writing data into ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * Every input message is converted into a byte array using a serialization + * schema and being sent into a message queue. + * + * @param type of input messages + */ +public class AMQSink extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final SerializationSchema serializationSchema; + private boolean logFailuresOnly = false; + private transient MessageProducer producer; + private transient Session session; + private transient Connection connection; + + /** +* Create AMQSink. +* +* @param connectionFactory factory for creating ActiveMQ connection +* @param queueName name of a queue to write to +* @param serializationSchema schema to serialize input message into a byte array +*/ + public AMQSink(ActiveMQConnectionFactory connectionFactory, String queueName, SerializationSchema serializationSchema) { + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.serializationSchema = serializationSchema; + } + + /** +* Defines whether the producer should fail on errors, or only log them. +* If this is set to true, then exceptions will be only logged, if set to false, +* exceptions will be eventually thrown and cause the streaming program to +* fail (and enter recovery). +* +* @param logFailuresOnly The flag to indicate logging-only on exceptions. +*/ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + Destination destination = session.createQueue(queueName); + + // Create a MessageProducer from the Session to the Topic or + // Queue + producer = session.createProducer(destination);
[jira] [Assigned] (FLINK-3761) Introduce key group state backend
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-3761: --- Assignee: Aljoscha Krettek (was: Till Rohrmann) > Introduce key group state backend > - > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-3761) Refactor State Backends/Make Keyed State Key-Group Aware
[ https://issues.apache.org/jira/browse/FLINK-3761?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-3761: Summary: Refactor State Backends/Make Keyed State Key-Group Aware (was: Introduce key group state backend) > Refactor State Backends/Make Keyed State Key-Group Aware > > > Key: FLINK-3761 > URL: https://issues.apache.org/jira/browse/FLINK-3761 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing >Reporter: Till Rohrmann >Assignee: Aljoscha Krettek > > After an off-line discussion with [~aljoscha], we came to the conclusion that > it would be beneficial to reflect the differences between a keyed and a > non-keyed stream also in the state backends. A state backend which is used > for a keyed stream offers a value, list, folding and value state and has to > group its keys into key groups. > A state backend for non-keyed streams can only offer a union state to make it > work with dynamic scaling. A union state is a state which is broadcasted to > all tasks in case of a recovery. The state backends can then select what > information they need to recover from the whole state (formerly distributed). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011941 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; + +/** + * Source for reading messages from an ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * This source is waiting for incoming messages from ActiveMQ and converts them from + * an array of bytes into an instance of the output type. If an incoming + * message is not a message with an array of bytes, this message is ignored + * and warning message is logged. + * + * @param type of output messages + */ +public class AMQSource extends MessageAcknowledgingSourceBase + implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(AMQSource.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final DeserializationSchema deserializationSchema; + private boolean logFailuresOnly = false; + private RunningChecker runningChecker; + private transient Connection connection; + private transient Session session; + private transient MessageConsumer consumer; + private boolean autoAck; + private HashMap unaknowledgedMessages = new HashMap<>(); + + /** +* Create AMQSource. +* +* @param connectionFactory factory that will be used to create a connection with ActiveMQ +* @param queueName name of an ActiveMQ queue to read from +* @param deserializationSchema schema to deserialize incoming messages +*/ + public AMQSource(ActiveMQConnectionFactory connectionFactory, String queueName, DeserializationSchema deserializationSchema) { + this(connectionFactory, queueName, deserializationSchema, new RunningCheckerImpl()); + } + + /** +* Create AMQSource. +* +* @param connectionFactory factory that will be used to create a connection with ActiveMQ +* @param queueName name of an ActiveMQ queue to read from +* @param deserializationSchema schema to deserialize incoming messages +* @param runningChecker running checker that is used to decide if the source is still running +*/ + AMQSource(ActiveMQConnectionFactory connectionFactory, String queueName, DeserializationSchema deserializationSchema, RunningChecker runningChecker) { + super(String.class); + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.deserializationSchema = deserializationSchema
[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011880 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; + +/** + * Source for reading messages from an ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * This source is waiting for incoming messages from ActiveMQ and converts them from + * an array of bytes into an instance of the output type. If an incoming + * message is not a message with an array of bytes, this message is ignored + * and warning message is logged. + * + * @param type of output messages + */ +public class AMQSource extends MessageAcknowledgingSourceBase + implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(AMQSource.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final DeserializationSchema deserializationSchema; + private boolean logFailuresOnly = false; + private RunningChecker runningChecker; + private transient Connection connection; + private transient Session session; + private transient MessageConsumer consumer; + private boolean autoAck; + private HashMap unaknowledgedMessages = new HashMap<>(); + + /** +* Create AMQSource. +* +* @param connectionFactory factory that will be used to create a connection with ActiveMQ +* @param queueName name of an ActiveMQ queue to read from +* @param deserializationSchema schema to deserialize incoming messages +*/ + public AMQSource(ActiveMQConnectionFactory connectionFactory, String queueName, DeserializationSchema deserializationSchema) { + this(connectionFactory, queueName, deserializationSchema, new RunningCheckerImpl()); + } + + /** +* Create AMQSource. +* +* @param connectionFactory factory that will be used to create a connection with ActiveMQ +* @param queueName name of an ActiveMQ queue to read from +* @param deserializationSchema schema to deserialize incoming messages +* @param runningChecker running checker that is used to decide if the source is still running +*/ + AMQSource(ActiveMQConnectionFactory connectionFactory, String queueName, DeserializationSchema deserializationSchema, RunningChecker runningChecker) { + super(String.class); + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.deserializationSchema = deserializationSchema
[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ
[ https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423325#comment-15423325 ] ASF GitHub Bot commented on FLINK-3298: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011880 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; + +/** + * Source for reading messages from an ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * This source is waiting for incoming messages from ActiveMQ and converts them from + * an array of bytes into an instance of the output type. If an incoming + * message is not a message with an array of bytes, this message is ignored + * and warning message is logged. + * + * @param type of output messages + */ +public class AMQSource extends MessageAcknowledgingSourceBase + implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(AMQSource.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final DeserializationSchema deserializationSchema; + private boolean logFailuresOnly = false; + private RunningChecker runningChecker; + private transient Connection connection; + private transient Session session; + private transient MessageConsumer consumer; + private boolean autoAck; + private HashMap unaknowledgedMessages = new HashMap<>(); + + /** +* Create AMQSource. +* +* @param connectionFactory factory that will be used to create a connection with ActiveMQ +* @param queueName name of an ActiveMQ queue to read from +* @param deserializationSchema schema to deserialize incoming messages +*/ + public AMQSource(ActiveMQConnectionFactory connectionFactory, String queueName, DeserializationSchema deserializationSchema) { + this(connectionFactory, queueName, deserializationSchema, new RunningCheckerImpl()); + } + + /** +* Create AMQSource. +* +* @param connectionFactory factory that will be used to create a connection with ActiveMQ +* @param queueName name of an ActiveMQ queue to read from +* @param deserializationSchema schema to deserialize incoming messages +* @param runningChecker running checker that is used to decide if the source is still running +*/ + AMQSource(ActiveMQConnectionFactory connectionFactory, String queueName, DeserializationSchema deserial
[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ
[ https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423327#comment-15423327 ] ASF GitHub Bot commented on FLINK-3298: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75011941 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSource.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; + +/** + * Source for reading messages from an ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * This source is waiting for incoming messages from ActiveMQ and converts them from + * an array of bytes into an instance of the output type. If an incoming + * message is not a message with an array of bytes, this message is ignored + * and warning message is logged. + * + * @param type of output messages + */ +public class AMQSource extends MessageAcknowledgingSourceBase + implements ResultTypeQueryable { + + private static final Logger LOG = LoggerFactory.getLogger(AMQSource.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final DeserializationSchema deserializationSchema; + private boolean logFailuresOnly = false; + private RunningChecker runningChecker; + private transient Connection connection; + private transient Session session; + private transient MessageConsumer consumer; + private boolean autoAck; + private HashMap unaknowledgedMessages = new HashMap<>(); + + /** +* Create AMQSource. +* +* @param connectionFactory factory that will be used to create a connection with ActiveMQ +* @param queueName name of an ActiveMQ queue to read from +* @param deserializationSchema schema to deserialize incoming messages +*/ + public AMQSource(ActiveMQConnectionFactory connectionFactory, String queueName, DeserializationSchema deserializationSchema) { + this(connectionFactory, queueName, deserializationSchema, new RunningCheckerImpl()); + } + + /** +* Create AMQSource. +* +* @param connectionFactory factory that will be used to create a connection with ActiveMQ +* @param queueName name of an ActiveMQ queue to read from +* @param deserializationSchema schema to deserialize incoming messages +* @param runningChecker running checker that is used to decide if the source is still running +*/ + AMQSource(ActiveMQConnectionFactory connectionFactory, String queueName, DeserializationSchema deserial
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423358#comment-15423358 ] ASF GitHub Bot commented on FLINK-3950: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2374#discussion_r75017658 --- Diff: flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java --- @@ -245,6 +246,50 @@ public void testHistogramReporting() throws Exception { } } + /** +* Tests that histograms are properly reported via the JMXReporter. --- End diff -- meters instead of histograms > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2374: [FLINK-3950] Add Meter Metric Type
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2374#discussion_r75017658 --- Diff: flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java --- @@ -245,6 +246,50 @@ public void testHistogramReporting() throws Exception { } } + /** +* Tests that histograms are properly reported via the JMXReporter. --- End diff -- meters instead of histograms --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2374: [FLINK-3950] Add Meter Metric Type
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2374#discussion_r75018225 --- Diff: flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java --- @@ -261,6 +327,40 @@ public long getMin() { } } + public static class TestingMeter implements Meter { --- End diff -- reuse the other TestingMeter class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423364#comment-15423364 ] ASF GitHub Bot commented on FLINK-3950: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2374#discussion_r75018180 --- Diff: flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java --- @@ -297,4 +342,42 @@ public long getMin() { }; } } + + static class TestingMeter implements Meter { --- End diff -- let's put this in a separate utility test class under `flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util` > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2374: [FLINK-3950] Add Meter Metric Type
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2374#discussion_r75018180 --- Diff: flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/metrics/jmx/JMXReporterTest.java --- @@ -297,4 +342,42 @@ public long getMin() { }; } } + + static class TestingMeter implements Meter { --- End diff -- let's put this in a separate utility test class under `flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423365#comment-15423365 ] ASF GitHub Bot commented on FLINK-3950: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2374#discussion_r75018225 --- Diff: flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java --- @@ -261,6 +327,40 @@ public long getMin() { } } + public static class TestingMeter implements Meter { --- End diff -- reuse the other TestingMeter class > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423388#comment-15423388 ] ASF GitHub Bot commented on FLINK-3950: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/2374 Good work! Only found a minor issue in regards to the TestingMeter class. There is one thing I would like to start a discussion on however. Right now the Meter interface is essentially a copy of the DropWizard Meter interface, which is a fair starting point. Now my question is: does this fit our use case? For example, should we really mandate exponentially weighed rate (according to the javadocs), or do we really need the 15 minute rate? > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2374: [FLINK-3950] Add Meter Metric Type
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2374 Good work! Only found a minor issue in regards to the TestingMeter class. There is one thing I would like to start a discussion on however. Right now the Meter interface is essentially a copy of the DropWizard Meter interface, which is a fair starting point. Now my question is: does this fit our use case? For example, should we really mandate exponentially weighed rate (according to the javadocs), or do we really need the 15 minute rate? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4402) Wrong metrics parameter names in documentation
[ https://issues.apache.org/jira/browse/FLINK-4402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423395#comment-15423395 ] Neelesh Srinivas Salian commented on FLINK-4402: If you don't mind, shall I work on this JIRA? > Wrong metrics parameter names in documentation > --- > > Key: FLINK-4402 > URL: https://issues.apache.org/jira/browse/FLINK-4402 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.1 > Environment: all >Reporter: RWenden >Priority: Trivial > Fix For: 1.1.2 > > Original Estimate: 4h > Remaining Estimate: 4h > > On the page > https://ci.apache.org/projects/flink/flink-docs-master/apis/metrics.html > the following metrics parameters should be > faulty: metrics.scope.tm.task , should be metrics.scope.task > faulty: metrics.scope.tm.operator , should be metrics.scope.operator > to make it work on Flink 1.1.1. > But to fix this, the constants in ConfigConstants.java can also be changed to > fit the documentation. Either way... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2314: [FLINK-3298] Implement ActiveMQ streaming connecto...
Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75024557 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * Sink class for writing data into ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * Every input message is converted into a byte array using a serialization + * schema and being sent into a message queue. + * + * @param type of input messages + */ +public class AMQSink extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final SerializationSchema serializationSchema; + private boolean logFailuresOnly = false; + private transient MessageProducer producer; + private transient Session session; + private transient Connection connection; + + /** +* Create AMQSink. +* +* @param connectionFactory factory for creating ActiveMQ connection +* @param queueName name of a queue to write to +* @param serializationSchema schema to serialize input message into a byte array +*/ + public AMQSink(ActiveMQConnectionFactory connectionFactory, String queueName, SerializationSchema serializationSchema) { + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.serializationSchema = serializationSchema; + } + + /** +* Defines whether the producer should fail on errors, or only log them. +* If this is set to true, then exceptions will be only logged, if set to false, +* exceptions will be eventually thrown and cause the streaming program to +* fail (and enter recovery). +* +* @param logFailuresOnly The flag to indicate logging-only on exceptions. +*/ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); --- End diff -- As far as I understand AUTO_ACKNOWLEDGE is relevant only for consuming (AMQSource), but not for a Sink. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2314: [FLINK-3298] Implement ActiveMQ streaming connector
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2314 @rmetzger I've updated the PR according to your review. Could you please take another look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ
[ https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423415#comment-15423415 ] ASF GitHub Bot commented on FLINK-3298: --- Github user mushketyk commented on a diff in the pull request: https://github.com/apache/flink/pull/2314#discussion_r75024557 --- Diff: flink-streaming-connectors/flink-connector-activemq/src/main/java/org/apache/flink/streaming/connectors/activemq/AMQSink.java --- @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.activemq; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +/** + * Sink class for writing data into ActiveMQ queue. + * + * To create an instance of AMQSink class one should initialize and configure an + * instance of a connection factory that will be used to create a connection. + * Every input message is converted into a byte array using a serialization + * schema and being sent into a message queue. + * + * @param type of input messages + */ +public class AMQSink extends RichSinkFunction { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(AMQSink.class); + + private final ActiveMQConnectionFactory connectionFactory; + private final String queueName; + private final SerializationSchema serializationSchema; + private boolean logFailuresOnly = false; + private transient MessageProducer producer; + private transient Session session; + private transient Connection connection; + + /** +* Create AMQSink. +* +* @param connectionFactory factory for creating ActiveMQ connection +* @param queueName name of a queue to write to +* @param serializationSchema schema to serialize input message into a byte array +*/ + public AMQSink(ActiveMQConnectionFactory connectionFactory, String queueName, SerializationSchema serializationSchema) { + this.connectionFactory = connectionFactory; + this.queueName = queueName; + this.serializationSchema = serializationSchema; + } + + /** +* Defines whether the producer should fail on errors, or only log them. +* If this is set to true, then exceptions will be only logged, if set to false, +* exceptions will be eventually thrown and cause the streaming program to +* fail (and enter recovery). +* +* @param logFailuresOnly The flag to indicate logging-only on exceptions. +*/ + public void setLogFailuresOnly(boolean logFailuresOnly) { + this.logFailuresOnly = logFailuresOnly; + } + + + @Override + public void open(Configuration config) throws Exception { + super.open(config); + // Create a Connection + connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); --- End diff -- As far as I understand AUTO_ACKNOWLEDGE is relevant only for consuming (AMQSource), but not for a Sink. > Streaming connector for ActiveMQ > > > Key: FLINK-3298 > URL: https://issues.apache.org/jira/browse/FLINK-32
[jira] [Commented] (FLINK-3298) Streaming connector for ActiveMQ
[ https://issues.apache.org/jira/browse/FLINK-3298?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423417#comment-15423417 ] ASF GitHub Bot commented on FLINK-3298: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2314 @rmetzger I've updated the PR according to your review. Could you please take another look? > Streaming connector for ActiveMQ > > > Key: FLINK-3298 > URL: https://issues.apache.org/jira/browse/FLINK-3298 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors >Reporter: Mohit Sethi >Assignee: Ivan Mushketyk >Priority: Minor > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2374: [FLINK-3950] Add Meter Metric Type
Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2374 @zentol Thank you for reviewing my PR! I'll fix these issues as soon as I can. On the metrics interface. Could you please elaborate why do you think exponentially weighted rate and 15 minute rate may be useless for us? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3950) Add Meter Metric Type
[ https://issues.apache.org/jira/browse/FLINK-3950?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423463#comment-15423463 ] ASF GitHub Bot commented on FLINK-3950: --- Github user mushketyk commented on the issue: https://github.com/apache/flink/pull/2374 @zentol Thank you for reviewing my PR! I'll fix these issues as soon as I can. On the metrics interface. Could you please elaborate why do you think exponentially weighted rate and 15 minute rate may be useless for us? > Add Meter Metric Type > - > > Key: FLINK-3950 > URL: https://issues.apache.org/jira/browse/FLINK-3950 > Project: Flink > Issue Type: Sub-task > Components: Core >Reporter: Stephan Ewen >Assignee: Ivan Mushketyk > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4404) Implement Data Transfer SSL
Suresh Krishnappa created FLINK-4404: Summary: Implement Data Transfer SSL Key: FLINK-4404 URL: https://issues.apache.org/jira/browse/FLINK-4404 Project: Flink Issue Type: Sub-task Reporter: Suresh Krishnappa This issue is to address part T3-3 (Implement Data Transfer TLS/SSL) of the Secure Data Access design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-4405) Implement Blob Server SSL
Suresh Krishnappa created FLINK-4405: Summary: Implement Blob Server SSL Key: FLINK-4405 URL: https://issues.apache.org/jira/browse/FLINK-4405 Project: Flink Issue Type: Sub-task Reporter: Suresh Krishnappa This issue is to address part T3-4 (Implement Blob Server TLS/SSL) of the Secure Data Access design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4404) Implement Data Transfer SSL
[ https://issues.apache.org/jira/browse/FLINK-4404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Suresh Krishnappa updated FLINK-4404: - Description: This issue is to address part T3-3 (Implement Data Transfer TLS/SSL) of the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc. (was: This issue is to address part T3-3 (Implement Data Transfer TLS/SSL) of the Secure Data Access design doc.) > Implement Data Transfer SSL > --- > > Key: FLINK-4404 > URL: https://issues.apache.org/jira/browse/FLINK-4404 > Project: Flink > Issue Type: Sub-task >Reporter: Suresh Krishnappa > Labels: security > > This issue is to address part T3-3 (Implement Data Transfer TLS/SSL) of the > [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4405) Implement Blob Server SSL
[ https://issues.apache.org/jira/browse/FLINK-4405?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Suresh Krishnappa updated FLINK-4405: - Description: This issue is to address part T3-4 (Implement Blob Server TLS/SSL) of the [Secure Data Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] design doc. was: This issue is to address part T3-4 (Implement Blob Server TLS/SSL) of the Secure Data Access design doc. > Implement Blob Server SSL > - > > Key: FLINK-4405 > URL: https://issues.apache.org/jira/browse/FLINK-4405 > Project: Flink > Issue Type: Sub-task >Reporter: Suresh Krishnappa > Labels: security > > This issue is to address part T3-4 (Implement Blob Server TLS/SSL) of the > [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"
[ https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423949#comment-15423949 ] ASF GitHub Bot commented on FLINK-4253: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2342#discussion_r75065679 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -77,7 +77,7 @@ /** * Shutdown hook thread to ensure deletion of the storage directory (or null if -* the configured recovery mode does not equal{@link RecoveryMode#STANDALONE}) +* the configured recovery mode does not equal{@link RecoveryMode#NONE}) --- End diff -- Ok > Rename "recovery.mode" config key to "high-availability" > > > Key: FLINK-4253 > URL: https://issues.apache.org/jira/browse/FLINK-4253 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi >Assignee: ramkrishna.s.vasudevan > > Currently, HA is configured via the following configuration keys: > {code} > recovery.mode: STANDALONE // No high availability (HA) > recovery.mode: ZOOKEEPER // HA > {code} > This could be more straight forward by simply renaming the key to > {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We > already have standalone cluster mode. > {code} > high-availability: NONE // No HA > high-availability: ZOOKEEPER // HA via ZooKeeper > {code} > The {{recovery.mode}} configuration keys would have to be deprecated before > completely removing them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2342: FLINK-4253 - Rename "recovery.mode" config key to ...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2342#discussion_r75065676 --- Diff: docs/setup/config.md --- @@ -285,7 +285,7 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use ## High Availability Mode -- `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'standalone' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value. +- `high-availability`: (Default 'none') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'none' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value. Previously this config was named 'recovery.mode' and the default config was 'standalone'. - `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected --- End diff -- Oh yes. Will change them too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4253) Rename "recovery.mode" config key to "high-availability"
[ https://issues.apache.org/jira/browse/FLINK-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423948#comment-15423948 ] ASF GitHub Bot commented on FLINK-4253: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2342#discussion_r75065676 --- Diff: docs/setup/config.md --- @@ -285,7 +285,7 @@ of the JobManager, because the same ActorSystem is used. Its not possible to use ## High Availability Mode -- `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'standalone' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value. +- `high-availability`: (Default 'none') Defines the recovery mode used for the cluster execution. Currently, Flink supports the 'none' mode where only a single JobManager runs and no JobManager state is checkpointed. The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value. Previously this config was named 'recovery.mode' and the default config was 'standalone'. - `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected --- End diff -- Oh yes. Will change them too. > Rename "recovery.mode" config key to "high-availability" > > > Key: FLINK-4253 > URL: https://issues.apache.org/jira/browse/FLINK-4253 > Project: Flink > Issue Type: Improvement >Reporter: Ufuk Celebi >Assignee: ramkrishna.s.vasudevan > > Currently, HA is configured via the following configuration keys: > {code} > recovery.mode: STANDALONE // No high availability (HA) > recovery.mode: ZOOKEEPER // HA > {code} > This could be more straight forward by simply renaming the key to > {{high-availability}}. Furthermore, the term {{STANDALONE}} is overloaded. We > already have standalone cluster mode. > {code} > high-availability: NONE // No HA > high-availability: ZOOKEEPER // HA via ZooKeeper > {code} > The {{recovery.mode}} configuration keys would have to be deprecated before > completely removing them. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2342: FLINK-4253 - Rename "recovery.mode" config key to ...
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2342#discussion_r75065679 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java --- @@ -77,7 +77,7 @@ /** * Shutdown hook thread to ensure deletion of the storage directory (or null if -* the configured recovery mode does not equal{@link RecoveryMode#STANDALONE}) +* the configured recovery mode does not equal{@link RecoveryMode#NONE}) --- End diff -- Ok --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4406) Implement job master registration at resource manager
Wenlong Lyu created FLINK-4406: -- Summary: Implement job master registration at resource manager Key: FLINK-4406 URL: https://issues.apache.org/jira/browse/FLINK-4406 Project: Flink Issue Type: Sub-task Components: Cluster Management Reporter: Wenlong Lyu Job Master needs to register to Resource Manager when starting and then watches leadership changes of RM, and trigger re-registration. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection
[ https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423984#comment-15423984 ] Zhijiang Wang commented on FLINK-4021: -- Thank you for advice, it is indeed simple by 'return isStagedBuffer' directly. > Problem of setting autoread for netty channel when more tasks sharing the > same Tcp connection > - > > Key: FLINK-4021 > URL: https://issues.apache.org/jira/browse/FLINK-4021 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > More than one task sharing the same Tcp connection for shuffling data. > If the downstream task said as "A" has no available memory segment to read > netty buffer from network, it will set autoread as false for the channel. > When the task A is failed or has available segments again, the netty handler > will be notified to process the staging buffers first, then reset autoread as > true. But in some scenarios, the autoread will not be set as true any more. > That is when processing staging buffers, first find the corresponding input > channel for the buffer, if the task for that input channel is failed, the > decodeMsg method in PartitionRequestClientHandler will return false, that > means setting autoread as true will not be done anymore. > In summary, if one task "A" sets the autoread as false because of no > available segments, and resulting in some staging buffers. If another task > "B" is failed by accident corresponding to one staging buffer. When task A > trys to reset autoread as true, the process can not work because of task B > failed. > I have fixed this problem in our application by adding one boolean parameter > in decodeBufferOrEvent method to distinguish whether this method is invoke by > netty IO thread channel read or staged message handler task in > PartitionRequestClientHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4021) Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection
[ https://issues.apache.org/jira/browse/FLINK-4021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423994#comment-15423994 ] Zhijiang Wang commented on FLINK-4021: -- Yeah, you are right. It is no problem in current flink failover mode. Maybe it is suitable for FLIP1 'https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures'. And we already improved the task failures by just restarting the source and failed ones, so noticed this issue. I forgot to add the testing for the function, thank you for confirming it. And what other jobs should I do next? > Problem of setting autoread for netty channel when more tasks sharing the > same Tcp connection > - > > Key: FLINK-4021 > URL: https://issues.apache.org/jira/browse/FLINK-4021 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination >Affects Versions: 1.0.2 >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang > > More than one task sharing the same Tcp connection for shuffling data. > If the downstream task said as "A" has no available memory segment to read > netty buffer from network, it will set autoread as false for the channel. > When the task A is failed or has available segments again, the netty handler > will be notified to process the staging buffers first, then reset autoread as > true. But in some scenarios, the autoread will not be set as true any more. > That is when processing staging buffers, first find the corresponding input > channel for the buffer, if the task for that input channel is failed, the > decodeMsg method in PartitionRequestClientHandler will return false, that > means setting autoread as true will not be done anymore. > In summary, if one task "A" sets the autoread as false because of no > available segments, and resulting in some staging buffers. If another task > "B" is failed by accident corresponding to one staging buffer. When task A > trys to reset autoread as true, the process can not work because of task B > failed. > I have fixed this problem in our application by adding one boolean parameter > in decodeBufferOrEvent method to distinguish whether this method is invoke by > netty IO thread channel read or staged message handler task in > PartitionRequestClientHandler. -- This message was sent by Atlassian JIRA (v6.3.4#6332)