[GitHub] storm issue #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-09-02 Thread vesense
Github user vesense commented on the issue:

https://github.com/apache/storm/pull/1665
  
Oh, I missed it. Will update. Thanks @priyank5485 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] storm-jms Code Donation

2016-09-02 Thread S G
I would love to see it part of storm/external and can volunteer to provide
any help if required during the migration.

-SG

On Fri, Sep 2, 2016 at 12:59 PM, P. Taylor Goetz  wrote:

> I’d like to start a discussion around adding storm-jms as an external
> module. I’ve gotten a few request to do so and the only thing that’s held
> me back is tracking down contributors and having them submit ICLAs.
>
> The code can be found here:
>
> https://github.com/ptgoetz/storm-jms
>
> If there’s support for adding it, I’ll track down contributors and handle
> the IP clearance process.
>
> -Taylor
>


[GitHub] storm issue #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on the issue:

https://github.com/apache/storm/pull/1642
  
I was able to review most of the code.  I skimmed the tests and didn't see 
anything strange. 

@revans2 knows that I will not be able to log on again for awhile.  I want 
to leave a note for others that, as my comments were either minor or were 
something we discussed, I don't want to hold up a merge due to my unresolved 
review comments should this change otherwise be ready to merge before I am back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77421416
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77421346
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
--- End diff --

OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77421324
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77421128
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
 ---
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+
+public class RunAsUserContainerLauncher extends ContainerLauncher {
+private final Map _conf;
+private final String _supervisorId;
+protected final ResourceIsolationInterface _resourceIsolationManager;
+
+public RunAsUserContainerLauncher(Map conf, String 
supervisorId, ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+_conf = conf;
+_supervisorId = supervisorId;
+_resourceIsolationManager = resourceIsolationManager;
+}
+
+@Override
+public Container launchContainer(int port, LocalAssignment assignment, 
LocalState state) throws IOException {
+Container container = new RunAsUserContainer(port, assignment, 
_conf, _supervisorId, state,
+_resourceIsolationManager, false);
+container.setup();
+container.launch();
+return container;
+}
+
+@Override
+public Container recoverContainer(int port, LocalAssignment 
assignment, LocalState state) throws IOException {
+Container container = null;
+try {
+container = new RunAsUserContainer(port, assignment, _conf, 
_supervisorId, state, 
+_resourceIsolationManager, true);
+} catch (ContainerRecoveryException e) {
+// We could not recover return null
+}
+return container;
+}
+
+@Override
+public Killable recoverContainer(String workerId) throws IOException {
+Container container = null;
+try {
+container = new RunAsUserContainer(workerId, _conf, 
_supervisorId, 
+_resourceIsolationManager);
+} catch (ContainerRecoveryException e) {
+// We could not recover return null
+}
+return container;
+}
--- End diff --

Yes, I botched my comment:  I wanted to suggest recoverContainer(p,a,s) 
could just call recoverContainer(w), but the two have different return types. 
So we should leave it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77419710
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -353,25 +350,21 @@ public LocalState nimbusTopoHistoryStateImpl(Map 
conf) throws IOException {
 }
 
 // we use this "weird" wrapper pattern temporarily for mocking in 
clojure test
-public static Map readSupervisorStormConf(Map conf, String stormId) 
throws IOException {
+public static Map readSupervisorStormConf(Map conf, String stormId) throws IOException {
 return _instance.readSupervisorStormConfImpl(conf, stormId);
 }
 
-public Map readSupervisorStormConfImpl(Map conf, String stormId) 
throws IOException {
+public Map readSupervisorStormConfImpl(Map conf, String stormId) throws IOException {
 String stormRoot = supervisorStormDistRoot(conf, stormId);
 String confPath = supervisorStormConfPath(stormRoot);
 return readSupervisorStormConfGivenPath(conf, confPath);
 }
 
 // we use this "weird" wrapper pattern temporarily for mocking in 
clojure test
-public static StormTopology readSupervisorTopology(Map conf, String 
stormId) throws IOException {
-return _instance.readSupervisorTopologyImpl(conf, stormId);
-}
-
-public StormTopology readSupervisorTopologyImpl(Map conf, String 
stormId) throws IOException {
+public static StormTopology readSupervisorTopology(Map conf, String 
stormId, AdvancedFSOps ops) throws IOException {
--- End diff --

Agreed. The comment describes a pattern that this change removes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77418874
  
--- Diff: storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
---
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ * TODO once we have replaced the original supervisor merge this with
+ * Localizer and optimize them
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+/**
+ * A future that has already completed.
+ */
+private static class AllDoneFuture implements Future {
+
+@Override
+public boolean cancel(boolean mayInterruptIfRunning) {
+return false;
+}
+
+@Override
+public boolean isCancelled() {
+return false;
+}
+
+@Override
+public boolean isDone() {
+return true;
+}
+
+@Override
+public Void get() {
+return null;
+}
+
+@Override
+public Void get(long timeout, TimeUnit unit) {
+return null;
+}
+
+}
+
+private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLocalizer.class);
+
+private final Localizer _localizer;
+private final ExecutorService _execService;
+private final boolean _isLocalMode;
+private final Map _conf;
+private final Map _basicPending;
+private final Map _blobPending;
+private final AdvancedFSOps _fsOps;
+
+private class DownloadBaseBlobsDistributed implements Callable {
+private final String _topologyId;
+
+public DownloadBaseBlobsDistributed(String topologyId) {
+this._topologyId = topologyId;
+}
+
+@Override
+public Void call() throws Exception {
+String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId);
+File sr = new File(stormroot);
+if (sr.exists()) {
+if (!_fsOps.supportsAtomicDirectoryMove()) {
+LOG.warn("{} may have partially downloaded blobs, 
recovering", _topologyId);
+Utils.forceDelete(stormroot);
+} else {
+LOG.warn("{} already downloaded blobs, skipping", 
_topologyId);
+return null;
+}
+}
+boolean deleteAll = true;
+String tmproot = ConfigUtils.supervisorTmpDir(_conf) + 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77418692
  
--- Diff: storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
---
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ * TODO once we have replaced the original supervisor merge this with
+ * Localizer and optimize them
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+/**
+ * A future that has already completed.
+ */
+private static class AllDoneFuture implements Future {
+
+@Override
+public boolean cancel(boolean mayInterruptIfRunning) {
+return false;
+}
+
+@Override
+public boolean isCancelled() {
+return false;
+}
+
+@Override
+public boolean isDone() {
+return true;
+}
+
+@Override
+public Void get() {
+return null;
+}
+
+@Override
+public Void get(long timeout, TimeUnit unit) {
+return null;
+}
+
+}
+
+private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLocalizer.class);
+
+private final Localizer _localizer;
+private final ExecutorService _execService;
+private final boolean _isLocalMode;
+private final Map _conf;
+private final Map _basicPending;
+private final Map _blobPending;
+private final AdvancedFSOps _fsOps;
+
+private class DownloadBaseBlobsDistributed implements Callable {
+private final String _topologyId;
+
+public DownloadBaseBlobsDistributed(String topologyId) {
+this._topologyId = topologyId;
+}
+
+@Override
+public Void call() throws Exception {
+String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId);
+File sr = new File(stormroot);
+if (sr.exists()) {
+if (!_fsOps.supportsAtomicDirectoryMove()) {
+LOG.warn("{} may have partially downloaded blobs, 
recovering", _topologyId);
+Utils.forceDelete(stormroot);
+} else {
+LOG.warn("{} already downloaded blobs, skipping", 
_topologyId);
+return null;
+}
+}
+boolean deleteAll = true;
+String tmproot = ConfigUtils.supervisorTmpDir(_conf) + 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77418682
  
--- Diff: storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
---
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ * TODO once we have replaced the original supervisor merge this with
+ * Localizer and optimize them
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+/**
+ * A future that has already completed.
+ */
+private static class AllDoneFuture implements Future {
+
+@Override
+public boolean cancel(boolean mayInterruptIfRunning) {
+return false;
+}
+
+@Override
+public boolean isCancelled() {
+return false;
+}
+
+@Override
+public boolean isDone() {
+return true;
+}
+
+@Override
+public Void get() {
+return null;
+}
+
+@Override
+public Void get(long timeout, TimeUnit unit) {
+return null;
+}
+
+}
+
+private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLocalizer.class);
+
+private final Localizer _localizer;
+private final ExecutorService _execService;
+private final boolean _isLocalMode;
+private final Map _conf;
+private final Map _basicPending;
+private final Map _blobPending;
+private final AdvancedFSOps _fsOps;
+
+private class DownloadBaseBlobsDistributed implements Callable {
+private final String _topologyId;
+
+public DownloadBaseBlobsDistributed(String topologyId) {
+this._topologyId = topologyId;
+}
+
+@Override
+public Void call() throws Exception {
+String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId);
+File sr = new File(stormroot);
+if (sr.exists()) {
+if (!_fsOps.supportsAtomicDirectoryMove()) {
+LOG.warn("{} may have partially downloaded blobs, 
recovering", _topologyId);
+Utils.forceDelete(stormroot);
+} else {
+LOG.warn("{} already downloaded blobs, skipping", 
_topologyId);
+return null;
+}
+}
+boolean deleteAll = true;
+String tmproot = ConfigUtils.supervisorTmpDir(_conf) + 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77418598
  
--- Diff: storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
---
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ * TODO once we have replaced the original supervisor merge this with
+ * Localizer and optimize them
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+/**
+ * A future that has already completed.
+ */
+private static class AllDoneFuture implements Future {
+
+@Override
+public boolean cancel(boolean mayInterruptIfRunning) {
+return false;
+}
+
+@Override
+public boolean isCancelled() {
+return false;
+}
+
+@Override
+public boolean isDone() {
+return true;
+}
+
+@Override
+public Void get() {
+return null;
+}
+
+@Override
+public Void get(long timeout, TimeUnit unit) {
+return null;
+}
+
+}
+
+private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLocalizer.class);
+
+private final Localizer _localizer;
+private final ExecutorService _execService;
+private final boolean _isLocalMode;
+private final Map _conf;
+private final Map _basicPending;
+private final Map _blobPending;
+private final AdvancedFSOps _fsOps;
+
+private class DownloadBaseBlobsDistributed implements Callable {
+private final String _topologyId;
+
+public DownloadBaseBlobsDistributed(String topologyId) {
+this._topologyId = topologyId;
+}
+
+@Override
+public Void call() throws Exception {
+String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId);
+File sr = new File(stormroot);
+if (sr.exists()) {
+if (!_fsOps.supportsAtomicDirectoryMove()) {
+LOG.warn("{} may have partially downloaded blobs, 
recovering", _topologyId);
+Utils.forceDelete(stormroot);
+} else {
+LOG.warn("{} already downloaded blobs, skipping", 
_topologyId);
+return null;
+}
+}
+boolean deleteAll = true;
+String tmproot = ConfigUtils.supervisorTmpDir(_conf) + 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77418561
  
--- Diff: storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
---
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ * TODO once we have replaced the original supervisor merge this with
+ * Localizer and optimize them
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+/**
+ * A future that has already completed.
+ */
+private static class AllDoneFuture implements Future {
+
+@Override
+public boolean cancel(boolean mayInterruptIfRunning) {
+return false;
+}
+
+@Override
+public boolean isCancelled() {
+return false;
+}
+
+@Override
+public boolean isDone() {
+return true;
+}
+
+@Override
+public Void get() {
+return null;
+}
+
+@Override
+public Void get(long timeout, TimeUnit unit) {
+return null;
+}
+
+}
+
+private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLocalizer.class);
+
+private final Localizer _localizer;
+private final ExecutorService _execService;
+private final boolean _isLocalMode;
+private final Map _conf;
+private final Map _basicPending;
+private final Map _blobPending;
+private final AdvancedFSOps _fsOps;
+
+private class DownloadBaseBlobsDistributed implements Callable {
+private final String _topologyId;
+
+public DownloadBaseBlobsDistributed(String topologyId) {
+this._topologyId = topologyId;
+}
+
+@Override
+public Void call() throws Exception {
+String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId);
+File sr = new File(stormroot);
+if (sr.exists()) {
+if (!_fsOps.supportsAtomicDirectoryMove()) {
+LOG.warn("{} may have partially downloaded blobs, 
recovering", _topologyId);
+Utils.forceDelete(stormroot);
+} else {
+LOG.warn("{} already downloaded blobs, skipping", 
_topologyId);
+return null;
+}
+}
--- End diff --

Let's make a helper method for this `if`, since we do the same thing for 
the 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77418422
  
--- Diff: storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
---
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ * TODO once we have replaced the original supervisor merge this with
+ * Localizer and optimize them
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+/**
+ * A future that has already completed.
+ */
+private static class AllDoneFuture implements Future {
+
+@Override
+public boolean cancel(boolean mayInterruptIfRunning) {
+return false;
+}
+
+@Override
+public boolean isCancelled() {
+return false;
+}
+
+@Override
+public boolean isDone() {
+return true;
+}
+
+@Override
+public Void get() {
+return null;
+}
+
+@Override
+public Void get(long timeout, TimeUnit unit) {
+return null;
+}
+
+}
+
+private static final Logger LOG = 
LoggerFactory.getLogger(AsyncLocalizer.class);
+
+private final Localizer _localizer;
+private final ExecutorService _execService;
+private final boolean _isLocalMode;
+private final Map _conf;
+private final Map _basicPending;
+private final Map _blobPending;
+private final AdvancedFSOps _fsOps;
+
+private class DownloadBaseBlobsDistributed implements Callable {
+private final String _topologyId;
+
+public DownloadBaseBlobsDistributed(String topologyId) {
+this._topologyId = topologyId;
+}
+
+@Override
+public Void call() throws Exception {
+String stormroot = ConfigUtils.supervisorStormDistRoot(_conf, 
_topologyId);
+File sr = new File(stormroot);
+if (sr.exists()) {
+if (!_fsOps.supportsAtomicDirectoryMove()) {
+LOG.warn("{} may have partially downloaded blobs, 
recovering", _topologyId);
+Utils.forceDelete(stormroot);
+} else {
+LOG.warn("{} already downloaded blobs, skipping", 
_topologyId);
+return null;
+}
+}
+boolean deleteAll = true;
+String tmproot = ConfigUtils.supervisorTmpDir(_conf) + 

[GitHub] storm issue #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on the issue:

https://github.com/apache/storm/pull/1642
  
@HeartSaVioR Yes it looks like I need to think through recovery and any 
races with the AsyncLocalizer a bit more.  I'll try to reproduce your error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77418240
  
--- Diff: storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
---
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ * TODO once we have replaced the original supervisor merge this with
+ * Localizer and optimize them
+ */
+public class AsyncLocalizer implements ILocalizer, Shutdownable {
+/**
+ * A future that has already completed.
+ */
+private static class AllDoneFuture implements Future {
+
+@Override
+public boolean cancel(boolean mayInterruptIfRunning) {
+return false;
+}
+
+@Override
+public boolean isCancelled() {
+return false;
+}
+
+@Override
+public boolean isDone() {
+return true;
+}
+
+@Override
+public Void get() {
+return null;
+}
+
+@Override
+public Void get(long timeout, TimeUnit unit) {
--- End diff --

My IDE is warning me that the TimeUnit param needs a `@NotNull` annotation 
or else the requirement gets overridden here, which is probably not intended.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77418129
  
--- Diff: storm-core/src/jvm/org/apache/storm/localizer/AsyncLocalizer.java 
---
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.localizer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.daemon.Shutdownable;
+import org.apache.storm.daemon.supervisor.AdvancedFSOps;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * This is a wrapper around the Localizer class that provides the desired
+ * async interface to Slot.
+ * TODO once we have replaced the original supervisor merge this with
+ * Localizer and optimize them
--- End diff --

Yes, we want to do this. We should track this with a jira instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77417627
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
 ---
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+
+public class RunAsUserContainerLauncher extends ContainerLauncher {
+private final Map _conf;
+private final String _supervisorId;
+protected final ResourceIsolationInterface _resourceIsolationManager;
+
+public RunAsUserContainerLauncher(Map conf, String 
supervisorId, ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+_conf = conf;
+_supervisorId = supervisorId;
+_resourceIsolationManager = resourceIsolationManager;
+}
+
+@Override
+public Container launchContainer(int port, LocalAssignment assignment, 
LocalState state) throws IOException {
+Container container = new RunAsUserContainer(port, assignment, 
_conf, _supervisorId, state,
+_resourceIsolationManager, false);
+container.setup();
+container.launch();
+return container;
+}
+
+@Override
+public Container recoverContainer(int port, LocalAssignment 
assignment, LocalState state) throws IOException {
+Container container = null;
+try {
+container = new RunAsUserContainer(port, assignment, _conf, 
_supervisorId, state, 
+_resourceIsolationManager, true);
+} catch (ContainerRecoveryException e) {
+// We could not recover return null
+}
+return container;
+}
+
+@Override
+public Killable recoverContainer(String workerId) throws IOException {
+Container container = null;
+try {
+container = new RunAsUserContainer(workerId, _conf, 
_supervisorId, 
+_resourceIsolationManager);
+} catch (ContainerRecoveryException e) {
+// We could not recover return null
+}
+return container;
+}
--- End diff --

But then the ContainerRecoveryException will not be turned into a null.  We 
can change that everywhere, but for now it is an implementation detail that is 
hidden.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77417198
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77417265
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -159,7 +162,7 @@ public static void addBlobReferences(Localizer 
localizer, String stormId, Map co
 }
 }
 
-public static Set readDownLoadedStormIds(Map conf) throws 
IOException {
+public static Set readDownLoadedStormIds(Map 
conf) throws IOException {
 Set stormIds = new HashSet<>();
 String path = ConfigUtils.supervisorStormDistRoot(conf);
 Collection rets = Utils.readDirContents(path);
--- End diff --

I agree that it could be improved. Let's create a separate jira issue to 
track it. I think it is out-of-scope here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77416935
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77416395
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77416200
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77415816
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
--- End diff --

I tried that, and for some reason java doesn't like it, because I am 
initializing it with a HashMap, that is a Map, but not exactly a Map. I wasn't 
totally sure why java didn't like it.


---
If your project is set up for it, you 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77415710
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77415373
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java ---
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.ProcessBuilder.Redirect;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Represents a container that a worker will run in.
+ */
+public abstract class Container implements Killable {
+private static final Logger LOG = 
LoggerFactory.getLogger(BasicContainer.class);
+protected final Map _conf;
+protected final Map _topoConf;
+protected String _workerId;
+protected final String _topologyId;
+protected final String _supervisorId;
+protected final int _port;
+protected final LocalAssignment _assignment;
+protected final AdvancedFSOps _ops;
+protected final ResourceIsolationInterface _resourceIsolationManager;
+
+//Exposed for testing
+protected Container(AdvancedFSOps ops, int port, LocalAssignment 
assignment,
+Map conf, Map topoConf, String 
supervisorId, 
+ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+assert((assignment == null && port <= 0) ||
+(assignment != null && port > 0));
+assert(conf != null);
+assert(ops != null);
+assert(supervisorId != null);
+
+_port = port;
+_ops = ops;
+_assignment = assignment;
+if (assignment != null) {
+_topologyId = assignment.get_topology_id();
+} else {
+_topologyId = null;
+}
+_conf = conf;
+_supervisorId = supervisorId;
+_resourceIsolationManager = resourceIsolationManager;
+if (topoConf == null) {
+_topoConf = readTopoConf();
+} else {
+_topoConf = topoConf;
+}
+}
+
+@Override
+public String toString() {
+return this.getClass().getSimpleName() + " topo:" + _topologyId + 
" worker:" + _workerId;
+}
+
+protected Map readTopoConf() throws IOException {
+assert(_topologyId != null);
+return ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+}
+
+protected Container(int port, LocalAssignment assignment, Map conf, 
+String supervisorId, ResourceIsolationInterface 
resourceIsolationManager) throws IOException {
+this(AdvancedFSOps.make(conf), port, assignment, conf, null, 
supervisorId, resourceIsolationManager);
+}
+
+/**
+ * Constructor to use when trying to recover a container from just the 
worker ID.
+ * @param workerId the id of the worker
+ * @param conf the config of the supervisor
+ * @param supervisorId the id of the 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77415016
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1990,31 +1995,26 @@ protected void forceDeleteImpl(String path) throws 
IOException {
 }
 
 /**
- * Creates a symbolic link to the target
+ * Creates a symbolic link to the target and force the creation if the 
target already exists
  * @param dir the parent directory of the link
  * @param targetDir the parent directory of the link's target
- * @param filename the file name of the link
  * @param targetFilename the file name of the links target
+ * @param filename the file name of the link
  * @throws IOException
  */
-public static void createSymlink(String dir, String targetDir,
-String filename, String targetFilename) throws IOException {
+public static void forceCreateSymlink(String dir, String targetDir,
--- End diff --

Deleting it.  I added it then made a better version in fsops, so I am 
removing it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77414257
  
--- Diff: 
storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java 
---
@@ -0,0 +1,459 @@
+package org.apache.storm.daemon.supervisor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+public class BasicContainerTest {
+public static class CommandRun {
+final List cmd;
+final Map env;
+final File pwd;
+
+public CommandRun(List cmd, Map env, File 
pwd) {
+this.cmd = cmd;
+this.env = env;
+this.pwd = pwd;
+}
+}
+
+public static class MockBasicContainer extends BasicContainer {
+public final List profileCmds = new ArrayList<>();
+public final List workerCmds = new ArrayList<>();
+
+public MockBasicContainer(int port, LocalAssignment assignment, 
Map conf,
+String supervisorId, LocalState localState, 
ResourceIsolationInterface resourceIsolationManager,
+boolean recover) throws IOException {
+super(port, assignment, conf, supervisorId, localState, 
resourceIsolationManager, recover);
+}
+
+public MockBasicContainer(AdvancedFSOps ops, int port, 
LocalAssignment assignment,
+Map conf, Map topoConf, 
String supervisorId, 
+ResourceIsolationInterface resourceIsolationManager, 
LocalState localState,
+String profileCmd) throws IOException {
+super(ops, port, assignment, conf, topoConf, supervisorId, 
resourceIsolationManager, localState, profileCmd);
+}
+
+@Override
+protected Map readTopoConf() throws IOException {
+return new HashMap<>();
+}
+
+@Override
+public void createNewWorkerId() {
+super.createNewWorkerId();
+}
+
+@Override
+public List substituteChildopts(Object value, int 
memOnheap) {
+return super.substituteChildopts(value, memOnheap);
+}
+   
+@Override
+protected boolean runProfilingCommand(List command, 
Map env, String logPrefix,
+File targetDir) throws IOException, InterruptedException {
+profileCmds.add(new CommandRun(command, env, targetDir));
+return true;
+}
+
+@Override
+protected void launchWorkerProcess(List command, 
Map env, String logPrefix,
+ExitCodeCallback processExitCallback, File targetDir) 
throws IOException {
+workerCmds.add(new CommandRun(command, env, targetDir));
+}
+
+@Override
+protected String javaCmd(String cmd) {
+//avoid system dependent things
+return cmd;
+}
+
+@Override
+protected List frameworkClasspath() {
+//We are not really running anything so make this
+// simple to check for
+return Arrays.asList("FRAMEWORK_CP");
+}
+
+@Override
+protected String javaLibraryPath(String stormRoot, Map conf) {
+return "JLP";
+}
+}
+
+@Test
+public void testCreateNewWorkerId() throws Exception {
+final String topoId = "test_topology";
+final int port = 8080;
+LocalAssignment la = new LocalAssignment();
+la.set_topology_id(topoId);
+
+AdvancedFSOps ops = mock(AdvancedFSOps.class);
+
+LocalState ls = mock(LocalState.class);
+
+MockBasicContainer mc = new MockBasicContainer(ops, port, la, new 
HashMap

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77413129
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java 
---
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Slot extends Thread implements AutoCloseable {
+private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
+
+static enum MachineState {
+EMPTY,
+RUNNING,
+WAITING_FOR_WORKER_START,
+KILL_AND_RELAUNCH,
+KILL,
+WAITING_FOR_BASIC_LOCALIZATION,
+WAITING_FOR_BLOB_LOCALIZATION;
+};
+
+static class StaticState {
+public final ILocalizer localizer;
+public final long hbTimeoutMs;
+public final long firstHbTimeoutMs;
+public final long killSleepMs;
+public final long monitorFreqMs;
+public final ContainerLauncher containerLauncher;
+public final int port;
+public final String host;
+public final ISupervisor iSupervisor;
+public final LocalState localState;
+
+StaticState(ILocalizer localizer, long hbTimeoutMs, long 
firstHbTimeoutMs,
+long killSleepMs, long monitorFreqMs,
+ContainerLauncher containerLauncher, String host, int port,
+ISupervisor iSupervisor, LocalState localState) {
+this.localizer = localizer;
+this.hbTimeoutMs = hbTimeoutMs;
+this.firstHbTimeoutMs = firstHbTimeoutMs;
+this.containerLauncher = containerLauncher;
+this.killSleepMs = killSleepMs;
+this.monitorFreqMs = monitorFreqMs;
+this.host = host;
+this.port = port;
+this.iSupervisor = iSupervisor;
+this.localState = localState;
+}
+}
+
+static class DynamicState {
+public final MachineState state;
+public final LocalAssignment newAssignment;
+public final LocalAssignment currentAssignment;
+public final Container container;
+public final LocalAssignment pendingLocalization;
+public final Future pendingDownload;
+public final Set profileActions;
+public final Set pendingStopProfileActions;
+
+/**
+ * The last time that WAITING_FOR_WORKER_START, KILL, or 
KILL_AND_RELAUNCH were entered into.
+ */
+public final long startTime;
+
+public DynamicState(final LocalAssignment currentAssignment, 
Container container, final LocalAssignment newAssignment) {
+this.currentAssignment = currentAssignment;
+this.container = container;
+if ((currentAssignment == null) ^ (container == null)) {

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77411245
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/ConfigUtils.java ---
@@ -353,25 +350,21 @@ public LocalState nimbusTopoHistoryStateImpl(Map 
conf) throws IOException {
 }
 
 // we use this "weird" wrapper pattern temporarily for mocking in 
clojure test
-public static Map readSupervisorStormConf(Map conf, String stormId) 
throws IOException {
+public static Map readSupervisorStormConf(Map conf, String stormId) throws IOException {
 return _instance.readSupervisorStormConfImpl(conf, stormId);
 }
 
-public Map readSupervisorStormConfImpl(Map conf, String stormId) 
throws IOException {
+public Map readSupervisorStormConfImpl(Map conf, String stormId) throws IOException {
 String stormRoot = supervisorStormDistRoot(conf, stormId);
 String confPath = supervisorStormConfPath(stormRoot);
 return readSupervisorStormConfGivenPath(conf, confPath);
 }
 
 // we use this "weird" wrapper pattern temporarily for mocking in 
clojure test
-public static StormTopology readSupervisorTopology(Map conf, String 
stormId) throws IOException {
-return _instance.readSupervisorTopologyImpl(conf, stormId);
-}
-
-public StormTopology readSupervisorTopologyImpl(Map conf, String 
stormId) throws IOException {
+public static StormTopology readSupervisorTopology(Map conf, String 
stormId, AdvancedFSOps ops) throws IOException {
--- End diff --

I put in the AdvancedFSOps to handle the mocking, but now I am not so sure 
on that.  It is too low level for a lot of what we want to do, so I think I 
will put it back in, and just do the mocking at the higher level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread revans2
Github user revans2 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77407551
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java ---
@@ -0,0 +1,493 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.ProcessBuilder.Redirect;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Represents a container that a worker will run in.
+ */
+public abstract class Container implements Killable {
+private static final Logger LOG = 
LoggerFactory.getLogger(BasicContainer.class);
+protected final Map _conf;
+protected final Map _topoConf;
+protected String _workerId;
+protected final String _topologyId;
+protected final String _supervisorId;
+protected final int _port;
+protected final LocalAssignment _assignment;
+protected final AdvancedFSOps _ops;
+protected final ResourceIsolationInterface _resourceIsolationManager;
+
+//Exposed for testing
+protected Container(AdvancedFSOps ops, int port, LocalAssignment 
assignment,
+Map conf, Map topoConf, String 
supervisorId, 
+ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+assert((assignment == null && port <= 0) ||
+(assignment != null && port > 0));
+assert(conf != null);
+assert(ops != null);
+assert(supervisorId != null);
+
+_port = port;
+_ops = ops;
+_assignment = assignment;
+if (assignment != null) {
+_topologyId = assignment.get_topology_id();
+} else {
+_topologyId = null;
+}
+_conf = conf;
+_supervisorId = supervisorId;
+_resourceIsolationManager = resourceIsolationManager;
+if (topoConf == null) {
+_topoConf = readTopoConf();
+} else {
+_topoConf = topoConf;
+}
+}
+
+@Override
+public String toString() {
+return this.getClass().getSimpleName() + " topo:" + _topologyId + 
" worker:" + _workerId;
+}
+
+protected Map readTopoConf() throws IOException {
+assert(_topologyId != null);
+return ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+}
+
+protected Container(int port, LocalAssignment assignment, Map conf, 
+String supervisorId, ResourceIsolationInterface 
resourceIsolationManager) throws IOException {
+this(AdvancedFSOps.make(conf), port, assignment, conf, null, 
supervisorId, resourceIsolationManager);
+}
+
+/**
+ * Constructor to use when trying to recover a container from just the 
worker ID.
+ * @param workerId the id of the worker
+ * @param conf the config of the supervisor
+ * @param supervisorId the id of the 

[DISCUSS] storm-jms Code Donation

2016-09-02 Thread P. Taylor Goetz
I’d like to start a discussion around adding storm-jms as an external module. 
I’ve gotten a few request to do so and the only thing that’s held me back is 
tracking down contributors and having them submit ICLAs.

The code can be found here:

https://github.com/ptgoetz/storm-jms 

If there’s support for adding it, I’ll track down contributors and handle the 
IP clearance process.

-Taylor


signature.asc
Description: Message signed with OpenPGP using GPGMail


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77403662
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java 
---
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Slot extends Thread implements AutoCloseable {
+private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
+
+static enum MachineState {
+EMPTY,
+RUNNING,
+WAITING_FOR_WORKER_START,
+KILL_AND_RELAUNCH,
+KILL,
+WAITING_FOR_BASIC_LOCALIZATION,
+WAITING_FOR_BLOB_LOCALIZATION;
+};
+
+static class StaticState {
+public final ILocalizer localizer;
+public final long hbTimeoutMs;
+public final long firstHbTimeoutMs;
+public final long killSleepMs;
+public final long monitorFreqMs;
+public final ContainerLauncher containerLauncher;
+public final int port;
+public final String host;
+public final ISupervisor iSupervisor;
+public final LocalState localState;
+
+StaticState(ILocalizer localizer, long hbTimeoutMs, long 
firstHbTimeoutMs,
+long killSleepMs, long monitorFreqMs,
+ContainerLauncher containerLauncher, String host, int port,
+ISupervisor iSupervisor, LocalState localState) {
+this.localizer = localizer;
+this.hbTimeoutMs = hbTimeoutMs;
+this.firstHbTimeoutMs = firstHbTimeoutMs;
+this.containerLauncher = containerLauncher;
+this.killSleepMs = killSleepMs;
+this.monitorFreqMs = monitorFreqMs;
+this.host = host;
+this.port = port;
+this.iSupervisor = iSupervisor;
+this.localState = localState;
+}
+}
+
+static class DynamicState {
+public final MachineState state;
+public final LocalAssignment newAssignment;
+public final LocalAssignment currentAssignment;
+public final Container container;
+public final LocalAssignment pendingLocalization;
+public final Future pendingDownload;
+public final Set profileActions;
+public final Set pendingStopProfileActions;
+
+/**
+ * The last time that WAITING_FOR_WORKER_START, KILL, or 
KILL_AND_RELAUNCH were entered into.
+ */
+public final long startTime;
+
+public DynamicState(final LocalAssignment currentAssignment, 
Container container, final LocalAssignment newAssignment) {
+this.currentAssignment = currentAssignment;
+this.container = container;
+if ((currentAssignment == null) ^ (container == null)) {

[VOTE] Release Apache Storm 0.10.2 (rc2)

2016-09-02 Thread P. Taylor Goetz
This is a call to vote on releasing Apache Storm 0.10.2 (rc2)

Full list of changes in this release:

https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=CHANGELOG.md;hb=d8e4a3b59e6a97165ff769f11c9d4d2d3ecbb9dc

The tag/commit to be voted upon is v0.10.2:

https://git-wip-us.apache.org/repos/asf?p=storm.git;a=tree;h=d51a27fae2a9549c3ccbbf23b4ad81a0c50a3a65;hb=d8e4a3b59e6a97165ff769f11c9d4d2d3ecbb9dc

The source archive being voted upon can be found here:

https://dist.apache.org/repos/dist/dev/storm/apache-storm-0.10.2-rc2/apache-storm-0.10.2-src.tar.gz

Other release files, signatures and digests can be found here:

https://dist.apache.org/repos/dist/dev/storm/apache-storm-0.10.2-rc2/

The release artifacts are signed with the following key:

https://git-wip-us.apache.org/repos/asf?p=storm.git;a=blob_plain;f=KEYS;hb=22b832708295fa2c15c4f3c70ac0d2bc6fded4bd

The Nexus staging repository for this release is:

https://repository.apache.org/content/repositories/orgapachestorm-1042

Please vote on releasing this package as Apache Storm 0.10.2.

When voting, please list the actions taken to verify the release.

This vote will be open for at least 72 hours.

[ ] +1 Release this package as Apache Storm 0.10.2
[ ]  0 No opinion
[ ] -1 Do not release this package because...

Thanks to everyone who contributed to this release.

-Taylor


signature.asc
Description: Message signed with OpenPGP using GPGMail


[GitHub] storm issue #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-09-02 Thread priyank5485
Github user priyank5485 commented on the issue:

https://github.com/apache/storm/pull/1665
  
@vesense Not sure if you saw my comment about exiting with a code of 1 in 
getOldConsumerOffsetsFromZk. Can you check that comment and address it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread Douglas Shore
We have benefited greatly from being downstream from SQE in powering our
data driven solutions.

I am excited to see this repo grow in breadth and depth.


On Fri, Sep 2, 2016 at 11:16 AM, Kamil Sindi  wrote:

> Our data science efforts rely on SQE to power our recommendations engine. I
> am also excited to contribute to it especially as we continue to implement
> predictive models at larger scales.
>
> On Fri, Sep 2, 2016 at 10:57 AM, Sahil Shah  wrote:
>
> > I would like to throw my support behind SQE. Having working with it in a
> > production environment, I have seen the many benefits in testing new
> > topologies and quickly understanding what a topology is doing. As our
> data
> > needs have grown, we have only increased our reliance on SQE and it
> stands
> > the test repeatedly. I am excited at the opportunity to contribute to
> this
> > wonderful open source community.
> >
> > On Fri, Sep 2, 2016 at 10:31 AM, Alex Halter  wrote:
> >
> > > I too want to voice my support for SQE and our commitment to the
> > initiative
> > > going forward. We've been working on adapting Storm to our needs for
> most
> > > of two years. It was thoughtfully designed and supports our production
> > > needs. We have a long list of features we want to build out and we'd
> love
> > > to work with the community.
> > >
> > >
> > > On Fri, Sep 2, 2016 at 10:19 AM, Rohit Garg 
> > > wrote:
> > >
> > > > I am one of the developers who has been working on SQE for past 1.5
> > > years.
> > > > Over time, we have made it more stable and production ready.
> > > >
> > > > As of now, one can easily scale SQE for more production data with
> easy
> > > > config changes and re-deploy, aggregate across different dimensions
> by
> > > > writing json like sql, write to different state stores and most
> > > > importantly, address new feature requirements really quick.(Since
> it's
> > > just
> > > > writing a sql like json file and sqe handles everything for you ! )
> > > >
> > > > I think SQE can really help companies who want to setup a production
> > > ready
> > > > and well tested framework within weeks (instead of months) for large
> > > scale
> > > > event stream processing and with minimum risks and limited resources.
> > We
> > > > are actively working on SQE to make it more awesome and are committed
> > to
> > > > make the experience of developing a highly scalable and fault
> tolerant
> > > > stream processing framework more seamless and less stressful 
> > > >
> > > > On Fri, Sep 2, 2016 at 9:49 AM, Lee Morris  wrote:
> > > >
> > > > > Hi, Storm Dev!
> > > > >
> > > > > I wanted to chime in to show support for SQE and show how committed
> > we
> > > > are
> > > > > to SQE. *StormSQL looks awesome and has some real potential! *
> > > > >
> > > > > We use SQE in production. It has been tested, code reviewed, load
> > > tested,
> > > > > maintained, and processing an average of 8 million tuples per
> minute
> > or
> > > > > more for over a year now. The investment into this code base has
> been
> > > > > significant.
> > > > >
> > > > > Please take a look at the code itself. The production quality code
> is
> > > > ready
> > > > > to go. Developers with no experience with Storm or even streaming
> > > > > successfully launch robust topologies using SQE.  Our productivity
> in
> > > > this
> > > > > area went up by orders of magnitude.
> > > > >
> > > > > Based on this experience we realized the value of querying storm,
> and
> > > we
> > > > > decided to give that value back to the storm community.
> > > > >
> > > > > Our data pipelines and real-time processing are very important to
> the
> > > > > success of JW Player. SQE has been a foundation for that. We will
> > > > continue
> > > > > to invest into this technology for years to come. Unfortunately we
> > > > wouldn't
> > > > > be able to adopt StormSQL as is until it has been put through the
> > > > crucible
> > > > > of production level usage and has had the same rigor applied. It
> > seems
> > > > much
> > > > > of the development has been over the last couple of weeks.
> > > > >
> > > > > *Quick Gap Analysis (Not Exhaustive)*
> > > > > *States*
> > > > >   - SQE supports Redis and MongoDB as states in addition to Kafka.
> > > (Soon
> > > > > adding a Test/Monitor State)
> > > > >   - SQE supports non-static field names for Redis state
> > > > >   - Storm SQL supports Kafka
> > > > >   - SQE supports replay filtering for Kafka
> > > > >
> > > > > *Aggregations*
> > > > >   - SQE supports stateful, exactly-once aggregations for states
> that
> > > > > support it
> > > > >   - Storm SQL supports aggregations within each micro batch
> > > > >
> > > > > *SQL*
> > > > >   - StormSQL supports SQL
> > > > >  - SQE supports SQL "like" JSON
> > > > >
> > > > > *Scaling*
> > > > >   - SQE has a mechanism for controlling parallelism or scaling
> > > > >   - Could not find 

Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread P. Taylor Goetz
Thanks to all the JW Player folks who joined this thread, and welcome to the 
Storm community! It’s good to see that the code donation comes with a community 
ready and willing to help out.

I’m +1 for accepting the code donation.

-Taylor


> On Sep 2, 2016, at 12:07 PM, Abhishek Agarwal  wrote:
> 
> +1 for the merge. Apart from code contributions, helping users run and
> troubleshoot SQE is equally important and  it seems that quite a number of
> folks are ready to help.
> 
> On Fri, Sep 2, 2016 at 8:46 PM, Kamil Sindi  wrote:
> 
>> Our data science efforts rely on SQE to power our recommendations engine. I
>> am also excited to contribute to it especially as we continue to implement
>> predictive models at larger scales.
>> 
>> On Fri, Sep 2, 2016 at 10:57 AM, Sahil Shah  wrote:
>> 
>>> I would like to throw my support behind SQE. Having working with it in a
>>> production environment, I have seen the many benefits in testing new
>>> topologies and quickly understanding what a topology is doing. As our
>> data
>>> needs have grown, we have only increased our reliance on SQE and it
>> stands
>>> the test repeatedly. I am excited at the opportunity to contribute to
>> this
>>> wonderful open source community.
>>> 
>>> On Fri, Sep 2, 2016 at 10:31 AM, Alex Halter  wrote:
>>> 
 I too want to voice my support for SQE and our commitment to the
>>> initiative
 going forward. We've been working on adapting Storm to our needs for
>> most
 of two years. It was thoughtfully designed and supports our production
 needs. We have a long list of features we want to build out and we'd
>> love
 to work with the community.
 
 
 On Fri, Sep 2, 2016 at 10:19 AM, Rohit Garg 
 wrote:
 
> I am one of the developers who has been working on SQE for past 1.5
 years.
> Over time, we have made it more stable and production ready.
> 
> As of now, one can easily scale SQE for more production data with
>> easy
> config changes and re-deploy, aggregate across different dimensions
>> by
> writing json like sql, write to different state stores and most
> importantly, address new feature requirements really quick.(Since
>> it's
 just
> writing a sql like json file and sqe handles everything for you ! )
> 
> I think SQE can really help companies who want to setup a production
 ready
> and well tested framework within weeks (instead of months) for large
 scale
> event stream processing and with minimum risks and limited resources.
>>> We
> are actively working on SQE to make it more awesome and are committed
>>> to
> make the experience of developing a highly scalable and fault
>> tolerant
> stream processing framework more seamless and less stressful 
> 
> On Fri, Sep 2, 2016 at 9:49 AM, Lee Morris  wrote:
> 
>> Hi, Storm Dev!
>> 
>> I wanted to chime in to show support for SQE and show how committed
>>> we
> are
>> to SQE. *StormSQL looks awesome and has some real potential! *
>> 
>> We use SQE in production. It has been tested, code reviewed, load
 tested,
>> maintained, and processing an average of 8 million tuples per
>> minute
>>> or
>> more for over a year now. The investment into this code base has
>> been
>> significant.
>> 
>> Please take a look at the code itself. The production quality code
>> is
> ready
>> to go. Developers with no experience with Storm or even streaming
>> successfully launch robust topologies using SQE.  Our productivity
>> in
> this
>> area went up by orders of magnitude.
>> 
>> Based on this experience we realized the value of querying storm,
>> and
 we
>> decided to give that value back to the storm community.
>> 
>> Our data pipelines and real-time processing are very important to
>> the
>> success of JW Player. SQE has been a foundation for that. We will
> continue
>> to invest into this technology for years to come. Unfortunately we
> wouldn't
>> be able to adopt StormSQL as is until it has been put through the
> crucible
>> of production level usage and has had the same rigor applied. It
>>> seems
> much
>> of the development has been over the last couple of weeks.
>> 
>> *Quick Gap Analysis (Not Exhaustive)*
>> *States*
>>  - SQE supports Redis and MongoDB as states in addition to Kafka.
 (Soon
>> adding a Test/Monitor State)
>>  - SQE supports non-static field names for Redis state
>>  - Storm SQL supports Kafka
>>  - SQE supports replay filtering for Kafka
>> 
>> *Aggregations*
>>  - SQE supports stateful, exactly-once aggregations for states
>> that
>> support it
>>  - Storm SQL supports aggregations within each micro batch
>> 
>> *SQL*
>>  - 

Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread Abhishek Agarwal
+1 for the merge. Apart from code contributions, helping users run and
troubleshoot SQE is equally important and  it seems that quite a number of
folks are ready to help.

On Fri, Sep 2, 2016 at 8:46 PM, Kamil Sindi  wrote:

> Our data science efforts rely on SQE to power our recommendations engine. I
> am also excited to contribute to it especially as we continue to implement
> predictive models at larger scales.
>
> On Fri, Sep 2, 2016 at 10:57 AM, Sahil Shah  wrote:
>
> > I would like to throw my support behind SQE. Having working with it in a
> > production environment, I have seen the many benefits in testing new
> > topologies and quickly understanding what a topology is doing. As our
> data
> > needs have grown, we have only increased our reliance on SQE and it
> stands
> > the test repeatedly. I am excited at the opportunity to contribute to
> this
> > wonderful open source community.
> >
> > On Fri, Sep 2, 2016 at 10:31 AM, Alex Halter  wrote:
> >
> > > I too want to voice my support for SQE and our commitment to the
> > initiative
> > > going forward. We've been working on adapting Storm to our needs for
> most
> > > of two years. It was thoughtfully designed and supports our production
> > > needs. We have a long list of features we want to build out and we'd
> love
> > > to work with the community.
> > >
> > >
> > > On Fri, Sep 2, 2016 at 10:19 AM, Rohit Garg 
> > > wrote:
> > >
> > > > I am one of the developers who has been working on SQE for past 1.5
> > > years.
> > > > Over time, we have made it more stable and production ready.
> > > >
> > > > As of now, one can easily scale SQE for more production data with
> easy
> > > > config changes and re-deploy, aggregate across different dimensions
> by
> > > > writing json like sql, write to different state stores and most
> > > > importantly, address new feature requirements really quick.(Since
> it's
> > > just
> > > > writing a sql like json file and sqe handles everything for you ! )
> > > >
> > > > I think SQE can really help companies who want to setup a production
> > > ready
> > > > and well tested framework within weeks (instead of months) for large
> > > scale
> > > > event stream processing and with minimum risks and limited resources.
> > We
> > > > are actively working on SQE to make it more awesome and are committed
> > to
> > > > make the experience of developing a highly scalable and fault
> tolerant
> > > > stream processing framework more seamless and less stressful 
> > > >
> > > > On Fri, Sep 2, 2016 at 9:49 AM, Lee Morris  wrote:
> > > >
> > > > > Hi, Storm Dev!
> > > > >
> > > > > I wanted to chime in to show support for SQE and show how committed
> > we
> > > > are
> > > > > to SQE. *StormSQL looks awesome and has some real potential! *
> > > > >
> > > > > We use SQE in production. It has been tested, code reviewed, load
> > > tested,
> > > > > maintained, and processing an average of 8 million tuples per
> minute
> > or
> > > > > more for over a year now. The investment into this code base has
> been
> > > > > significant.
> > > > >
> > > > > Please take a look at the code itself. The production quality code
> is
> > > > ready
> > > > > to go. Developers with no experience with Storm or even streaming
> > > > > successfully launch robust topologies using SQE.  Our productivity
> in
> > > > this
> > > > > area went up by orders of magnitude.
> > > > >
> > > > > Based on this experience we realized the value of querying storm,
> and
> > > we
> > > > > decided to give that value back to the storm community.
> > > > >
> > > > > Our data pipelines and real-time processing are very important to
> the
> > > > > success of JW Player. SQE has been a foundation for that. We will
> > > > continue
> > > > > to invest into this technology for years to come. Unfortunately we
> > > > wouldn't
> > > > > be able to adopt StormSQL as is until it has been put through the
> > > > crucible
> > > > > of production level usage and has had the same rigor applied. It
> > seems
> > > > much
> > > > > of the development has been over the last couple of weeks.
> > > > >
> > > > > *Quick Gap Analysis (Not Exhaustive)*
> > > > > *States*
> > > > >   - SQE supports Redis and MongoDB as states in addition to Kafka.
> > > (Soon
> > > > > adding a Test/Monitor State)
> > > > >   - SQE supports non-static field names for Redis state
> > > > >   - Storm SQL supports Kafka
> > > > >   - SQE supports replay filtering for Kafka
> > > > >
> > > > > *Aggregations*
> > > > >   - SQE supports stateful, exactly-once aggregations for states
> that
> > > > > support it
> > > > >   - Storm SQL supports aggregations within each micro batch
> > > > >
> > > > > *SQL*
> > > > >   - StormSQL supports SQL
> > > > >  - SQE supports SQL "like" JSON
> > > > >
> > > > > *Scaling*
> > > > >   - SQE has a mechanism for controlling parallelism or scaling
> > > > >   - Could 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77369914
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java 
---
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Slot extends Thread implements AutoCloseable {
+private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
+
+static enum MachineState {
+EMPTY,
+RUNNING,
+WAITING_FOR_WORKER_START,
+KILL_AND_RELAUNCH,
+KILL,
+WAITING_FOR_BASIC_LOCALIZATION,
+WAITING_FOR_BLOB_LOCALIZATION;
+};
+
+static class StaticState {
+public final ILocalizer localizer;
+public final long hbTimeoutMs;
+public final long firstHbTimeoutMs;
+public final long killSleepMs;
+public final long monitorFreqMs;
+public final ContainerLauncher containerLauncher;
+public final int port;
+public final String host;
+public final ISupervisor iSupervisor;
+public final LocalState localState;
+
+StaticState(ILocalizer localizer, long hbTimeoutMs, long 
firstHbTimeoutMs,
+long killSleepMs, long monitorFreqMs,
+ContainerLauncher containerLauncher, String host, int port,
+ISupervisor iSupervisor, LocalState localState) {
+this.localizer = localizer;
+this.hbTimeoutMs = hbTimeoutMs;
+this.firstHbTimeoutMs = firstHbTimeoutMs;
+this.containerLauncher = containerLauncher;
+this.killSleepMs = killSleepMs;
+this.monitorFreqMs = monitorFreqMs;
+this.host = host;
+this.port = port;
+this.iSupervisor = iSupervisor;
+this.localState = localState;
+}
+}
+
+static class DynamicState {
+public final MachineState state;
+public final LocalAssignment newAssignment;
+public final LocalAssignment currentAssignment;
+public final Container container;
+public final LocalAssignment pendingLocalization;
+public final Future pendingDownload;
+public final Set profileActions;
+public final Set pendingStopProfileActions;
+
+/**
+ * The last time that WAITING_FOR_WORKER_START, KILL, or 
KILL_AND_RELAUNCH were entered into.
+ */
+public final long startTime;
+
+public DynamicState(final LocalAssignment currentAssignment, 
Container container, final LocalAssignment newAssignment) {
+this.currentAssignment = currentAssignment;
+this.container = container;
+if ((currentAssignment == null) ^ (container == null)) {

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77369714
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java 
---
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Slot extends Thread implements AutoCloseable {
+private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
+
+static enum MachineState {
+EMPTY,
+RUNNING,
+WAITING_FOR_WORKER_START,
+KILL_AND_RELAUNCH,
+KILL,
+WAITING_FOR_BASIC_LOCALIZATION,
+WAITING_FOR_BLOB_LOCALIZATION;
+};
+
+static class StaticState {
+public final ILocalizer localizer;
+public final long hbTimeoutMs;
+public final long firstHbTimeoutMs;
+public final long killSleepMs;
+public final long monitorFreqMs;
+public final ContainerLauncher containerLauncher;
+public final int port;
+public final String host;
+public final ISupervisor iSupervisor;
+public final LocalState localState;
+
+StaticState(ILocalizer localizer, long hbTimeoutMs, long 
firstHbTimeoutMs,
+long killSleepMs, long monitorFreqMs,
+ContainerLauncher containerLauncher, String host, int port,
+ISupervisor iSupervisor, LocalState localState) {
+this.localizer = localizer;
+this.hbTimeoutMs = hbTimeoutMs;
+this.firstHbTimeoutMs = firstHbTimeoutMs;
+this.containerLauncher = containerLauncher;
+this.killSleepMs = killSleepMs;
+this.monitorFreqMs = monitorFreqMs;
+this.host = host;
+this.port = port;
+this.iSupervisor = iSupervisor;
+this.localState = localState;
+}
+}
+
+static class DynamicState {
+public final MachineState state;
+public final LocalAssignment newAssignment;
+public final LocalAssignment currentAssignment;
+public final Container container;
+public final LocalAssignment pendingLocalization;
+public final Future pendingDownload;
+public final Set profileActions;
+public final Set pendingStopProfileActions;
+
+/**
+ * The last time that WAITING_FOR_WORKER_START, KILL, or 
KILL_AND_RELAUNCH were entered into.
+ */
+public final long startTime;
+
+public DynamicState(final LocalAssignment currentAssignment, 
Container container, final LocalAssignment newAssignment) {
+this.currentAssignment = currentAssignment;
+this.container = container;
+if ((currentAssignment == null) ^ (container == null)) {

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77367840
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/Slot.java 
---
@@ -0,0 +1,766 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Slot extends Thread implements AutoCloseable {
+private static final Logger LOG = LoggerFactory.getLogger(Slot.class);
+
+static enum MachineState {
+EMPTY,
+RUNNING,
+WAITING_FOR_WORKER_START,
+KILL_AND_RELAUNCH,
+KILL,
+WAITING_FOR_BASIC_LOCALIZATION,
+WAITING_FOR_BLOB_LOCALIZATION;
+};
+
+static class StaticState {
+public final ILocalizer localizer;
+public final long hbTimeoutMs;
+public final long firstHbTimeoutMs;
+public final long killSleepMs;
+public final long monitorFreqMs;
+public final ContainerLauncher containerLauncher;
+public final int port;
+public final String host;
+public final ISupervisor iSupervisor;
+public final LocalState localState;
+
+StaticState(ILocalizer localizer, long hbTimeoutMs, long 
firstHbTimeoutMs,
+long killSleepMs, long monitorFreqMs,
+ContainerLauncher containerLauncher, String host, int port,
+ISupervisor iSupervisor, LocalState localState) {
+this.localizer = localizer;
+this.hbTimeoutMs = hbTimeoutMs;
+this.firstHbTimeoutMs = firstHbTimeoutMs;
+this.containerLauncher = containerLauncher;
+this.killSleepMs = killSleepMs;
+this.monitorFreqMs = monitorFreqMs;
+this.host = host;
+this.port = port;
+this.iSupervisor = iSupervisor;
+this.localState = localState;
+}
+}
+
+static class DynamicState {
+public final MachineState state;
+public final LocalAssignment newAssignment;
+public final LocalAssignment currentAssignment;
+public final Container container;
+public final LocalAssignment pendingLocalization;
+public final Future pendingDownload;
+public final Set profileActions;
+public final Set pendingStopProfileActions;
+
+/**
+ * The last time that WAITING_FOR_WORKER_START, KILL, or 
KILL_AND_RELAUNCH were entered into.
+ */
+public final long startTime;
+
+public DynamicState(final LocalAssignment currentAssignment, 
Container container, final LocalAssignment newAssignment) {
+this.currentAssignment = currentAssignment;
+this.container = container;
+if ((currentAssignment == null) ^ (container == null)) {

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77365678
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainerLauncher.java
 ---
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+
+public class RunAsUserContainerLauncher extends ContainerLauncher {
+private final Map _conf;
+private final String _supervisorId;
+protected final ResourceIsolationInterface _resourceIsolationManager;
+
+public RunAsUserContainerLauncher(Map conf, String 
supervisorId, ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+_conf = conf;
+_supervisorId = supervisorId;
+_resourceIsolationManager = resourceIsolationManager;
+}
+
+@Override
+public Container launchContainer(int port, LocalAssignment assignment, 
LocalState state) throws IOException {
+Container container = new RunAsUserContainer(port, assignment, 
_conf, _supervisorId, state,
+_resourceIsolationManager, false);
+container.setup();
+container.launch();
+return container;
+}
+
+@Override
+public Container recoverContainer(int port, LocalAssignment 
assignment, LocalState state) throws IOException {
+Container container = null;
+try {
+container = new RunAsUserContainer(port, assignment, _conf, 
_supervisorId, state, 
+_resourceIsolationManager, true);
+} catch (ContainerRecoveryException e) {
+// We could not recover return null
+}
+return container;
+}
+
+@Override
+public Killable recoverContainer(String workerId) throws IOException {
+Container container = null;
+try {
+container = new RunAsUserContainer(workerId, _conf, 
_supervisorId, 
+_resourceIsolationManager);
+} catch (ContainerRecoveryException e) {
+// We could not recover return null
+}
+return container;
+}
--- End diff --

Could just `return RunAsUserContainer(w, c, s, r);`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77364277
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java 
---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RunAsUserContainer extends BasicContainer {
+private static final Logger LOG = 
LoggerFactory.getLogger(RunAsUserContainer.class);
+
+public RunAsUserContainer(int port, LocalAssignment assignment, 
Map conf, String supervisorId,
+LocalState state, ResourceIsolationInterface 
resourceIsolationManager, boolean recover) throws IOException {
+super(port, assignment, conf, supervisorId, state, 
resourceIsolationManager, recover);
+if (Utils.isOnWindows()) {
+throw new RuntimeException("ERROR: Windows doesn't support 
running workers as different users yet");
+}
+}
+
+public RunAsUserContainer(String workerId, Map conf, 
String supervisorId,
+ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+super(workerId, conf, supervisorId, resourceIsolationManager);
+if (Utils.isOnWindows()) {
+throw new RuntimeException("ERROR: Windows doesn't support 
running workers as different users yet");
+}
+}
+
+private void signal(long pid, int signal) throws IOException {
+List commands = Arrays.asList("signal", 
String.valueOf(pid), String.valueOf(signal));
+String user = getWorkerUser();
+String logPrefix = "kill -"+signal+" " + pid;
+SupervisorUtils.processLauncherAndWait(_conf, user, commands, 
null, logPrefix);
+}
+
+@Override
+protected void kill(long pid) throws IOException {
+signal(pid, 15);
+}
+
+@Override
+protected void forceKill(long pid) throws IOException {
+signal(pid, 9);
+}
+
+@Override
+protected boolean runProfilingCommand(List command, 
Map env, String logPrefix, File targetDir) throws IOException, 
InterruptedException {
+String user = this.getWorkerUser();
+String td = targetDir.getAbsolutePath();
+LOG.info("Running as user:{} command:{}", user, command);
+String containerFile = Utils.containerFilePath(td);
+if (Utils.checkFileExists(containerFile)) {
+SupervisorUtils.rmrAsUser(_conf, containerFile, containerFile);
+}
+String scriptFile = Utils.scriptFilePath(td);
+if (Utils.checkFileExists(scriptFile)) {
+SupervisorUtils.rmrAsUser(_conf, scriptFile, scriptFile);
+}
+String script = Utils.writeScript(td, command, env);
+List args = Arrays.asList("profiler", td, script);
+int ret = SupervisorUtils.processLauncherAndWait(_conf, user, 
args, env, logPrefix);
+return ret == 0;
+}
+
+@Override
+public List updateCommandForIsolation(List command) {
+//With run as user we do the isolation before switching users, so 
don't modify the original
+// command line
+return command;
+}   
--- End diff --

@revans2 , Let's move updateCommandForIsolation into launchWorkerProcess, 
as we discussed. It will clean up the code.


---
If your project is set up for it, you can reply to this email and have your
reply 

Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread Kamil Sindi
Our data science efforts rely on SQE to power our recommendations engine. I
am also excited to contribute to it especially as we continue to implement
predictive models at larger scales.

On Fri, Sep 2, 2016 at 10:57 AM, Sahil Shah  wrote:

> I would like to throw my support behind SQE. Having working with it in a
> production environment, I have seen the many benefits in testing new
> topologies and quickly understanding what a topology is doing. As our data
> needs have grown, we have only increased our reliance on SQE and it stands
> the test repeatedly. I am excited at the opportunity to contribute to this
> wonderful open source community.
>
> On Fri, Sep 2, 2016 at 10:31 AM, Alex Halter  wrote:
>
> > I too want to voice my support for SQE and our commitment to the
> initiative
> > going forward. We've been working on adapting Storm to our needs for most
> > of two years. It was thoughtfully designed and supports our production
> > needs. We have a long list of features we want to build out and we'd love
> > to work with the community.
> >
> >
> > On Fri, Sep 2, 2016 at 10:19 AM, Rohit Garg 
> > wrote:
> >
> > > I am one of the developers who has been working on SQE for past 1.5
> > years.
> > > Over time, we have made it more stable and production ready.
> > >
> > > As of now, one can easily scale SQE for more production data with easy
> > > config changes and re-deploy, aggregate across different dimensions by
> > > writing json like sql, write to different state stores and most
> > > importantly, address new feature requirements really quick.(Since it's
> > just
> > > writing a sql like json file and sqe handles everything for you ! )
> > >
> > > I think SQE can really help companies who want to setup a production
> > ready
> > > and well tested framework within weeks (instead of months) for large
> > scale
> > > event stream processing and with minimum risks and limited resources.
> We
> > > are actively working on SQE to make it more awesome and are committed
> to
> > > make the experience of developing a highly scalable and fault tolerant
> > > stream processing framework more seamless and less stressful 
> > >
> > > On Fri, Sep 2, 2016 at 9:49 AM, Lee Morris  wrote:
> > >
> > > > Hi, Storm Dev!
> > > >
> > > > I wanted to chime in to show support for SQE and show how committed
> we
> > > are
> > > > to SQE. *StormSQL looks awesome and has some real potential! *
> > > >
> > > > We use SQE in production. It has been tested, code reviewed, load
> > tested,
> > > > maintained, and processing an average of 8 million tuples per minute
> or
> > > > more for over a year now. The investment into this code base has been
> > > > significant.
> > > >
> > > > Please take a look at the code itself. The production quality code is
> > > ready
> > > > to go. Developers with no experience with Storm or even streaming
> > > > successfully launch robust topologies using SQE.  Our productivity in
> > > this
> > > > area went up by orders of magnitude.
> > > >
> > > > Based on this experience we realized the value of querying storm, and
> > we
> > > > decided to give that value back to the storm community.
> > > >
> > > > Our data pipelines and real-time processing are very important to the
> > > > success of JW Player. SQE has been a foundation for that. We will
> > > continue
> > > > to invest into this technology for years to come. Unfortunately we
> > > wouldn't
> > > > be able to adopt StormSQL as is until it has been put through the
> > > crucible
> > > > of production level usage and has had the same rigor applied. It
> seems
> > > much
> > > > of the development has been over the last couple of weeks.
> > > >
> > > > *Quick Gap Analysis (Not Exhaustive)*
> > > > *States*
> > > >   - SQE supports Redis and MongoDB as states in addition to Kafka.
> > (Soon
> > > > adding a Test/Monitor State)
> > > >   - SQE supports non-static field names for Redis state
> > > >   - Storm SQL supports Kafka
> > > >   - SQE supports replay filtering for Kafka
> > > >
> > > > *Aggregations*
> > > >   - SQE supports stateful, exactly-once aggregations for states that
> > > > support it
> > > >   - Storm SQL supports aggregations within each micro batch
> > > >
> > > > *SQL*
> > > >   - StormSQL supports SQL
> > > >  - SQE supports SQL "like" JSON
> > > >
> > > > *Scaling*
> > > >   - SQE has a mechanism for controlling parallelism or scaling
> > > >   - Could not find parallelism or scaling controls within StormSQL
> (May
> > > > need to look harder)
> > > >
> > > > *Support for SQE*
> > > > So far the SQE / JW Player developers have been watching this thread
> > > > without knowing if we should chime in. I call upon the devs at JW to
> > > chime
> > > > in because we are dedicated to the success of this SQL in Storm.
> > > >
> > > > (Noticed I said "chime" three times in this email... well now four
> > times)
> > > >
> > > > Thanks for 

Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread Priyanka Singh
I would also like to voice my strong support for SQE. This SQL like query
engine empowers our streaming infrastructure and makes it extremely easy to
write production code. Being fairly new to storm, I could pick up and add
SQE queries fairly quickly and enrich our real-time products and services.

I'm very excited to contribute to this project!

On Fri, Sep 2, 2016 at 10:31 AM, Alex Halter  wrote:

> I too want to voice my support for SQE and our commitment to the initiative
> going forward. We've been working on adapting Storm to our needs for most
> of two years. It was thoughtfully designed and supports our production
> needs. We have a long list of features we want to build out and we'd love
> to work with the community.
>
>
> On Fri, Sep 2, 2016 at 10:19 AM, Rohit Garg 
> wrote:
>
> > I am one of the developers who has been working on SQE for past 1.5
> years.
> > Over time, we have made it more stable and production ready.
> >
> > As of now, one can easily scale SQE for more production data with easy
> > config changes and re-deploy, aggregate across different dimensions by
> > writing json like sql, write to different state stores and most
> > importantly, address new feature requirements really quick.(Since it's
> just
> > writing a sql like json file and sqe handles everything for you ! )
> >
> > I think SQE can really help companies who want to setup a production
> ready
> > and well tested framework within weeks (instead of months) for large
> scale
> > event stream processing and with minimum risks and limited resources. We
> > are actively working on SQE to make it more awesome and are committed to
> > make the experience of developing a highly scalable and fault tolerant
> > stream processing framework more seamless and less stressful 
> >
> > On Fri, Sep 2, 2016 at 9:49 AM, Lee Morris  wrote:
> >
> > > Hi, Storm Dev!
> > >
> > > I wanted to chime in to show support for SQE and show how committed we
> > are
> > > to SQE. *StormSQL looks awesome and has some real potential! *
> > >
> > > We use SQE in production. It has been tested, code reviewed, load
> tested,
> > > maintained, and processing an average of 8 million tuples per minute or
> > > more for over a year now. The investment into this code base has been
> > > significant.
> > >
> > > Please take a look at the code itself. The production quality code is
> > ready
> > > to go. Developers with no experience with Storm or even streaming
> > > successfully launch robust topologies using SQE.  Our productivity in
> > this
> > > area went up by orders of magnitude.
> > >
> > > Based on this experience we realized the value of querying storm, and
> we
> > > decided to give that value back to the storm community.
> > >
> > > Our data pipelines and real-time processing are very important to the
> > > success of JW Player. SQE has been a foundation for that. We will
> > continue
> > > to invest into this technology for years to come. Unfortunately we
> > wouldn't
> > > be able to adopt StormSQL as is until it has been put through the
> > crucible
> > > of production level usage and has had the same rigor applied. It seems
> > much
> > > of the development has been over the last couple of weeks.
> > >
> > > *Quick Gap Analysis (Not Exhaustive)*
> > > *States*
> > >   - SQE supports Redis and MongoDB as states in addition to Kafka.
> (Soon
> > > adding a Test/Monitor State)
> > >   - SQE supports non-static field names for Redis state
> > >   - Storm SQL supports Kafka
> > >   - SQE supports replay filtering for Kafka
> > >
> > > *Aggregations*
> > >   - SQE supports stateful, exactly-once aggregations for states that
> > > support it
> > >   - Storm SQL supports aggregations within each micro batch
> > >
> > > *SQL*
> > >   - StormSQL supports SQL
> > >  - SQE supports SQL "like" JSON
> > >
> > > *Scaling*
> > >   - SQE has a mechanism for controlling parallelism or scaling
> > >   - Could not find parallelism or scaling controls within StormSQL (May
> > > need to look harder)
> > >
> > > *Support for SQE*
> > > So far the SQE / JW Player developers have been watching this thread
> > > without knowing if we should chime in. I call upon the devs at JW to
> > chime
> > > in because we are dedicated to the success of this SQL in Storm.
> > >
> > > (Noticed I said "chime" three times in this email... well now four
> times)
> > >
> > > Thanks for reading,
> > >
> > > Lee Morris, Sr Principal Engineer, Data  |  JWPLAYER
> > >
> > > O: 212.244.0140 <212.244.0140%20x999>  |  M: 215.920.1331
> > >
> > > 2 Park Avenue, 10th Floor North, New York NY 10016
> > >
> > > jwplayer.com  |  @jwplayer 
> > >
> > > On Tue, Aug 30, 2016 at 5:46 PM, Jungtaek Lim 
> wrote:
> > >
> > > > Hi Morrigan,
> > > >
> > > > Thanks for joining discussion. I thought we need to hear your goal to
> > > > donate SQE code, and opinion for how to apply SQE to Storm SQL 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77361882
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java 
---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RunAsUserContainer extends BasicContainer {
+private static final Logger LOG = 
LoggerFactory.getLogger(RunAsUserContainer.class);
+
+public RunAsUserContainer(int port, LocalAssignment assignment, 
Map conf, String supervisorId,
+LocalState state, ResourceIsolationInterface 
resourceIsolationManager, boolean recover) throws IOException {
+super(port, assignment, conf, supervisorId, state, 
resourceIsolationManager, recover);
+if (Utils.isOnWindows()) {
+throw new RuntimeException("ERROR: Windows doesn't support 
running workers as different users yet");
+}
+}
+
+public RunAsUserContainer(String workerId, Map conf, 
String supervisorId,
+ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+super(workerId, conf, supervisorId, resourceIsolationManager);
+if (Utils.isOnWindows()) {
+throw new RuntimeException("ERROR: Windows doesn't support 
running workers as different users yet");
--- End diff --

UnsupportedOperationException, and in fact we could add a 
Utils#unsupportedOnWindows method and just call it as we would a validator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77361623
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/RunAsUserContainer.java 
---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RunAsUserContainer extends BasicContainer {
+private static final Logger LOG = 
LoggerFactory.getLogger(RunAsUserContainer.class);
+
+public RunAsUserContainer(int port, LocalAssignment assignment, 
Map conf, String supervisorId,
+LocalState state, ResourceIsolationInterface 
resourceIsolationManager, boolean recover) throws IOException {
+super(port, assignment, conf, supervisorId, state, 
resourceIsolationManager, recover);
+if (Utils.isOnWindows()) {
+throw new RuntimeException("ERROR: Windows doesn't support 
running workers as different users yet");
--- End diff --

UnsupportedOperationException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread Sahil Shah
I would like to throw my support behind SQE. Having working with it in a
production environment, I have seen the many benefits in testing new
topologies and quickly understanding what a topology is doing. As our data
needs have grown, we have only increased our reliance on SQE and it stands
the test repeatedly. I am excited at the opportunity to contribute to this
wonderful open source community.

On Fri, Sep 2, 2016 at 10:31 AM, Alex Halter  wrote:

> I too want to voice my support for SQE and our commitment to the initiative
> going forward. We've been working on adapting Storm to our needs for most
> of two years. It was thoughtfully designed and supports our production
> needs. We have a long list of features we want to build out and we'd love
> to work with the community.
>
>
> On Fri, Sep 2, 2016 at 10:19 AM, Rohit Garg 
> wrote:
>
> > I am one of the developers who has been working on SQE for past 1.5
> years.
> > Over time, we have made it more stable and production ready.
> >
> > As of now, one can easily scale SQE for more production data with easy
> > config changes and re-deploy, aggregate across different dimensions by
> > writing json like sql, write to different state stores and most
> > importantly, address new feature requirements really quick.(Since it's
> just
> > writing a sql like json file and sqe handles everything for you ! )
> >
> > I think SQE can really help companies who want to setup a production
> ready
> > and well tested framework within weeks (instead of months) for large
> scale
> > event stream processing and with minimum risks and limited resources. We
> > are actively working on SQE to make it more awesome and are committed to
> > make the experience of developing a highly scalable and fault tolerant
> > stream processing framework more seamless and less stressful 
> >
> > On Fri, Sep 2, 2016 at 9:49 AM, Lee Morris  wrote:
> >
> > > Hi, Storm Dev!
> > >
> > > I wanted to chime in to show support for SQE and show how committed we
> > are
> > > to SQE. *StormSQL looks awesome and has some real potential! *
> > >
> > > We use SQE in production. It has been tested, code reviewed, load
> tested,
> > > maintained, and processing an average of 8 million tuples per minute or
> > > more for over a year now. The investment into this code base has been
> > > significant.
> > >
> > > Please take a look at the code itself. The production quality code is
> > ready
> > > to go. Developers with no experience with Storm or even streaming
> > > successfully launch robust topologies using SQE.  Our productivity in
> > this
> > > area went up by orders of magnitude.
> > >
> > > Based on this experience we realized the value of querying storm, and
> we
> > > decided to give that value back to the storm community.
> > >
> > > Our data pipelines and real-time processing are very important to the
> > > success of JW Player. SQE has been a foundation for that. We will
> > continue
> > > to invest into this technology for years to come. Unfortunately we
> > wouldn't
> > > be able to adopt StormSQL as is until it has been put through the
> > crucible
> > > of production level usage and has had the same rigor applied. It seems
> > much
> > > of the development has been over the last couple of weeks.
> > >
> > > *Quick Gap Analysis (Not Exhaustive)*
> > > *States*
> > >   - SQE supports Redis and MongoDB as states in addition to Kafka.
> (Soon
> > > adding a Test/Monitor State)
> > >   - SQE supports non-static field names for Redis state
> > >   - Storm SQL supports Kafka
> > >   - SQE supports replay filtering for Kafka
> > >
> > > *Aggregations*
> > >   - SQE supports stateful, exactly-once aggregations for states that
> > > support it
> > >   - Storm SQL supports aggregations within each micro batch
> > >
> > > *SQL*
> > >   - StormSQL supports SQL
> > >  - SQE supports SQL "like" JSON
> > >
> > > *Scaling*
> > >   - SQE has a mechanism for controlling parallelism or scaling
> > >   - Could not find parallelism or scaling controls within StormSQL (May
> > > need to look harder)
> > >
> > > *Support for SQE*
> > > So far the SQE / JW Player developers have been watching this thread
> > > without knowing if we should chime in. I call upon the devs at JW to
> > chime
> > > in because we are dedicated to the success of this SQL in Storm.
> > >
> > > (Noticed I said "chime" three times in this email... well now four
> times)
> > >
> > > Thanks for reading,
> > >
> > > Lee Morris, Sr Principal Engineer, Data  |  JWPLAYER
> > >
> > > O: 212.244.0140 <212.244.0140%20x999>  |  M: 215.920.1331
> > >
> > > 2 Park Avenue, 10th Floor North, New York NY 10016
> > >
> > > jwplayer.com  |  @jwplayer 
> > >
> > > On Tue, Aug 30, 2016 at 5:46 PM, Jungtaek Lim 
> wrote:
> > >
> > > > Hi Morrigan,
> > > >
> > > > Thanks for joining discussion. I thought we need to hear your goal to
> > > > donate SQE 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77357693
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77356337
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77355840
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77355233
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread Alex Halter
I too want to voice my support for SQE and our commitment to the initiative
going forward. We've been working on adapting Storm to our needs for most
of two years. It was thoughtfully designed and supports our production
needs. We have a long list of features we want to build out and we'd love
to work with the community.


On Fri, Sep 2, 2016 at 10:19 AM, Rohit Garg  wrote:

> I am one of the developers who has been working on SQE for past 1.5 years.
> Over time, we have made it more stable and production ready.
>
> As of now, one can easily scale SQE for more production data with easy
> config changes and re-deploy, aggregate across different dimensions by
> writing json like sql, write to different state stores and most
> importantly, address new feature requirements really quick.(Since it's just
> writing a sql like json file and sqe handles everything for you ! )
>
> I think SQE can really help companies who want to setup a production ready
> and well tested framework within weeks (instead of months) for large scale
> event stream processing and with minimum risks and limited resources. We
> are actively working on SQE to make it more awesome and are committed to
> make the experience of developing a highly scalable and fault tolerant
> stream processing framework more seamless and less stressful 
>
> On Fri, Sep 2, 2016 at 9:49 AM, Lee Morris  wrote:
>
> > Hi, Storm Dev!
> >
> > I wanted to chime in to show support for SQE and show how committed we
> are
> > to SQE. *StormSQL looks awesome and has some real potential! *
> >
> > We use SQE in production. It has been tested, code reviewed, load tested,
> > maintained, and processing an average of 8 million tuples per minute or
> > more for over a year now. The investment into this code base has been
> > significant.
> >
> > Please take a look at the code itself. The production quality code is
> ready
> > to go. Developers with no experience with Storm or even streaming
> > successfully launch robust topologies using SQE.  Our productivity in
> this
> > area went up by orders of magnitude.
> >
> > Based on this experience we realized the value of querying storm, and we
> > decided to give that value back to the storm community.
> >
> > Our data pipelines and real-time processing are very important to the
> > success of JW Player. SQE has been a foundation for that. We will
> continue
> > to invest into this technology for years to come. Unfortunately we
> wouldn't
> > be able to adopt StormSQL as is until it has been put through the
> crucible
> > of production level usage and has had the same rigor applied. It seems
> much
> > of the development has been over the last couple of weeks.
> >
> > *Quick Gap Analysis (Not Exhaustive)*
> > *States*
> >   - SQE supports Redis and MongoDB as states in addition to Kafka. (Soon
> > adding a Test/Monitor State)
> >   - SQE supports non-static field names for Redis state
> >   - Storm SQL supports Kafka
> >   - SQE supports replay filtering for Kafka
> >
> > *Aggregations*
> >   - SQE supports stateful, exactly-once aggregations for states that
> > support it
> >   - Storm SQL supports aggregations within each micro batch
> >
> > *SQL*
> >   - StormSQL supports SQL
> >  - SQE supports SQL "like" JSON
> >
> > *Scaling*
> >   - SQE has a mechanism for controlling parallelism or scaling
> >   - Could not find parallelism or scaling controls within StormSQL (May
> > need to look harder)
> >
> > *Support for SQE*
> > So far the SQE / JW Player developers have been watching this thread
> > without knowing if we should chime in. I call upon the devs at JW to
> chime
> > in because we are dedicated to the success of this SQL in Storm.
> >
> > (Noticed I said "chime" three times in this email... well now four times)
> >
> > Thanks for reading,
> >
> > Lee Morris, Sr Principal Engineer, Data  |  JWPLAYER
> >
> > O: 212.244.0140 <212.244.0140%20x999>  |  M: 215.920.1331
> >
> > 2 Park Avenue, 10th Floor North, New York NY 10016
> >
> > jwplayer.com  |  @jwplayer 
> >
> > On Tue, Aug 30, 2016 at 5:46 PM, Jungtaek Lim  wrote:
> >
> > > Hi Morrigan,
> > >
> > > Thanks for joining discussion. I thought we need to hear your goal to
> > > donate SQE code, and opinion for how to apply SQE to Storm SQL and
> > working
> > > on further improvements.
> > >
> > > Not sure when you took a look at the feature set of Storm SQL, but if
> you
> > > haven't recently, you may want to do that.
> > > I started working on improving Storm SQL several weeks ago, and many
> > things
> > > are addressed in recent weeks.
> > >
> > > * STORM-1435 : You
> can
> > > easily launch Storm SQL runner without concerning dependencies for
> Storm
> > > SQL core and runtime. It wasn't easy to run before STORM-2016
> > >  is introduced.
> > > * Refactored Storm SQL code 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77353254
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/ReadClusterState.java ---
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.cluster.VersionedData;
+import org.apache.storm.daemon.supervisor.Slot.MachineState;
+import org.apache.storm.daemon.supervisor.Slot.TopoProfileAction;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.ILocalizer;
+import org.apache.storm.messaging.IContext;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ReadClusterState implements Runnable, AutoCloseable {
+private static final Logger LOG = 
LoggerFactory.getLogger(ReadClusterState.class);
+
+private final Map superConf;
+private final IStormClusterState stormClusterState;
+private final EventManager syncSupEventManager;
+private final AtomicReference> 
assignmentVersions;
+private final Map slots = new HashMap<>();
+private final AtomicInteger readRetry = new AtomicInteger(0);
+private final String assignmentId;
+private final ISupervisor iSuper;
+private final ILocalizer localizer;
+private final ContainerLauncher launcher;
+private final String host;
+private final LocalState localState;
+private final IStormClusterState clusterState;
+private final AtomicReference> 
cachedAssignments;
+
+public ReadClusterState(Supervisor supervisor) throws Exception {
+this(supervisor.getConf(), supervisor.getStormClusterState(), 
supervisor.getEventManger(),
+supervisor.getAssignmentId(), supervisor.getiSupervisor(),
+supervisor.getAsyncLocalizer(), supervisor.getHostName(),
+supervisor.getLocalState(), 
supervisor.getStormClusterState(),
+supervisor.getCurrAssignment(), 
supervisor.getSharedContext());
+}
+
+public ReadClusterState(Map superConf, 
IStormClusterState stormClusterState,
+EventManager syncSupEventManager, String assignmentId, 
ISupervisor iSuper,
+ILocalizer localizer, String host, LocalState localState,
+IStormClusterState clusterState, AtomicReference> cachedAssignments,
+IContext sharedContext) throws Exception{
+this.superConf = superConf;
+this.stormClusterState = stormClusterState;
+this.syncSupEventManager = syncSupEventManager;
+this.assignmentVersions = new AtomicReference>(new HashMap());
+this.assignmentId = assignmentId;
+this.iSuper = iSuper;
+this.localizer = localizer;
+this.host = host;
+this.localState = localState;
+this.clusterState = clusterState;
+

Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread Donato Borrello
It's really cool to see this discussion about Storm SQE - it's a project
we put a lot of love and sweat into!

You can put me on the list of people who are interested in contributing as
part of
the Storm community. I've been excited about working with Storm ever since
reading
Nathan Marz's book, Big Data.


On Fri, Sep 2, 2016 at 10:07 AM, Kelvin Shek  wrote:

> Being relatively new to Storm before I came to jwplayer, SQE has made it
> extremely easy for me to pick up and hit the ground running.  The code is
> robust, and I look forward to contributing to the project to ensure its
> continued success.
>
>
>
> On Fri, Sep 2, 2016 at 9:49 AM, Lee Morris  wrote:
>
> > Hi, Storm Dev!
> >
> > I wanted to chime in to show support for SQE and show how committed we
> are
> > to SQE. *StormSQL looks awesome and has some real potential! *
> >
> > We use SQE in production. It has been tested, code reviewed, load tested,
> > maintained, and processing an average of 8 million tuples per minute or
> > more for over a year now. The investment into this code base has been
> > significant.
> >
> > Please take a look at the code itself. The production quality code is
> ready
> > to go. Developers with no experience with Storm or even streaming
> > successfully launch robust topologies using SQE.  Our productivity in
> this
> > area went up by orders of magnitude.
> >
> > Based on this experience we realized the value of querying storm, and we
> > decided to give that value back to the storm community.
> >
> > Our data pipelines and real-time processing are very important to the
> > success of JW Player. SQE has been a foundation for that. We will
> continue
> > to invest into this technology for years to come. Unfortunately we
> wouldn't
> > be able to adopt StormSQL as is until it has been put through the
> crucible
> > of production level usage and has had the same rigor applied. It seems
> much
> > of the development has been over the last couple of weeks.
> >
> > *Quick Gap Analysis (Not Exhaustive)*
> > *States*
> >   - SQE supports Redis and MongoDB as states in addition to Kafka. (Soon
> > adding a Test/Monitor State)
> >   - SQE supports non-static field names for Redis state
> >   - Storm SQL supports Kafka
> >   - SQE supports replay filtering for Kafka
> >
> > *Aggregations*
> >   - SQE supports stateful, exactly-once aggregations for states that
> > support it
> >   - Storm SQL supports aggregations within each micro batch
> >
> > *SQL*
> >   - StormSQL supports SQL
> >  - SQE supports SQL "like" JSON
> >
> > *Scaling*
> >   - SQE has a mechanism for controlling parallelism or scaling
> >   - Could not find parallelism or scaling controls within StormSQL (May
> > need to look harder)
> >
> > *Support for SQE*
> > So far the SQE / JW Player developers have been watching this thread
> > without knowing if we should chime in. I call upon the devs at JW to
> chime
> > in because we are dedicated to the success of this SQL in Storm.
> >
> > (Noticed I said "chime" three times in this email... well now four times)
> >
> > Thanks for reading,
> >
> > Lee Morris, Sr Principal Engineer, Data  |  JWPLAYER
> >
> > O: 212.244.0140 <212.244.0140%20x999>  |  M: 215.920.1331
> >
> > 2 Park Avenue, 10th Floor North, New York NY 10016
> >
> > jwplayer.com  |  @jwplayer 
> >
> > On Tue, Aug 30, 2016 at 5:46 PM, Jungtaek Lim  wrote:
> >
> > > Hi Morrigan,
> > >
> > > Thanks for joining discussion. I thought we need to hear your goal to
> > > donate SQE code, and opinion for how to apply SQE to Storm SQL and
> > working
> > > on further improvements.
> > >
> > > Not sure when you took a look at the feature set of Storm SQL, but if
> you
> > > haven't recently, you may want to do that.
> > > I started working on improving Storm SQL several weeks ago, and many
> > things
> > > are addressed in recent weeks.
> > >
> > > * STORM-1435 : You
> can
> > > easily launch Storm SQL runner without concerning dependencies for
> Storm
> > > SQL core and runtime. It wasn't easy to run before STORM-2016
> > >  is introduced.
> > > * Refactored Storm SQL code for Trident to fit to Trident operations.
> > Storm
> > > SQL parsed SQL and generated topology code but it was not easy to know
> > how
> > > topology code is generated, and also hard to determine how Trident
> > > optimizations are applied.
> > > * STORM-1434 ,
> > > STORM-2050
> > > : Addressed GROUP BY
> > > with
> > > UDAF (User Defined Aggregate Function) on Trident mode. Storm SQL
> already
> > > supported UDF on Trident mode.
> > > * STORM-2057 : JOIN
> > > (inner, left outer, right outer, full outer) feature is now on
> reviewing.
> > > Note that only 

Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread Kelvin Shek
Being relatively new to Storm before I came to jwplayer, SQE has made it
extremely easy for me to pick up and hit the ground running.  The code is
robust, and I look forward to contributing to the project to ensure its
continued success.



On Fri, Sep 2, 2016 at 9:49 AM, Lee Morris  wrote:

> Hi, Storm Dev!
>
> I wanted to chime in to show support for SQE and show how committed we are
> to SQE. *StormSQL looks awesome and has some real potential! *
>
> We use SQE in production. It has been tested, code reviewed, load tested,
> maintained, and processing an average of 8 million tuples per minute or
> more for over a year now. The investment into this code base has been
> significant.
>
> Please take a look at the code itself. The production quality code is ready
> to go. Developers with no experience with Storm or even streaming
> successfully launch robust topologies using SQE.  Our productivity in this
> area went up by orders of magnitude.
>
> Based on this experience we realized the value of querying storm, and we
> decided to give that value back to the storm community.
>
> Our data pipelines and real-time processing are very important to the
> success of JW Player. SQE has been a foundation for that. We will continue
> to invest into this technology for years to come. Unfortunately we wouldn't
> be able to adopt StormSQL as is until it has been put through the crucible
> of production level usage and has had the same rigor applied. It seems much
> of the development has been over the last couple of weeks.
>
> *Quick Gap Analysis (Not Exhaustive)*
> *States*
>   - SQE supports Redis and MongoDB as states in addition to Kafka. (Soon
> adding a Test/Monitor State)
>   - SQE supports non-static field names for Redis state
>   - Storm SQL supports Kafka
>   - SQE supports replay filtering for Kafka
>
> *Aggregations*
>   - SQE supports stateful, exactly-once aggregations for states that
> support it
>   - Storm SQL supports aggregations within each micro batch
>
> *SQL*
>   - StormSQL supports SQL
>  - SQE supports SQL "like" JSON
>
> *Scaling*
>   - SQE has a mechanism for controlling parallelism or scaling
>   - Could not find parallelism or scaling controls within StormSQL (May
> need to look harder)
>
> *Support for SQE*
> So far the SQE / JW Player developers have been watching this thread
> without knowing if we should chime in. I call upon the devs at JW to chime
> in because we are dedicated to the success of this SQL in Storm.
>
> (Noticed I said "chime" three times in this email... well now four times)
>
> Thanks for reading,
>
> Lee Morris, Sr Principal Engineer, Data  |  JWPLAYER
>
> O: 212.244.0140 <212.244.0140%20x999>  |  M: 215.920.1331
>
> 2 Park Avenue, 10th Floor North, New York NY 10016
>
> jwplayer.com  |  @jwplayer 
>
> On Tue, Aug 30, 2016 at 5:46 PM, Jungtaek Lim  wrote:
>
> > Hi Morrigan,
> >
> > Thanks for joining discussion. I thought we need to hear your goal to
> > donate SQE code, and opinion for how to apply SQE to Storm SQL and
> working
> > on further improvements.
> >
> > Not sure when you took a look at the feature set of Storm SQL, but if you
> > haven't recently, you may want to do that.
> > I started working on improving Storm SQL several weeks ago, and many
> things
> > are addressed in recent weeks.
> >
> > * STORM-1435 : You can
> > easily launch Storm SQL runner without concerning dependencies for Storm
> > SQL core and runtime. It wasn't easy to run before STORM-2016
> >  is introduced.
> > * Refactored Storm SQL code for Trident to fit to Trident operations.
> Storm
> > SQL parsed SQL and generated topology code but it was not easy to know
> how
> > topology code is generated, and also hard to determine how Trident
> > optimizations are applied.
> > * STORM-1434 ,
> > STORM-2050
> > : Addressed GROUP BY
> > with
> > UDAF (User Defined Aggregate Function) on Trident mode. Storm SQL already
> > supported UDF on Trident mode.
> > * STORM-2057 : JOIN
> > (inner, left outer, right outer, full outer) feature is now on reviewing.
> > Note that only equi-join is supported.
> >
> > The changes are not included to official release yet, but I expect Storm
> > 1.1.0 will include them which are worth to try out for early adopters.
> >
> > You can also refer STORM-1433
> >  for current phase of
> > Storm SQL. Might need to have another phases (epics) for resolving other
> > issues as well.
> >
> > I only had a look at SQE wiki so don't know the detailed features of SQE,
> > but my feeling is that recent changes fills the gap between SQE and Storm
> > SQL, and even addressing some TODOs of SQE. We might need to cross 

[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread d2r
Github user d2r commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77350298
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/Container.java ---
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.Writer;
+import java.lang.ProcessBuilder.Redirect;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Represents a container that a worker will run in.
+ */
+public abstract class Container implements Killable {
+private static final Logger LOG = 
LoggerFactory.getLogger(BasicContainer.class);
+protected final Map _conf;
+protected final Map _topoConf;
+protected String _workerId;
+protected final String _topologyId;
+protected final String _supervisorId;
+protected final int _port;
+protected final LocalAssignment _assignment;
+protected final AdvancedFSOps _ops;
+protected final ResourceIsolationInterface _resourceIsolationManager;
+
+//Exposed for testing
+protected Container(AdvancedFSOps ops, int port, LocalAssignment 
assignment,
+Map conf, Map topoConf, String 
supervisorId, 
+ResourceIsolationInterface resourceIsolationManager) throws 
IOException {
+assert((assignment == null && port <= 0) ||
+(assignment != null && port > 0));
+assert(conf != null);
+assert(ops != null);
+assert(supervisorId != null);
+
+_port = port;
+_ops = ops;
+_assignment = assignment;
+if (assignment != null) {
+_topologyId = assignment.get_topology_id();
+} else {
+_topologyId = null;
+}
+_conf = conf;
+_supervisorId = supervisorId;
+_resourceIsolationManager = resourceIsolationManager;
+if (topoConf == null) {
+_topoConf = readTopoConf();
+} else {
+_topoConf = topoConf;
+}
+}
+
+@Override
+public String toString() {
+return this.getClass().getSimpleName() + " topo:" + _topologyId + 
" worker:" + _workerId;
+}
+
+protected Map readTopoConf() throws IOException {
+assert(_topologyId != null);
+return ConfigUtils.readSupervisorStormConf(_conf, _topologyId);
+}
+
+protected Container(int port, LocalAssignment assignment, Map conf, 
+String supervisorId, ResourceIsolationInterface 
resourceIsolationManager) throws IOException {
+this(AdvancedFSOps.make(conf), port, assignment, conf, null, 
supervisorId, resourceIsolationManager);
+}
+
+/**
+ * Constructor to use when trying to recover a container from just the 
worker ID.
+ * @param workerId the id of the worker
+ * @param conf the config of the supervisor
+ * @param supervisorId the id of the 

Re: [DISCUSS] Accept JW Player SQE Code Donation

2016-09-02 Thread Lee Morris
Hi, Storm Dev!

I wanted to chime in to show support for SQE and show how committed we are
to SQE. *StormSQL looks awesome and has some real potential! *

We use SQE in production. It has been tested, code reviewed, load tested,
maintained, and processing an average of 8 million tuples per minute or
more for over a year now. The investment into this code base has been
significant.

Please take a look at the code itself. The production quality code is ready
to go. Developers with no experience with Storm or even streaming
successfully launch robust topologies using SQE.  Our productivity in this
area went up by orders of magnitude.

Based on this experience we realized the value of querying storm, and we
decided to give that value back to the storm community.

Our data pipelines and real-time processing are very important to the
success of JW Player. SQE has been a foundation for that. We will continue
to invest into this technology for years to come. Unfortunately we wouldn't
be able to adopt StormSQL as is until it has been put through the crucible
of production level usage and has had the same rigor applied. It seems much
of the development has been over the last couple of weeks.

*Quick Gap Analysis (Not Exhaustive)*
*States*
  - SQE supports Redis and MongoDB as states in addition to Kafka. (Soon
adding a Test/Monitor State)
  - SQE supports non-static field names for Redis state
  - Storm SQL supports Kafka
  - SQE supports replay filtering for Kafka

*Aggregations*
  - SQE supports stateful, exactly-once aggregations for states that
support it
  - Storm SQL supports aggregations within each micro batch

*SQL*
  - StormSQL supports SQL
 - SQE supports SQL "like" JSON

*Scaling*
  - SQE has a mechanism for controlling parallelism or scaling
  - Could not find parallelism or scaling controls within StormSQL (May
need to look harder)

*Support for SQE*
So far the SQE / JW Player developers have been watching this thread
without knowing if we should chime in. I call upon the devs at JW to chime
in because we are dedicated to the success of this SQL in Storm.

(Noticed I said "chime" three times in this email... well now four times)

Thanks for reading,

Lee Morris, Sr Principal Engineer, Data  |  JWPLAYER

O: 212.244.0140 <212.244.0140%20x999>  |  M: 215.920.1331

2 Park Avenue, 10th Floor North, New York NY 10016

jwplayer.com  |  @jwplayer 

On Tue, Aug 30, 2016 at 5:46 PM, Jungtaek Lim  wrote:

> Hi Morrigan,
>
> Thanks for joining discussion. I thought we need to hear your goal to
> donate SQE code, and opinion for how to apply SQE to Storm SQL and working
> on further improvements.
>
> Not sure when you took a look at the feature set of Storm SQL, but if you
> haven't recently, you may want to do that.
> I started working on improving Storm SQL several weeks ago, and many things
> are addressed in recent weeks.
>
> * STORM-1435 : You can
> easily launch Storm SQL runner without concerning dependencies for Storm
> SQL core and runtime. It wasn't easy to run before STORM-2016
>  is introduced.
> * Refactored Storm SQL code for Trident to fit to Trident operations. Storm
> SQL parsed SQL and generated topology code but it was not easy to know how
> topology code is generated, and also hard to determine how Trident
> optimizations are applied.
> * STORM-1434 ,
> STORM-2050
> : Addressed GROUP BY
> with
> UDAF (User Defined Aggregate Function) on Trident mode. Storm SQL already
> supported UDF on Trident mode.
> * STORM-2057 : JOIN
> (inner, left outer, right outer, full outer) feature is now on reviewing.
> Note that only equi-join is supported.
>
> The changes are not included to official release yet, but I expect Storm
> 1.1.0 will include them which are worth to try out for early adopters.
>
> You can also refer STORM-1433
>  for current phase of
> Storm SQL. Might need to have another phases (epics) for resolving other
> issues as well.
>
> I only had a look at SQE wiki so don't know the detailed features of SQE,
> but my feeling is that recent changes fills the gap between SQE and Storm
> SQL, and even addressing some TODOs of SQE. We might need to cross check
> feature set of each project to make clear on pros and cons for each
> project.
>
> Btw, while Storm SQL has been implemented its missing features, the
> difficult part for Storm SQL is SQL optimizations. There seems lots of SQL
> optimizations (like filter pushdown) but I'm not expert on that and it
> apparently needs more deep understanding of Calcite. Other parts also need
> contributors but we strongly need contributors in this area.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>
> 2016년 8월 31일 (수) 오전 

[GitHub] storm issue #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-09-02 Thread vesense
Github user vesense commented on the issue:

https://github.com/apache/storm/pull/1665
  
@HeartSaVioR OK, have a good time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-09-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1665
  
I'd like to wait for @priyank5485 and @abellina to finish reviewing. I'm 
now reviewing and testing Supervisor V2 and it could take some times.
FYI: I'm on vacation on next week, and will be traveling in several days 
(not too long but not available for that moment).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Storm SQL Phase 3 created

2016-09-02 Thread Jungtaek Lim
Yeah that would be nice. Thanks for the suggestion Xin.

- Jungtaek Lim (HeartSaVioR)

2016년 9월 2일 (금) 오후 6:47, Xin Wang 님이 작성:

> how about move storm-sql-kafka to a new directory like storm-sql-external
> which contains various storm-sql data sources?
>
> 2016-09-02 10:31 GMT+08:00 Jungtaek Lim :
>
> > Hi devs,
> >
> > I just created epic issue for Storm SQL phase 3 which tracks efforts for
> > adding available data sources for Storm SQL.
> > https://issues.apache.org/jira/browse/STORM-2075
> >
> > Currently Storm SQL only supports Apache Kafka as producer (input table)
> > and consumer (output table) which is not sufficient for industry use
> cases.
> > Since we support various external modules as spout and bolt, and also
> > Trident state, we can expand them to data sources of Storm SQL.
> > The implementation of storm-sql-kafka
> > <
> https://github.com/apache/storm/tree/master/external/sql/storm-sql-kafka>
> > is simple enough so adding other modules as data source should be simple,
> > too.
> >
> > There're still some other works in Phase II
> >  to make Storm SQL
> > production ready, so any contributions on Storm SQL would be really
> > appreciated.
> >
> > Please let me know if you have any questions or suggestions.
> >
> > Thanks,
> > Jungtaek Lim (HeartSaVioR)
> >
>


[GitHub] storm issue #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1642
  
I found another issue: 
When I rebalance 3 workers into 1 worker, all workers are killed first 
(expected) and AsyncLocalizer clear out topology codes since all workers are 
killed. 
But Supervisor doesn't download topology code again while starting new 
worker, so it goes wrong and worker and/or supervisor are killed.

This seems to be a kind of race condition (Slot and AsyncLocalizer) and I 
saw two scenarios: 

1. Worker can be launched but topology directory is removed after launching 
so worker is crashed. Slot tries to relaunch worker but throws 
IllegalStateException because topology directory is gone and supervisor also be 
killed.

2. Supervisor is killed even before launching worker.

After this, Supervisor will consistently be killed unless clearing out 
supervisor directory as same as above comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1642
  
With my patch (symlink issue and NPE issue) I can see workers launched and 
killed by Supervisor V2. (remote)

Tested:
- kill worker process with -9
- rebalance with different worker count
- jstack dump, heap dump, restart worker


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1642
  
Found other issue: 
Constructor of BasicContainer (in fact constructor of Container) throws FNF 
when topology dist files are deleted. So even though Slot is creating with 
recover mode, Supervisor is killed instead of throwing 
ContainerRecoveryException. Supervisor will consistently be killed unless we 
remove the assignment for that - killing topology.

This is a weird case but can be happening, and I guess that's why 
ContainerRecoveryException exists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread HeartSaVioR
Github user HeartSaVioR commented on the issue:

https://github.com/apache/storm/pull/1642
  
I merged this in my local, and do some tests, and see:

1. integration test fails from my dev. machine

- I'm using OSX 10.11, Java 1.8.0_66
- I got wrong IP address from integration test. I'm using IP sharing router 
but assigned IP is even not same as external IP.
-- This is similar to what I consistently observed from my shared office. 
-- In my office even build for current master is failing. (I'm testing this 
pull request in home now.)
-- Strange thing is that normal unit tests are not failing for this build. 
In my office it's also failing.

2. There's a bug for creating symbolic link for dependencies, and also 
createSymbolicLink.  I'll leave a comment to diff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77321291
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -1990,31 +1995,26 @@ protected void forceDeleteImpl(String path) throws 
IOException {
 }
 
 /**
- * Creates a symbolic link to the target
+ * Creates a symbolic link to the target and force the creation if the 
target already exists
  * @param dir the parent directory of the link
  * @param targetDir the parent directory of the link's target
- * @param filename the file name of the link
  * @param targetFilename the file name of the links target
+ * @param filename the file name of the link
  * @throws IOException
  */
-public static void createSymlink(String dir, String targetDir,
-String filename, String targetFilename) throws IOException {
+public static void forceCreateSymlink(String dir, String targetDir,
--- End diff --

The sequence of parameters have been swapped: targetFilename and filename.
Since parameters of parent directory are link and target, it would be 
natural to keep that sequence to filename, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: Storm SQL Phase 3 created

2016-09-02 Thread Xin Wang
how about move storm-sql-kafka to a new directory like storm-sql-external
which contains various storm-sql data sources?

2016-09-02 10:31 GMT+08:00 Jungtaek Lim :

> Hi devs,
>
> I just created epic issue for Storm SQL phase 3 which tracks efforts for
> adding available data sources for Storm SQL.
> https://issues.apache.org/jira/browse/STORM-2075
>
> Currently Storm SQL only supports Apache Kafka as producer (input table)
> and consumer (output table) which is not sufficient for industry use cases.
> Since we support various external modules as spout and bolt, and also
> Trident state, we can expand them to data sources of Storm SQL.
> The implementation of storm-sql-kafka
> 
> is simple enough so adding other modules as data source should be simple,
> too.
>
> There're still some other works in Phase II
>  to make Storm SQL
> production ready, so any contributions on Storm SQL would be really
> appreciated.
>
> Please let me know if you have any questions or suggestions.
>
> Thanks,
> Jungtaek Lim (HeartSaVioR)
>


[GitHub] storm issue #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-09-02 Thread vesense
Github user vesense commented on the issue:

https://github.com/apache/storm/pull/1665
  
@HeartSaVioR Can we merge this in? and, it's very easy to cherry-pick to 
1.x-branch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1665: STORM-2074: fix storm-kafka-monitor NPE bug

2016-09-02 Thread vesense
Github user vesense commented on a diff in the pull request:

https://github.com/apache/storm/pull/1665#discussion_r77310159
  
--- Diff: 
external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 ---
@@ -89,6 +89,10 @@ public static void main (String args[]) {
 printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " 
and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required  with " +
 OPTION_OLD_CONSUMER_LONG);
 }
+String zkNode = 
commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG);
--- End diff --

we can just remove the check since 
`curatorFramework.checkExists().forPath(zkPath)` will do the same check for 
zkpath. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1565: STORM-1970: external project examples refator

2016-09-02 Thread vesense
Github user vesense commented on the issue:

https://github.com/apache/storm/pull/1565
  
Thanks @HeartSaVioR  I see. Will update.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (STORM-2077) KafkaSpout doesn't retry failed tuples

2016-09-02 Thread Manu Zhang (JIRA)

[ 
https://issues.apache.org/jira/browse/STORM-2077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15457702#comment-15457702
 ] 

Manu Zhang commented on STORM-2077:
---

[~tobiasmaier], have you turned on acker ?

> KafkaSpout doesn't retry failed tuples
> --
>
> Key: STORM-2077
> URL: https://issues.apache.org/jira/browse/STORM-2077
> Project: Apache Storm
>  Issue Type: Bug
>  Components: storm-kafka
>Affects Versions: 1.0.2
>Reporter: Tobias Maier
>
> KafkaSpout does not retry all failed tuples.
> We used following Configuration:
> Map props = new HashMap<>();
> props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "c1");
> props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
> props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
> ByteArrayDeserializer.class.getName());
> props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
> broker.bootstrapServer());
> KafkaSpoutStreams kafkaSpoutStreams = new 
> KafkaSpoutStreams.Builder(FIELDS_KAFKA_EVENT, new 
> String[]{"test-topic"}).build();
> KafkaSpoutTuplesBuilder kafkaSpoutTuplesBuilder = new 
> KafkaSpoutTuplesBuilder.Builder<>(new 
> KeyValueKafkaSpoutTupleBuilder("test-topic")).build();
> KafkaSpoutRetryService retryService = new 
> KafkaSpoutLoggedRetryExponentialBackoff(KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1),
>  KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.milliSeconds(1), 3, 
> KafkaSpoutLoggedRetryExponentialBackoff.TimeInterval.seconds(1));
> KafkaSpoutConfig config = new 
> KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, kafkaSpoutTuplesBuilder, 
> retryService)
> .setFirstPollOffsetStrategy(UNCOMMITTED_LATEST)
> .setMaxUncommittedOffsets(30)
> .setOffsetCommitPeriodMs(10)
> .setMaxRetries(3)
> .build();
> kafkaSpout = new org.apache.storm.kafka.spout.KafkaSpout<>(config);
> The downstream bolt fails every tuple and we expect, that those tuple will 
> all be replayed. But that's not the case for every tuple.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] storm pull request #1642: STORM-2018: Supervisor V2.

2016-09-02 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/1642#discussion_r77299756
  
--- Diff: 
storm-core/test/jvm/org/apache/storm/daemon/supervisor/BasicContainerTest.java 
---
@@ -0,0 +1,459 @@
+package org.apache.storm.daemon.supervisor;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.Config;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.junit.Test;
+
+public class BasicContainerTest {
+public static class CommandRun {
+final List cmd;
+final Map env;
+final File pwd;
+
+public CommandRun(List cmd, Map env, File 
pwd) {
+this.cmd = cmd;
+this.env = env;
+this.pwd = pwd;
+}
+}
+
+public static class MockBasicContainer extends BasicContainer {
+public final List profileCmds = new ArrayList<>();
+public final List workerCmds = new ArrayList<>();
+
+public MockBasicContainer(int port, LocalAssignment assignment, 
Map conf,
+String supervisorId, LocalState localState, 
ResourceIsolationInterface resourceIsolationManager,
+boolean recover) throws IOException {
+super(port, assignment, conf, supervisorId, localState, 
resourceIsolationManager, recover);
+}
+
+public MockBasicContainer(AdvancedFSOps ops, int port, 
LocalAssignment assignment,
+Map conf, Map topoConf, 
String supervisorId, 
+ResourceIsolationInterface resourceIsolationManager, 
LocalState localState,
+String profileCmd) throws IOException {
+super(ops, port, assignment, conf, topoConf, supervisorId, 
resourceIsolationManager, localState, profileCmd);
+}
+
+@Override
+protected Map readTopoConf() throws IOException {
+return new HashMap<>();
+}
+
+@Override
+public void createNewWorkerId() {
+super.createNewWorkerId();
+}
+
+@Override
+public List substituteChildopts(Object value, int 
memOnheap) {
+return super.substituteChildopts(value, memOnheap);
+}
+   
+@Override
+protected boolean runProfilingCommand(List command, 
Map env, String logPrefix,
+File targetDir) throws IOException, InterruptedException {
+profileCmds.add(new CommandRun(command, env, targetDir));
+return true;
+}
+
+@Override
+protected void launchWorkerProcess(List command, 
Map env, String logPrefix,
+ExitCodeCallback processExitCallback, File targetDir) 
throws IOException {
+workerCmds.add(new CommandRun(command, env, targetDir));
+}
+
+@Override
+protected String javaCmd(String cmd) {
+//avoid system dependent things
+return cmd;
+}
+
+@Override
+protected List frameworkClasspath() {
+//We are not really running anything so make this
+// simple to check for
+return Arrays.asList("FRAMEWORK_CP");
+}
+
+@Override
+protected String javaLibraryPath(String stormRoot, Map conf) {
+return "JLP";
+}
+}
+
+@Test
+public void testCreateNewWorkerId() throws Exception {
+final String topoId = "test_topology";
+final int port = 8080;
+LocalAssignment la = new LocalAssignment();
+la.set_topology_id(topoId);
+
+AdvancedFSOps ops = mock(AdvancedFSOps.class);
+
+LocalState ls = mock(LocalState.class);
+
+MockBasicContainer mc = new MockBasicContainer(ops, port, la, new 
HashMap