[GitHub] storm issue #2518: STORM-2902: Some improvements for storm-rocketmq module
Github user hustfxj commented on the issue: https://github.com/apache/storm/pull/2518 +1 Thank you for @vesense ---
[GitHub] storm pull request #2518: STORM-2902: Some improvements for storm-rocketmq m...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/2518#discussion_r179943857 --- Diff: external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java --- @@ -60,14 +58,14 @@ public class RocketMqSpout implements IRichSpout { // TODO add metrics --- End diff -- not only add metrcis, but also print log for MqSpout and MqBolt ---
[GitHub] storm issue #2024: STORM-2349: Add one RocketMQ plugin for the Apache Storm
Github user hustfxj commented on the issue: https://github.com/apache/storm/pull/2024 @vesense thank you. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request #2024: STORM-2349: Add one RocketMQ plugin for the Apache...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/2024#discussion_r111697081 --- Diff: external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java --- @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.rocketmq.spout; + +import org.apache.commons.lang.Validate; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.storm.Config; +import org.apache.storm.rocketmq.DefaultMessageRetryManager; +import org.apache.storm.rocketmq.MessageRetryManager; +import org.apache.storm.rocketmq.MessageSet; +import org.apache.storm.rocketmq.RocketMQConfig; +import org.apache.storm.rocketmq.SpoutConfig; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean; +import static org.apache.storm.rocketmq.RocketMQUtils.getInteger; + +/** + * RocketMQSpout uses MQPushConsumer as the default implementation. + * PushConsumer is a high level consumer API, wrapping the pulling details + * Looks like broker push messages to consumer + */ +public class RocketMQSpout implements IRichSpout { +// TODO add metrics + +private MQPushConsumer consumer; +private SpoutOutputCollector collector; +private BlockingQueue queue; + +private Properties properties; +private MessageRetryManager messageRetryManager; + +public RocketMQSpout(Properties properties) { +this.properties = properties; +} + +@Override +public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { +Validate.notEmpty(properties, "Consumer properties can not be empty"); +boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false); + +int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString())); +queue = new LinkedBlockingQueue<>(queueSize); + +consumer = new DefaultMQPushConsumer(); +RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer, context); + +if (ordered) { +consumer.registerMessageListener(new MessageListenerOrderly() { +@Override +public ConsumeOrderlyStatus consumeMessage(List msgs, + ConsumeOrderlyContext context) { +if (process(msgs)) { +return ConsumeOrderlyStatus.SUCCESS; +} else { +return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; +} +}
[GitHub] storm pull request #2024: STORM-2349: Add one RocketMQ plugin for the Apache...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/2024#discussion_r111294519 --- Diff: external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java --- @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.rocketmq.spout; + +import org.apache.commons.lang.Validate; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.storm.Config; +import org.apache.storm.rocketmq.DefaultMessageRetryManager; +import org.apache.storm.rocketmq.MessageRetryManager; +import org.apache.storm.rocketmq.MessageSet; +import org.apache.storm.rocketmq.RocketMQConfig; +import org.apache.storm.rocketmq.SpoutConfig; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean; +import static org.apache.storm.rocketmq.RocketMQUtils.getInteger; + +/** + * RocketMQSpout uses MQPushConsumer as the default implementation. + * PushConsumer is a high level consumer API, wrapping the pulling details + * Looks like broker push messages to consumer + */ +public class RocketMQSpout implements IRichSpout { +// TODO add metrics + +private MQPushConsumer consumer; +private SpoutOutputCollector collector; +private BlockingQueue queue; + +private Properties properties; +private MessageRetryManager messageRetryManager; + +public RocketMQSpout(Properties properties) { +this.properties = properties; +} + +@Override +public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { +Validate.notEmpty(properties, "Consumer properties can not be empty"); +boolean ordered = getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false); + +int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString())); +queue = new LinkedBlockingQueue<>(queueSize); + +consumer = new DefaultMQPushConsumer(); +RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer, context); + +if (ordered) { +consumer.registerMessageListener(new MessageListenerOrderly() { +@Override +public ConsumeOrderlyStatus consumeMessage(List msgs, + ConsumeOrderlyContext context) { +if (process(msgs)) { +return ConsumeOrderlyStatus.SUCCESS; +} else { +return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; +} +}
[GitHub] storm pull request #2024: STORM-2349: Add one RocketMQ plugin for the Apache...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/2024#discussion_r111292663 --- Diff: external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java --- @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.rocketmq.spout; + +import org.apache.commons.lang.Validate; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.MQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.storm.Config; +import org.apache.storm.rocketmq.DefaultMessageRetryManager; +import org.apache.storm.rocketmq.MessageRetryManager; +import org.apache.storm.rocketmq.MessageSet; +import org.apache.storm.rocketmq.RocketMQConfig; +import org.apache.storm.rocketmq.SpoutConfig; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean; +import static org.apache.storm.rocketmq.RocketMQUtils.getInteger; + +/** + * RocketMQSpout uses MQPushConsumer as the default implementation. + * PushConsumer is a high level consumer API, wrapping the pulling details + * Looks like broker push messages to consumer + */ +public class RocketMQSpout implements IRichSpout { +// TODO add metrics + +private MQPushConsumer consumer; +private SpoutOutputCollector collector; +private boolean ordered; +private BlockingQueue queue; + +private Properties properties; +private MessageRetryManager messageRetryManager; + +public RocketMQSpout(Properties properties) { +this.properties = properties; +} + +@Override +public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { +Validate.notEmpty(properties, "Consumer properties can not be empty"); +ordered = getBoolean(properties,RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false); + +int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, Utils.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING))); +queue = new LinkedBlockingQueue<>(queueSize); + +consumer = new DefaultMQPushConsumer(); +RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer)consumer, context); + +if (ordered) { +consumer.registerMessageListener(new MessageListenerOrderly() { +@Override +public ConsumeOrderlyStatus consumeMessage(List msgs, + ConsumeOrderlyContext context) { +if (process(msgs)) { +return ConsumeOrderlyStatus.SUCCESS; +} else { +return ConsumeOrderlyS
[GitHub] storm issue #1200: Tests - STORM-1235, STORM-1236, STORM-1237, STORM-1238, S...
Github user hustfxj commented on the issue: https://github.com/apache/storm/pull/1200 Thank you @abhishekagarwal87 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: remove unnecessary registerSerialization about...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1278#issuecomment-219647600 we register the class "org.apache.storm.transactional.TransactionAttempt" in SerializationFactory , But we register the class "org.apache.storm.trident.topology.TransactionAttempt" in MasterBatchCoordinator. So it will occur serialization errors when we run trident topology. Do you Think ? @revans2 @satishd --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1769 Added a test to check local nimbus ...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1403#issuecomment-217434349 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1764: Pacemaker is throwing some stack t...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1397#issuecomment-217050548 good, +1. Thank you @knusbaum --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1761: Storm-Solr Example Throws ArrayInd...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1394#issuecomment-216716374 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1707] Remove two minute timeout after w...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1370#issuecomment-216446889 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1754: Correct java version in 0.10.x sto...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1385#issuecomment-216408694 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1731 (1.x) Avoid looking up debug / back...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1362#issuecomment-214968261 it's amazing, +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: minor: fix `storm.py` broken link
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1349#issuecomment-213344710 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1712] make storage plugin for transacti...
GitHub user hustfxj opened a pull request: https://github.com/apache/storm/pull/1342 [STORM-1712] make storage plugin for transactional state [not to be merged yet] This is not to be merged yet. Now we support storage transactional state to hbase, and so on. I want to hear your opinion. If Ok, I am pleasure to create the PR and merge the code. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm transactional Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1342.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1342 commit e30a4e685a0446a8a5ac092d48db4581968565a4 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-04-15T16:42:02Z make storage plugin for transactional state --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1713: Replace NotImplementedException wi...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1341#issuecomment-210549291 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1693: Move stats cleanup to executor shu...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1333#issuecomment-209193820 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1286] port kill_workers to java.
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1310#issuecomment-205645664 Thank you, +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1670 LocalState#get(String) can throw Fi...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1297#issuecomment-204610269 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1668: Fix silent failing of flux for set...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1281#issuecomment-204199412 great +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-204198660 @revans2 Thank you again. In fact I can't do some testing with profiling options due to my jdk. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1667] Log the IO exception when deletin...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1280#issuecomment-203765818 Thank you. I will merge it [1257] (https://github.com/apache/storm/pull/1257) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-203761199 @revans2 @longdafeng Thank you very much. I have addressed your comments. And we will follow the JIRAs about some other things. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r58003203 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java --- @@ -0,0 +1,408 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.workermanager; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +public class DefaultWorkerManager implements IWorkerManager { + +private static Logger LOG = LoggerFactory.getLogger(DefaultWorkerManager.class); + +private Map conf; +private CgroupManager resourceIsolationManager; +private boolean runWorkerAsUser; + +@Override +public void prepareWorker(Map conf, Localizer localizer) { +this.conf = conf; +if (Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), false)) { +try { +this.resourceIsolationManager = Utils.newInstance((String) conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN)); +this.resourceIsolationManager.prepare(conf); +LOG.info("Using resource isolation plugin {} {}", conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager); +} catch (IOException e) { +throw Utils.wrapInRuntime(e); +} +} else { +this.resourceIsolationManager = null; +} +this.runWorkerAsUser = Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false); +} + +@Override +public IWorkerResult launchWorker(String supervisorId, String assignmentId, String stormId, Long port, String workerId, WorkerResources resources, +Utils.ExitCodeCallable workerExitCallback) { +try { + +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String stormOptions = ConfigUtils.concatIfNotNull(System.getProperty("storm.options")); +String stormConfFile = ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file")); +String workerTmpDir = ConfigUtils.workerTmpRoot(conf, workerId); + +String stormLogDir = ConfigUtils.getLogDir(); +String stormLogConfDir = (String) (conf.get(Config.STORM_LOG4J2_CONF_DIR)); + +String stormLog4j2ConfDir; +if (StringUtils.isNotBlank(stormLogConfDir)) { +if (Utils.isAbsolutePath(stormLogConfDir)) { +stormLog4j2ConfDir = stormLogConfDir; +} else { +stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + stormLogConfDir; +} +} else { +stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR + "log4j2"; +} + +String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, stormId); + +String jlp = jlp(stormRoot, conf); + +String stormJar = ConfigUtils.supervisorStormJarPath(stormRoot); + +Map stormConf = ConfigUtils.readSupervisorStormConf(conf, stormId); + +String workerClassPath = getWorkerClassPath(stormJar, stormConf); + +Objec
[GitHub] storm pull request: STORM-1669 (1.x): Fix SolrUpdateBolt flush bug
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1284#issuecomment-203759675 nice +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57997792 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java --- @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.daemon.supervisor.SupervisorUtils; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.generated.ProfileRequest; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.*; + +public class RunProfilerActions implements Runnable { +private static Logger LOG = LoggerFactory.getLogger(RunProfilerActions.class); + +private Map conf; +private IStormClusterState stormClusterState; +private String hostName; + +private String profileCmd; + +private SupervisorData supervisorData; + +private class ActionExitCallback implements Utils.ExitCodeCallable { +private String stormId; +private ProfileRequest profileRequest; +private String logPrefix; + +public ActionExitCallback(String stormId, ProfileRequest profileRequest, String logPrefix) { +this.stormId = stormId; +this.profileRequest = profileRequest; +this.logPrefix = logPrefix; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} profile-action exited for {}", logPrefix, exitCode); +try { +stormClusterState.deleteTopologyProfileRequests(stormId, profileRequest); --- End diff -- @revans2 I mean it before. Now I find I did the wrong thing. We should add it back in. Thus we can guarantee operation jprofileStop even if errors happened when operating jprofileStart. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57994377 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map<String, Map<String, Object>> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map<String, List> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); --- End diff -- Of course, I will follow up a JIR for this --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57991093 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java --- @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor.timer; + +import org.apache.storm.Config; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.daemon.supervisor.SupervisorData; +import org.apache.storm.generated.SupervisorInfo; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class SupervisorHeartbeat implements Runnable { + + private final IStormClusterState stormClusterState; + private final String supervisorId; + private final Map conf; + private final SupervisorData supervisorData; + +public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) { +this.stormClusterState = supervisorData.getStormClusterState(); +this.supervisorId = supervisorData.getSupervisorId(); +this.supervisorData = supervisorData; +this.conf = conf; +} + +private SupervisorInfo buildSupervisorInfo(Map conf, SupervisorData supervisorData) { +SupervisorInfo supervisorInfo = new SupervisorInfo(); +supervisorInfo.set_time_secs(Time.currentTimeSecs()); +supervisorInfo.set_hostname(supervisorData.getHostName()); +supervisorInfo.set_assignment_id(supervisorData.getAssignmentId()); + +List usedPorts = new ArrayList<>(); + usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet()); +supervisorInfo.set_used_ports(usedPorts); +List metaDatas = (List)supervisorData.getiSupervisor().getMetadata(); +List portList = new ArrayList<>(); +if (metaDatas != null){ +for (Object data : metaDatas){ +Integer port = Utils.getInt(data); +if (port != null) +portList.add(port.longValue()); +} +} + +supervisorInfo.set_meta(portList); +supervisorInfo.set_scheduler_meta((Map<String, String>) conf.get(Config.SUPERVISOR_SCHEDULER_META)); + supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime()); +supervisorInfo.set_version(supervisorData.getStormVersion()); +supervisorInfo.set_resources_map(mkSupervisorCapacities(conf)); +return supervisorInfo; +} + +private Map<String, Double> mkSupervisorCapacities(Map conf) { +Map<String, Double> ret = new HashMap<String, Double>(); +Double mem = (double) (conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB)); +ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem); +Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY)); +ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu); +return ret; +} --- End diff -- Of course, I'll be only too pleased to follow on the JIRA later --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57990981 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,271 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.ProcessSimulator; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; +public static void setInstance(SupervisorUtils u) { +_instance = u; +} +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process processLauncher(Map conf, String user, List commandPrefix, List args, Map<String, String> environment, final String logPreFix, + final Utils.ExitCodeCallable exitCodeCallback, File dir) throws IOException { +if (StringUtils.isBlank(user)) { +throw new IllegalArgumentException("User cannot be blank when calling processLauncher."); +} +String wlinitial = (String) (conf.get(Config.SUPERVISOR_WORKER_LAUNCHER)); +String stormHome = ConfigUtils.concatIfNotNull(System.getProperty("storm.home")); +String wl; +if (StringUtils.isNotBlank(wlinitial)) { +wl = wlinitial; +} else { +wl = stormHome + "/bin/worker-launcher"; +} +List commands = new ArrayList<>(); +if (commandPrefix != null){ +commands.addAll(commandPrefix); +} +commands.add(wl); +commands.add(user); +commands.addAll(args); +LOG.info("Running as user: {} command: {}", user, commands); +return Utils.launchProcess(commands, environment, logPreFix, exitCodeCallback, dir); +} + +public static int processLauncherAndWait(Map conf, String user, List args, final Map<String, String> environment, final String logPreFix) +throws IOException { +int ret = 0; +Process process = processLauncher(conf, user, null, args, environment, logPreFix, null, null); +if (StringUtils.isNotBlank(logPreFix)) +Utils.readAndLogStream(logPreFix, process.getInputStream()); +try { +process.waitFor(); +} catch (InterruptedException e) { +LOG.info("{} interrupted.", logPreFix); +} +ret = process.exitValue(); +return ret; +} + +public static void setupStormCodeDir(Map conf, Map stormConf, String dir) throws IOException { +if (Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) { +String logPrefix = "setup conf for " + dir; +List commands = new ArrayList<>(); +commands.add("code-dir"); +commands.add(dir); +processLauncherAndWait(conf, (String) (stormConf
[GitHub] storm pull request: STORM-1271: Port backtype.storm.daemon.task to...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1249#discussion_r57912312 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java --- @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon; + +import org.apache.storm.Config; +import org.apache.storm.Thrift; +import org.apache.storm.daemon.metrics.BuiltinMetrics; +import org.apache.storm.daemon.metrics.BuiltinMetricsUtil; +import org.apache.storm.generated.Bolt; +import org.apache.storm.generated.ComponentObject; +import org.apache.storm.generated.JavaObject; +import org.apache.storm.generated.ShellComponent; +import org.apache.storm.generated.SpoutSpec; +import org.apache.storm.generated.StateSpoutSpec; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.grouping.LoadMapping; +import org.apache.storm.hooks.ITaskHook; +import org.apache.storm.hooks.info.EmitInfo; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.spout.ShellSpout; +import org.apache.storm.stats.CommonStats; +import org.apache.storm.task.ShellBolt; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.TupleImpl; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; + +public class Task { + +private static final Logger LOG = LoggerFactory.getLogger(Task.class); + +private Map executorData; +private Map workerData; +private TopologyContext systemTopologyContext; +private TopologyContext userTopologyContext; +private WorkerTopologyContext workerTopologyContext; +private LoadMapping loadMapping; +private Integer taskId; +private String componentId; +private Object taskObject; --- End diff -- componentObject is ok for me --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: remove unnecessary registerSerialization about...
GitHub user hustfxj opened a pull request: https://github.com/apache/storm/pull/1278 remove unnecessary registerSerialization about TransactionAttempt because we will register the class in the SerializationFactory very minor. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm minor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1278.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1278 commit a48e9535fd8bcd6de195a70af6102535b60d56d6 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-28T05:37:42Z Merge branch 'master' of github.com:apache/storm commit 3812b2fa04cc7eb86035d402f3c407af3b4daffe Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-30T08:56:12Z Merge branch 'master' of github.com:apache/storm commit 94eb38356366115df1995b6950539a2c14a7ac99 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-30T15:38:44Z remove unnecessary registerSerialization about TransactionAttempt because we will register the class in the SerializationFactory --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-203337583 @revans2 ok, I will file the JIRAs about the local_supervisor.clj and "remove localSyncProcess" after porting worker.clj. I am not sure whether someone is using the getAssignmentId(), although I agree with @zhuoliu . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Fix minor bug in RAS Tests
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1207#issuecomment-203193862 nice. +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57828359 --- Diff: storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj --- @@ -0,0 +1,64 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-supervisor + (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData Supervisor SupervisorUtils] + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm ProcessSimulator]) + (:use [org.apache.storm.daemon common] +[org.apache.storm log]) + (:require [org.apache.storm.daemon [worker :as worker] ]) + (:require [clojure.string :as str]) + (:gen-class)) + +(defn launch-local-worker [supervisorData stormId port workerId resources] + (let [conf (.getConf supervisorData) + pid (Utils/uuid) +worker (worker/mk-worker conf + (.getSharedContext supervisorData) + stormId + (.getAssignmentId supervisorData) + (int port) + workerId)] +(ConfigUtils/setWorkerUserWSE conf workerId "") +(ProcessSimulator/registerProcess pid worker) +(.put (.getWorkerThreadPids supervisorData) workerId pid) +)) +(defn shutdown-local-worker [supervisorData worker-manager workerId] + (log-message "shutdown-local-worker") + (let [supervisor-id (.getSupervisorId supervisorData) +worker-pids (.getWorkerThreadPids supervisorData) +dead-workers (.getDeadWorkers supervisorData)] +(.shutdownWorker worker-manager supervisor-id workerId worker-pids) --- End diff -- In the old code when killing a worker we would call `supervisor/shutdown-worker`, it will perform some acitons, and call `ProcessSimulator/killProcess` & `supervisor/try-cleanup-worker`. Now we move `try-cleanup-worker` to the worker-manager, it is called `cleanupWorker` . So the `shutdown-local-worker` is similar to the `supervisor/shutddown-worker`. Why do we override the `shutdown-local-worker`. Maybe I can't explain it clearly. You can refer to testing.clj : ``` (defmacro capture-changed-workers [& body] `(let [launch-captured# (atom {}) shutdown-captured# (atom {})] (with-var-roots [local-supervisor/launch-local-worker (mk-capture-launch-fn launch-captured#) local-supervisor/shutdown-local-worker (mk-capture-shutdown-fn shutdown-captured#)] ~@body {:launched @launch-captured# :shutdown @shutdown-captured#}))) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Shade Objenesis in storm-core
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1274#issuecomment-202865541 Yes, you are right. Objenesis is missing from storm-core. I suggest we should Integrate the Objenesis's code in order to avoid to rely on the Objenesis jar. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1660: remove flux gitignore file and mov...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1267#issuecomment-202233180 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1257#discussion_r57544806 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java --- @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.storm.Config; +import org.apache.storm.scheduler.ISupervisor; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +public class StandaloneSupervisor implements ISupervisor { +private String supervisorId; +private Map conf; + +@Override +public void prepare(Map stormConf, String schedulerLocalDir) { +try { +LocalState localState = new LocalState(schedulerLocalDir); +String supervisorId = localState.getSupervisorId(); +if (supervisorId == null) { +supervisorId = generateSupervisorId(); +localState.setSupervisorId(supervisorId); +} +this.conf = stormConf; +this.supervisorId = supervisorId; +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +@Override +public String getSupervisorId() { +return supervisorId; +} + +@Override +public String getAssignmentId() { --- End diff -- I agree with you --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-201153034 @jerrypeng @revans2 can you look at it again? Thank you very much! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1257#issuecomment-201152514 Sorry I deleted the old branch about "supervisor" without attention, So I reopen the PR about supervisor. The PR is the same as before. If want look at thee old comments , you can link the 1184(https://github.com/apache/storm/pull/1184). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
GitHub user hustfxj opened a pull request: https://github.com/apache/storm/pull/1257 [STORM-1279] port backtype.storm.daemon.supervisor to java 1 port supervisor.clj to java; 2 Update all the callings to supervisor; 3 Supervisor's class hierarchy as follows: SupervisorManger is supervisor' manger which can clean and shutdown supervisor; SyncSupervisorEvent is responsible for downloading/removing assignments and topologys' files; SyncProcessEvent is responsible for starting/killing workers; SupervisorUtils have common methods; 4 create local-supervisor.clj for local mode. 5. fix the fails on supervisor test in Windows You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm supervisor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1257.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1257 commit 08934e29982d3936c9e247a8d7bac563053f869f Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-02-26T04:38:23Z port Supervisor to java commit b281c735f0089d24407af67586a1b41de45ac382 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-02-26T05:15:56Z update supervisor's structure commit 19fcafbd0fe1cbee49e797824c47ba1f6b727270 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-02T01:00:37Z update test codes about supervisor commit b09b4129d845aff6be285ea1748b842499c40e0b Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-04T04:14:41Z Merge branch 'master' into supervisor commit 42bacde20ea86867b874395532aa034cfad4f120 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-06T08:05:14Z Merge branch 'master' into supervisor commit 465a4b89521a4ac15b81969009133bdfa12d0655 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-10T12:12:18Z commit 184dc4a5c3fa8c9662ab224a82f33cc687b95c4b Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-10T14:17:06Z sdf commit 65ce9d2e03be5f5c4defa8342bfbefe9f59adcf9 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-10T14:57:01Z Merge branch 'master' into supervisor commit f78c36d7cc9ca82c6aa4e073f07279650a14fd45 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-10T15:20:33Z remove setLocalizer commit 69c8b3c31d4ee528aea58f716b092c24ba6b0b1a Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-10T15:26:42Z Merge branch 'master' into supervisor commit 95bf67347cad7c11aeaf55b7588e627be298d1c2 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-10T15:49:52Z resolve conflict when merge with master commit cc95d4f708efa123e5fc908bea15545f7139655b Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-11T00:03:00Z sdf commit a1e473526b5d9074ae1f9ff98162ddc78e426a73 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-14T08:54:36Z add the plugin to use for manager worker commit b49c99541ae9c2c3f86d9823c64d30765f7716c6 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-14T10:56:59Z Merge branch 'master' into supervisor commit 42928c2182cf2b755c6f98ad039b2e858787dfe4 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-14T16:16:19Z start worker successfully commit 56f27e5d58d7abd1bdd9aff95dfb862540b166ef Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-16T06:02:10Z Merge branch 'master' of github.com:apache/storm commit d63167cc4a13289ef46b5fa1650621c57b191d3b Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-17T01:29:54Z Merge branch 'master' of github.com:apache/storm commit 2e2ffb29df039e9339e7b2b44352c744efb5caf0 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-18T13:16:44Z Merge branch 'master' of github.com:apache/storm commit 28867372a4fc96d744ccd00a27d9e38dab2bd49e Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-23T03:10:08Z Merge branch 'master' of github.com:apache/storm commit f03b8bec105e88282211bf3e7dd4be4aeed484d8 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-23T05:53:00Z Merge branch 'master' into supervisor and update supervisor based STORM-1631 commit 724f5d2cea8debea8c6fb6a0d42d275880636834 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-23T17:29:20Z update commit 0100898ce9006cedd66c61b082001d1d455e5199 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-25T01:09:53Z Merge branch 'master' of github.com:apache/storm commit 753648927bb2c82443ede9525200bb6197f8d3b6 Author: xiaojian
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj closed the pull request at: https://github.com/apache/storm/pull/1184 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57414646 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,632 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map<String, Map<String, Object>> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map<String, List> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map<String, String> stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map<Integer, LocalAssignment> existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map<Integer, LocalAssignment> allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map<Integer, LocalAssignment> newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry<Integer, LocalAssignment> entry : allAssignment.entrySet()) { +if (supervisorData.getiSupervisor().confirmAssigned(entry.
[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1195#issuecomment-201114123 @redsanket @knusbaum @revans2 can you look it again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1655 (1.x) Flux doesn't set return code ...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1254#issuecomment-201091348 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1650] improve performance by XORShiftRa...
Github user hustfxj closed the pull request at: https://github.com/apache/storm/pull/1250 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1650] improve performance by XORShiftRa...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1250#issuecomment-200444929 @revans2 It is used in a non-thread safe way, especialy spout/bolt thread. So we think it may not make sense. So I will close the PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57193403 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java --- @@ -0,0 +1,632 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.storm.Config; +import org.apache.storm.blobstore.BlobStore; +import org.apache.storm.blobstore.ClientBlobStore; +import org.apache.storm.cluster.IStateStorage; +import org.apache.storm.cluster.IStormClusterState; +import org.apache.storm.event.EventManager; +import org.apache.storm.generated.*; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.LocalizedResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.*; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.JarURLConnection; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class SyncSupervisorEvent implements Runnable { + +private static final Logger LOG = LoggerFactory.getLogger(SyncSupervisorEvent.class); + +private EventManager syncSupEventManager; +private EventManager syncProcessManager; +private IStormClusterState stormClusterState; +private LocalState localState; +private SyncProcessEvent syncProcesses; +private SupervisorData supervisorData; + +public SyncSupervisorEvent(SupervisorData supervisorData, SyncProcessEvent syncProcesses, EventManager syncSupEventManager, +EventManager syncProcessManager) { + +this.syncProcesses = syncProcesses; +this.syncSupEventManager = syncSupEventManager; +this.syncProcessManager = syncProcessManager; +this.stormClusterState = supervisorData.getStormClusterState(); +this.localState = supervisorData.getLocalState(); +this.supervisorData = supervisorData; +} + +@Override +public void run() { +try { +Map conf = supervisorData.getConf(); +Runnable syncCallback = new EventManagerPushCallback(this, syncSupEventManager); +List stormIds = stormClusterState.assignments(syncCallback); +Map<String, Map<String, Object>> assignmentsSnapshot = +getAssignmentsSnapshot(stormClusterState, stormIds, supervisorData.getAssignmentVersions().get(), syncCallback); +Map<String, List> stormIdToProfilerActions = getProfileActions(stormClusterState, stormIds); + +Set allDownloadedTopologyIds = SupervisorUtils.readDownLoadedStormIds(conf); +Map<String, String> stormcodeMap = readStormCodeLocations(assignmentsSnapshot); +Map<Integer, LocalAssignment> existingAssignment = localState.getLocalAssignmentsMap(); +if (existingAssignment == null) { +existingAssignment = new HashMap<>(); +} + +Map<Integer, LocalAssignment> allAssignment = +readAssignments(assignmentsSnapshot, existingAssignment, supervisorData.getAssignmentId(), supervisorData.getSyncRetry()); + +Map<Integer, LocalAssignment> newAssignment = new HashMap<>(); +Set assignedStormIds = new HashSet<>(); + +for (Map.Entry<Integer, LocalAssignment> entry : allAssignment.entrySet()) { +if (supervisorData.getiSupervisor().confirmAssigned(entry.
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r57185550 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,428 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; +private SupervisorData supervisorData; +public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new ExecutorInfo(-1, -1); + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + +@Override +public void run() { +LOG.debug("Syncing processes"); +try { +Map conf = supervisorData.getConf(); +Map<Integer, LocalAssignment> assignedExecutors = localState.getLocalAssignmentsMap(); + +if (assignedExecutors == null) { +assignedExecutors = new HashMap<>(); +} +int now = Time.currentTimeSecs(); + +Map<String, StateHeartbeat> localWorkerStats = getLocalWorkerStats(supervisorData, assignedExecutors, now); + +Set keeperWorkerIds = new HashSet<>(); +
[GitHub] storm pull request: [STORM-1650] improve performance by XORShiftRa...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1250#issuecomment-200402469 @revans2 ThreadLocalRandom is 20% faster than XORShiftRandom. But ThreadLocalRandom is static.Yes, we can't use XORShiftRandom in executor.clj due to thread safety now. But if we assure every spout/bolt thread has itself XORShiftRandom object. Thus we can. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1650] improve performance by XORShiftRa...
GitHub user hustfxj opened a pull request: https://github.com/apache/storm/pull/1250 [STORM-1650] improve performance by XORShiftRandom XORShiftRandom have much better performance than Random, So I use XORShiftRandom replace Random at some places You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm rand Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1250.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1250 commit 56f27e5d58d7abd1bdd9aff95dfb862540b166ef Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-16T06:02:10Z Merge branch 'master' of github.com:apache/storm commit d63167cc4a13289ef46b5fa1650621c57b191d3b Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-17T01:29:54Z Merge branch 'master' of github.com:apache/storm commit 2e2ffb29df039e9339e7b2b44352c744efb5caf0 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-18T13:16:44Z Merge branch 'master' of github.com:apache/storm commit 28867372a4fc96d744ccd00a27d9e38dab2bd49e Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-23T03:10:08Z Merge branch 'master' of github.com:apache/storm commit b3c4d810be30a98b6c874abe535dd82bc2d4e13c Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-23T12:22:34Z improve performance by XORShiftRandom --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-200214770 @jerrypeng I have merge your bug fix.Thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1648: drpc spout reconnect on failure
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1246#issuecomment-200195424 It may not be necessary, but I still +1. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1195#issuecomment-200183338 @knusbaum I have remove the JMX, and use Gauge report the stats. I think Gauge is enough. can you look at it again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Make StormTimer join task thread on close
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1244#issuecomment-200175487 good, +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1195#issuecomment-199555267 @knusbaum I also prefer to go to Coda Hale for the stats before, It means we will drop the JMX Mbean. But I agree with you totaly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1229: port backtype.storm.metric.testing...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1238#issuecomment-199075001 @abhishekagarwal87 thank you. It have better use AtomicReference to replace the Map, then we can avoid to use the "synchronized". And do HashBasedTable.create() generate the struct "HashMap<R, HashMap<C, V>>", but not "Multimap<R, HashMap<C, V>>" ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: hotfix: parent version for pom.xml in storm-mo...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1228#issuecomment-198152909 +1 good --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1636] - Supervisor shutdown with worker...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1226#issuecomment-197706290 nice catch. I have considered this when I port supervisor.clj to java. Than you @jerrypeng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1195#issuecomment-197645359 @redsanket @abhishekagarwal87 I have addressed your comments, thank you. @redsanket I also am curious where is the jmx related code implemented. But I don't found, so I removed pacemaker.register () about jmx. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Fix logging for LoggingMetricsConsumer STORM-5...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1221#issuecomment-197705609 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1195#issuecomment-198351433 @redsanket @knusbaum thank you. I have addressed your comments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-197645649 @revans2 can you look at it again ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1630 Add guide page for Windows users
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1220#issuecomment-197224808 @HeartSaVioR sorry I think we shouldn't add it now. It is superfluous, so that's more better to a official guide from MS, linking that page, as you do. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1631] - Storm CGroup bugs 1) when launc...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1216#issuecomment-197209965 It looks good to me, +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-197155837 @jerrypeng Of course --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1209#issuecomment-197155218 @srdo I mean that we can report errors into Zookeeper whether the option is enabled. And the metrisc is not only record the counter, but also the timeout time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1629 (For 1.x) Files/move doesn't work p...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1215#issuecomment-197152524 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1624] add maven central status in READM...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1212#issuecomment-196584158 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1195#issuecomment-196393789 Every can help me review this PR. In my perspective , I hope we can accelerate the first phrase job. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1209#issuecomment-196220384 Spout itself emits messages by SpoutOutputCollector 's emit(). If lots of messages failed, then acker will trigger SpoutOutputCollector emits those failed messages. It may happen dead lock. Because down bolts may slow to handle messsages and it will block emit(), then spout/acker thread will block. Thus others messages which is send by those can't be handled by acker. So the bolts will block. The scene may be called "loop dead lock". I want say that this PR is sound to this scene. Because It can make us find the dead lock in time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1209#issuecomment-196213692 It looks to good. I also hope we should see this done through both the metrics system and through writing an error into zookeeper that would show up on the UI for the component that is stuck as @revans2 @bastiliu said. Then let users manually see what is happening. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: !!! DO NOT MERGE !!! STORM-1617 preview !!! DO...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1203#issuecomment-195316832 I really support the subversion repo. As far as I am concerned,now the asf-site itself is not convenient for users to browse and learn. And it is lack of detailed documentation . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: Fix incorrect comment in default.yaml
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1201#issuecomment-195164936 +1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55701258 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.utils.PathUtils; +import org.apache.storm.Config; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; + +public static void setInstance(SupervisorUtils u) { +_instance = u; +} + +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process workerLauncher(Map conf, String user, List args, Map<String, String> environment, final String logPreFix, --- End diff -- @revans2 agree. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1610] port pacemaker_state_factory_test...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1192#issuecomment-194905475 @zhuoliu if it's ok, can you merge it ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55694794 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java --- @@ -0,0 +1,669 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.container.cgroup.CgroupManager; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.LocalAssignment; +import org.apache.storm.generated.WorkerResources; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.eclipse.jetty.util.ConcurrentHashSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.*; + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - rmr + * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new worker + * ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for workers + * launch + */ +public class SyncProcessEvent extends ShutdownWork implements Runnable { + +private static Logger LOG = LoggerFactory.getLogger(SyncProcessEvent.class); + +private LocalState localState; + +private SupervisorData supervisorData; + +private class ProcessExitCallback implements Utils.ExitCodeCallable { +private final String logPrefix; +private final String workerId; + +public ProcessExitCallback(String logPrefix, String workerId) { +this.logPrefix = logPrefix; +this.workerId = workerId; +} + +@Override +public Object call() throws Exception { +return null; +} + +@Override +public Object call(int exitCode) { +LOG.info("{} exited with code: {}", logPrefix, exitCode); +supervisorData.getDeadWorkers().add(workerId); +return null; +} +} + +public SyncProcessEvent(){ + +} +public SyncProcessEvent(SupervisorData supervisorData) { +init(supervisorData); +} + +//TODO: initData is intended to local supervisor, so we will remove them after porting worker.clj to java +public void init(SupervisorData supervisorData){ +this.supervisorData = supervisorData; +this.localState = supervisorData.getLocalState(); +} + + +/** + * 1. to kill are those in allocated that are dead or disallowed 2. kill the ones that should be dead - read pids, kill -9 and individually remove file - + * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. of the rest, figure out what assignments aren't yet satisfied 4. generate new + * worker ids, write new "approved workers" to LS 5. create local dir for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait + * for workers launch + */ +@Override +public void run() { +LOG.debug("Syncing processes"); +try { +Map conf = supervisorData.getConf(); +Map<Integer, LocalAssignment> assignedExecutors = localState.getL
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55694293 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java --- @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon.supervisor; + +import org.apache.commons.lang.StringUtils; +import org.apache.curator.utils.PathUtils; +import org.apache.storm.Config; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.localizer.LocalResource; +import org.apache.storm.localizer.Localizer; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.LocalState; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.URLDecoder; +import java.util.*; + +public class SupervisorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(SupervisorUtils.class); + +private static final SupervisorUtils INSTANCE = new SupervisorUtils(); +private static SupervisorUtils _instance = INSTANCE; + +public static void setInstance(SupervisorUtils u) { +_instance = u; +} + +public static void resetInstance() { +_instance = INSTANCE; +} + +public static Process workerLauncher(Map conf, String user, List args, Map<String, String> environment, final String logPreFix, --- End diff -- Yes. I am confused with the name in fact. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1184#discussion_r55689859 --- Diff: storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj --- @@ -0,0 +1,61 @@ +;; Licensed to the Apache Software Foundation (ASF) under one +;; or more contributor license agreements. See the NOTICE file +;; distributed with this work for additional information +;; regarding copyright ownership. The ASF licenses this file +;; to you under the Apache License, Version 2.0 (the +;; "License"); you may not use this file except in compliance +;; with the License. You may obtain a copy of the License at +;; +;; http://www.apache.org/licenses/LICENSE-2.0 +;; +;; Unless required by applicable law or agreed to in writing, software +;; distributed under the License is distributed on an "AS IS" BASIS, +;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +;; See the License for the specific language governing permissions and +;; limitations under the License. +(ns org.apache.storm.daemon.local-supervisor + (:import [org.apache.storm.daemon.supervisor SyncProcessEvent SupervisorData ShutdownWork Supervisor] + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm ProcessSimulator]) + (:use [org.apache.storm.daemon common] +[org.apache.storm log]) + (:require [org.apache.storm.daemon [worker :as worker] ]) + (:require [clojure.string :as str]) + (:gen-class)) --- End diff -- ok! I will follow up the JIRA to move local_supervisor.clj to java after this PR is merged into master. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1609] Netty Client is not best effort d...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1194#issuecomment-194629551 +1. I have a suggestion. we drop messages because Channel is not in good state. I hope we should put the messages into buffer when Channel is not in good state. Of course, we should drop the buffer if the channel is still not in good state. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1605] use '/usr/bin/env python' to chec...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1196#issuecomment-194607393 @unsleepy22 It loos good to me. Many *.py files start with "#!/usr/bin/python" in storm . So do you think we should update them together or not? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1195#issuecomment-194280649 @abhishekagarwal87 Thank you. But I hope only it is the same as before for the details. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1270] [STORM-1274] port drpc ...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1156#discussion_r55503487 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java --- @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon; + +import com.codahale.metrics.Meter; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.generated.*; +import org.apache.storm.logging.ThriftAccessLogger; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.security.auth.*; +import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase; +import org.apache.storm.ui.FilterConfiguration; +import org.apache.storm.ui.IConfigurator; +import org.apache.storm.ui.UIHelpers; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.apache.thrift.TException; +import org.eclipse.jetty.server.Server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.Servlet; +import java.security.Principal; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, AutoCloseable { + +private static final Logger LOG = LoggerFactory.getLogger(DrpcServer.class); +private final Long timeoutCheckSecs = 5L; + +private Map conf; + +private ThriftServer handlerServer; +private ThriftServer invokeServer; +private IHttpCredentialsPlugin httpCredsHandler; + +private Thread clearThread; + +private IAuthorizer authorizer; + +private Servlet httpServlet; + +private AtomicInteger ctr = new AtomicInteger(0); +private ConcurrentHashMap<String, ConcurrentLinkedQueue> requestQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue>(); + +private static class InternalRequest { +public final Semaphore sem; +public final int startTimeSecs; +public final String function; +public final DRPCRequest request; +public volatile Object result; + +public InternalRequest(String function, DRPCRequest request) { +sem = new Semaphore(0); +startTimeSecs = Time.currentTimeSecs(); +this.function = function; +this.request = request; +} +} + +private ConcurrentHashMap<String, InternalRequest> outstandingRequests = new ConcurrentHashMap<>(); + +private final static Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests"); +private final static Meter meterExecuteCalls = StormMetricsRegistry.registerMeter("drpc:num-execute-calls"); +private final static Meter meterResultCalls = StormMetricsRegistry.registerMeter("drpc:num-result-calls"); +private final static Meter meterFailRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls"); +private final static Meter meterFetchRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls"); +private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls"); + +public DrpcServer(Map conf) { +this.conf = conf; +this.authorizer = mkAuthorizationHandler((String) (this.conf.get(Config.DRPC_AUTHORIZER))); +initClearThread(); +} + +public void setHttpServlet(Servlet httpServlet) { +this.httpServlet = httpServlet; +} + +private ThriftServer initHandlerServer(final DrpcServer service)
[GitHub] storm pull request: [STORM-1270] [STORM-1274] port drpc ...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1156#discussion_r55503401 --- Diff: storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java --- @@ -0,0 +1,356 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.daemon; + +import com.codahale.metrics.Meter; +import com.google.common.collect.ImmutableMap; +import org.apache.commons.lang.StringUtils; +import org.apache.storm.Config; +import org.apache.storm.generated.*; +import org.apache.storm.logging.ThriftAccessLogger; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.security.auth.*; +import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase; +import org.apache.storm.ui.FilterConfiguration; +import org.apache.storm.ui.IConfigurator; +import org.apache.storm.ui.UIHelpers; +import org.apache.storm.utils.Time; +import org.apache.storm.utils.Utils; +import org.apache.storm.utils.VersionInfo; +import org.apache.thrift.TException; +import org.eclipse.jetty.server.Server; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.servlet.Servlet; +import java.security.Principal; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class DrpcServer implements DistributedRPC.Iface, DistributedRPCInvocations.Iface, AutoCloseable { + +private static final Logger LOG = LoggerFactory.getLogger(DrpcServer.class); +private final Long timeoutCheckSecs = 5L; + +private Map conf; + +private ThriftServer handlerServer; +private ThriftServer invokeServer; +private IHttpCredentialsPlugin httpCredsHandler; + +private Thread clearThread; + +private IAuthorizer authorizer; + +private Servlet httpServlet; + +private AtomicInteger ctr = new AtomicInteger(0); +private ConcurrentHashMap<String, ConcurrentLinkedQueue> requestQueues = new ConcurrentHashMap<String, ConcurrentLinkedQueue>(); + +private static class InternalRequest { +public final Semaphore sem; +public final int startTimeSecs; +public final String function; +public final DRPCRequest request; +public volatile Object result; + +public InternalRequest(String function, DRPCRequest request) { +sem = new Semaphore(0); +startTimeSecs = Time.currentTimeSecs(); +this.function = function; +this.request = request; +} +} + +private ConcurrentHashMap<String, InternalRequest> outstandingRequests = new ConcurrentHashMap<>(); + +private final static Meter meterHttpRequests = StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests"); +private final static Meter meterExecuteCalls = StormMetricsRegistry.registerMeter("drpc:num-execute-calls"); +private final static Meter meterResultCalls = StormMetricsRegistry.registerMeter("drpc:num-result-calls"); +private final static Meter meterFailRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls"); +private final static Meter meterFetchRequestCalls = StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls"); +private final static Meter meterShutdownCalls = StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls"); + +public DrpcServer(Map conf) { +this.conf = conf; +this.authorizer = mkAuthorizationHandler((String) (this.conf.get(Config.DRPC_AUTHORIZER))); +initClearThread(); +} + +public void setHttpServlet(Servlet httpServlet) { +this.httpServlet = httpServlet; +} + +private ThriftServer initHandlerServer(final DrpcServer service)
[GitHub] storm pull request: [STORM-1610] port pacemaker_state_factory_test...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1192#issuecomment-194179024 @zhuoliu @satishd @knusbaum @abhishekagarwal87 Thank you. I have addressed your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1195#issuecomment-194168476 when we start up nimbus before starting up pacemaker, the nimbus will die. Because nimbus can't read the workers' heartbeats by "heartbeat-storms". In my opinion, it shouldn't be like this. @revans2 Do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...
GitHub user hustfxj opened a pull request: https://github.com/apache/storm/pull/1195 [STORM-1611] port org.apache.storm.pacemaker.pacemaker to java 1 port pacemaker_test to java; 2 Update all the callings to cluster; 3 remove pacemaker.register about jmx; 4 fix the bug about #1071 , this is my fault on this worker. You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm pacemaker Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1195.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1195 commit 83c72d5d3f80797be473368c60e2f3deb7b49e90 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-08T12:58:36Z port pacemaker.clj_test.clj to java commit f22673af95de6386528b829b09af5ad500d6ac0d Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-09T07:41:37Z fix bug @STORM-1273 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1270] [STORM-1274] port drpc ...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1156#issuecomment-193758244 @revans2 @abhishekagarwal87 could you take a look? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-193738634 @zhuoliu @abhishekagarwal87 Thank you. I will consider your suggestions, and adjust class hierarchy. Of course, I hope others can help review the supervisor and give me some comments. Then I will revise the codes together in order to avoid to different suggestions and repeat modifications. Thank you again. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1610] port pacemaker_state_factory_test...
GitHub user hustfxj opened a pull request: https://github.com/apache/storm/pull/1192 [STORM-1610] port pacemaker_state_factory_test.clj to java You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm pacemaker-state-factory-test Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1192.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1192 commit c1a240cd6f76fa4ac4db2c26c28b4dd8fd1c3d24 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-08T10:52:47Z port pacemaker_state_factory_test.clj to java --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1233: Port AuthUtilsTest to java
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1191#discussion_r55305746 --- Diff: storm-core/test/jvm/org/apache/storm/security/auth/AuthUtilsTest.java --- @@ -0,0 +1,227 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.security.auth; + +import java.io.IOException; +import java.io.File; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.Subject; + +import org.apache.commons.codec.binary.Hex; +import org.apache.storm.Config; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.junit.Test; +import org.mockito.Mockito; + +public class AuthUtilsTest { + +@Test(expected=IOException.class) +public void getOptionsThrowsOnMissingSectionTest() throws IOException { +Configuration mockConfig = Mockito.mock(Configuration.class); +AuthUtils.get(mockConfig, "bogus-section", ""); +} + +@Test +public void getNonExistentSectionTest() throws IOException { +Map<String, String> optionMap = new HashMap<String, String>(); +AppConfigurationEntry entry = Mockito.mock(AppConfigurationEntry.class); + +Mockito.<Map<String, ?>>when(entry.getOptions()).thenReturn(optionMap); +String section = "bogus-section"; +Configuration mockConfig = Mockito.mock(Configuration.class); +Mockito.when(mockConfig.getAppConfigurationEntry(section)) + .thenReturn(new AppConfigurationEntry[] {entry}); +Assert.assertNull( +AuthUtils.get(mockConfig, section, "nonexistent-key")); +} + +@Test +public void getFirstValueForValidKeyTest() throws IOException { +Map<String, String> optionMap = new HashMap<String, String>(); +optionMap.put("existent-key", "foo"); +AppConfigurationEntry entry = Mockito.mock(AppConfigurationEntry.class); + +Mockito.<Map<String, ?>>when(entry.getOptions()).thenReturn(optionMap); +String section = "bogus-section"; +Configuration mockConfig = Mockito.mock(Configuration.class); +Mockito.when(mockConfig.getAppConfigurationEntry(section)) + .thenReturn(new AppConfigurationEntry[] {entry}); +Assert.assertEquals( +AuthUtils.get(mockConfig, section, "existent-key"), "foo"); +} --- End diff -- The test logic is different with clojure's test-returns-first-value-for-valid-key --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1233: Port AuthUtilsTest to java
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1191#discussion_r55303331 --- Diff: storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java --- @@ -81,35 +80,88 @@ public static Configuration GetConfiguration(Map storm_conf) { } /** - * Pull a set of keys out of a Configuration. - * @param conf The config to pull the key/value pairs out of. - * @param conf_entry The app configuration entry name to get stuff from. - * @return Return a map of the configs in conf. + * Get configurations for a section + * @param configuration The config to pull the key/value pairs out of. + * @param section The app configuration entry name to get stuff from. + * @return Return array of config entries or null if configuration is null */ -public static SortedMap<String, ?> PullConfig(Configuration conf, -String conf_entry) throws IOException { -if(conf == null) { +public static AppConfigurationEntry[] getEntries(Configuration configuration, +String section) throws IOException { +if (configuration == null) { return null; } -AppConfigurationEntry configurationEntries[] = conf.getAppConfigurationEntry(conf_entry); -if(configurationEntries == null) { -String errorMessage = "Could not find a '" + conf_entry -+ "' entry in this configuration: Client cannot start."; + +AppConfigurationEntry configurationEntries[] = configuration.getAppConfigurationEntry(section); +if (configurationEntries == null) { +String errorMessage = "Could not find a '"+ section + "' entry in this configuration."; throw new IOException(errorMessage); } +return configurationEntries; +} +/** + * Pull a set of keys out of a Configuration. + * @param configuration The config to pull the key/value pairs out of. + * @param section The app configuration entry name to get stuff from. + * @return Return a map of the configs in conf. + */ +public static SortedMap<String, ?> pullConfig(Configuration configuration, +String section) throws IOException { +AppConfigurationEntry[] configurationEntries = AuthUtils.getEntries(configuration, section); + +if (configurationEntries == null) { +return null; +} + TreeMap<String, Object> results = new TreeMap<>(); -for(AppConfigurationEntry entry: configurationEntries) { +for (AppConfigurationEntry entry: configurationEntries) { Map<String, ?> options = entry.getOptions(); -for(String key : options.keySet()) { +for (String key : options.keySet()) { results.put(key, options.get(key)); } } + return results; } /** + * Pull a the value given section and key from Configuration + * @param configuration The config to pull the key/value pairs out of. + * @param section The app configuration entry name to get stuff from. + * @param key The key to look up inside of the section + * @return Return a the String value of the configuration value + */ +public static String get(Configuration configuration, String section, String key) throws IOException { +AppConfigurationEntry[] configurationEntries = AuthUtils.getEntries(configuration, section); + +if (configurationEntries == null){ +return null; +} + +for (AppConfigurationEntry entry: configurationEntries) { +Object val = entry.getOptions().get(key); +if (val != null) +return (String)val; +} +return null; +} + +/** + * Instantiate class with klassName and return instance + * @param klassName The class name + * @return Instance of type T + */ +private static T makeClass(String klassName) { +try { +Class klass = Class.forName(klassName); +return (T)klass.newInstance(); +} catch (Exception e) { +throw new RuntimeException(e); +} +} --- End diff -- I think it's better to replace this method by Utils.newInstance --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub
[GitHub] storm pull request: [STORM-1606] print the information of testcase...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1189#discussion_r55138526 --- Diff: dev-tools/travis/print-errors-from-test-reports.py --- @@ -55,6 +55,10 @@ def print_error_reports_from_report_file(file_path): if fail is not None: print_detail_information(testcase, fail) +failure = testcase.find("failure") --- End diff -- @unsleepy22 the api "find(match)" of Python ElementTree finds the first sub element matching match. So the match is sub element, but not sub string --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
GitHub user hustfxj reopened a pull request: https://github.com/apache/storm/pull/1184 [STORM-1279] port backtype.storm.daemon.supervisor to java 1 port supervisor.clj to java; 2 Update all the callings to supervisor; 3 Supervisor's class hierarchy as follows: ``SupervisorManger is supervisor' manger which can clean and shutdown supervisor;`` ``SyncSupervisorEvent is responsible for downloading/removing assignments and topologys' files;`` ``SyncProcessEvent is responsible for starting/killing workers;`` ``SupervisorUtils have common methods;`` 4 create local-supervisor.clj for local mode. 5. fix the fails on supervisor test in Windows You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm supervisor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1184 commit 08934e29982d3936c9e247a8d7bac563053f869f Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-02-26T04:38:23Z port Supervisor to java commit b281c735f0089d24407af67586a1b41de45ac382 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-02-26T05:15:56Z update supervisor's structure commit 19fcafbd0fe1cbee49e797824c47ba1f6b727270 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-02T01:00:37Z update test codes about supervisor commit b09b4129d845aff6be285ea1748b842499c40e0b Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-04T04:14:41Z Merge branch 'master' into supervisor commit 42bacde20ea86867b874395532aa034cfad4f120 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-06T08:05:14Z Merge branch 'master' into supervisor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj closed the pull request at: https://github.com/apache/storm/pull/1184 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1606] print the information of testcase...
GitHub user hustfxj opened a pull request: https://github.com/apache/storm/pull/1189 [STORM-1606] print the information of testcase which is on failure You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm travis Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1189.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1189 commit 812031ff7e3017dfcbff4c3434fbd3c2437dcb33 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-06T08:24:22Z print the information of testcase which is on failure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1269] port backtype.storm.daemon.common...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1185#discussion_r55002193 --- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java --- @@ -2302,4 +2302,54 @@ public Object call() { public static long bitXor(Long a, Long b) { return a ^ b; } + +public static Integer parseInt(Object o) { +if (o == null) { +return null; +} + +if (o instanceof String) { +return Integer.parseInt(String.valueOf(o)); +} else if (o instanceof Long) { +long value = (Long) o; +return (int) value; +} else if (o instanceof Integer) { +return (Integer) o; +} else { +throw new RuntimeException("Invalid value " + o.getClass().getName() + " " + o); +} +} + +public static Integer parseInt(Object o, int defaultValue) { --- End diff -- Utils has the methods . The name is getInt() --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: STORM-1603 Storm UT fails on supervisor test i...
Github user hustfxj commented on a diff in the pull request: https://github.com/apache/storm/pull/1183#discussion_r55001806 --- Diff: storm-core/test/clj/backtype/storm/supervisor_test.clj --- @@ -261,8 +260,8 @@ (str "-Dstorm.id=" mock-storm-id) (str "-Dworker.id=" mock-worker-id) (str "-Dworker.port=" mock-port) - "-Dstorm.log.dir=/logs" - "-Dlog4j.configurationFile=/log4j2/worker.xml" +(str "-Dstorm.log.dir=" file-path-separator "logs") +(str "-Dlog4j.configurationFile=" file-path-separator "log4j2" file-path-separator "worker.xml") --- End diff -- @HeartSaVioR sorry, I didn't see STORM-1463 before. I think we should find whole of things and fix it. Thank you @HeartSaVioR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
Github user hustfxj commented on the pull request: https://github.com/apache/storm/pull/1184#issuecomment-192118347 success on supervisor test in my computer of windows, but CI have 2 failures about supervisor-test.clj. I can't see the supervisor-test.xml. So I don't kown where failures come ? I didi some manual testing and things look good. I hope everyone can review it and give me some suggests. With my pressure! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...
GitHub user hustfxj opened a pull request: https://github.com/apache/storm/pull/1184 [STORM-1279] port backtype.storm.daemon.supervisor to java 1 port supervisor.clj to java; 2 Update all the callings to supervisor; 3 Supervisor's class hierarchy as follows: SupervisorManger is supervisor' manger which can clean and shutdown supervisor; SyncSupervisorEvent is responsible for downloading/removing assignments and topologys' files; SyncProcessEvent is responsible for starting/killing workers; SupervisorUtils have common methods; 4. fix the fails on supervisor test in Windows You can merge this pull request into a Git repository by running: $ git pull https://github.com/hustfxj/storm supervisor Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1184.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1184 commit 08934e29982d3936c9e247a8d7bac563053f869f Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-02-26T04:38:23Z port Supervisor to java commit b281c735f0089d24407af67586a1b41de45ac382 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-02-26T05:15:56Z update supervisor's structure commit 19fcafbd0fe1cbee49e797824c47ba1f6b727270 Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-02T01:00:37Z update test codes about supervisor commit b09b4129d845aff6be285ea1748b842499c40e0b Author: xiaojian.fxj <xiaojian@alibaba-inc.com> Date: 2016-03-04T04:14:41Z Merge branch 'master' into supervisor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---