[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1445 @HeartSaVioR thanks for your careful review, all comments addressed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73185119 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMap<Long, TupleInfo> pending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map<String, String> credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map<Integer, Task> idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73178126 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMap<Long, TupleInfo> pending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map<String, String> credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map<Integer, Task> idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73177311 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class SpoutOutputCollectorImpl implements ISpoutOutputCollector { + +private final SpoutExecutor executor; +private final Task taskData; +private final int taskId; +private final MutableLong emittedCount; +private final boolean hasAckers; +private final Random random; +private final Boolean isEventLoggers; +private final Boolean isDebug; +private final RotatingMap<Long, TupleInfo> pending; + +@SuppressWarnings("unused") +public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId, +MutableLong emittedCount, boolean hasAckers, Random random, +Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.emittedCount = emittedCount; +this.hasAckers = hasAckers; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +this.pending = pending; +} + +@Override +public List emit(String streamId, List tuple, Object messageId) { +return sendSpoutMsg(streamId, tuple, messageId, null); +} + +@Override +public void emitDirect(int taskId, String streamId, List tuple, Object messageId) { +sendSpoutMsg(streamId, tuple, messageId, taskId); +} + +@Override +public long getPendingCount() { +return pending.size(); +} + +@Override +public void reportError(Throwable error) { +executor.getReportError().report(error); +} + +private List sendSpoutMsg(String stream, List values, Object messageId, Integer outTaskId) { +emittedCount.increment(); + +List outTasks; +if (outTaskId != null) { +outTasks = taskData.getOutgoingTasks(outTaskId, stream, values); +} else { +outTasks = taskData.getOutgoingTasks(stream, values); +} + +List ackSeq = new ArrayList<>(); +boolean needAck = (messageId != null) && hasAckers; + +long rootId = MessageId.generateId(random); +for (Integer t : outTasks) { +MessageId msgId; +if (needAck) { +long as = MessageId.generateId(random); +msgId = MessageId.makeRootId(rootId, as); +ackSeq.add(as); +} else { +msgId = MessageId.makeUnanchored(); +} + +TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId); +executor.getExecutorTransfer().transfer(t, tuple); +} +if (isEventLoggers) { +executor.sendToEventLogger(executor, taskDat
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73177059 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BoltOutputCollectorImpl implements IOutputCollector { + +private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class); + +private final BoltExecutor executor; +private final Task taskData; +private final int taskId; +private final Random random; +private final boolean isEventLoggers; +private final boolean isDebug; + +public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random, + boolean isEventLoggers, boolean isDebug) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +} + +public List emit(String streamId, Collection anchors, List tuple) { +return boltEmit(streamId, anchors, tuple, null); +} + +@Override +public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { +boltEmit(streamId, anchors, tuple, taskId); +} + +private List boltEmit(String streamId, Collection anchors, List values, Integer targetTaskId) { +List outTasks; +if (targetTaskId != null) { +outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values); +} else { +outTasks = taskData.getOutgoingTasks(streamId, values); +} + +for (Integer t : outTasks) { +Map<Long, Long> anchorsToIds = new HashMap<>(); +if (anchors != null) { +for (Tuple a : anchors) { +long edgeId = MessageId.generateId(random); +((TupleImpl) a).updateAckVal(edgeId); +for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) { +putXor(anchorsToIds, root_id, edgeId); +} +} +} +MessageId msgId = MessageId.makeId(anchorsToIds); +TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId); +executor.getExecutorTransfer().transfer(t, tupleExt); +} +if (isEventLoggers) { +executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random); +} +return outTasks; --- End diff -- changed `getOutgoingTasks` method to ensure this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastruct
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73176900 --- Diff: storm-core/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java --- @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.bolt; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.hooks.info.BoltAckInfo; +import org.apache.storm.hooks.info.BoltFailInfo; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BoltOutputCollectorImpl implements IOutputCollector { + +private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class); + +private final BoltExecutor executor; +private final Task taskData; +private final int taskId; +private final Random random; +private final boolean isEventLoggers; +private final boolean isDebug; + +public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random, + boolean isEventLoggers, boolean isDebug) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +} + +public List emit(String streamId, Collection anchors, List tuple) { +return boltEmit(streamId, anchors, tuple, null); +} + +@Override +public void emitDirect(int taskId, String streamId, Collection anchors, List tuple) { +boltEmit(streamId, anchors, tuple, taskId); +} + +private List boltEmit(String streamId, Collection anchors, List values, Integer targetTaskId) { +List outTasks; +if (targetTaskId != null) { +outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values); +} else { +outTasks = taskData.getOutgoingTasks(streamId, values); +} + +for (Integer t : outTasks) { +Map<Long, Long> anchorsToIds = new HashMap<>(); +if (anchors != null) { +for (Tuple a : anchors) { +long edgeId = MessageId.generateId(random); --- End diff -- you're correct, will address --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73174655 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug; +protected final Runnable suicideFn; +protected final IStormClusterState stormClusterState; +protected final Map<Integer, String> taskToComponent; +protected CommonStats stats; +protected final Map<Integer, Map<Integer, Map<Strin
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73174615 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug; +protected final Runnable suicideFn; +protected final IStormClusterState stormClusterState; +protected final Map<Integer, String> taskToComponent; +protected CommonStats stats; +protected final Map<Integer, Map<Integer, Map<Strin
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r73085409 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMap<Long, TupleInfo> pending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map<String, String> credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map<Integer, Task> idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72961638 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMap<Long, TupleInfo> pending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map<String, String> credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map<Integer, Task> idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); +this.spouts = new ArrayList<>(); +for (Task task : idToTask.values()) { +this.spouts.add((ISpout) task.getTaskObject()); +} +this.pending = new RotatingMap<>(2, new RotatingMap.ExpiredCallback<Long, TupleInfo>() { +@Override +public void expire(Long key, TupleInfo tupleInfo) { +Long timeDelta = null; +if (tupleInfo.getTimestamp() != 0) { +timeDelta = Time.deltaMs(tupleInfo.getTimestamp()); +
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72961410 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,575 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug; +protected final Runnable suicideFn; +protected final IStormClusterState stormClusterState; +protected final Map<Integer, String> taskToComponent; +protected CommonStats stats; +protected final Map<Integer, Map<Integer, Map<Strin
[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1445 @HeartSaVioR your comments are addressed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72922608 --- Diff: storm-core/test/clj/integration/org/apache/storm/integration_test.clj --- @@ -403,17 +403,18 @@ (def bolt-prepared? (atom false)) (defbolt prepare-tracked-bolt [] {:prepare true} [conf context collector] - (reset! bolt-prepared? true) (bolt (execute [tuple] +(reset! bolt-prepared? true) --- End diff -- The basic idea was that, in the original code, `spout-opened?` or `bolt-prepared?` are set to true in executor initialization phase, while after porting to java, this behavior is deferred to `execute/nextTuple`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72920746 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java --- @@ -179,23 +180,22 @@ public BuiltinMetrics getBuiltInMetrics() { private TopologyContext mkTopologyContext(StormTopology topology) throws IOException { Map conf = (Map) workerData.get("conf"); return new TopologyContext( -topology, -(Map) workerData.get("storm-conf"), -(Map<Integer, String>) workerData.get("task->component"), -(Map<String, List>) workerData.get("component->sorted-tasks"), -(Map<String, Map<String, Fields>>) workerData.get("component->stream->fields"), -(String) workerData.get("storm-id"), - ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get("storm-id"))), -ConfigUtils.workerPidsRoot(conf, (String) workerData.get("worker-id")), -taskId, -(Integer) workerData.get("port"), -(List) workerData.get("task-ids"), -(Map<String, Object>) workerData.get("default-shared-resources"), -(Map<String, Object>) workerData.get("user-shared-resources"), -(Map<String, Object>) executorData.get("shared-executor-data"), -(Map<Integer, Map<Integer, Map<String, IMetric>>>) executorData.get("interval->task->metric-registry"), -(clojure.lang.Atom) executorData.get("open-or-prepare-was-called?") -); +topology, +(Map) workerData.get("storm-conf"), --- End diff -- Oh, changed in StormCommon, but missed here. Will address. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72778919 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java --- @@ -539,10 +557,35 @@ protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) thr if (aznHandler != null) { aznHandler.prepare(conf); } -LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler); +LOG.debug("authorization class name:{}, class:{}, handler:{}", klassName, aznClass, aznHandler); } } return aznHandler; } + +@SuppressWarnings("unchecked") +public static WorkerTopologyContext makeWorkerContext(Map<String, Object> workerData) { +try { +StormTopology stormTopology = (StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY); +Map stormConf = (Map) workerData.get(Constants.STORM_CONF); +Map<Integer, String> taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT); +Map<String, List> componentToSortedTasks = +(Map<String, List>) workerData.get(Constants.COMPONENT_TO_SOTRTED_TASKS); --- End diff -- ah, good catch! will address. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72778909 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java --- @@ -539,10 +557,35 @@ protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) thr if (aznHandler != null) { aznHandler.prepare(conf); } -LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler); +LOG.debug("authorization class name:{}, class:{}, handler:{}", klassName, aznClass, aznHandler); } } return aznHandler; } + +@SuppressWarnings("unchecked") +public static WorkerTopologyContext makeWorkerContext(Map<String, Object> workerData) { +try { +StormTopology stormTopology = (StormTopology) workerData.get(Constants.SYSTEM_TOPOLOGY); +Map stormConf = (Map) workerData.get(Constants.STORM_CONF); +Map<Integer, String> taskToComponent = (Map<Integer, String>) workerData.get(Constants.TASK_TO_COMPONENT); +Map<String, List> componentToSortedTasks = +(Map<String, List>) workerData.get(Constants.COMPONENT_TO_SOTRTED_TASKS); +Map<String, Map<String, Fields>> componentToStreamToFields = +(Map<String, Map<String, Fields>>) workerData.get(Constants.COMPONENT_TO_STREAM_TO_FIELDS); +String stormId = (String) workerData.get(Constants.STORM_ID); +Map conf = (Map) workerData.get(Constants.CONF); +Integer port = (Integer) workerData.get(Constants.PORT); +String codeDir = ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, stormId)); +String pidDir = ConfigUtils.workerPidsRoot(conf, stormId); +List workerTasks = (List) workerData.get(Constants.TASK_IDS); +Map<String, Object> defaultResources = (Map<String, Object>) workerData.get(Constants.DEFAULT_SHARED_RESOURCES); +Map<String, Object> userResources = (Map<String, Object>) workerData.get(Constants.USER_SHARED_RESOURCES); +return new WorkerTopologyContext(stormTopology, stormConf, taskToComponent, componentToSortedTasks, +componentToStreamToFields, stormId, codeDir, pidDir, port, workerTasks, defaultResources, userResources); +} catch (IOException e) { +throw Utils.wrapInRuntime(e); --- End diff -- ConfigUtils.supervisorStormDistRoot throws IOException --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r72778899 --- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java --- @@ -2364,4 +2374,23 @@ public static long bitXor(Long a, Long b) { return rtn; } +/** + * converts a clojure PersistentMap to java HashMap + */ +public static Map<String, Object> convertMap(Map map) { --- End diff -- Will change method name. For parameter type, this method is called in `Executor.mkExecutor` which passes in `Map workerData`, which should later be replaced with a normal java map after worker is ported to java, so I personally prefer the generic Map type instead of using APersistentMap. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70237264 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutOutputCollectorImpl.java --- @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.ExecutorTransfer; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutOutputCollector; +import org.apache.storm.tuple.MessageId; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.MutableLong; +import org.apache.storm.utils.RotatingMap; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SpoutOutputCollectorImpl implements ISpoutOutputCollector { +private final Logger LOG = LoggerFactory.getLogger(getClass()); + +private final SpoutExecutor executor; +private final Task taskData; +private final int taskId; +private final MutableLong emittedCount; +private final boolean hasAckers; +private final Random random; +private final Boolean isEventLoggers; +private final Boolean isDebug; +private final RotatingMap<Long, TupleInfo> pending; + +public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId, +MutableLong emittedCount, boolean hasAckers, Random random, +Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) { +this.executor = executor; +this.taskData = taskData; +this.taskId = taskId; +this.emittedCount = emittedCount; +this.hasAckers = hasAckers; +this.random = random; +this.isEventLoggers = isEventLoggers; +this.isDebug = isDebug; +this.pending = pending; +} + +@Override +public List emit(String streamId, List tuple, Object messageId) { +return sendSpoutMsg(streamId, tuple, messageId, null); +} + +@Override +public void emitDirect(int taskId, String streamId, List tuple, Object messageId) { +sendSpoutMsg(streamId, tuple, messageId, taskId); +} + +@Override +public long getPendingCount() { +return pending.size(); +} + +@Override +public void reportError(Throwable error) { +executor.getReportError().report(error); +} + +private List sendSpoutMsg(String stream, List values, Object messageId, Integer outTaskId) { +emittedCount.increment(); + +java.util.List outTasks; --- End diff -- thanks, addressed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1445 @harshach thanks for your comments, all updated. I also made some changes to code format/styles in StormCommon.java, if that's ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70205892 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,567 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug; +protected final Runnable suicideFn; +protected final IStormClusterState stormClusterState; +protected final Map<Integer, String> taskToComponent; +protected CommonStats stats; +protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry; +protected final Map<String, Map<String, LoadAwareCustomStreamG
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70205841 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,567 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug; +protected final Runnable suicideFn; +protected final IStormClusterState stormClusterState; +protected final Map<Integer, String> taskToComponent; +protected CommonStats stats; +protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry; +protected final Map<String, Map<String, LoadAwareCustomStreamG
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70205741 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,567 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; --- End diff -- ok, addressed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70205731 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java --- @@ -179,23 +180,22 @@ public BuiltinMetrics getBuiltInMetrics() { private TopologyContext mkTopologyContext(StormTopology topology) throws IOException { Map conf = (Map) workerData.get("conf"); return new TopologyContext( -topology, -(Map) workerData.get("storm-conf"), -(Map<Integer, String>) workerData.get("task->component"), -(Map<String, List>) workerData.get("component->sorted-tasks"), -(Map<String, Map<String, Fields>>) workerData.get("component->stream->fields"), -(String) workerData.get("storm-id"), - ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(conf, (String) workerData.get("storm-id"))), -ConfigUtils.workerPidsRoot(conf, (String) workerData.get("worker-id")), -taskId, -(Integer) workerData.get("port"), -(List) workerData.get("task-ids"), -(Map<String, Object>) workerData.get("default-shared-resources"), -(Map<String, Object>) workerData.get("user-shared-resources"), -(Map<String, Object>) executorData.get("shared-executor-data"), -(Map<Integer, Map<Integer, Map<String, IMetric>>>) executorData.get("interval->task->metric-registry"), -(clojure.lang.Atom) executorData.get("open-or-prepare-was-called?") -); +topology, +(Map) workerData.get("storm-conf"), --- End diff -- addressed, moved them into Constants.java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70205708 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java --- @@ -539,10 +542,34 @@ protected IAuthorizer mkAuthorizationHandlerImpl(String klassName, Map conf) thr if (aznHandler != null) { aznHandler.prepare(conf); } -LOG.debug("authorization class name:{}, class:{}, handler:{}",klassName, aznClass, aznHandler); +LOG.debug("authorization class name:{}, class:{}, handler:{}", klassName, aznClass, aznHandler); } } return aznHandler; } + +public static WorkerTopologyContext makeWorkerContext(Map<String, Object> workerData) { +try { +StormTopology stormTopology = (StormTopology) workerData.get("system-topology"); --- End diff -- addressed, moved them into Constants class --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70205629 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/StormCommon.java --- @@ -77,7 +78,7 @@ public static StormCommon setInstance(StormCommon common) { return oldInstance; } -private static final Logger LOG = LoggerFactory.getLogger(StormCommon.class); +private static final Logger LOG = getLogger(StormCommon.class); --- End diff -- addressed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70205623 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/Acker.java --- @@ -101,7 +101,7 @@ public void execute(Tuple input) { } curr.failed = true; pending.put(id, curr); -} else if (ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { +} else if(ACKER_RESET_TIMEOUT_STREAM_ID.equals(streamId)) { --- End diff -- addressed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r70203481 --- Diff: storm-core/src/jvm/org/apache/storm/executor/Executor.java --- @@ -0,0 +1,567 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import clojure.lang.IFn; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.cluster.ClusterStateContext; +import org.apache.storm.cluster.ClusterUtils; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.GrouperFactory; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.bolt.BoltExecutor; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.executor.error.ReportError; +import org.apache.storm.executor.error.ReportErrorAndDie; +import org.apache.storm.executor.spout.SpoutExecutor; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.DebugOptions; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadAwareCustomStreamGrouping; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.DisruptorBackpressureCallback; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.WorkerBackpressureThread; +import org.json.simple.JSONValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.Callable; + +public abstract class Executor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(Executor.class); + +protected final Map workerData; +protected final WorkerTopologyContext workerTopologyContext; +protected final List executorId; +protected final List taskIds; +protected final String componentId; +protected final AtomicBoolean openOrPrepareWasCalled; +protected final Map stormConf; +protected final Map conf; +protected final String stormId; +protected final HashMap sharedExecutorData; +protected final AtomicBoolean stormActive; +protected final AtomicReference<Map<String, DebugOptions>> stormComponentDebug; +protected final Runnable suicideFn; +protected final IStormClusterState stormClusterState; +protected final Map<Integer, String> taskToComponent; +protected CommonStats stats; +protected final Map<Integer, Map<Integer, Map<String, IMetric>>> intervalToTaskToMetricToRegistry; +protected final Map<String, Map<String, LoadAwareCustomStreamG
[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1445 @hustfxj could you take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r69555784 --- Diff: storm-core/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java --- @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.spout; + +import com.google.common.collect.ImmutableMap; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.ICredentialsListener; +import org.apache.storm.daemon.Acker; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics; +import org.apache.storm.executor.Executor; +import org.apache.storm.executor.TupleInfo; +import org.apache.storm.hooks.info.SpoutAckInfo; +import org.apache.storm.hooks.info.SpoutFailInfo; +import org.apache.storm.spout.ISpout; +import org.apache.storm.spout.ISpoutWaitStrategy; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class SpoutExecutor extends Executor { + +private static final Logger LOG = LoggerFactory.getLogger(SpoutExecutor.class); + +private final ISpoutWaitStrategy spoutWaitStrategy; +private Integer maxSpoutPending; +private final AtomicBoolean lastActive; +private List spouts; +private List outputCollectors; +private final MutableLong emittedCount; +private final MutableLong emptyEmitStreak; +private final SpoutThrottlingMetrics spoutThrottlingMetrics; +private final boolean hasAckers; +private RotatingMap<Long, TupleInfo> pending; +private final boolean backPressureEnabled; + +public SpoutExecutor(final Map workerData, final List executorId, Map<String, String> credentials) { +super(workerData, executorId, credentials); +this.spoutWaitStrategy = Utils.newInstance((String) stormConf.get(Config.TOPOLOGY_SPOUT_WAIT_STRATEGY)); +this.spoutWaitStrategy.prepare(stormConf); + +this.backPressureEnabled = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE), false); + +this.lastActive = new AtomicBoolean(false); +this.hasAckers = StormCommon.hasAckers(stormConf); +this.emittedCount = new MutableLong(0); +this.emptyEmitStreak = new MutableLong(0); +this.spoutThrottlingMetrics = new SpoutThrottlingMetrics(); +} + +@Override +public void init(final Map<Integer, Task> idToTask) { +LOG.info("Opening spout {}:{}", componentId, idToTask.keySet()); +this.idToTask = idToTask; +this.maxSpoutPending = Utils.getInt(stormConf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING), 0) * idToTask.size(); --- End diff -- The default value of max pending is null, in the original executor.clj: ``` ^Integer max-spout-pending (if max-spout-pending (int max-spout-pending)) ``` max-spout-pending could be null thus in the code usage, it's like `max-spout-pending and ...`, while I changed it into java in this way: `maxSpoutPending !=0 && ...` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1445 @satishd upmerged. Strange that `org.apache.storm.messaging.netty-unit-test` fails while all tests including ut & integration tests pass on my mac. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1445 @abhishekagarwal87 no hurry, please take your time~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1499: (1.x) STORM-1911 IClusterMetricsConsumer should use secon...
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1499 @HeartSaVioR could you use Time.currentTimeSecs instead? there's already a method to get current time secs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1445 @abhishekagarwal87 could you help review again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1482: STORM-1876: Option to build storm-kafka and storm-kafka-c...
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1482 Should we change the README.md accordingly? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm issue #1445: [STORM-1277] port backtype.storm.daemon.executor to java
Github user unsleepy22 commented on the issue: https://github.com/apache/storm/pull/1445 @abhishekagarwal87 thanks again for your review, all comments addressed except for ones explained inline above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r66716382 --- Diff: storm-core/src/clj/org/apache/storm/daemon/local_executor.clj --- @@ -0,0 +1,43 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-executor --- End diff -- I'm not very sure. This file is added for test, in testing.clj, the `local-transfer-executor-tuple` method will be replaced at runtime, and requires the method name as an argument. Translating it into Java will probably fail to do this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r66715672 --- Diff: storm-core/src/jvm/org/apache/storm/executor/BaseExecutor.java --- @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor; + +import com.google.common.collect.Lists; +import com.lmax.disruptor.EventHandler; +import java.net.UnknownHostException; +import org.apache.storm.Config; +import org.apache.storm.Constants; +import org.apache.storm.StormTimer; +import org.apache.storm.daemon.StormCommon; +import org.apache.storm.daemon.Task; +import org.apache.storm.executor.error.IReportError; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.AddressedTuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.DisruptorQueue; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; +import java.util.concurrent.Callable; + +public abstract class BaseExecutor implements Callable, EventHandler { + +private static final Logger LOG = LoggerFactory.getLogger(BaseExecutor.class); + +protected final ExecutorData executorData; +protected final Map stormConf; +protected final String componentId; +protected final WorkerTopologyContext workerTopologyContext; +protected final IReportError reportError; +protected final Callable sampler; +protected final Random rand; +protected final DisruptorQueue transferQueue; +protected final DisruptorQueue receiveQueue; +protected final Map<Integer, Task> idToTask; +protected final Map<String, String> credentials; +protected final Boolean isDebug; +protected final Boolean isEventLoggers; +protected String hostname; + +public BaseExecutor(ExecutorData executorData, Map<Integer, Task> idToTask, Map<String, String> credentials) { +this.executorData = executorData; +this.stormConf = executorData.getStormConf(); +this.componentId = executorData.getComponentId(); +this.workerTopologyContext = executorData.getWorkerTopologyContext(); +this.reportError = executorData.getReportError(); +this.sampler = executorData.getSampler(); +this.rand = new Random(Utils.secureRandomLong()); +this.transferQueue = executorData.getBatchTransferWorkerQueue(); +this.receiveQueue = executorData.getReceiveQueue(); +this.idToTask = idToTask; +this.credentials = credentials; +this.isDebug = Utils.getBoolean(stormConf.get(Config.TOPOLOGY_DEBUG), false); +this.isEventLoggers = StormCommon.hasEventLoggers(stormConf); + +try { +this.hostname = Utils.hostname(stormConf); +} catch (UnknownHostException ignored) { +this.hostname = ""; +} +} + +@SuppressWarnings("unchecked") +@Override +public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception { +ArrayList addressedTuples = (ArrayList) event; +for (AddressedTuple addressedTuple : addressedTuples) { +TupleImpl tuple = (TupleImpl) addressedTuple.getTuple(); +int taskId = addressedTuple.getDest(); +if (isDebug) { +LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple); +} +if (taskId != AddressedTuple.BROADCAST_DEST) { +tupleActionFn(taskId, tuple); +} else { +for (Integer t : ex
[GitHub] storm pull request #1445: [STORM-1277] port backtype.storm.daemon.executor t...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1445#discussion_r66714834 --- Diff: storm-core/src/jvm/org/apache/storm/executor/error/IReportError.java --- @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.executor.error; + +public interface IReportError { --- End diff -- Actually there's already an IErrorReporter under org.apache.storm.task package. I think we can remove this IReportError class, and extend ErrorReporter(i.e., ReportError) from the existing IErrorReporter, what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1445#issuecomment-97509 @abhishekagarwal87 For your Point 2, after more thoughts, I think it might be a little difficult to move BaseExecutor into Executor. Before creating the real executor, we need to init some context in advance i.e., ExecutorData, so that we know whether we want a SpoutExecutor or BoltExecutor, but if we move this into Executor as an abstract class, this will be that we are trying to create a BaseExecutor before we know what to create exactly. I think it's better to rename Executor class to be ExecutorMaker or ExecutorUtil or something, what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1445#issuecomment-95424 @abhishekagarwal87 thanks for the review, comments inline 1. Why is ExecutorData a standalone class? Can we not put the fields of ExecutorData inside the Executor class itself. We can pass around the Executor object itself. [Cody] My intention was to keep Executor itself as clean as possible, surely we can put the fields of ExecutorData into Executor, but this might make Executor quite large. 2. How is BaseExecutor different from Executor? [Cody] Admittedly this is kind of confusing. Currently Executor is more like a util class while subclasses of BaseExecutor do the real work. I think I'm ok to make Executor as an abstract class like just like BaseExecutor and remove BaseExecutor. 3. I assume ExecutorCommons is supposed to be the code which will be called by both spout and bolt. Can it move to the Executor superclass itself? [Cody] Since ExecutorCommon contains only static methods, which might be called in OutputCollector, i.e., Spout/BoltOutputCollectorImpl, I would not put it as a super class, still, I'm ok to put it into Executor instead of another ExecutorCommon class (might make Executor even larger...). 4. Does ExecutorShutdown needs to be in its own class? [Cody] My thought was to expose ExecutorShutdown to worker, which is a simplified interface(actually the original clojure code does the same way). Otherwise we have to implement Shutdownable, IRunningExecutor interfaces all in Executor(as well as interfaces from BaseExecutor). More thoughts on current design: It more or less follows the design of JStorm, and I personally would keep a unified structure for both executor and worker, i.e., they will have ExecutorData/WorkerData, which contain running information for the executor/worker; they will return Shutdownable's like ExecutorShutdown/WorkerShutdown, which are simplified interfaces. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1445#issuecomment-221794972 fixed integration tests as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1445#issuecomment-22142 upmerged with master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1445#issuecomment-221550255 All tests have been fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...
GitHub user unsleepy22 opened a pull request: https://github.com/apache/storm/pull/1445 [STORM-1277] port backtype.storm.daemon.executor to java Sorry for the long delay on this. This PR is not 100% completed yet, things remaining: 1. a few tests fail and I'm still working on this (basically with method replacement of `local-executor/local-transfer-executor-tuple`, which was originally `executor/mk-executor-transfer-fn` in testing.clj, I'm not familiar with this and still trying to figure out how, appreciate any help) 2. my code base is behind master, so after all tests pass I will make a full merge from master Hopefully I can finish the problems by this week. Still, the base structure/design of the executor won't change, so I think I can get reviews from you guys first on the design. You can merge this pull request into a Git repository by running: $ git pull https://github.com/unsleepy22/storm storm-executor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1445.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1445 commit b13ac017938c3a55181d33faf77570be0d2e001e Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-04-29T09:58:50Z port executor to java things remaining: 1. a few tests to be fixed 2. need to upmerge with master --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1444#issuecomment-221465548 sorry close and re-submit --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...
Github user unsleepy22 closed the pull request at: https://github.com/apache/storm/pull/1444 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1277] port backtype.storm.daemon.execut...
GitHub user unsleepy22 opened a pull request: https://github.com/apache/storm/pull/1444 [STORM-1277] port backtype.storm.daemon.executor to java Sorry for the long delay on this. This PR is not 100% completed yet, things remaining: 1. a few tests will fail and I'm still working on this (basically with method replacements like local-executor/local-transfer-executor-tuple, which was originally executor/mk-executor-transfer-fn) 2. my code base is behind master, so after all tests pass I will make a full merge from master Hopefully I can finish the problems by this week. Still, the base structure/design of the executor won't change, so I think I can get reviews from you first on the design. You can merge this pull request into a Git repository by running: $ git pull https://github.com/unsleepy22/storm storm-executor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1444.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1444 commit 3da8aa0c7c9854c82920ab25ea15859dc9a4658d Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-04-29T09:58:50Z init commit for executor commit 69111064945ef7cad7706875739fbe0f1168f31e Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-11T09:29:25Z refined code, fixed bugs commit e3cb37373f108055460dd40ef3d66aa306ef0b6e Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-11T13:18:49Z fixed bugs commit 2f209adef20af5a43aac67c5420d838d8934b5a3 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-23T09:40:42Z added debug logs commit b0efb563d7f0f3a5c99c18675871618dd1b52b20 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-23T13:37:33Z fixed type cast error commit c1cfcc2ece8de6b51561b00ea1b39fe5d4f05c25 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-24T09:12:20Z messaging-test ok commit ed2454d8429687d1fe2f7feb71b5e8fc87b434f3 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-24T10:52:30Z fixed tuple-test error commit eab32b22e8cd08612b8db8cebf4f025df351eae4 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-24T11:26:21Z minor changes commit 5883acf4d557c51e7002a5a4120e1c51d352c279 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-25T03:22:06Z changes for testing commit c4ec8bacf38fb36bf2ec9ff352c21e7edef3297b Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-25T03:31:40Z revert changes to acker & testing commit da454e90a3606b09b469f460a3e8f52104666162 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-05-25T03:33:15Z revert changes to testing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1324#issuecomment-217691630 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1324#discussion_r61899624 --- Diff: storm-core/src/jvm/org/apache/storm/metric/MetricsConsumerBolt.java --- @@ -47,17 +68,71 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll } _metricsConsumer.prepare(stormConf, _registrationArgument, context, collector); _collector = collector; +_taskExecuteThread = new Thread(new MetricsHandlerRunnable()); +_taskExecuteThread.setDaemon(true); +_taskExecuteThread.start(); } @Override public void execute(Tuple input) { - _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)); +// remove older tasks if task queue exceeds the max size +if (_taskQueue.size() > _maxRetainMetricTuples) { +while (_taskQueue.size() - 1 > _maxRetainMetricTuples) { --- End diff -- I guess we may run into a concurrent issue here, if while statement suffices, it poll one item, while at the same time, metrics consumer consumers one item, anyway, I don't think this matters too much so I'm ok to keep it as is. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1324#discussion_r61898673 --- Diff: storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java --- @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.filter; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.storm.metric.api.IMetricsConsumer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; + +public class FilterByMetricName implements MetricsFilter { + +private List whitelistPattern; +private List blacklistPattern; +private boolean noneSpecified = false; + +public FilterByMetricName(List whitelistPattern, List blacklistPattern) { +// guard NPE +if (whitelistPattern == null) { +this.whitelistPattern = Collections.emptyList(); +} else { +this.whitelistPattern = convertPatternStringsToPatternInstances(whitelistPattern); +} + +// guard NPE +if (blacklistPattern == null) { +this.blacklistPattern = Collections.emptyList(); +} else { +this.blacklistPattern = convertPatternStringsToPatternInstances(blacklistPattern); +} + +if (this.whitelistPattern.isEmpty() && this.blacklistPattern.isEmpty()) { +noneSpecified = true; +} else if (!this.whitelistPattern.isEmpty() && !this.blacklistPattern.isEmpty()) { +throw new IllegalArgumentException("You have to specify either includes or excludes, or none."); +} +} + +private ArrayList convertPatternStringsToPatternInstances(List patterns) { +return Lists.newArrayList(Iterators.transform(patterns.iterator(), new Function<String, Pattern>() { +@Override +public Pattern apply(String s) { +return Pattern.compile(s); +} +})); +} + +@Override +public boolean apply(IMetricsConsumer.DataPoint dataPoint) { +if (noneSpecified) { +return true; +} + +String metricName = dataPoint.name; + +if (!whitelistPattern.isEmpty()) { --- End diff -- OK, I personally prefer the latter approach. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1731 (1.x) Avoid looking up debug / back...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1362#issuecomment-214960183 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1729 Get rid of reflections while record...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1361#issuecomment-214711443 +1 BTW. can we have more performance tests in more cases so that we can see the overall performance change? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1723 (1.x) Introduce ClusterMetricsConsu...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1352#issuecomment-214360458 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1723 (1.x) Introduce ClusterMetricsConsu...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1352#issuecomment-214096038 Maybe it's better to put multi-cluster support in 2.0, in which I think multi-cluster can be supported at least in web ui. So we can leave it be for now if we don't see the need from end users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1723 (1.x) Introduce ClusterMetricsConsu...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1352#issuecomment-213391769 for the ClusterInfo class, do we need to add a "cluster.name" property? In a real metrics consumer case, the external storage may want to consumer a bunch of cluster metrics and display them, thus a "cluster.name" may be necessary to distinguish different clusters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1723 (1.x) Introduce ClusterMetricsConsu...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1352#issuecomment-213390625 I couldn't comment on nimbus.clj, how about rename "cluster-consumer-executors" to "cluster-metrics-consumers" since we have "executors" in tasks already. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1324#issuecomment-213362512 Also, as addressed in STORM-1698, we may have to drop metric points when metrics consumer itself fails to catch up. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1700 Introduce 'whitelist' / 'blacklist'...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1324#discussion_r60715825 --- Diff: storm-core/src/jvm/org/apache/storm/metric/filter/FilterByMetricName.java --- @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric.filter; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; +import org.apache.storm.metric.api.IMetricsConsumer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.regex.Pattern; + +public class FilterByMetricName implements MetricsFilter { + +private List whitelistPattern; +private List blacklistPattern; +private boolean noneSpecified = false; + +public FilterByMetricName(List whitelistPattern, List blacklistPattern) { +// guard NPE +if (whitelistPattern == null) { +this.whitelistPattern = Collections.emptyList(); +} else { +this.whitelistPattern = convertPatternStringsToPatternInstances(whitelistPattern); +} + +// guard NPE +if (blacklistPattern == null) { +this.blacklistPattern = Collections.emptyList(); +} else { +this.blacklistPattern = convertPatternStringsToPatternInstances(blacklistPattern); +} + +if (this.whitelistPattern.isEmpty() && this.blacklistPattern.isEmpty()) { +noneSpecified = true; +} else if (!this.whitelistPattern.isEmpty() && !this.blacklistPattern.isEmpty()) { +throw new IllegalArgumentException("You have to specify either includes or excludes, or none."); +} +} + +private ArrayList convertPatternStringsToPatternInstances(List patterns) { +return Lists.newArrayList(Iterators.transform(patterns.iterator(), new Function<String, Pattern>() { +@Override +public Pattern apply(String s) { +return Pattern.compile(s); +} +})); +} + +@Override +public boolean apply(IMetricsConsumer.DataPoint dataPoint) { +if (noneSpecified) { +return true; +} + +String metricName = dataPoint.name; + +if (!whitelistPattern.isEmpty()) { --- End diff -- My concern about pattern match is, regular expression matching is pretty CPU-intensive, if possible, I would prefer a simple "String.contains" match, which may be much faster and less CPU-intensive. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1698 Asynchronous MetricsConsumerBolt
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1322#issuecomment-213352632 I suppose this PR has been merged into https://github.com/apache/storm/pull/1324 ? I'll skip this one. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1687] divide by zero in StatsUtil
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1312#issuecomment-206103981 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1655 (1.x) Flux doesn't set return code ...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1254#issuecomment-200819815 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1218#issuecomment-200667388 @abhishekagarwal87 done~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1218#issuecomment-200613922 thanks, addressed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1218#discussion_r57266090 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.metrics; + +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IStatefulObject; +import org.apache.storm.metric.api.StateMetric; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.TopologyContext; + +public class BuiltinMetricsUtil { +public static BuiltinMetrics mkData(String type, Object stats) { +if (StatsUtil.SPOUT.equals(type)) { +return new BuiltinSpoutMetrics((SpoutExecutorStats) stats); +} +return new BuiltinBoltMetrics((BoltExecutorStats) stats); +} + +public static void registerIconnectionServerMetric(Object server, Map stormConf, TopologyContext context) { +if (server instanceof IStatefulObject) { +registerMetric("__recv-iconnection", new StateMetric((IStatefulObject) server), stormConf, context); +} +} + +public static void registerIconnectionClientMetrics(final Map nodePort2socket, Map stormConf, TopologyContext context) { +IMetric metric = new IMetric() { +@Override +public Object getValueAndReset() { +Map<Object, Object> ret = new HashMap<>(); +for (Object o : nodePort2socket.entrySet()) { +Map.Entry entry = (Map.Entry) o; +Object nodePort = entry.getKey(); +Object connection = entry.getValue(); +if (connection instanceof IStatefulObject) { +ret.put(nodePort, ((IStatefulObject) connection).getState()); +} +} +return ret; +} +}; +registerMetric("__send-iconnection", metric, stormConf, context); +} + +public static void registerQueueMetrics(Map queues, Map stormConf, TopologyContext context) { +for (Object o : queues.entrySet()) { --- End diff -- addressed --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1218#issuecomment-200407083 ping @revans2 , could you take time to have a look? this PR blocks task.clj. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1229: port backtype.storm.metric.testing...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1238#discussion_r56766748 --- Diff: storm-core/test/jvm/org/apache/storm/metric/FakeMetricConsumer.java --- @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Multimap; +import com.google.common.collect.Table; + +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.task.IErrorReporter; +import org.apache.storm.task.TopologyContext; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +public class FakeMetricConsumer implements IMetricsConsumer { + +public static final Table<String, String, Multimap<Integer, Object>> buffer = HashBasedTable.create(); --- End diff -- why is this static? can't we make it an instance field? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Fix logging for LoggingMetricsConsumer STORM-5...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1221#issuecomment-197652505 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1218#issuecomment-198410768 @revans2 could you take a look? Since porting task.clj depends on this, I'd like this to be merged as soon as possible and since stats.clj is done, builtin-metrics is much simpler. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1218#discussion_r56305899 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/metrics/BuiltinMetricsUtil.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.metrics; + +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.Config; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.metric.api.IStatefulObject; +import org.apache.storm.metric.api.StateMetric; +import org.apache.storm.stats.BoltExecutorStats; +import org.apache.storm.stats.SpoutExecutorStats; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.task.TopologyContext; + +public class BuiltinMetricsUtil { +public static BuiltinMetrics mkData(String type, Object stats) { +if (StatsUtil.SPOUT.equals(type)) { +return new BuiltinSpoutMetrics((SpoutExecutorStats) stats); +} +return new BuiltinBoltMetrics((BoltExecutorStats) stats); +} + +public static void registerIconnectionServerMetric(Object server, Map stormConf, TopologyContext context) { +if (server instanceof IStatefulObject) { +registerMetric("__recv-iconnection", new StateMetric((IStatefulObject) server), stormConf, context); +} +} + +public static void registerIconnectionClientMetrics(final Map nodePort2socket, Map stormConf, TopologyContext context) { +IMetric metric = new IMetric() { +@Override +public Object getValueAndReset() { +Map<Object, Object> ret = new HashMap<>(); +for (Object o : nodePort2socket.entrySet()) { --- End diff -- No, it has to be cast to Map.Entry inside the `for` loop code because it's actually a clojure map which has no generic definition. Java compiler will complain if I replace `Object o` to `Map.Entry o` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1268] port builtin-metrics to java
GitHub user unsleepy22 opened a pull request: https://github.com/apache/storm/pull/1218 [STORM-1268] port builtin-metrics to java You can merge this pull request into a Git repository by running: $ git pull https://github.com/unsleepy22/storm STORM-1268 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1218.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1218 commit 5ddab72ae9cc13bc7f680daa44c27ee9fb7b87ef Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-03-16T06:09:59Z port builtin-metrics to java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1147#issuecomment-196277268 @revans2 any update on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1618: Add the option of passing config d...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1204#issuecomment-195396027 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1605] use '/usr/bin/env python' to chec...
GitHub user unsleepy22 opened a pull request: https://github.com/apache/storm/pull/1196 [STORM-1605] use '/usr/bin/env python' to check python version Current python version check is hard-coded and it cannot detect python 2.7.x versions. Changed to /usr/bin/env to keep consistent with storm.py You can merge this pull request into a Git repository by running: $ git pull https://github.com/unsleepy22/storm STORM-1605 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1196.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1196 commit 20f1497c213c97a8a72b2f43039ad59ce9ce5169 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-03-09T08:05:04Z use '/usr/bin/env python' to check python version --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1147#issuecomment-194115823 @revans2 you're right about the possible NPE, I've updated code. Also I added a lot of method comments. I have tested web ui with new code and compared stats data and all looks good. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1606] print the information of testcase...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1189#issuecomment-192884522 Sorry, I didn't take a deep look. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1606] print the information of testcase...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1189#discussion_r55135284 --- Diff: dev-tools/travis/print-errors-from-test-reports.py --- @@ -55,6 +55,10 @@ def print_error_reports_from_report_file(file_path): if fail is not None: print_detail_information(testcase, fail) +failure = testcase.find("failure") --- End diff -- I would suspect the validity of this change since "fail" is a substring of "failure", the script should not miss "failure"s by searching "fail". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1590] port defmeters/defgauge/defhistog...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1171#issuecomment-192615168 @revans2 could you take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1147#issuecomment-192244223 @revans2 I get your idea and you don't have to go through all of them, my initial thought was to keep it compatible with original clojure data structures, but I'm ok to change all of them, I will update in a day or two~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1283: port backtype.storm.MockAutoCred t...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1176#issuecomment-191824439 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: port defmeters/defgauge/defhistogram... to jav...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1171#issuecomment-191044704 @revans2 changed according to your comments, also changed all the other files. start-metrics-reporters in common.clj is changed to java too. Please help review again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1161#issuecomment-191034221 @abhishekagarwal87 changed according to your comments @revans2 added logs to .gitignore and maven-antrun-plugin to delete logs directory after tests please help review again --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: port defmeters/defgauge/defhistogram... to jav...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1171#discussion_r54667164 --- Diff: storm-core/src/jvm/org/apache/storm/metric/StormMetricsRegistry.java --- @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metric; + +import clojure.lang.IFn; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings("unchecked") +public class StormMetricsRegistry { +private static final Logger LOG = LoggerFactory.getLogger(StormMetricsRegistry.class); +private static final MetricRegistry metrics = new MetricRegistry(); --- End diff -- Sure, will change --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: port defmeters/defgauge/defhistogram... to jav...
GitHub user unsleepy22 opened a pull request: https://github.com/apache/storm/pull/1171 port defmeters/defgauge/defhistogram... to java for all of our code to use This PR is under STORM-1590, I changed nimbus.clj only currently and would like to hear your opinion first before moving on. You can merge this pull request into a Git repository by running: $ git pull https://github.com/unsleepy22/storm STORM-1590 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1171.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1171 commit a23533cac02c53742e83e1049783c255489521f7 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-03-01T15:34:58Z change defmeter and defgauge in nimbus to java metrics code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1161#issuecomment-190727124 Ah, you're right, I get you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1161#issuecomment-190713869 @abhishekagarwal87 your change is more careful & considerate, but changing defaults.yaml would be simpler and it's pretty reasonable to set storm.log.dir to "logs" by default, what do you think? Also, @revans2 what's your opinion on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1161#issuecomment-190684819 Probably won't work since storm.home is not set when running tests. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1161#issuecomment-190621208 @abhishekagarwal87 changed according to your comments, please help review again, rat check also passes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1147#issuecomment-190526220 @revans2 yes I see the exception too when running your test, but looks odd because I've tested all storm ui pages in distributed mode with java stats and everything looks good. The exception is as follows, StatsUtil.filterSysStreams expects a map of {window -> stream id -> value}, while in your test it gets a map of {stream id -> value}. I suppose this happens in local mode only? Though I can manage to make through your test by changing this method a little. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1244: port backtype.storm.command.upload...
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1166#discussion_r54429162 --- Diff: storm-core/src/jvm/org/apache/storm/command/UploadCredentials.java --- @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.command; + +import org.apache.storm.StormSubmitter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileReader; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class UploadCredentials { + +private static final Logger LOG = LoggerFactory.getLogger(UploadCredentials.class); + +public static void main(String[] args) throws Exception { +Map<String, Object> cl = CLI.opt("f", "file", null) +.arg("topologyName", CLI.FIRST_WINS) +.arg("rawCredentials", CLI.INTO_LIST) +.parse(args); + +String credentialFile = (String) cl.get("f"); +java.util.List rawCredentials = (java.util.List) cl.get("rawCredentials"); --- End diff -- you can import java.util.List --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1161#issuecomment-190194178 @abhishekagarwal87 , I get you. My opinion would be: 1. revert changes to ConfigUtils class 2. set "storm.log.dir: logs" in defaults.yaml 3. add "**/logs/**" to apache-rat-plugin excludes 4. remain supervisor-test the same with my commit because settting "storm.log.dir = logs OR /tmp/logs" will break supervisor-test anyway. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1161#issuecomment-190108980 @abhishekagarwal87 meaning that add `storm.log.dir: /logs` to defaults.yaml? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java
Github user unsleepy22 commented on a diff in the pull request: https://github.com/apache/storm/pull/1147#discussion_r54365079 --- Diff: storm-core/src/jvm/org/apache/storm/stats/BoltExecutorStats.java --- @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.stats; + +import clojure.lang.PersistentVector; +import java.util.HashMap; +import java.util.Map; +import org.apache.storm.metric.internal.MultiCountStatAndMetric; +import org.apache.storm.metric.internal.MultiLatencyStatAndMetric; + +@SuppressWarnings("unchecked") +public class BoltExecutorStats extends CommonStats { + +public static final String ACKED = "acked"; +public static final String FAILED = "failed"; +public static final String EXECUTED = "executed"; +public static final String PROCESS_LATENCIES = "process-latencies"; +public static final String EXECUTE_LATENCIES = "execute-latencies"; + +public static final String[] BOLT_FIELDS = {ACKED, FAILED, EXECUTED, PROCESS_LATENCIES, EXECUTE_LATENCIES}; + +public BoltExecutorStats() { +super(); + +put(ACKED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); +put(FAILED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); +put(EXECUTED, new MultiCountStatAndMetric(NUM_STAT_BUCKETS)); +put(PROCESS_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); +put(EXECUTE_LATENCIES, new MultiLatencyStatAndMetric(NUM_STAT_BUCKETS)); +} + +public MultiCountStatAndMetric getAcked() { +return (MultiCountStatAndMetric) this.get(ACKED); +} + +public MultiCountStatAndMetric getFailed() { +return (MultiCountStatAndMetric) this.get(FAILED); +} + +public MultiCountStatAndMetric getExecuted() { +return (MultiCountStatAndMetric) this.get(EXECUTED); +} + +public MultiLatencyStatAndMetric getProcessLatencies() { +return (MultiLatencyStatAndMetric) this.get(PROCESS_LATENCIES); +} + +public MultiLatencyStatAndMetric getExecuteLatencies() { +return (MultiLatencyStatAndMetric) this.get(EXECUTE_LATENCIES); +} + +public void boltExecuteTuple(String component, String stream, long latencyMs) { +Object key = PersistentVector.create(component, stream); +this.getExecuted().incBy(key, this.rate); +this.getExecuteLatencies().record(key, latencyMs); +} + +public void boltAckedTuple(String component, String stream, long latencyMs) { +Object key = PersistentVector.create(component, stream); +this.getAcked().incBy(key, this.rate); +this.getProcessLatencies().record(key, latencyMs); +} + +public void boltFailedTuple(String component, String stream, long latencyMs) { +Object key = PersistentVector.create(component, stream); +this.getFailed().incBy(key, this.rate); + +} + +public Map renderStats() { +cleanupStats(); +Map ret = new HashMap(); +ret.putAll(valueStats(CommonStats.COMMON_FIELDS)); +ret.putAll(valueStats(BoltExecutorStats.BOLT_FIELDS)); +StatsUtil.putRawKV(ret, StatsUtil.TYPE, StatsUtil.KW_BOLT); --- End diff -- @revans2 I've changed all the others according to your comments, my only problem is renderStats, do you mean that I only need to maintain ExecutorStats in memory without any stats Maps while running? Although renderStats is only used in thriftify-zk-worker-hb, when workers do heartbeats(see worker/do-executor-heartbeats method), it will use a clojure PersistentMap data structure, which messes with ExecutorStats, should I turn it into ExecutorStats too? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1161#issuecomment-190003786 Build failure looks odd, I have all tests pass locally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
GitHub user unsleepy22 opened a pull request: https://github.com/apache/storm/pull/1161 [Storm-1579] Fix NoSuchFileException when running tests in storm-core The old code takes "storm.local.dir" property as logDir while changes in STORM-1552 used ConfigUtils.getLogDir, which doesn't take this property, causing the ultimate event log directory to be "/logs/" and resulting in NoSuchFileException because permission is denied to create /logs directory. The change of code causes 3 test cases to fail in supervisor_test, so I changed them accordingly to make all tests pass. You can merge this pull request into a Git repository by running: $ git pull https://github.com/unsleepy22/storm STORM-1579 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1161.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1161 commit 034f0cf107403100650d6eb65e7168f62133864a Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-02-28T14:33:21Z fix STORM-1579, checks storm.local.dir property/conf when getting storm log dir commit 504c11b8ead80e186ff0de83dbdece2337cd1162 Author: å«ä¹ <weiyue...@taobao.com> Date: 2016-02-28T14:42:56Z append "/logs" to "storm.local.dir" property when non-null --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 closed the pull request at: https://github.com/apache/storm/pull/1155 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1155#issuecomment-189629712 My change was quite simple: just adding back the logic to check "storm.local.dir" property/conf, but there're 3 test failures in "org.apache.storm.supervisor-test" (all the others pass without exceptions), I've no idea why, can someone please help take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1147#issuecomment-189345949 Thanks @revans2 , I'll address all your comments~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1155#issuecomment-189291719 @abhishekagarwal87 they're in test-reports, when running tests, you'll get messages like "Exception: java.lang.ClassCastException thrown from the UncaughtExceptionHandler in thread "Thread-35-__eventlogger-executor[2 2]", by looking into the detailed test-reports, you'll get NoSuchFileException. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [Storm-1579] Fix NoSuchFileException when runn...
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1155#issuecomment-189268264 There're test failures, I'll look into it first. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1252] port backtype.storm.stats to java
Github user unsleepy22 commented on the pull request: https://github.com/apache/storm/pull/1147#issuecomment-189262832 @revans2 could you please help review this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---