[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java

2016-08-02 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1445
  
@HeartSaVioR thanks for your careful review, all comments addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-02 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73185119
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap<Long, TupleInfo> pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map<String, String> credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map<Integer, Task> idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback<Long, TupleInfo>() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-02 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73178126
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap<Long, TupleInfo> pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map<String, String> credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map<Integer, Task> idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback<Long, TupleInfo>() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-02 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73177311
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+
+private final SpoutExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final MutableLong emittedCount;
+private final boolean hasAckers;
+private final Random random;
+private final Boolean isEventLoggers;
+private final Boolean isDebug;
+private final RotatingMap<Long, TupleInfo> pending;
+
+@SuppressWarnings("unused")
+public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, 
Task taskData, int taskId,
+MutableLong emittedCount, boolean 
hasAckers, Random random,
+Boolean isEventLoggers, Boolean 
isDebug, RotatingMap<Long, TupleInfo> pending) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.emittedCount = emittedCount;
+this.hasAckers = hasAckers;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+this.pending = pending;
+}
+
+@Override
+public List emit(String streamId, List tuple, Object 
messageId) {
+return sendSpoutMsg(streamId, tuple, messageId, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, List 
tuple, Object messageId) {
+sendSpoutMsg(streamId, tuple, messageId, taskId);
+}
+
+@Override
+public long getPendingCount() {
+return pending.size();
+}
+
+@Override
+public void reportError(Throwable error) {
+executor.getReportError().report(error);
+}
+
+private List sendSpoutMsg(String stream, List values, 
Object messageId, Integer outTaskId) {
+emittedCount.increment();
+
+List outTasks;
+if (outTaskId != null) {
+outTasks = taskData.getOutgoingTasks(outTaskId, stream, 
values);
+} else {
+outTasks = taskData.getOutgoingTasks(stream, values);
+}
+
+List ackSeq = new ArrayList<>();
+boolean needAck = (messageId != null) && hasAckers;
+
+long rootId = MessageId.generateId(random);
+for (Integer t : outTasks) {
+MessageId msgId;
+if (needAck) {
+long as = MessageId.generateId(random);
+msgId = MessageId.makeRootId(rootId, as);
+ackSeq.add(as);
+} else {
+msgId = MessageId.makeUnanchored();
+}
+
+TupleImpl tuple = new 
TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, 
msgId);
+executor.getExecutorTransfer().transfer(t, tuple);
+}
+if (isEventLoggers) {
+executor.sendToEventLogger(executor, taskDat

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-02 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73177059
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java 
---
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+private final BoltExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final Random random;
+private final boolean isEventLoggers;
+private final boolean isDebug;
+
+public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, 
int taskId, Random random,
+   boolean isEventLoggers, boolean 
isDebug) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+}
+
+public List emit(String streamId, Collection anchors, 
List tuple) {
+return boltEmit(streamId, anchors, tuple, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, Collection 
anchors, List tuple) {
+boltEmit(streamId, anchors, tuple, taskId);
+}
+
+private List boltEmit(String streamId, Collection 
anchors, List values, Integer targetTaskId) {
+List outTasks;
+if (targetTaskId != null) {
+outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, 
values);
+} else {
+outTasks = taskData.getOutgoingTasks(streamId, values);
+}
+
+for (Integer t : outTasks) {
+Map<Long, Long> anchorsToIds = new HashMap<>();
+if (anchors != null) {
+for (Tuple a : anchors) {
+long edgeId = MessageId.generateId(random);
+((TupleImpl) a).updateAckVal(edgeId);
+for (Long root_id : 
a.getMessageId().getAnchorsToIds().keySet()) {
+putXor(anchorsToIds, root_id, edgeId);
+}
+}
+}
+MessageId msgId = MessageId.makeId(anchorsToIds);
+TupleImpl tupleExt = new 
TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
+executor.getExecutorTransfer().transfer(t, tupleExt);
+}
+if (isEventLoggers) {
+executor.sendToEventLogger(executor, taskData, values, 
executor.getComponentId(), null, random);
+}
+return outTasks;
--- End diff --

changed `getOutgoingTasks` method to ensure this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastruct

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-02 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73176900
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java 
---
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.bolt;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.hooks.info.BoltAckInfo;
+import org.apache.storm.hooks.info.BoltFailInfo;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BoltOutputCollectorImpl implements IOutputCollector {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BoltOutputCollectorImpl.class);
+
+private final BoltExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final Random random;
+private final boolean isEventLoggers;
+private final boolean isDebug;
+
+public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, 
int taskId, Random random,
+   boolean isEventLoggers, boolean 
isDebug) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+}
+
+public List emit(String streamId, Collection anchors, 
List tuple) {
+return boltEmit(streamId, anchors, tuple, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, Collection 
anchors, List tuple) {
+boltEmit(streamId, anchors, tuple, taskId);
+}
+
+private List boltEmit(String streamId, Collection 
anchors, List values, Integer targetTaskId) {
+List outTasks;
+if (targetTaskId != null) {
+outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, 
values);
+} else {
+outTasks = taskData.getOutgoingTasks(streamId, values);
+}
+
+for (Integer t : outTasks) {
+Map<Long, Long> anchorsToIds = new HashMap<>();
+if (anchors != null) {
+for (Tuple a : anchors) {
+long edgeId = MessageId.generateId(random);
--- End diff --

you're correct, will address


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-02 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73174655
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference<Map<String, DebugOptions>> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState stormClusterState;
+protected final Map<Integer, String> taskToComponent;
+protected CommonStats stats;
+protected final Map<Integer, Map<Integer, Map<Strin

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-02 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73174615
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference<Map<String, DebugOptions>> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState stormClusterState;
+protected final Map<Integer, String> taskToComponent;
+protected CommonStats stats;
+protected final Map<Integer, Map<Integer, Map<Strin

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r73085409
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap<Long, TupleInfo> pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map<String, String> credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map<Integer, Task> idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback<Long, TupleInfo>() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72961638
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap<Long, TupleInfo> pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map<String, String> credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map<Integer, Task> idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
+this.spouts = new ArrayList<>();
+for (Task task : idToTask.values()) {
+this.spouts.add((ISpout) task.getTaskObject());
+}
+this.pending = new RotatingMap<>(2, new 
RotatingMap.ExpiredCallback<Long, TupleInfo>() {
+@Override
+public void expire(Long key, TupleInfo tupleInfo) {
+Long timeDelta = null;
+if (tupleInfo.getTimestamp() != 0) {
+timeDelta = Time.deltaMs(tupleInfo.getTimestamp());
+

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-08-01 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72961410
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,575 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference<Map<String, DebugOptions>> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState stormClusterState;
+protected final Map<Integer, String> taskToComponent;
+protected CommonStats stats;
+protected final Map<Integer, Map<Integer, Map<Strin

[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java

2016-07-31 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1445
  
@HeartSaVioR your comments are addressed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-31 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72922608
  
--- Diff: 
storm-core/test/clj/integration/org/apache/storm/integration_test.clj ---
@@ -403,17 +403,18 @@
 (def bolt-prepared? (atom false))
 (defbolt prepare-tracked-bolt [] {:prepare true}
   [conf context collector]  
-  (reset! bolt-prepared? true)
   (bolt
(execute [tuple]
+(reset! bolt-prepared? true)
--- End diff --

The basic idea was that, in the original code, `spout-opened?` or 
`bolt-prepared?` are set to true in executor initialization phase, while after 
porting to java, this behavior is deferred to `execute/nextTuple`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-31 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72920746
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -179,23 +180,22 @@ public BuiltinMetrics getBuiltInMetrics() {
 private TopologyContext mkTopologyContext(StormTopology topology) 
throws IOException {
 Map conf = (Map) workerData.get("conf");
 return new TopologyContext(
-topology,
-(Map) workerData.get("storm-conf"),
-(Map<Integer, String>) workerData.get("task->component"),
-(Map<String, List>) 
workerData.get("component->sorted-tasks"),
-(Map<String, Map<String, Fields>>) 
workerData.get("component->stream->fields"),
-(String) workerData.get("storm-id"),
-
ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf,
 (String) workerData.get("storm-id"))),
-ConfigUtils.workerPidsRoot(conf, (String) 
workerData.get("worker-id")),
-taskId,
-(Integer) workerData.get("port"),
-(List) workerData.get("task-ids"),
-(Map<String, Object>) 
workerData.get("default-shared-resources"),
-(Map<String, Object>) workerData.get("user-shared-resources"),
-(Map<String, Object>) executorData.get("shared-executor-data"),
-(Map<Integer, Map<Integer, Map<String, IMetric>>>) 
executorData.get("interval->task->metric-registry"),
-(clojure.lang.Atom) 
executorData.get("open-or-prepare-was-called?")
-);
+topology,
+(Map) workerData.get("storm-conf"),
--- End diff --

Oh, changed in StormCommon, but missed here. Will address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-29 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72778919
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---
@@ -539,10 +557,35 @@ protected IAuthorizer 
mkAuthorizationHandlerImpl(String klassName, Map conf) thr
 if (aznHandler != null) {
 aznHandler.prepare(conf);
 }
-LOG.debug("authorization class name:{}, class:{}, 
handler:{}",klassName, aznClass, aznHandler);
+LOG.debug("authorization class name:{}, class:{}, 
handler:{}", klassName, aznClass, aznHandler);
 }
 }
 
 return aznHandler;
 }
+
+@SuppressWarnings("unchecked")
+public static WorkerTopologyContext makeWorkerContext(Map<String, 
Object> workerData) {
+try {
+StormTopology stormTopology = (StormTopology) 
workerData.get(Constants.SYSTEM_TOPOLOGY);
+Map stormConf = (Map) workerData.get(Constants.STORM_CONF);
+Map<Integer, String> taskToComponent = (Map<Integer, String>) 
workerData.get(Constants.TASK_TO_COMPONENT);
+Map<String, List> componentToSortedTasks =
+(Map<String, List>) 
workerData.get(Constants.COMPONENT_TO_SOTRTED_TASKS);
--- End diff --

ah, good catch! will address.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-29 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72778909
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---
@@ -539,10 +557,35 @@ protected IAuthorizer 
mkAuthorizationHandlerImpl(String klassName, Map conf) thr
 if (aznHandler != null) {
 aznHandler.prepare(conf);
 }
-LOG.debug("authorization class name:{}, class:{}, 
handler:{}",klassName, aznClass, aznHandler);
+LOG.debug("authorization class name:{}, class:{}, 
handler:{}", klassName, aznClass, aznHandler);
 }
 }
 
 return aznHandler;
 }
+
+@SuppressWarnings("unchecked")
+public static WorkerTopologyContext makeWorkerContext(Map<String, 
Object> workerData) {
+try {
+StormTopology stormTopology = (StormTopology) 
workerData.get(Constants.SYSTEM_TOPOLOGY);
+Map stormConf = (Map) workerData.get(Constants.STORM_CONF);
+Map<Integer, String> taskToComponent = (Map<Integer, String>) 
workerData.get(Constants.TASK_TO_COMPONENT);
+Map<String, List> componentToSortedTasks =
+(Map<String, List>) 
workerData.get(Constants.COMPONENT_TO_SOTRTED_TASKS);
+Map<String, Map<String, Fields>> componentToStreamToFields =
+(Map<String, Map<String, Fields>>) 
workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS);
+String stormId = (String) workerData.get(Constants.STORM_ID);
+Map conf = (Map) workerData.get(Constants.CONF);
+Integer port = (Integer) workerData.get(Constants.PORT);
+String codeDir = 
ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf,
 stormId));
+String pidDir = ConfigUtils.workerPidsRoot(conf, stormId);
+List workerTasks = (List) 
workerData.get(Constants.TASK_IDS);
+Map<String, Object> defaultResources = (Map<String, Object>) 
workerData.get(Constants.DEFAULT_SHARED_RESOURCES);
+Map<String, Object> userResources = (Map<String, Object>) 
workerData.get(Constants.USER_SHARED_RESOURCES);
+return new WorkerTopologyContext(stormTopology, stormConf, 
taskToComponent, componentToSortedTasks,
+componentToStreamToFields, stormId, codeDir, pidDir, 
port, workerTasks, defaultResources, userResources);
+} catch (IOException e) {
+throw Utils.wrapInRuntime(e);
--- End diff --

ConfigUtils.supervisorStormDistRoot throws IOException


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-29 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r72778899
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -2364,4 +2374,23 @@ public static long bitXor(Long a, Long b) {
 return rtn;
 }
 
+/**
+ * converts a clojure PersistentMap to java HashMap
+ */
+public static Map<String, Object> convertMap(Map map) {
--- End diff --

Will change method name. For parameter type, this method is called in 
`Executor.mkExecutor` which passes in `Map workerData`, which should later be 
replaced with a normal java map after worker is ported to java, so I personally 
prefer the generic Map type instead of using APersistentMap.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r70237264
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java
 ---
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.ExecutorTransfer;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.tuple.MessageId;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.MutableLong;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {
+private final Logger LOG = LoggerFactory.getLogger(getClass());
+
+private final SpoutExecutor executor;
+private final Task taskData;
+private final int taskId;
+private final MutableLong emittedCount;
+private final boolean hasAckers;
+private final Random random;
+private final Boolean isEventLoggers;
+private final Boolean isDebug;
+private final RotatingMap<Long, TupleInfo> pending;
+
+public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, 
Task taskData, int taskId,
+MutableLong emittedCount, boolean 
hasAckers, Random random,
+Boolean isEventLoggers, Boolean 
isDebug, RotatingMap<Long, TupleInfo> pending) {
+this.executor = executor;
+this.taskData = taskData;
+this.taskId = taskId;
+this.emittedCount = emittedCount;
+this.hasAckers = hasAckers;
+this.random = random;
+this.isEventLoggers = isEventLoggers;
+this.isDebug = isDebug;
+this.pending = pending;
+}
+
+@Override
+public List emit(String streamId, List tuple, Object 
messageId) {
+return sendSpoutMsg(streamId, tuple, messageId, null);
+}
+
+@Override
+public void emitDirect(int taskId, String streamId, List 
tuple, Object messageId) {
+sendSpoutMsg(streamId, tuple, messageId, taskId);
+}
+
+@Override
+public long getPendingCount() {
+return pending.size();
+}
+
+@Override
+public void reportError(Throwable error) {
+executor.getReportError().report(error);
+}
+
+private List sendSpoutMsg(String stream, List values, 
Object messageId, Integer outTaskId) {
+emittedCount.increment();
+
+java.util.List outTasks;
--- End diff --

thanks, addressed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java

2016-07-11 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1445
  
@harshach thanks for your comments, all updated. I also made some changes 
to code format/styles in StormCommon.java, if that's ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r70205892
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,567 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference<Map<String, DebugOptions>> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState stormClusterState;
+protected final Map<Integer, String> taskToComponent;
+protected CommonStats stats;
+protected final Map<Integer, Map<Integer, Map<String, IMetric>>> 
intervalToTaskToMetricToRegistry;
+protected final Map<String, Map<String, 
LoadAwareCustomStreamG

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r70205841
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,567 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference<Map<String, DebugOptions>> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState stormClusterState;
+protected final Map<Integer, String> taskToComponent;
+protected CommonStats stats;
+protected final Map<Integer, Map<Integer, Map<String, IMetric>>> 
intervalToTaskToMetricToRegistry;
+protected final Map<String, Map<String, 
LoadAwareCustomStreamG

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r70205741
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,567 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
--- End diff --

ok, addressed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r70205731
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -179,23 +180,22 @@ public BuiltinMetrics getBuiltInMetrics() {
 private TopologyContext mkTopologyContext(StormTopology topology) 
throws IOException {
 Map conf = (Map) workerData.get("conf");
 return new TopologyContext(
-topology,
-(Map) workerData.get("storm-conf"),
-(Map<Integer, String>) workerData.get("task->component"),
-(Map<String, List>) 
workerData.get("component->sorted-tasks"),
-(Map<String, Map<String, Fields>>) 
workerData.get("component->stream->fields"),
-(String) workerData.get("storm-id"),
-
ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf,
 (String) workerData.get("storm-id"))),
-ConfigUtils.workerPidsRoot(conf, (String) 
workerData.get("worker-id")),
-taskId,
-(Integer) workerData.get("port"),
-(List) workerData.get("task-ids"),
-(Map<String, Object>) 
workerData.get("default-shared-resources"),
-(Map<String, Object>) workerData.get("user-shared-resources"),
-(Map<String, Object>) executorData.get("shared-executor-data"),
-(Map<Integer, Map<Integer, Map<String, IMetric>>>) 
executorData.get("interval->task->metric-registry"),
-(clojure.lang.Atom) 
executorData.get("open-or-prepare-was-called?")
-);
+topology,
+(Map) workerData.get("storm-conf"),
--- End diff --

addressed, moved them into Constants.java


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r70205708
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---
@@ -539,10 +542,34 @@ protected IAuthorizer 
mkAuthorizationHandlerImpl(String klassName, Map conf) thr
 if (aznHandler != null) {
 aznHandler.prepare(conf);
 }
-LOG.debug("authorization class name:{}, class:{}, 
handler:{}",klassName, aznClass, aznHandler);
+LOG.debug("authorization class name:{}, class:{}, 
handler:{}", klassName, aznClass, aznHandler);
 }
 }
 
 return aznHandler;
 }
+
+public static WorkerTopologyContext makeWorkerContext(Map<String, 
Object> workerData) {
+try {
+StormTopology stormTopology = (StormTopology) 
workerData.get("system-topology");
--- End diff --

addressed, moved them into Constants class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r70205629
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java ---
@@ -77,7 +78,7 @@ public static StormCommon setInstance(StormCommon common) 
{
 return oldInstance;
 }
 
-private static final Logger LOG = 
LoggerFactory.getLogger(StormCommon.class);
+private static final Logger LOG = getLogger(StormCommon.class);
--- End diff --

addressed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r70205623
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/Acker.java ---
@@ -101,7 +101,7 @@ public void execute(Tuple input) {
 }
 curr.failed = true;
 pending.put(id, curr);
-} else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
+} else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) {
--- End diff --

addressed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-10 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r70203481
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java ---
@@ -0,0 +1,567 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import clojure.lang.IFn;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.dsl.ProducerType;
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.cluster.ClusterStateContext;
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.cluster.DaemonType;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.GrouperFactory;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.bolt.BoltExecutor;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.executor.error.ReportError;
+import org.apache.storm.executor.error.ReportErrorAndDie;
+import org.apache.storm.executor.spout.SpoutExecutor;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.DebugOptions;
+import org.apache.storm.generated.Grouping;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.DisruptorBackpressureCallback;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.WorkerBackpressureThread;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+
+public abstract class Executor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(Executor.class);
+
+protected final Map workerData;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final List executorId;
+protected final List taskIds;
+protected final String componentId;
+protected final AtomicBoolean openOrPrepareWasCalled;
+protected final Map stormConf;
+protected final Map conf;
+protected final String stormId;
+protected final HashMap sharedExecutorData;
+protected final AtomicBoolean stormActive;
+protected final AtomicReference<Map<String, DebugOptions>> 
stormComponentDebug;
+protected final Runnable suicideFn;
+protected final IStormClusterState stormClusterState;
+protected final Map<Integer, String> taskToComponent;
+protected CommonStats stats;
+protected final Map<Integer, Map<Integer, Map<String, IMetric>>> 
intervalToTaskToMetricToRegistry;
+protected final Map<String, Map<String, 
LoadAwareCustomStreamG

[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java

2016-07-07 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1445
  
@hustfxj could you take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-07-05 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r69555784
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java ---
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.spout;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.ICredentialsListener;
+import org.apache.storm.daemon.Acker;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
+import org.apache.storm.executor.Executor;
+import org.apache.storm.executor.TupleInfo;
+import org.apache.storm.hooks.info.SpoutAckInfo;
+import org.apache.storm.hooks.info.SpoutFailInfo;
+import org.apache.storm.spout.ISpout;
+import org.apache.storm.spout.ISpoutWaitStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpoutExecutor extends Executor {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SpoutExecutor.class);
+
+private final ISpoutWaitStrategy spoutWaitStrategy;
+private Integer maxSpoutPending;
+private final AtomicBoolean lastActive;
+private List spouts;
+private List outputCollectors;
+private final MutableLong emittedCount;
+private final MutableLong emptyEmitStreak;
+private final SpoutThrottlingMetrics spoutThrottlingMetrics;
+private final boolean hasAckers;
+private RotatingMap<Long, TupleInfo> pending;
+private final boolean backPressureEnabled;
+
+public SpoutExecutor(final Map workerData, final List 
executorId, Map<String, String> credentials) {
+super(workerData, executorId, credentials);
+this.spoutWaitStrategy = Utils.newInstance((String) 
stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY));
+this.spoutWaitStrategy.prepare(stormConf);
+
+this.backPressureEnabled = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false);
+
+this.lastActive = new AtomicBoolean(false);
+this.hasAckers = StormCommon.hasAckers(stormConf);
+this.emittedCount = new MutableLong(0);
+this.emptyEmitStreak = new MutableLong(0);
+this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
+}
+
+@Override
+public void init(final Map<Integer, Task> idToTask) {
+LOG.info("Opening spout {}:{}", componentId, idToTask.keySet());
+this.idToTask = idToTask;
+this.maxSpoutPending = 
Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * 
idToTask.size();
--- End diff --

The default value of max pending is null, in the original executor.clj:
```
^Integer max-spout-pending (if max-spout-pending (int max-spout-pending))
```
max-spout-pending could be null

thus in the code usage, it's like `max-spout-pending and ...`, while I 
changed it into java in this way:
`maxSpoutPending !=0 && ...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java

2016-06-24 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1445
  
@satishd upmerged. Strange that 
`org.apache.storm.messaging.netty-unit-test` fails while all tests including ut 
& integration tests pass on my mac.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java

2016-06-17 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1445
  
@abhishekagarwal87 no hurry, please take your time~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1499: (1.x) STORM-1911 IClusterMetricsConsumer should use secon...

2016-06-17 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1499
  
@HeartSaVioR could you use Time.currentTimeSecs instead? there's already a 
method to get current time secs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java

2016-06-17 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1445
  
@abhishekagarwal87 could you help review again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1482: STORM-1876: Option to build storm-kafka and storm-kafka-c...

2016-06-13 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1482
  
Should we change the README.md accordingly?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java

2016-06-11 Thread unsleepy22
Github user unsleepy22 commented on the issue:

https://github.com/apache/storm/pull/1445
  
@abhishekagarwal87 thanks again for your review, all comments addressed 
except for ones explained inline above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-06-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r66716382
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/local_executor.clj ---
@@ -0,0 +1,43 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.local-executor
--- End diff --

I'm not very sure. This file is added for test, in testing.clj, the 
`local-transfer-executor-tuple` method will be replaced at runtime, and 
requires the method name as an argument. Translating it into Java will probably 
fail to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-06-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r66715672
  
--- Diff: storm-core/src/jvm/org/apache/storm/executor/BaseExecutor.java ---
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor;
+
+import com.google.common.collect.Lists;
+import com.lmax.disruptor.EventHandler;
+import java.net.UnknownHostException;
+import org.apache.storm.Config;
+import org.apache.storm.Constants;
+import org.apache.storm.StormTimer;
+import org.apache.storm.daemon.StormCommon;
+import org.apache.storm.daemon.Task;
+import org.apache.storm.executor.error.IReportError;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.AddressedTuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.DisruptorQueue;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.Callable;
+
+public abstract class BaseExecutor implements Callable, EventHandler {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(BaseExecutor.class);
+
+protected final ExecutorData executorData;
+protected final Map stormConf;
+protected final String componentId;
+protected final WorkerTopologyContext workerTopologyContext;
+protected final IReportError reportError;
+protected final Callable sampler;
+protected final Random rand;
+protected final DisruptorQueue transferQueue;
+protected final DisruptorQueue receiveQueue;
+protected final Map<Integer, Task> idToTask;
+protected final Map<String, String> credentials;
+protected final Boolean isDebug;
+protected final Boolean isEventLoggers;
+protected String hostname;
+
+public BaseExecutor(ExecutorData executorData, Map<Integer, Task> 
idToTask, Map<String, String> credentials) {
+this.executorData = executorData;
+this.stormConf = executorData.getStormConf();
+this.componentId = executorData.getComponentId();
+this.workerTopologyContext = 
executorData.getWorkerTopologyContext();
+this.reportError = executorData.getReportError();
+this.sampler = executorData.getSampler();
+this.rand = new Random(Utils.secureRandomLong());
+this.transferQueue = executorData.getBatchTransferWorkerQueue();
+this.receiveQueue = executorData.getReceiveQueue();
+this.idToTask = idToTask;
+this.credentials = credentials;
+this.isDebug = 
Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false);
+this.isEventLoggers = StormCommon.hasEventLoggers(stormConf);
+
+try {
+this.hostname = Utils.hostname(stormConf);
+} catch (UnknownHostException ignored) {
+this.hostname = "";
+}
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void onEvent(Object event, long seq, boolean endOfBatch) throws 
Exception {
+ArrayList addressedTuples = 
(ArrayList) event;
+for (AddressedTuple addressedTuple : addressedTuples) {
+TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
+int taskId = addressedTuple.getDest();
+if (isDebug) {
+LOG.info("Processing received message FOR {} TUPLE: {}", 
taskId, tuple);
+}
+if (taskId != AddressedTuple.BROADCAST_DEST) {
+tupleActionFn(taskId, tuple);
+} else {
+for (Integer t : ex

[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...

2016-06-11 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1445#discussion_r66714834
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java ---
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.executor.error;
+
+public interface IReportError {
--- End diff --

Actually there's already an IErrorReporter under org.apache.storm.task 
package. I think we can remove this IReportError class, and extend 
ErrorReporter(i.e., ReportError) from the existing IErrorReporter, what do you 
think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...

2016-05-28 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1445#issuecomment-97509
  
@abhishekagarwal87 
For your Point 2, after more thoughts, I think it might be a little 
difficult to move BaseExecutor into Executor. Before creating the real 
executor, we need to init some context in advance i.e., ExecutorData, so that 
we know whether we want a SpoutExecutor or BoltExecutor, but if we move this 
into Executor as an abstract class, this will be that we are trying to create a 
BaseExecutor before we know what to create exactly. 

I think it's better to rename Executor class to be ExecutorMaker or 
ExecutorUtil or something, what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...

2016-05-28 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1445#issuecomment-95424
  
@abhishekagarwal87 thanks for the review, comments inline
1. Why is ExecutorData a standalone class? Can we not put the fields of 
ExecutorData inside the Executor class itself. We can pass around the Executor 
object itself.
[Cody] My intention was to keep Executor itself as clean as possible, 
surely we can put the fields of ExecutorData into Executor, but this might make 
Executor quite large.

2. How is BaseExecutor different from Executor?
[Cody] Admittedly this is kind of confusing. Currently Executor is more 
like a util class while subclasses of BaseExecutor do the real work. I think 
I'm ok to make Executor as an abstract class like just like BaseExecutor and 
remove BaseExecutor.

3. I assume ExecutorCommons is supposed to be the code which will be called 
by both spout and bolt. Can it move to the Executor superclass itself?
[Cody] Since ExecutorCommon contains only static methods, which might be 
called in OutputCollector, i.e., Spout/BoltOutputCollectorImpl, I would not put 
it as a super class, still, I'm ok to put it into Executor instead of another 
ExecutorCommon class (might make Executor even larger...).

4. Does ExecutorShutdown needs to be in its own class?
[Cody] My thought was to expose ExecutorShutdown to worker, which is a 
simplified interface(actually the original clojure code does the same way).  
Otherwise we have to implement Shutdownable, IRunningExecutor interfaces all in 
Executor(as well as interfaces from BaseExecutor). 

More thoughts on current design: It more or less follows the design of 
JStorm, and I personally would keep a unified structure for both executor and 
worker, i.e., they will have ExecutorData/WorkerData, which contain running 
information for the executor/worker; they will return Shutdownable's like 
ExecutorShutdown/WorkerShutdown, which are simplified interfaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...

2016-05-26 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1445#issuecomment-221794972
  
fixed integration tests as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...

2016-05-25 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1445#issuecomment-22142
  
upmerged with master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...

2016-05-25 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1445#issuecomment-221550255
  
All tests have been fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...

2016-05-24 Thread unsleepy22
GitHub user unsleepy22 opened a pull request:

https://github.com/apache/storm/pull/1445

[STORM-1277] port backtype.storm.daemon.executor to java

Sorry for the long delay on this.

This PR is not 100% completed yet, things remaining:
1. a few tests fail and I'm still working on this (basically with method 
replacement of `local-executor/local-transfer-executor-tuple`, which was 
originally `executor/mk-executor-transfer-fn` in testing.clj, I'm not familiar 
with this and still trying to figure out how, appreciate any help)
2. my code base is behind master, so after all tests pass I will make a 
full merge from master
Hopefully I can finish the problems by this week.

Still, the base structure/design of the executor won't change, so I think I 
can get reviews from you guys first on the design.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/unsleepy22/storm storm-executor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1445.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1445


commit b13ac017938c3a55181d33faf77570be0d2e001e
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-04-29T09:58:50Z

port executor to java
things remaining:
1. a few tests to be fixed
2. need to upmerge with master




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...

2016-05-24 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1444#issuecomment-221465548
  
sorry close and re-submit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...

2016-05-24 Thread unsleepy22
Github user unsleepy22 closed the pull request at:

https://github.com/apache/storm/pull/1444


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...

2016-05-24 Thread unsleepy22
GitHub user unsleepy22 opened a pull request:

https://github.com/apache/storm/pull/1444

[STORM-1277] port backtype.storm.daemon.executor to java

Sorry for the long delay on this.

This PR is not 100% completed yet, things remaining:
1. a few tests will fail and I'm still working on this (basically with 
method replacements like local-executor/local-transfer-executor-tuple, which 
was originally executor/mk-executor-transfer-fn)
2. my code base is behind master, so after all tests pass I will make a 
full merge from master
Hopefully I can finish the problems by this week.

Still, the base structure/design of the executor won't change, so I think I 
can get reviews from you first on the design.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/unsleepy22/storm storm-executor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1444.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1444


commit 3da8aa0c7c9854c82920ab25ea15859dc9a4658d
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-04-29T09:58:50Z

init commit for executor

commit 69111064945ef7cad7706875739fbe0f1168f31e
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-11T09:29:25Z

refined code, fixed bugs

commit e3cb37373f108055460dd40ef3d66aa306ef0b6e
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-11T13:18:49Z

fixed bugs

commit 2f209adef20af5a43aac67c5420d838d8934b5a3
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-23T09:40:42Z

added debug logs

commit b0efb563d7f0f3a5c99c18675871618dd1b52b20
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-23T13:37:33Z

fixed type cast error

commit c1cfcc2ece8de6b51561b00ea1b39fe5d4f05c25
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-24T09:12:20Z

messaging-test ok

commit ed2454d8429687d1fe2f7feb71b5e8fc87b434f3
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-24T10:52:30Z

fixed tuple-test error

commit eab32b22e8cd08612b8db8cebf4f025df351eae4
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-24T11:26:21Z

minor changes

commit 5883acf4d557c51e7002a5a4120e1c51d352c279
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-25T03:22:06Z

changes for testing

commit c4ec8bacf38fb36bf2ec9ff352c21e7edef3297b
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-25T03:31:40Z

revert changes to acker & testing

commit da454e90a3606b09b469f460a3e8f52104666162
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-05-25T03:33:15Z

revert changes to testing




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...

2016-05-07 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1324#issuecomment-217691630
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...

2016-05-03 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1324#discussion_r61899624
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java ---
@@ -47,17 +68,71 @@ public void prepare(Map stormConf, TopologyContext 
context, OutputCollector coll
 }
 _metricsConsumer.prepare(stormConf, _registrationArgument, 
context, collector);
 _collector = collector;
+_taskExecuteThread = new Thread(new MetricsHandlerRunnable());
+_taskExecuteThread.setDaemon(true);
+_taskExecuteThread.start();
 }
 
 @Override
 public void execute(Tuple input) {
-
_metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), 
(Collection)input.getValue(1));
+// remove older tasks if task queue exceeds the max size
+if (_taskQueue.size() > _maxRetainMetricTuples) {
+while (_taskQueue.size() - 1 > _maxRetainMetricTuples) {
--- End diff --

I guess we may run into a concurrent issue here, if while statement 
suffices, it poll one item, while at the same time, metrics consumer consumers 
one item, anyway, I don't think this matters too much so I'm ok to keep it as 
is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...

2016-05-03 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1324#discussion_r61898673
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java ---
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.storm.metric.api.IMetricsConsumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class FilterByMetricName implements MetricsFilter {
+
+private List whitelistPattern;
+private List blacklistPattern;
+private boolean noneSpecified = false;
+
+public FilterByMetricName(List whitelistPattern, List 
blacklistPattern) {
+// guard NPE
+if (whitelistPattern == null) {
+this.whitelistPattern = Collections.emptyList();
+} else {
+this.whitelistPattern = 
convertPatternStringsToPatternInstances(whitelistPattern);
+}
+
+// guard NPE
+if (blacklistPattern == null) {
+this.blacklistPattern = Collections.emptyList();
+} else {
+this.blacklistPattern = 
convertPatternStringsToPatternInstances(blacklistPattern);
+}
+
+if (this.whitelistPattern.isEmpty() && 
this.blacklistPattern.isEmpty()) {
+noneSpecified = true;
+} else if (!this.whitelistPattern.isEmpty() && 
!this.blacklistPattern.isEmpty()) {
+throw new IllegalArgumentException("You have to specify either 
includes or excludes, or none.");
+}
+}
+
+private ArrayList 
convertPatternStringsToPatternInstances(List patterns) {
+return Lists.newArrayList(Iterators.transform(patterns.iterator(), 
new Function<String, Pattern>() {
+@Override
+public Pattern apply(String s) {
+return Pattern.compile(s);
+}
+}));
+}
+
+@Override
+public boolean apply(IMetricsConsumer.DataPoint dataPoint) {
+if (noneSpecified) {
+return true;
+}
+
+String metricName = dataPoint.name;
+
+if (!whitelistPattern.isEmpty()) {
--- End diff --

OK, I personally prefer the latter approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1731 (1.x) Avoid looking up debug / back...

2016-04-26 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1362#issuecomment-214960183
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1729 Get rid of reflections while record...

2016-04-26 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1361#issuecomment-214711443
  
+1
BTW. can we have more performance tests in more cases so that we can see 
the overall performance change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1723 (1.x) Introduce ClusterMetricsConsu...

2016-04-25 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1352#issuecomment-214360458
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1723 (1.x) Introduce ClusterMetricsConsu...

2016-04-24 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1352#issuecomment-214096038
  
Maybe it's better to put multi-cluster support in 2.0, in which I think 
multi-cluster can be supported at least in web ui. So we can leave it be for 
now if we don't see the need from end users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1723 (1.x) Introduce ClusterMetricsConsu...

2016-04-22 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1352#issuecomment-213391769
  
for the ClusterInfo class, do we need to add a "cluster.name" property? In 
a real metrics consumer case, the external storage may want to consumer a bunch 
of cluster metrics and display them, thus a "cluster.name" may be necessary to 
distinguish different clusters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1723 (1.x) Introduce ClusterMetricsConsu...

2016-04-22 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1352#issuecomment-213390625
  
I couldn't comment on nimbus.clj, how about rename 
"cluster-consumer-executors" to "cluster-metrics-consumers" since we have 
"executors" in tasks already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...

2016-04-22 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1324#issuecomment-213362512
  
Also, as addressed in STORM-1698, we may have to drop metric points when 
metrics consumer itself fails to catch up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...

2016-04-22 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1324#discussion_r60715825
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java ---
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric.filter;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import org.apache.storm.metric.api.IMetricsConsumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class FilterByMetricName implements MetricsFilter {
+
+private List whitelistPattern;
+private List blacklistPattern;
+private boolean noneSpecified = false;
+
+public FilterByMetricName(List whitelistPattern, List 
blacklistPattern) {
+// guard NPE
+if (whitelistPattern == null) {
+this.whitelistPattern = Collections.emptyList();
+} else {
+this.whitelistPattern = 
convertPatternStringsToPatternInstances(whitelistPattern);
+}
+
+// guard NPE
+if (blacklistPattern == null) {
+this.blacklistPattern = Collections.emptyList();
+} else {
+this.blacklistPattern = 
convertPatternStringsToPatternInstances(blacklistPattern);
+}
+
+if (this.whitelistPattern.isEmpty() && 
this.blacklistPattern.isEmpty()) {
+noneSpecified = true;
+} else if (!this.whitelistPattern.isEmpty() && 
!this.blacklistPattern.isEmpty()) {
+throw new IllegalArgumentException("You have to specify either 
includes or excludes, or none.");
+}
+}
+
+private ArrayList 
convertPatternStringsToPatternInstances(List patterns) {
+return Lists.newArrayList(Iterators.transform(patterns.iterator(), 
new Function<String, Pattern>() {
+@Override
+public Pattern apply(String s) {
+return Pattern.compile(s);
+}
+}));
+}
+
+@Override
+public boolean apply(IMetricsConsumer.DataPoint dataPoint) {
+if (noneSpecified) {
+return true;
+}
+
+String metricName = dataPoint.name;
+
+if (!whitelistPattern.isEmpty()) {
--- End diff --

My concern about pattern match is, regular expression matching is pretty 
CPU-intensive, if possible, I would prefer a simple "String.contains" match, 
which may be much faster and less CPU-intensive.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1698 Asynchronous MetricsConsumerBolt

2016-04-22 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1322#issuecomment-213352632
  
I suppose this PR has been merged into 
https://github.com/apache/storm/pull/1324 ? I'll skip this one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1687] divide by zero in StatsUtil

2016-04-05 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1312#issuecomment-206103981
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1655 (1.x) Flux doesn't set return code ...

2016-03-24 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1254#issuecomment-200819815
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java

2016-03-23 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1218#issuecomment-200667388
  
@abhishekagarwal87 done~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java

2016-03-23 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1218#issuecomment-200613922
  
thanks, addressed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java

2016-03-23 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1218#discussion_r57266090
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.api.StateMetric;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.TopologyContext;
+
+public class BuiltinMetricsUtil {
+public static BuiltinMetrics mkData(String type, Object stats) {
+if (StatsUtil.SPOUT.equals(type)) {
+return new BuiltinSpoutMetrics((SpoutExecutorStats) stats);
+}
+return new BuiltinBoltMetrics((BoltExecutorStats) stats);
+}
+
+public static void registerIconnectionServerMetric(Object server, Map 
stormConf, TopologyContext context) {
+if (server instanceof IStatefulObject) {
+registerMetric("__recv-iconnection", new 
StateMetric((IStatefulObject) server), stormConf, context);
+}
+}
+
+public static void registerIconnectionClientMetrics(final Map 
nodePort2socket, Map stormConf, TopologyContext context) {
+IMetric metric = new IMetric() {
+@Override
+public Object getValueAndReset() {
+Map<Object, Object> ret = new HashMap<>();
+for (Object o : nodePort2socket.entrySet()) {
+Map.Entry entry = (Map.Entry) o;
+Object nodePort = entry.getKey();
+Object connection = entry.getValue();
+if (connection instanceof IStatefulObject) {
+ret.put(nodePort, ((IStatefulObject) 
connection).getState());
+}
+}
+return ret;
+}
+};
+registerMetric("__send-iconnection", metric, stormConf, context);
+}
+
+public static void registerQueueMetrics(Map queues, Map stormConf, 
TopologyContext context) {
+for (Object o : queues.entrySet()) {
--- End diff --

addressed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java

2016-03-23 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1218#issuecomment-200407083
  
ping @revans2 , could you take time to have a look? this PR blocks task.clj.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1229: port backtype.storm.metric.testing...

2016-03-20 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1238#discussion_r56766748
  
--- Diff: 
storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java ---
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Table;
+
+import org.apache.storm.metric.api.IMetricsConsumer;
+import org.apache.storm.task.IErrorReporter;
+import org.apache.storm.task.TopologyContext;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class FakeMetricConsumer implements IMetricsConsumer {
+
+public static final Table<String, String, Multimap<Integer, Object>> 
buffer = HashBasedTable.create();
--- End diff --

why is this static? can't we make it an instance field?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Fix logging for LoggingMetricsConsumer STORM-5...

2016-03-20 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1221#issuecomment-197652505
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java

2016-03-18 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1218#issuecomment-198410768
  
@revans2 could you take a look? Since porting task.clj depends on this, I'd 
like this to be merged as soon as possible and since stats.clj is done, 
builtin-metrics is much simpler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java

2016-03-16 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1218#discussion_r56305899
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.metric.api.IStatefulObject;
+import org.apache.storm.metric.api.StateMetric;
+import org.apache.storm.stats.BoltExecutorStats;
+import org.apache.storm.stats.SpoutExecutorStats;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.task.TopologyContext;
+
+public class BuiltinMetricsUtil {
+public static BuiltinMetrics mkData(String type, Object stats) {
+if (StatsUtil.SPOUT.equals(type)) {
+return new BuiltinSpoutMetrics((SpoutExecutorStats) stats);
+}
+return new BuiltinBoltMetrics((BoltExecutorStats) stats);
+}
+
+public static void registerIconnectionServerMetric(Object server, Map 
stormConf, TopologyContext context) {
+if (server instanceof IStatefulObject) {
+registerMetric("__recv-iconnection", new 
StateMetric((IStatefulObject) server), stormConf, context);
+}
+}
+
+public static void registerIconnectionClientMetrics(final Map 
nodePort2socket, Map stormConf, TopologyContext context) {
+IMetric metric = new IMetric() {
+@Override
+public Object getValueAndReset() {
+Map<Object, Object> ret = new HashMap<>();
+for (Object o : nodePort2socket.entrySet()) {
--- End diff --

No, it has to be cast to Map.Entry inside the `for` loop code because it's 
actually a clojure map which has no generic definition. Java compiler will 
complain if I replace `Object o` to `Map.Entry o`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java

2016-03-16 Thread unsleepy22
GitHub user unsleepy22 opened a pull request:

https://github.com/apache/storm/pull/1218

[STORM-1268] port builtin-metrics to java



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/unsleepy22/storm STORM-1268

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1218.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1218


commit 5ddab72ae9cc13bc7f680daa44c27ee9fb7b87ef
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-03-16T06:09:59Z

port builtin-metrics to java




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java

2016-03-14 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-196277268
  
@revans2 any update on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1618: Add the option of passing config d...

2016-03-11 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1204#issuecomment-195396027
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1605] use '/usr/bin/env python' to chec...

2016-03-09 Thread unsleepy22
GitHub user unsleepy22 opened a pull request:

https://github.com/apache/storm/pull/1196

[STORM-1605] use '/usr/bin/env python' to check python version

Current python version check is hard-coded and it cannot detect python 
2.7.x versions. Changed to /usr/bin/env to keep consistent with storm.py

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/unsleepy22/storm STORM-1605

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1196.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1196


commit 20f1497c213c97a8a72b2f43039ad59ce9ce5169
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-03-09T08:05:04Z

use '/usr/bin/env python' to check python version




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java

2016-03-08 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-194115823
  
@revans2 you're right about the possible NPE, I've updated code.
Also I added a lot of method comments.
I have tested web ui with new code and compared stats data and all looks 
good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1606] print the information of testcase...

2016-03-06 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1189#issuecomment-192884522
  
Sorry, I didn't take a deep look. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1606] print the information of testcase...

2016-03-06 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1189#discussion_r55135284
  
--- Diff: dev-tools/travis/print-errors-from-test-reports.py ---
@@ -55,6 +55,10 @@ def print_error_reports_from_report_file(file_path):
 if fail is not None:
 print_detail_information(testcase, fail)
 
+failure = testcase.find("failure")
--- End diff --

I would suspect the validity of this change since "fail" is a substring of 
"failure", the script should not miss "failure"s by searching "fail".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1590] port defmeters/defgauge/defhistog...

2016-03-05 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1171#issuecomment-192615168
  
@revans2 could you take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java

2016-03-04 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-192244223
  
@revans2 I get your idea and you don't have to go through all of them, my 
initial thought was to keep it compatible with original clojure data 
structures, but I'm ok to change all of them, I will update in a day or two~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1283: port backtype.storm.MockAutoCred t...

2016-03-03 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1176#issuecomment-191824439
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: port defmeters/defgauge/defhistogram... to jav...

2016-03-01 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1171#issuecomment-191044704
  
@revans2 changed according to your comments, also changed all the other 
files. start-metrics-reporters in common.clj is changed to java too. Please 
help review again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-03-01 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1161#issuecomment-191034221
  
@abhishekagarwal87 changed according to your comments
@revans2 added logs to .gitignore and maven-antrun-plugin to delete logs 
directory after tests
please help review again


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: port defmeters/defgauge/defhistogram... to jav...

2016-03-01 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1171#discussion_r54667164
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java ---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.metric;
+
+import clojure.lang.IFn;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.Metric;
+import com.codahale.metrics.MetricRegistry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings("unchecked")
+public class StormMetricsRegistry {
+private static final Logger LOG = 
LoggerFactory.getLogger(StormMetricsRegistry.class);
+private static final MetricRegistry metrics = new MetricRegistry();
--- End diff --

Sure, will change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: port defmeters/defgauge/defhistogram... to jav...

2016-03-01 Thread unsleepy22
GitHub user unsleepy22 opened a pull request:

https://github.com/apache/storm/pull/1171

port defmeters/defgauge/defhistogram... to java for all of our code to use

This PR is under STORM-1590, I changed nimbus.clj only currently and would 
like to hear your opinion first before moving on.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/unsleepy22/storm STORM-1590

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1171.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1171


commit a23533cac02c53742e83e1049783c255489521f7
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-03-01T15:34:58Z

change defmeter and defgauge in nimbus to java metrics code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-03-01 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1161#issuecomment-190727124
  
Ah, you're right, I get you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-03-01 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1161#issuecomment-190713869
  
@abhishekagarwal87 your change is more careful & considerate, but changing 
defaults.yaml would be simpler and it's pretty reasonable to set storm.log.dir 
to "logs" by default, what do you think? Also, @revans2 what's your opinion on 
this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-03-01 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1161#issuecomment-190684819
  
Probably won't work since storm.home is not set when running tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-03-01 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1161#issuecomment-190621208
  
@abhishekagarwal87 changed according to your comments, please help review 
again, rat check also passes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java

2016-02-29 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-190526220
  
@revans2 yes I see the exception too when running your test, but looks odd 
because I've tested all storm ui pages in distributed mode with java stats and 
everything looks good. The exception is as follows, StatsUtil.filterSysStreams 
expects a map of {window -> stream id -> value}, while in your test it gets a 
map of {stream id -> value}. I suppose this happens in local mode only? Though 
I can manage to make through your test by changing this method a little.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1244: port backtype.storm.command.upload...

2016-02-29 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1166#discussion_r54429162
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java ---
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.command;
+
+import org.apache.storm.StormSubmitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class UploadCredentials {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(UploadCredentials.class);
+
+public static void main(String[] args) throws Exception {
+Map<String, Object> cl = CLI.opt("f", "file", null)
+.arg("topologyName", CLI.FIRST_WINS)
+.arg("rawCredentials", CLI.INTO_LIST)
+.parse(args);
+
+String credentialFile = (String) cl.get("f");
+java.util.List rawCredentials = (java.util.List) 
cl.get("rawCredentials");
--- End diff --

you can import java.util.List 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-02-29 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1161#issuecomment-190194178
  
@abhishekagarwal87 , I get you. My opinion would be:
1. revert changes to ConfigUtils class
2. set "storm.log.dir: logs" in defaults.yaml
3. add "**/logs/**" to apache-rat-plugin excludes
4. remain supervisor-test the same with my commit because settting 
"storm.log.dir = logs OR /tmp/logs" will break supervisor-test anyway.

What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-02-29 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1161#issuecomment-190108980
  
@abhishekagarwal87 meaning that add `storm.log.dir: /logs` to defaults.yaml?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java

2016-02-28 Thread unsleepy22
Github user unsleepy22 commented on a diff in the pull request:

https://github.com/apache/storm/pull/1147#discussion_r54365079
  
--- Diff: storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java 
---
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.stats;
+
+import clojure.lang.PersistentVector;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.storm.metric.internal.MultiCountStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+
+@SuppressWarnings("unchecked")
+public class BoltExecutorStats extends CommonStats {
+
+public static final String ACKED = "acked";
+public static final String FAILED = "failed";
+public static final String EXECUTED = "executed";
+public static final String PROCESS_LATENCIES = "process-latencies";
+public static final String EXECUTE_LATENCIES = "execute-latencies";
+
+public static final String[] BOLT_FIELDS = {ACKED, FAILED, EXECUTED, 
PROCESS_LATENCIES, EXECUTE_LATENCIES};
+
+public BoltExecutorStats() {
+super();
+
+put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+put(EXECUTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS));
+put(PROCESS_LATENCIES, new 
MultiLatencyStatAndMetric(NUM_STAT_BUCKETS));
+put(EXECUTE_LATENCIES, new 
MultiLatencyStatAndMetric(NUM_STAT_BUCKETS));
+}
+
+public MultiCountStatAndMetric getAcked() {
+return (MultiCountStatAndMetric) this.get(ACKED);
+}
+
+public MultiCountStatAndMetric getFailed() {
+return (MultiCountStatAndMetric) this.get(FAILED);
+}
+
+public MultiCountStatAndMetric getExecuted() {
+return (MultiCountStatAndMetric) this.get(EXECUTED);
+}
+
+public MultiLatencyStatAndMetric getProcessLatencies() {
+return (MultiLatencyStatAndMetric) this.get(PROCESS_LATENCIES);
+}
+
+public MultiLatencyStatAndMetric getExecuteLatencies() {
+return (MultiLatencyStatAndMetric) this.get(EXECUTE_LATENCIES);
+}
+
+public void boltExecuteTuple(String component, String stream, long 
latencyMs) {
+Object key = PersistentVector.create(component, stream);
+this.getExecuted().incBy(key, this.rate);
+this.getExecuteLatencies().record(key, latencyMs);
+}
+
+public void boltAckedTuple(String component, String stream, long 
latencyMs) {
+Object key = PersistentVector.create(component, stream);
+this.getAcked().incBy(key, this.rate);
+this.getProcessLatencies().record(key, latencyMs);
+}
+
+public void boltFailedTuple(String component, String stream, long 
latencyMs) {
+Object key = PersistentVector.create(component, stream);
+this.getFailed().incBy(key, this.rate);
+
+}
+
+public Map renderStats() {
+cleanupStats();
+Map ret = new HashMap();
+ret.putAll(valueStats(CommonStats.COMMON_FIELDS));
+ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS));
+StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT);
--- End diff --

@revans2 I've changed all the others according to your comments, my only 
problem is renderStats, do you mean that I only need to maintain ExecutorStats 
in memory without any stats Maps while running? Although renderStats is only 
used in thriftify-zk-worker-hb, when workers do heartbeats(see 
worker/do-executor-heartbeats method), it will use a clojure PersistentMap data 
structure, which messes with ExecutorStats, should I turn it into ExecutorStats 
too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If 

[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-02-28 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1161#issuecomment-190003786
  
Build failure looks odd, I have all tests pass locally.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-02-28 Thread unsleepy22
GitHub user unsleepy22 opened a pull request:

https://github.com/apache/storm/pull/1161

[Storm-1579] Fix NoSuchFileException when running tests in storm-core

The old code takes "storm.local.dir" property as logDir while changes in 
STORM-1552 used ConfigUtils.getLogDir, which doesn't take this property, 
causing the ultimate event log directory to be "/logs/" and resulting in 
NoSuchFileException because permission is denied to create /logs directory.

The change of code causes 3 test cases to fail in supervisor_test, so I 
changed them accordingly to make all tests pass.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/unsleepy22/storm STORM-1579

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1161.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1161


commit 034f0cf107403100650d6eb65e7168f62133864a
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-02-28T14:33:21Z

fix STORM-1579, checks storm.local.dir property/conf when getting storm log 
dir

commit 504c11b8ead80e186ff0de83dbdece2337cd1162
Author: 卫乐 <weiyue...@taobao.com>
Date:   2016-02-28T14:42:56Z

append "/logs" to "storm.local.dir" property when non-null




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-02-28 Thread unsleepy22
Github user unsleepy22 closed the pull request at:

https://github.com/apache/storm/pull/1155


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-02-27 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1155#issuecomment-189629712
  
My change was quite simple: just adding back the logic to check 
"storm.local.dir" property/conf, but there're 3 test failures in 
"org.apache.storm.supervisor-test" (all the others pass without exceptions), 
I've no idea why, can someone please help take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java

2016-02-26 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-189345949
  
Thanks @revans2 , I'll address all your comments~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-02-26 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1155#issuecomment-189291719
  
@abhishekagarwal87 they're in test-reports, when running tests, you'll get 
messages like "Exception: java.lang.ClassCastException thrown from the 
UncaughtExceptionHandler in thread "Thread-35-__eventlogger-executor[2 2]", by 
looking into the detailed test-reports, you'll get NoSuchFileException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...

2016-02-26 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1155#issuecomment-189268264
  
There're test failures, I'll look into it first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java

2016-02-26 Thread unsleepy22
Github user unsleepy22 commented on the pull request:

https://github.com/apache/storm/pull/1147#issuecomment-189262832
  
@revans2 could you please help review this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   >