[GitHub] storm issue #2518: STORM-2902: Some improvements for storm-rocketmq module

2018-04-19 Thread hustfxj
Github user hustfxj commented on the issue:

https://github.com/apache/storm/pull/2518
  
+1 Thank you for @vesense 


---


[GitHub] storm pull request #2518: STORM-2902: Some improvements for storm-rocketmq m...

2018-04-08 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/2518#discussion_r179943857
  
--- Diff: 
external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMqSpout.java
 ---
@@ -60,14 +58,14 @@
 public class RocketMqSpout implements IRichSpout {
 // TODO add metrics
--- End diff --

not only add metrcis, but also print log for MqSpout and MqBolt


---


[GitHub] storm issue #2024: STORM-2349: Add one RocketMQ plugin for the Apache Storm

2017-04-20 Thread hustfxj
Github user hustfxj commented on the issue:

https://github.com/apache/storm/pull/2024
  
@vesense thank you. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request #2024: STORM-2349: Add one RocketMQ plugin for the Apache...

2017-04-16 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/2024#discussion_r111697081
  
--- Diff: 
external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
 ---
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import 
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import 
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.Config;
+import org.apache.storm.rocketmq.DefaultMessageRetryManager;
+import org.apache.storm.rocketmq.MessageRetryManager;
+import org.apache.storm.rocketmq.MessageSet;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean;
+import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
+
+/**
+ * RocketMQSpout uses MQPushConsumer as the default implementation.
+ * PushConsumer is a high level consumer API, wrapping the pulling details
+ * Looks like broker push messages to consumer
+ */
+public class RocketMQSpout implements IRichSpout {
+// TODO add metrics
+
+private MQPushConsumer consumer;
+private SpoutOutputCollector collector;
+private BlockingQueue queue;
+
+private Properties properties;
+private MessageRetryManager messageRetryManager;
+
+public RocketMQSpout(Properties properties) {
+this.properties = properties;
+}
+
+@Override
+public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+Validate.notEmpty(properties, "Consumer properties can not be 
empty");
+boolean ordered = getBoolean(properties, 
RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
+
+int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, 
Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString()));
+queue = new LinkedBlockingQueue<>(queueSize);
+
+consumer = new DefaultMQPushConsumer();
+RocketMQConfig.buildConsumerConfigs(properties, 
(DefaultMQPushConsumer)consumer, context);
+
+if (ordered) {
+consumer.registerMessageListener(new MessageListenerOrderly() {
+@Override
+public ConsumeOrderlyStatus 
consumeMessage(List msgs,
+   
ConsumeOrderlyContext context) {
+if (process(msgs)) {
+return ConsumeOrderlyStatus.SUCCESS;
+} else {
+return 
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+}
+}

[GitHub] storm pull request #2024: STORM-2349: Add one RocketMQ plugin for the Apache...

2017-04-12 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/2024#discussion_r111294519
  
--- Diff: 
external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
 ---
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import 
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import 
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.Config;
+import org.apache.storm.rocketmq.DefaultMessageRetryManager;
+import org.apache.storm.rocketmq.MessageRetryManager;
+import org.apache.storm.rocketmq.MessageSet;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean;
+import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
+
+/**
+ * RocketMQSpout uses MQPushConsumer as the default implementation.
+ * PushConsumer is a high level consumer API, wrapping the pulling details
+ * Looks like broker push messages to consumer
+ */
+public class RocketMQSpout implements IRichSpout {
+// TODO add metrics
+
+private MQPushConsumer consumer;
+private SpoutOutputCollector collector;
+private BlockingQueue queue;
+
+private Properties properties;
+private MessageRetryManager messageRetryManager;
+
+public RocketMQSpout(Properties properties) {
+this.properties = properties;
+}
+
+@Override
+public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+Validate.notEmpty(properties, "Consumer properties can not be 
empty");
+boolean ordered = getBoolean(properties, 
RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
+
+int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, 
Integer.parseInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING).toString()));
+queue = new LinkedBlockingQueue<>(queueSize);
+
+consumer = new DefaultMQPushConsumer();
+RocketMQConfig.buildConsumerConfigs(properties, 
(DefaultMQPushConsumer)consumer, context);
+
+if (ordered) {
+consumer.registerMessageListener(new MessageListenerOrderly() {
+@Override
+public ConsumeOrderlyStatus 
consumeMessage(List msgs,
+   
ConsumeOrderlyContext context) {
+if (process(msgs)) {
+return ConsumeOrderlyStatus.SUCCESS;
+} else {
+return 
ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+}
+}

[GitHub] storm pull request #2024: STORM-2349: Add one RocketMQ plugin for the Apache...

2017-04-12 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/2024#discussion_r111292663
  
--- Diff: 
external/storm-rocketmq/src/main/java/org/apache/storm/rocketmq/spout/RocketMQSpout.java
 ---
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.rocketmq.spout;
+
+import org.apache.commons.lang.Validate;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import 
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import 
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import 
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.storm.Config;
+import org.apache.storm.rocketmq.DefaultMessageRetryManager;
+import org.apache.storm.rocketmq.MessageRetryManager;
+import org.apache.storm.rocketmq.MessageSet;
+import org.apache.storm.rocketmq.RocketMQConfig;
+import org.apache.storm.rocketmq.SpoutConfig;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.apache.storm.rocketmq.RocketMQUtils.getBoolean;
+import static org.apache.storm.rocketmq.RocketMQUtils.getInteger;
+
+/**
+ * RocketMQSpout uses MQPushConsumer as the default implementation.
+ * PushConsumer is a high level consumer API, wrapping the pulling details
+ * Looks like broker push messages to consumer
+ */
+public class RocketMQSpout implements IRichSpout {
+// TODO add metrics
+
+private MQPushConsumer consumer;
+private SpoutOutputCollector collector;
+private boolean ordered;
+private BlockingQueue queue;
+
+private Properties properties;
+private MessageRetryManager messageRetryManager;
+
+public RocketMQSpout(Properties properties) {
+this.properties = properties;
+}
+
+@Override
+public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collector) {
+Validate.notEmpty(properties, "Consumer properties can not be 
empty");
+ordered = 
getBoolean(properties,RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
+
+int queueSize = getInteger(properties, SpoutConfig.QUEUE_SIZE, 
Utils.getInt(conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)));
+queue = new LinkedBlockingQueue<>(queueSize);
+
+consumer = new DefaultMQPushConsumer();
+RocketMQConfig.buildConsumerConfigs(properties, 
(DefaultMQPushConsumer)consumer, context);
+
+if (ordered) {
+consumer.registerMessageListener(new MessageListenerOrderly() {
+@Override
+public ConsumeOrderlyStatus 
consumeMessage(List msgs,
+   
ConsumeOrderlyContext context) {
+if (process(msgs)) {
+return ConsumeOrderlyStatus.SUCCESS;
+} else {
+return 
ConsumeOrderlyS

[GitHub] storm issue #1200: Tests - STORM-1235, STORM-1236, STORM-1237, STORM-1238, S...

2016-07-07 Thread hustfxj
Github user hustfxj commented on the issue:

https://github.com/apache/storm/pull/1200
  
Thank you @abhishekagarwal87 +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: remove unnecessary registerSerialization about...

2016-05-17 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1278#issuecomment-219647600
  
we register the class "org.apache.storm.transactional.TransactionAttempt" 
in SerializationFactory , But we register the class 
"org.apache.storm.trident.topology.TransactionAttempt" in 
MasterBatchCoordinator. So it will occur serialization errors when we run 
trident topology. Do you Think ? @revans2 @satishd  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1769 Added a test to check local nimbus ...

2016-05-06 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1403#issuecomment-217434349
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1764: Pacemaker is throwing some stack t...

2016-05-04 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1397#issuecomment-217050548
  
good, +1. Thank you @knusbaum 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1761: Storm-Solr Example Throws ArrayInd...

2016-05-03 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1394#issuecomment-216716374
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1707] Remove two minute timeout after w...

2016-05-03 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1370#issuecomment-216446889
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1754: Correct java version in 0.10.x sto...

2016-05-02 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1385#issuecomment-216408694
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1731 (1.x) Avoid looking up debug / back...

2016-04-26 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1362#issuecomment-214968261
  
it's amazing, +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: minor: fix `storm.py` broken link

2016-04-22 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1349#issuecomment-213344710
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1712] make storage plugin for transacti...

2016-04-15 Thread hustfxj
GitHub user hustfxj opened a pull request:

https://github.com/apache/storm/pull/1342

[STORM-1712] make storage plugin for transactional state [not to be merged 
yet]

This is not to be merged yet. Now we support storage transactional state to 
hbase, and so on. I want to hear your opinion. If Ok, I am pleasure to create 
the PR and merge the code.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm transactional

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1342.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1342


commit e30a4e685a0446a8a5ac092d48db4581968565a4
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-04-15T16:42:02Z

make storage plugin for transactional state




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1713: Replace NotImplementedException wi...

2016-04-15 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1341#issuecomment-210549291
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1693: Move stats cleanup to executor shu...

2016-04-12 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1333#issuecomment-209193820
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1286] port kill_workers to java.

2016-04-04 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1310#issuecomment-205645664
  
Thank you, +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1670 LocalState#get(String) can throw Fi...

2016-04-01 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1297#issuecomment-204610269
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1668: Fix silent failing of flux for set...

2016-03-31 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1281#issuecomment-204199412
  
great +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-31 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1257#issuecomment-204198660
  
@revans2  Thank you again. In fact I can't do some testing with profiling 
options due to my jdk.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1667] Log the IO exception when deletin...

2016-03-30 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1280#issuecomment-203765818
  
Thank you. I will merge it [1257] 
(https://github.com/apache/storm/pull/1257)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-30 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1257#issuecomment-203761199
  
@revans2 @longdafeng  Thank you very much. I have addressed your comments. 
And we will follow the  JIRAs about some other things. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-30 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1257#discussion_r58003203
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/workermanager/DefaultWorkerManager.java
 ---
@@ -0,0 +1,408 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.workermanager;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class DefaultWorkerManager implements IWorkerManager {
+
+private static Logger LOG = 
LoggerFactory.getLogger(DefaultWorkerManager.class);
+
+private Map conf;
+private CgroupManager resourceIsolationManager;
+private boolean runWorkerAsUser;
+
+@Override
+public void prepareWorker(Map conf, Localizer localizer) {
+this.conf = conf;
+if 
(Utils.getBoolean(conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN_ENABLE), 
false)) {
+try {
+this.resourceIsolationManager = Utils.newInstance((String) 
conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN));
+this.resourceIsolationManager.prepare(conf);
+LOG.info("Using resource isolation plugin {} {}", 
conf.get(Config.STORM_RESOURCE_ISOLATION_PLUGIN), resourceIsolationManager);
+} catch (IOException e) {
+throw Utils.wrapInRuntime(e);
+}
+} else {
+this.resourceIsolationManager = null;
+}
+this.runWorkerAsUser = 
Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false);
+}
+
+@Override
+public IWorkerResult launchWorker(String supervisorId, String 
assignmentId, String stormId, Long port, String workerId, WorkerResources 
resources,
+Utils.ExitCodeCallable workerExitCallback) {
+try {
+
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String stormOptions = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.options"));
+String stormConfFile = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.conf.file"));
+String workerTmpDir = ConfigUtils.workerTmpRoot(conf, 
workerId);
+
+String stormLogDir = ConfigUtils.getLogDir();
+String stormLogConfDir = (String) 
(conf.get(Config.STORM_LOG4J2_CONF_DIR));
+
+String stormLog4j2ConfDir;
+if (StringUtils.isNotBlank(stormLogConfDir)) {
+if (Utils.isAbsolutePath(stormLogConfDir)) {
+stormLog4j2ConfDir = stormLogConfDir;
+} else {
+stormLog4j2ConfDir = stormHome + 
Utils.FILE_PATH_SEPARATOR + stormLogConfDir;
+}
+} else {
+stormLog4j2ConfDir = stormHome + Utils.FILE_PATH_SEPARATOR 
+ "log4j2";
+}
+
+String stormRoot = ConfigUtils.supervisorStormDistRoot(conf, 
stormId);
+
+String jlp = jlp(stormRoot, conf);
+
+String stormJar = 
ConfigUtils.supervisorStormJarPath(stormRoot);
+
+Map stormConf = ConfigUtils.readSupervisorStormConf(conf, 
stormId);
+
+String workerClassPath = getWorkerClassPath(stormJar, 
stormConf);
+
+Objec

[GitHub] storm pull request: STORM-1669 (1.x): Fix SolrUpdateBolt flush bug

2016-03-30 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1284#issuecomment-203759675
  
nice +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-30 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1257#discussion_r57997792
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/RunProfilerActions.java
 ---
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.generated.ProfileAction;
+import org.apache.storm.generated.ProfileRequest;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.*;
+
+public class RunProfilerActions implements Runnable {
+private static Logger LOG = 
LoggerFactory.getLogger(RunProfilerActions.class);
+
+private Map conf;
+private IStormClusterState stormClusterState;
+private String hostName;
+
+private String profileCmd;
+
+private SupervisorData supervisorData;
+
+private class ActionExitCallback implements Utils.ExitCodeCallable {
+private String stormId;
+private ProfileRequest profileRequest;
+private String logPrefix;
+
+public ActionExitCallback(String stormId, ProfileRequest 
profileRequest, String logPrefix) {
+this.stormId = stormId;
+this.profileRequest = profileRequest;
+this.logPrefix = logPrefix;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} profile-action exited for {}", logPrefix, 
exitCode);
+try {
+stormClusterState.deleteTopologyProfileRequests(stormId, 
profileRequest);
--- End diff --

@revans2 I mean it before. Now I find I  did the wrong thing. We should add 
it back in. Thus we can guarantee  operation jprofileStop  even if errors 
happened when operating jprofileStart.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-30 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1257#discussion_r57994377
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java 
---
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.*;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.*;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SyncSupervisorEvent implements Runnable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SyncSupervisorEvent.class);
+
+private EventManager syncSupEventManager;
+private EventManager syncProcessManager;
+private IStormClusterState stormClusterState;
+private LocalState localState;
+private SyncProcessEvent syncProcesses;
+private SupervisorData supervisorData;
+
+public SyncSupervisorEvent(SupervisorData supervisorData, 
SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
+EventManager syncProcessManager) {
+
+this.syncProcesses = syncProcesses;
+this.syncSupEventManager = syncSupEventManager;
+this.syncProcessManager = syncProcessManager;
+this.stormClusterState = supervisorData.getStormClusterState();
+this.localState = supervisorData.getLocalState();
+this.supervisorData = supervisorData;
+}
+
+@Override
+public void run() {
+try {
+Map conf = supervisorData.getConf();
+Runnable syncCallback = new EventManagerPushCallback(this, 
syncSupEventManager);
+List stormIds = 
stormClusterState.assignments(syncCallback);
+Map<String, Map<String, Object>> assignmentsSnapshot =
+getAssignmentsSnapshot(stormClusterState, stormIds, 
supervisorData.getAssignmentVersions().get(), syncCallback);
+Map<String, List> stormIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
--- End diff --

Of course, I will follow up a JIR for this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-30 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1257#discussion_r57991093
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java
 ---
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor.timer;
+
+import org.apache.storm.Config;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.daemon.supervisor.SupervisorData;
+import org.apache.storm.generated.SupervisorInfo;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SupervisorHeartbeat implements Runnable {
+
+ private final IStormClusterState stormClusterState;
+ private final String supervisorId;
+ private final Map conf;
+ private final SupervisorData supervisorData;
+
+public SupervisorHeartbeat(Map conf, SupervisorData supervisorData) {
+this.stormClusterState = supervisorData.getStormClusterState();
+this.supervisorId = supervisorData.getSupervisorId();
+this.supervisorData = supervisorData;
+this.conf = conf;
+}
+
+private SupervisorInfo buildSupervisorInfo(Map conf, SupervisorData 
supervisorData) {
+SupervisorInfo supervisorInfo = new SupervisorInfo();
+supervisorInfo.set_time_secs(Time.currentTimeSecs());
+supervisorInfo.set_hostname(supervisorData.getHostName());
+supervisorInfo.set_assignment_id(supervisorData.getAssignmentId());
+
+List usedPorts = new ArrayList<>();
+
usedPorts.addAll(supervisorData.getCurrAssignment().get().keySet());
+supervisorInfo.set_used_ports(usedPorts);
+List metaDatas = 
(List)supervisorData.getiSupervisor().getMetadata();
+List portList = new ArrayList<>();
+if (metaDatas != null){
+for (Object data : metaDatas){
+Integer port = Utils.getInt(data);
+if (port != null)
+portList.add(port.longValue());
+}
+}
+
+supervisorInfo.set_meta(portList);
+supervisorInfo.set_scheduler_meta((Map<String, String>) 
conf.get(Config.SUPERVISOR_SCHEDULER_META));
+
supervisorInfo.set_uptime_secs(supervisorData.getUpTime().upTime());
+supervisorInfo.set_version(supervisorData.getStormVersion());
+supervisorInfo.set_resources_map(mkSupervisorCapacities(conf));
+return supervisorInfo;
+}
+
+private Map<String, Double> mkSupervisorCapacities(Map conf) {
+Map<String, Double> ret = new HashMap<String, Double>();
+Double mem = (double) 
(conf.get(Config.SUPERVISOR_MEMORY_CAPACITY_MB));
+ret.put(Config.SUPERVISOR_MEMORY_CAPACITY_MB, mem);
+Double cpu = (double) (conf.get(Config.SUPERVISOR_CPU_CAPACITY));
+ret.put(Config.SUPERVISOR_CPU_CAPACITY, cpu);
+return ret;
+}
--- End diff --

Of course, I'll be only too pleased to follow on the JIRA later


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-30 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1257#discussion_r57990981
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.ProcessSimulator;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process processLauncher(Map conf, String user, 
List commandPrefix, List args, Map<String, String> environment, 
final String logPreFix,
+  final Utils.ExitCodeCallable 
exitCodeCallback, File dir) throws IOException {
+if (StringUtils.isBlank(user)) {
+throw new IllegalArgumentException("User cannot be blank when 
calling processLauncher.");
+}
+String wlinitial = (String) 
(conf.get(Config.SUPERVISOR_WORKER_LAUNCHER));
+String stormHome = 
ConfigUtils.concatIfNotNull(System.getProperty("storm.home"));
+String wl;
+if (StringUtils.isNotBlank(wlinitial)) {
+wl = wlinitial;
+} else {
+wl = stormHome + "/bin/worker-launcher";
+}
+List commands = new ArrayList<>();
+if (commandPrefix != null){
+commands.addAll(commandPrefix);
+}
+commands.add(wl);
+commands.add(user);
+commands.addAll(args);
+LOG.info("Running as user: {} command: {}", user, commands);
+return Utils.launchProcess(commands, environment, logPreFix, 
exitCodeCallback, dir);
+}
+
+public static int processLauncherAndWait(Map conf, String user, 
List args, final Map<String, String> environment, final String 
logPreFix)
+throws IOException {
+int ret = 0;
+Process process = processLauncher(conf, user, null, args, 
environment, logPreFix, null, null);
+if (StringUtils.isNotBlank(logPreFix))
+Utils.readAndLogStream(logPreFix, process.getInputStream());
+try {
+process.waitFor();
+} catch (InterruptedException e) {
+LOG.info("{} interrupted.", logPreFix);
+}
+ret = process.exitValue();
+return ret;
+}
+
+public static void setupStormCodeDir(Map conf, Map stormConf, String 
dir) throws IOException {
+if 
(Utils.getBoolean(conf.get(Config.SUPERVISOR_RUN_WORKER_AS_USER), false)) {
+String logPrefix = "setup conf for " + dir;
+List commands = new ArrayList<>();
+commands.add("code-dir");
+commands.add(dir);
+processLauncherAndWait(conf, (String) 
(stormConf

[GitHub] storm pull request: STORM-1271: Port backtype.storm.daemon.task to...

2016-03-30 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1249#discussion_r57912312
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.Config;
+import org.apache.storm.Thrift;
+import org.apache.storm.daemon.metrics.BuiltinMetrics;
+import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
+import org.apache.storm.generated.Bolt;
+import org.apache.storm.generated.ComponentObject;
+import org.apache.storm.generated.JavaObject;
+import org.apache.storm.generated.ShellComponent;
+import org.apache.storm.generated.SpoutSpec;
+import org.apache.storm.generated.StateSpoutSpec;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.grouping.LoadMapping;
+import org.apache.storm.hooks.ITaskHook;
+import org.apache.storm.hooks.info.EmitInfo;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.spout.ShellSpout;
+import org.apache.storm.stats.CommonStats;
+import org.apache.storm.task.ShellBolt;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.TupleImpl;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+public class Task {
+
+private static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+private Map executorData;
+private Map workerData;
+private TopologyContext systemTopologyContext;
+private TopologyContext userTopologyContext;
+private WorkerTopologyContext workerTopologyContext;
+private LoadMapping loadMapping;
+private Integer taskId;
+private String componentId;
+private Object taskObject;
--- End diff --

componentObject  is ok for me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: remove unnecessary registerSerialization about...

2016-03-30 Thread hustfxj
GitHub user hustfxj opened a pull request:

https://github.com/apache/storm/pull/1278

remove unnecessary registerSerialization about TransactionAttempt because 
we will register the class in the SerializationFactory

very minor. 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm minor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1278.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1278


commit a48e9535fd8bcd6de195a70af6102535b60d56d6
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-28T05:37:42Z

Merge branch 'master' of github.com:apache/storm

commit 3812b2fa04cc7eb86035d402f3c407af3b4daffe
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-30T08:56:12Z

Merge branch 'master' of github.com:apache/storm

commit 94eb38356366115df1995b6950539a2c14a7ac99
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-30T15:38:44Z

remove unnecessary registerSerialization about TransactionAttempt because 
we will register the class in the SerializationFactory




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-30 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1257#issuecomment-203337583
  
@revans2  ok, I will file the JIRAs about the local_supervisor.clj and 
"remove localSyncProcess" after porting worker.clj.  I am not sure whether 
someone is using the getAssignmentId(), although I agree with @zhuoliu .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Fix minor bug in RAS Tests

2016-03-29 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1207#issuecomment-203193862
  
nice. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-29 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1257#discussion_r57828359
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj 
---
@@ -0,0 +1,64 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.local-supervisor
+  (:import [org.apache.storm.daemon.supervisor SyncProcessEvent 
SupervisorData Supervisor SupervisorUtils]
+   [org.apache.storm.utils Utils ConfigUtils]
+   [org.apache.storm ProcessSimulator])
+  (:use [org.apache.storm.daemon common]
+[org.apache.storm log])
+  (:require [org.apache.storm.daemon [worker :as worker] ])
+  (:require [clojure.string :as str])
+  (:gen-class))
+
+(defn launch-local-worker [supervisorData stormId port workerId resources]
+  (let [conf (.getConf supervisorData)
+ pid (Utils/uuid)
+worker (worker/mk-worker conf
+ (.getSharedContext supervisorData)
+ stormId
+ (.getAssignmentId supervisorData)
+ (int port)
+ workerId)]
+(ConfigUtils/setWorkerUserWSE conf workerId "")
+(ProcessSimulator/registerProcess pid worker)
+(.put (.getWorkerThreadPids supervisorData) workerId pid)
+))
+(defn shutdown-local-worker [supervisorData worker-manager workerId]
+  (log-message "shutdown-local-worker")
+  (let [supervisor-id (.getSupervisorId supervisorData)
+worker-pids (.getWorkerThreadPids supervisorData)
+dead-workers (.getDeadWorkers supervisorData)]
+(.shutdownWorker worker-manager supervisor-id workerId worker-pids)
--- End diff --

In the old code when killing a worker we would call 
`supervisor/shutdown-worker`, it will perform some acitons, and call 
`ProcessSimulator/killProcess` & `supervisor/try-cleanup-worker`. Now we move 
`try-cleanup-worker` to the worker-manager, it is called `cleanupWorker` . So 
the `shutdown-local-worker` is similar to the `supervisor/shutddown-worker`. 
Why do we override the `shutdown-local-worker`. Maybe I can't explain it 
clearly. You can refer to testing.clj :
```
(defmacro capture-changed-workers
  [& body]
  `(let [launch-captured# (atom {})
 shutdown-captured# (atom {})]
 (with-var-roots [local-supervisor/launch-local-worker 
(mk-capture-launch-fn launch-captured#)
  local-supervisor/shutdown-local-worker 
(mk-capture-shutdown-fn shutdown-captured#)]
 ~@body
 {:launched @launch-captured#
  :shutdown @shutdown-captured#})))
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Shade Objenesis in storm-core

2016-03-29 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1274#issuecomment-202865541
  
Yes, you are right. Objenesis is missing from storm-core. I suggest we 
should Integrate the Objenesis's code in order to avoid to rely on the 
Objenesis jar. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1660: remove flux gitignore file and mov...

2016-03-27 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1267#issuecomment-202233180
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-27 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1257#discussion_r57544806
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/StandaloneSupervisor.java 
---
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.storm.Config;
+import org.apache.storm.scheduler.ISupervisor;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+public class StandaloneSupervisor implements ISupervisor {
+private String supervisorId;
+private Map conf;
+
+@Override
+public void prepare(Map stormConf, String schedulerLocalDir) {
+try {
+LocalState localState = new LocalState(schedulerLocalDir);
+String supervisorId = localState.getSupervisorId();
+if (supervisorId == null) {
+supervisorId = generateSupervisorId();
+localState.setSupervisorId(supervisorId);
+}
+this.conf = stormConf;
+this.supervisorId = supervisorId;
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public String getSupervisorId() {
+return supervisorId;
+}
+
+@Override
+public String getAssignmentId() {
--- End diff --

I agree with you


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-24 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1257#issuecomment-201153034
  
@jerrypeng @revans2  can you look at it again? Thank you very much!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-24 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1257#issuecomment-201152514
  
Sorry  I deleted the old branch about "supervisor" without attention, So I 
reopen the PR about supervisor. The PR is the same as before. If want look at 
thee old comments , you can link the  
1184(https://github.com/apache/storm/pull/1184). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-24 Thread hustfxj
GitHub user hustfxj opened a pull request:

https://github.com/apache/storm/pull/1257

[STORM-1279] port backtype.storm.daemon.supervisor to java

1 port supervisor.clj to java;
2 Update all the callings to supervisor;
3 Supervisor's class hierarchy as follows:
SupervisorManger is supervisor' manger which can clean and shutdown 
supervisor;
SyncSupervisorEvent is responsible for downloading/removing assignments and 
topologys' files;
SyncProcessEvent is responsible for starting/killing workers;
SupervisorUtils have common methods;
4 create local-supervisor.clj for local mode.
5. fix the fails on supervisor test in Windows

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm supervisor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1257.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1257


commit 08934e29982d3936c9e247a8d7bac563053f869f
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-02-26T04:38:23Z

port Supervisor to java

commit b281c735f0089d24407af67586a1b41de45ac382
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-02-26T05:15:56Z

update supervisor's structure

commit 19fcafbd0fe1cbee49e797824c47ba1f6b727270
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-02T01:00:37Z

update test codes about supervisor

commit b09b4129d845aff6be285ea1748b842499c40e0b
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-04T04:14:41Z

Merge branch 'master' into supervisor

commit 42bacde20ea86867b874395532aa034cfad4f120
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-06T08:05:14Z

Merge branch 'master' into supervisor

commit 465a4b89521a4ac15b81969009133bdfa12d0655
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-10T12:12:18Z



commit 184dc4a5c3fa8c9662ab224a82f33cc687b95c4b
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-10T14:17:06Z

sdf

commit 65ce9d2e03be5f5c4defa8342bfbefe9f59adcf9
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-10T14:57:01Z

Merge branch 'master' into supervisor

commit f78c36d7cc9ca82c6aa4e073f07279650a14fd45
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-10T15:20:33Z

remove setLocalizer

commit 69c8b3c31d4ee528aea58f716b092c24ba6b0b1a
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-10T15:26:42Z

Merge branch 'master' into supervisor

commit 95bf67347cad7c11aeaf55b7588e627be298d1c2
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-10T15:49:52Z

resolve conflict when merge with master

commit cc95d4f708efa123e5fc908bea15545f7139655b
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-11T00:03:00Z

sdf

commit a1e473526b5d9074ae1f9ff98162ddc78e426a73
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-14T08:54:36Z

add the plugin to use for manager worker

commit b49c99541ae9c2c3f86d9823c64d30765f7716c6
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-14T10:56:59Z

Merge branch 'master' into supervisor

commit 42928c2182cf2b755c6f98ad039b2e858787dfe4
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-14T16:16:19Z

start worker successfully

commit 56f27e5d58d7abd1bdd9aff95dfb862540b166ef
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-16T06:02:10Z

Merge branch 'master' of github.com:apache/storm

commit d63167cc4a13289ef46b5fa1650621c57b191d3b
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-17T01:29:54Z

Merge branch 'master' of github.com:apache/storm

commit 2e2ffb29df039e9339e7b2b44352c744efb5caf0
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-18T13:16:44Z

Merge branch 'master' of github.com:apache/storm

commit 28867372a4fc96d744ccd00a27d9e38dab2bd49e
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-23T03:10:08Z

Merge branch 'master' of github.com:apache/storm

commit f03b8bec105e88282211bf3e7dd4be4aeed484d8
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-23T05:53:00Z

Merge branch 'master' into supervisor and update supervisor based 
STORM-1631

commit 724f5d2cea8debea8c6fb6a0d42d275880636834
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-23T17:29:20Z

update

commit 0100898ce9006cedd66c61b082001d1d455e5199
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-25T01:09:53Z

Merge branch 'master' of github.com:apache/storm

commit 753648927bb2c82443ede9525200bb6197f8d3b6
Author: xiaojian

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-24 Thread hustfxj
Github user hustfxj closed the pull request at:

https://github.com/apache/storm/pull/1184


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-24 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r57414646
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java 
---
@@ -0,0 +1,632 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.*;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.*;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SyncSupervisorEvent implements Runnable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SyncSupervisorEvent.class);
+
+private EventManager syncSupEventManager;
+private EventManager syncProcessManager;
+private IStormClusterState stormClusterState;
+private LocalState localState;
+private SyncProcessEvent syncProcesses;
+private SupervisorData supervisorData;
+
+public SyncSupervisorEvent(SupervisorData supervisorData, 
SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
+EventManager syncProcessManager) {
+
+this.syncProcesses = syncProcesses;
+this.syncSupEventManager = syncSupEventManager;
+this.syncProcessManager = syncProcessManager;
+this.stormClusterState = supervisorData.getStormClusterState();
+this.localState = supervisorData.getLocalState();
+this.supervisorData = supervisorData;
+}
+
+@Override
+public void run() {
+try {
+Map conf = supervisorData.getConf();
+Runnable syncCallback = new EventManagerPushCallback(this, 
syncSupEventManager);
+List stormIds = 
stormClusterState.assignments(syncCallback);
+Map<String, Map<String, Object>> assignmentsSnapshot =
+getAssignmentsSnapshot(stormClusterState, stormIds, 
supervisorData.getAssignmentVersions().get(), syncCallback);
+Map<String, List> stormIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
+
+Set allDownloadedTopologyIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
+Map<String, String> stormcodeMap = 
readStormCodeLocations(assignmentsSnapshot);
+Map<Integer, LocalAssignment> existingAssignment = 
localState.getLocalAssignmentsMap();
+if (existingAssignment == null) {
+existingAssignment = new HashMap<>();
+}
+
+Map<Integer, LocalAssignment> allAssignment =
+readAssignments(assignmentsSnapshot, 
existingAssignment, supervisorData.getAssignmentId(), 
supervisorData.getSyncRetry());
+
+Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
+Set assignedStormIds = new HashSet<>();
+
+for (Map.Entry<Integer, LocalAssignment> entry : 
allAssignment.entrySet()) {
+if 
(supervisorData.getiSupervisor().confirmAssigned(entry.

[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-24 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-201114123
  
@redsanket @knusbaum @revans2  can you look it again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1655 (1.x) Flux doesn't set return code ...

2016-03-24 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1254#issuecomment-201091348
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1650] improve performance by XORShiftRa...

2016-03-23 Thread hustfxj
Github user hustfxj closed the pull request at:

https://github.com/apache/storm/pull/1250


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1650] improve performance by XORShiftRa...

2016-03-23 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1250#issuecomment-200444929
  
@revans2  It is used in a non-thread safe way, especialy spout/bolt thread. 
So we think it may not make sense. So I will close the PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-23 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r57193403
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncSupervisorEvent.java 
---
@@ -0,0 +1,632 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.storm.Config;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.ClientBlobStore;
+import org.apache.storm.cluster.IStateStorage;
+import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.event.EventManager;
+import org.apache.storm.generated.*;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.LocalizedResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.*;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.JarURLConnection;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SyncSupervisorEvent implements Runnable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SyncSupervisorEvent.class);
+
+private EventManager syncSupEventManager;
+private EventManager syncProcessManager;
+private IStormClusterState stormClusterState;
+private LocalState localState;
+private SyncProcessEvent syncProcesses;
+private SupervisorData supervisorData;
+
+public SyncSupervisorEvent(SupervisorData supervisorData, 
SyncProcessEvent syncProcesses, EventManager syncSupEventManager,
+EventManager syncProcessManager) {
+
+this.syncProcesses = syncProcesses;
+this.syncSupEventManager = syncSupEventManager;
+this.syncProcessManager = syncProcessManager;
+this.stormClusterState = supervisorData.getStormClusterState();
+this.localState = supervisorData.getLocalState();
+this.supervisorData = supervisorData;
+}
+
+@Override
+public void run() {
+try {
+Map conf = supervisorData.getConf();
+Runnable syncCallback = new EventManagerPushCallback(this, 
syncSupEventManager);
+List stormIds = 
stormClusterState.assignments(syncCallback);
+Map<String, Map<String, Object>> assignmentsSnapshot =
+getAssignmentsSnapshot(stormClusterState, stormIds, 
supervisorData.getAssignmentVersions().get(), syncCallback);
+Map<String, List> stormIdToProfilerActions = 
getProfileActions(stormClusterState, stormIds);
+
+Set allDownloadedTopologyIds = 
SupervisorUtils.readDownLoadedStormIds(conf);
+Map<String, String> stormcodeMap = 
readStormCodeLocations(assignmentsSnapshot);
+Map<Integer, LocalAssignment> existingAssignment = 
localState.getLocalAssignmentsMap();
+if (existingAssignment == null) {
+existingAssignment = new HashMap<>();
+}
+
+Map<Integer, LocalAssignment> allAssignment =
+readAssignments(assignmentsSnapshot, 
existingAssignment, supervisorData.getAssignmentId(), 
supervisorData.getSyncRetry());
+
+Map<Integer, LocalAssignment> newAssignment = new HashMap<>();
+Set assignedStormIds = new HashSet<>();
+
+for (Map.Entry<Integer, LocalAssignment> entry : 
allAssignment.entrySet()) {
+if 
(supervisorData.getiSupervisor().confirmAssigned(entry.

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-23 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r57185550
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.daemon.supervisor.workermanager.IWorkerManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+private  SupervisorData supervisorData;
+public static final ExecutorInfo SYSTEM_EXECUTOR_INFO = new 
ExecutorInfo(-1, -1);
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map<Integer, LocalAssignment> assignedExecutors = 
localState.getLocalAssignmentsMap();
+
+if (assignedExecutors == null) {
+assignedExecutors = new HashMap<>();
+}
+int now = Time.currentTimeSecs();
+
+Map<String, StateHeartbeat> localWorkerStats = 
getLocalWorkerStats(supervisorData, assignedExecutors, now);
+
+Set keeperWorkerIds = new HashSet<>();
+ 

[GitHub] storm pull request: [STORM-1650] improve performance by XORShiftRa...

2016-03-23 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1250#issuecomment-200402469
  
@revans2  ThreadLocalRandom is 20% faster than XORShiftRandom. But 
ThreadLocalRandom is static.Yes, we can't use XORShiftRandom in 
executor.clj due to thread safety now. But if we assure every spout/bolt thread 
has itself XORShiftRandom object. Thus we  can. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1650] improve performance by XORShiftRa...

2016-03-23 Thread hustfxj
GitHub user hustfxj opened a pull request:

https://github.com/apache/storm/pull/1250

[STORM-1650] improve performance by XORShiftRandom

XORShiftRandom have much better performance than Random, So I use 
XORShiftRandom replace Random at some places

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm rand

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1250.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1250


commit 56f27e5d58d7abd1bdd9aff95dfb862540b166ef
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-16T06:02:10Z

Merge branch 'master' of github.com:apache/storm

commit d63167cc4a13289ef46b5fa1650621c57b191d3b
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-17T01:29:54Z

Merge branch 'master' of github.com:apache/storm

commit 2e2ffb29df039e9339e7b2b44352c744efb5caf0
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-18T13:16:44Z

Merge branch 'master' of github.com:apache/storm

commit 28867372a4fc96d744ccd00a27d9e38dab2bd49e
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-23T03:10:08Z

Merge branch 'master' of github.com:apache/storm

commit b3c4d810be30a98b6c874abe535dd82bc2d4e13c
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-23T12:22:34Z

improve performance by XORShiftRandom




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-23 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1184#issuecomment-200214770
  
@jerrypeng  I have merge your bug fix.Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1648: drpc spout reconnect on failure

2016-03-22 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1246#issuecomment-200195424
  
It may not be necessary, but I still +1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-22 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-200183338
  
@knusbaum  I have remove the JMX, and use Gauge report the stats. I think 
Gauge is enough. can you look at it again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Make StormTimer join task thread on close

2016-03-22 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1244#issuecomment-200175487
  
good, +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-21 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-199555267
  
@knusbaum  I also prefer to go to Coda Hale for the stats before, It means 
we will drop the JMX Mbean. But I agree with you totaly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1229: port backtype.storm.metric.testing...

2016-03-20 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1238#issuecomment-199075001
  
@abhishekagarwal87  thank you. It have better use AtomicReference to 
replace the Map, then we can avoid to use  the "synchronized". And  do 
HashBasedTable.create() generate the struct "HashMap<R, HashMap<C, V>>", but 
not "Multimap<R, HashMap<C, V>>" ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: hotfix: parent version for pom.xml in storm-mo...

2016-03-19 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1228#issuecomment-198152909
  
+1 good


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1636] - Supervisor shutdown with worker...

2016-03-19 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1226#issuecomment-197706290
  
nice catch. I have considered this when I port supervisor.clj to java. Than 
you @jerrypeng 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-19 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-197645359
  
@redsanket @abhishekagarwal87  I have addressed your comments, thank you. 
@redsanket  I also am curious where is the jmx related code implemented. But I 
don't found, so I removed pacemaker.register () about jmx.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Fix logging for LoggingMetricsConsumer STORM-5...

2016-03-19 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1221#issuecomment-197705609
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-19 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-198351433
  
@redsanket @knusbaum  thank you. I have addressed your comments


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-19 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1184#issuecomment-197645649
  
@revans2  can you look at it again ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1630 Add guide page for Windows users

2016-03-16 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1220#issuecomment-197224808
  
@HeartSaVioR  sorry I think we shouldn't add it now. It is superfluous, so 
that's more better to a official guide from MS, linking that page, as you do. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1631] - Storm CGroup bugs 1) when launc...

2016-03-16 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1216#issuecomment-197209965
  
It looks good to me, +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-15 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1184#issuecomment-197155837
  
@jerrypeng  Of course


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...

2016-03-15 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1209#issuecomment-197155218
  
@srdo  I mean that we can report errors into Zookeeper whether the option 
is enabled. And the metrisc is not only record the counter, but also the 
timeout time. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1629 (For 1.x) Files/move doesn't work p...

2016-03-15 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1215#issuecomment-197152524
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1624] add maven central status in READM...

2016-03-14 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1212#issuecomment-196584158
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-14 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-196393789
  
Every can help me review this PR. In my perspective , I hope we can 
accelerate the first phrase job.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...

2016-03-14 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1209#issuecomment-196220384
  
Spout itself emits messages by SpoutOutputCollector 's emit().  If lots of 
messages failed, then acker will trigger SpoutOutputCollector emits those 
failed messages. It may happen dead lock. Because down bolts may slow to handle 
messsages and it will block emit(),  then spout/acker thread will block.  Thus 
others messages which is send by those can't be handled by acker. So the bolts 
will block. The scene may be called "loop dead lock".  I want say that this PR 
is sound to this scene. Because It can make us find the dead lock in time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-956: When the execute() or nextTuple() h...

2016-03-14 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1209#issuecomment-196213692
  
It looks to good.  I also hope we should see this done through both the 
metrics system and through writing an error into zookeeper that would show up 
on the UI for the component that is stuck as @revans2  @bastiliu  said. Then 
let users manually see what is happening.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: !!! DO NOT MERGE !!! STORM-1617 preview !!! DO...

2016-03-11 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1203#issuecomment-195316832
  
 I really support  the subversion repo. As far as I am concerned,now the 
asf-site itself is not convenient for users to browse and learn. And it  is  
lack of detailed documentation .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: Fix incorrect comment in default.yaml

2016-03-10 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1201#issuecomment-195164936
  
+1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-10 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55701258
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.utils.PathUtils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map<String, String> environment, final String logPreFix,
--- End diff --

@revans2  agree. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1610] port pacemaker_state_factory_test...

2016-03-10 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1192#issuecomment-194905475
  
@zhuoliu  if  it's ok, can you merge it ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-10 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55694794
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SyncProcessEvent.java ---
@@ -0,0 +1,669 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.CgroupManager;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.generated.LocalAssignment;
+import org.apache.storm.generated.WorkerResources;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. kill 
the ones that should be dead - read pids, kill -9 and individually remove file 
- rmr
+ * heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and log) 3. 
of the rest, figure out what assignments aren't yet satisfied 4. generate new 
worker
+ * ids, write new "approved workers" to LS 5. create local dir for worker 
id 5. launch new workers (give worker-id, port, and supervisor-id) 6. wait for 
workers
+ * launch
+ */
+public class SyncProcessEvent extends ShutdownWork implements Runnable {
+
+private static Logger LOG = 
LoggerFactory.getLogger(SyncProcessEvent.class);
+
+private  LocalState localState;
+
+private SupervisorData supervisorData;
+
+private class ProcessExitCallback implements Utils.ExitCodeCallable {
+private final String logPrefix;
+private final String workerId;
+
+public ProcessExitCallback(String logPrefix, String workerId) {
+this.logPrefix = logPrefix;
+this.workerId = workerId;
+}
+
+@Override
+public Object call() throws Exception {
+return null;
+}
+
+@Override
+public Object call(int exitCode) {
+LOG.info("{} exited with code: {}", logPrefix, exitCode);
+supervisorData.getDeadWorkers().add(workerId);
+return null;
+}
+}
+
+public SyncProcessEvent(){
+
+}
+public SyncProcessEvent(SupervisorData supervisorData) {
+init(supervisorData);
+}
+
+//TODO: initData is intended to local supervisor, so we will remove 
them after porting worker.clj to java
+public void init(SupervisorData supervisorData){
+this.supervisorData = supervisorData;
+this.localState = supervisorData.getLocalState();
+}
+
+
+/**
+ * 1. to kill are those in allocated that are dead or disallowed 2. 
kill the ones that should be dead - read pids, kill -9 and individually remove 
file -
+ * rmr heartbeat dir, rmdir pid dir, rmdir id dir (catch exception and 
log) 3. of the rest, figure out what assignments aren't yet satisfied 4. 
generate new
+ * worker ids, write new "approved workers" to LS 5. create local dir 
for worker id 5. launch new workers (give worker-id, port, and supervisor-id) 
6. wait
+ * for workers launch
+ */
+@Override
+public void run() {
+LOG.debug("Syncing processes");
+try {
+Map conf = supervisorData.getConf();
+Map<Integer, LocalAssignment> assignedExecutors = 
localState.getL

[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-10 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55694293
  
--- Diff: 
storm-core/src/jvm/org/apache/storm/daemon/supervisor/SupervisorUtils.java ---
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon.supervisor;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.utils.PathUtils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.LSWorkerHeartbeat;
+import org.apache.storm.localizer.LocalResource;
+import org.apache.storm.localizer.Localizer;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.LocalState;
+import org.apache.storm.utils.Utils;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URLDecoder;
+import java.util.*;
+
+public class SupervisorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(SupervisorUtils.class);
+
+private static final SupervisorUtils INSTANCE = new SupervisorUtils();
+private static SupervisorUtils _instance = INSTANCE;
+
+public static void setInstance(SupervisorUtils u) {
+_instance = u;
+}
+
+public static void resetInstance() {
+_instance = INSTANCE;
+}
+
+public static Process workerLauncher(Map conf, String user, 
List args, Map<String, String> environment, final String logPreFix,
--- End diff --

Yes. I am confused with the name in fact.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-10 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1184#discussion_r55689859
  
--- Diff: storm-core/src/clj/org/apache/storm/daemon/local_supervisor.clj 
---
@@ -0,0 +1,61 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.local-supervisor
+  (:import [org.apache.storm.daemon.supervisor SyncProcessEvent 
SupervisorData ShutdownWork Supervisor]
+   [org.apache.storm.utils Utils ConfigUtils]
+   [org.apache.storm ProcessSimulator])
+  (:use [org.apache.storm.daemon common]
+[org.apache.storm log])
+  (:require [org.apache.storm.daemon [worker :as worker] ])
+  (:require [clojure.string :as str])
+  (:gen-class))
--- End diff --

ok! I will follow up the JIRA to move local_supervisor.clj to java after 
this PR is merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1609] Netty Client is not best effort d...

2016-03-09 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1194#issuecomment-194629551
  
+1. I have a suggestion.  we drop messages because Channel is not in good 
state. I hope we should put the messages into buffer when Channel is not in 
good state. Of course, we should drop the buffer if the channel is still not in 
good state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1605] use '/usr/bin/env python' to chec...

2016-03-09 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1196#issuecomment-194607393
  
@unsleepy22  It loos good to me.  Many *.py files start with 
"#!/usr/bin/python" in storm . So do you think we should update them together 
or not?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-09 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-194280649
  
@abhishekagarwal87  Thank you. But I hope only it is the same as before for 
the details.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1270] [STORM-1274] port drpc ...

2016-03-09 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1156#discussion_r55503487
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java ---
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.codahale.metrics.Meter;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.*;
+import org.apache.storm.logging.ThriftAccessLogger;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.security.auth.*;
+import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
+import org.apache.storm.ui.FilterConfiguration;
+import org.apache.storm.ui.IConfigurator;
+import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.apache.thrift.TException;
+import org.eclipse.jetty.server.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Servlet;
+import java.security.Principal;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DrpcServer implements DistributedRPC.Iface, 
DistributedRPCInvocations.Iface, AutoCloseable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DrpcServer.class);
+private final Long timeoutCheckSecs = 5L;
+
+private Map conf;
+
+private ThriftServer handlerServer;
+private ThriftServer invokeServer;
+private IHttpCredentialsPlugin httpCredsHandler;
+
+private Thread clearThread;
+
+private IAuthorizer authorizer;
+
+private Servlet httpServlet;
+
+private AtomicInteger ctr = new AtomicInteger(0);
+private ConcurrentHashMap<String, ConcurrentLinkedQueue> 
requestQueues = new ConcurrentHashMap<String, 
ConcurrentLinkedQueue>();
+
+private static class InternalRequest {
+public final Semaphore sem;
+public final int startTimeSecs;
+public final String function;
+public final DRPCRequest request;
+public volatile Object result;
+
+public InternalRequest(String function, DRPCRequest request) {
+sem = new Semaphore(0);
+startTimeSecs = Time.currentTimeSecs();
+this.function = function;
+this.request = request;
+}
+}
+
+private ConcurrentHashMap<String, InternalRequest> outstandingRequests 
= new ConcurrentHashMap<>();
+
+private final static Meter meterHttpRequests = 
StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
+private final static Meter meterExecuteCalls = 
StormMetricsRegistry.registerMeter("drpc:num-execute-calls");
+private final static Meter meterResultCalls = 
StormMetricsRegistry.registerMeter("drpc:num-result-calls");
+private final static Meter meterFailRequestCalls = 
StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls");
+private final static Meter meterFetchRequestCalls = 
StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
+private final static Meter meterShutdownCalls = 
StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+
+public DrpcServer(Map conf) {
+this.conf = conf;
+this.authorizer = mkAuthorizationHandler((String) 
(this.conf.get(Config.DRPC_AUTHORIZER)));
+initClearThread();
+}
+
+public void setHttpServlet(Servlet httpServlet) {
+this.httpServlet = httpServlet;
+}
+
+private ThriftServer initHandlerServer(final DrpcServer service)

[GitHub] storm pull request: [STORM-1270] [STORM-1274] port drpc ...

2016-03-09 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1156#discussion_r55503401
  
--- Diff: storm-core/src/jvm/org/apache/storm/daemon/DrpcServer.java ---
@@ -0,0 +1,356 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import com.codahale.metrics.Meter;
+import com.google.common.collect.ImmutableMap;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.generated.*;
+import org.apache.storm.logging.ThriftAccessLogger;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.security.auth.*;
+import org.apache.storm.security.auth.authorizer.DRPCAuthorizerBase;
+import org.apache.storm.ui.FilterConfiguration;
+import org.apache.storm.ui.IConfigurator;
+import org.apache.storm.ui.UIHelpers;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.utils.VersionInfo;
+import org.apache.thrift.TException;
+import org.eclipse.jetty.server.Server;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.Servlet;
+import java.security.Principal;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DrpcServer implements DistributedRPC.Iface, 
DistributedRPCInvocations.Iface, AutoCloseable {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(DrpcServer.class);
+private final Long timeoutCheckSecs = 5L;
+
+private Map conf;
+
+private ThriftServer handlerServer;
+private ThriftServer invokeServer;
+private IHttpCredentialsPlugin httpCredsHandler;
+
+private Thread clearThread;
+
+private IAuthorizer authorizer;
+
+private Servlet httpServlet;
+
+private AtomicInteger ctr = new AtomicInteger(0);
+private ConcurrentHashMap<String, ConcurrentLinkedQueue> 
requestQueues = new ConcurrentHashMap<String, 
ConcurrentLinkedQueue>();
+
+private static class InternalRequest {
+public final Semaphore sem;
+public final int startTimeSecs;
+public final String function;
+public final DRPCRequest request;
+public volatile Object result;
+
+public InternalRequest(String function, DRPCRequest request) {
+sem = new Semaphore(0);
+startTimeSecs = Time.currentTimeSecs();
+this.function = function;
+this.request = request;
+}
+}
+
+private ConcurrentHashMap<String, InternalRequest> outstandingRequests 
= new ConcurrentHashMap<>();
+
+private final static Meter meterHttpRequests = 
StormMetricsRegistry.registerMeter("drpc:num-execute-http-requests");
+private final static Meter meterExecuteCalls = 
StormMetricsRegistry.registerMeter("drpc:num-execute-calls");
+private final static Meter meterResultCalls = 
StormMetricsRegistry.registerMeter("drpc:num-result-calls");
+private final static Meter meterFailRequestCalls = 
StormMetricsRegistry.registerMeter("drpc:num-failRequest-calls");
+private final static Meter meterFetchRequestCalls = 
StormMetricsRegistry.registerMeter("drpc:num-fetchRequest-calls");
+private final static Meter meterShutdownCalls = 
StormMetricsRegistry.registerMeter("drpc:num-shutdown-calls");
+
+public DrpcServer(Map conf) {
+this.conf = conf;
+this.authorizer = mkAuthorizationHandler((String) 
(this.conf.get(Config.DRPC_AUTHORIZER)));
+initClearThread();
+}
+
+public void setHttpServlet(Servlet httpServlet) {
+this.httpServlet = httpServlet;
+}
+
+private ThriftServer initHandlerServer(final DrpcServer service)

[GitHub] storm pull request: [STORM-1610] port pacemaker_state_factory_test...

2016-03-09 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1192#issuecomment-194179024
  
@zhuoliu @satishd @knusbaum @abhishekagarwal87  Thank you. I have addressed 
your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-08 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1195#issuecomment-194168476
  
when we start up nimbus before starting up pacemaker, the nimbus will die. 
Because nimbus can't read the workers' heartbeats by "heartbeat-storms". In my 
opinion, it shouldn't be like this. @revans2  Do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1611] port org.apache.storm.pacemaker.p...

2016-03-08 Thread hustfxj
GitHub user hustfxj opened a pull request:

https://github.com/apache/storm/pull/1195

[STORM-1611] port org.apache.storm.pacemaker.pacemaker to java

1 port pacemaker_test to java;
2 Update all the callings to cluster;
3 remove pacemaker.register about jmx;
4 fix the bug about #1071 , this is my fault on this worker.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm pacemaker

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1195.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1195


commit 83c72d5d3f80797be473368c60e2f3deb7b49e90
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-08T12:58:36Z

port pacemaker.clj_test.clj to java

commit f22673af95de6386528b829b09af5ad500d6ac0d
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-09T07:41:37Z

fix bug @STORM-1273




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1270] [STORM-1274] port drpc ...

2016-03-08 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1156#issuecomment-193758244
  
@revans2 @abhishekagarwal87 could you take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-08 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1184#issuecomment-193738634
  
@zhuoliu @abhishekagarwal87  Thank you. I will consider your suggestions, 
and adjust class hierarchy. Of course, I hope others can help review the 
supervisor and give me some comments. Then I will revise the codes together in 
order to avoid to different suggestions and repeat modifications. Thank you 
again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1610] port pacemaker_state_factory_test...

2016-03-08 Thread hustfxj
GitHub user hustfxj opened a pull request:

https://github.com/apache/storm/pull/1192

[STORM-1610] port pacemaker_state_factory_test.clj to java



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm pacemaker-state-factory-test

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1192.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1192


commit c1a240cd6f76fa4ac4db2c26c28b4dd8fd1c3d24
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-08T10:52:47Z

port pacemaker_state_factory_test.clj to java




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1233: Port AuthUtilsTest to java

2016-03-07 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1191#discussion_r55305746
  
--- Diff: 
storm-core/test/jvm/org/apache/storm/security/auth/AuthUtilsTest.java ---
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.security.auth;
+
+import java.io.IOException;
+import java.io.File;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.Subject;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.storm.Config;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class AuthUtilsTest {
+
+@Test(expected=IOException.class)
+public void getOptionsThrowsOnMissingSectionTest() throws IOException {
+Configuration mockConfig = Mockito.mock(Configuration.class);
+AuthUtils.get(mockConfig, "bogus-section", "");
+}
+
+@Test
+public void getNonExistentSectionTest() throws IOException {
+Map<String, String> optionMap = new HashMap<String, String>();
+AppConfigurationEntry entry = 
Mockito.mock(AppConfigurationEntry.class);
+
+Mockito.<Map<String, 
?>>when(entry.getOptions()).thenReturn(optionMap);
+String section = "bogus-section";
+Configuration mockConfig = Mockito.mock(Configuration.class);
+Mockito.when(mockConfig.getAppConfigurationEntry(section))
+   .thenReturn(new AppConfigurationEntry[] {entry});
+Assert.assertNull(
+AuthUtils.get(mockConfig, section, "nonexistent-key"));
+}
+
+@Test
+public void getFirstValueForValidKeyTest() throws IOException {
+Map<String, String> optionMap = new HashMap<String, String>();
+optionMap.put("existent-key", "foo");
+AppConfigurationEntry entry = 
Mockito.mock(AppConfigurationEntry.class);
+
+Mockito.<Map<String, 
?>>when(entry.getOptions()).thenReturn(optionMap);
+String section = "bogus-section";
+Configuration mockConfig = Mockito.mock(Configuration.class);
+Mockito.when(mockConfig.getAppConfigurationEntry(section))
+   .thenReturn(new AppConfigurationEntry[] {entry});
+Assert.assertEquals(
+AuthUtils.get(mockConfig, section, "existent-key"), "foo");
+}
--- End diff --

The test logic is different with clojure's 
test-returns-first-value-for-valid-key


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1233: Port AuthUtilsTest to java

2016-03-07 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1191#discussion_r55303331
  
--- Diff: storm-core/src/jvm/org/apache/storm/security/auth/AuthUtils.java 
---
@@ -81,35 +80,88 @@ public static Configuration GetConfiguration(Map 
storm_conf) {
 }
 
 /**
- * Pull a set of keys out of a Configuration.
- * @param conf The config to pull the key/value pairs out of.
- * @param conf_entry The app configuration entry name to get stuff 
from.
- * @return Return a map of the configs in conf.
+ * Get configurations for a section
+ * @param configuration The config to pull the key/value pairs out of.
+ * @param section The app configuration entry name to get stuff from.
+ * @return Return array of config entries or null if configuration is 
null
  */
-public static SortedMap<String, ?> PullConfig(Configuration conf,
-String conf_entry) throws 
IOException {
-if(conf == null) {
+public static AppConfigurationEntry[] getEntries(Configuration 
configuration, 
+String section) throws 
IOException {
+if (configuration == null) {
 return null;
 }
-AppConfigurationEntry configurationEntries[] = 
conf.getAppConfigurationEntry(conf_entry);
-if(configurationEntries == null) {
-String errorMessage = "Could not find a '" + conf_entry
-+ "' entry in this configuration: Client cannot start.";
+
+AppConfigurationEntry configurationEntries[] = 
configuration.getAppConfigurationEntry(section);
+if (configurationEntries == null) {
+String errorMessage = "Could not find a '"+ section + "' entry 
in this configuration.";
 throw new IOException(errorMessage);
 }
+return configurationEntries;
+}
 
+/**
+ * Pull a set of keys out of a Configuration.
+ * @param configuration The config to pull the key/value pairs out of.
+ * @param section The app configuration entry name to get stuff from.
+ * @return Return a map of the configs in conf.
+ */
+public static SortedMap<String, ?> pullConfig(Configuration 
configuration,
+String section) throws 
IOException {
+AppConfigurationEntry[] configurationEntries = 
AuthUtils.getEntries(configuration, section);
+
+if (configurationEntries == null) {
+return null;
+}
+
 TreeMap<String, Object> results = new TreeMap<>();
 
-for(AppConfigurationEntry entry: configurationEntries) {
+for (AppConfigurationEntry entry: configurationEntries) {
 Map<String, ?> options = entry.getOptions();
-for(String key : options.keySet()) {
+for (String key : options.keySet()) {
 results.put(key, options.get(key));
 }
 }
+
 return results;
 }
 
 /**
+ * Pull a the value given section and key from Configuration
+ * @param configuration The config to pull the key/value pairs out of.
+ * @param section The app configuration entry name to get stuff from.
+ * @param key The key to look up inside of the section
+ * @return Return a the String value of the configuration value
+ */
+public static String get(Configuration configuration, String section, 
String key) throws IOException {
+AppConfigurationEntry[] configurationEntries = 
AuthUtils.getEntries(configuration, section);
+
+if (configurationEntries == null){
+return null;
+}
+
+for (AppConfigurationEntry entry: configurationEntries) {
+Object val = entry.getOptions().get(key);
+if (val != null)
+return (String)val;
+}
+return null;
+}
+
+/**
+ * Instantiate class with klassName and return instance
+ * @param klassName The class name
+ * @return Instance of type T
+ */
+private static  T makeClass(String klassName) {
+try {
+Class klass = Class.forName(klassName);
+return (T)klass.newInstance();
+} catch (Exception e) {
+throw new RuntimeException(e);
+}
+}
--- End diff --

I think it's better to replace this method by  Utils.newInstance


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub 

[GitHub] storm pull request: [STORM-1606] print the information of testcase...

2016-03-06 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1189#discussion_r55138526
  
--- Diff: dev-tools/travis/print-errors-from-test-reports.py ---
@@ -55,6 +55,10 @@ def print_error_reports_from_report_file(file_path):
 if fail is not None:
 print_detail_information(testcase, fail)
 
+failure = testcase.find("failure")
--- End diff --

@unsleepy22  the  api "find(match)" of Python ElementTree finds the first 
sub element matching match. So the match is sub element, but not sub string


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-06 Thread hustfxj
GitHub user hustfxj reopened a pull request:

https://github.com/apache/storm/pull/1184

[STORM-1279] port backtype.storm.daemon.supervisor to java

1 port supervisor.clj to java;
2 Update all the callings to supervisor;
3 Supervisor's class hierarchy as follows:
  ``SupervisorManger is supervisor' manger which can clean and shutdown 
supervisor;``
  ``SyncSupervisorEvent is responsible for downloading/removing 
assignments and topologys' files;``
  ``SyncProcessEvent is responsible for starting/killing workers;``
  ``SupervisorUtils have common methods;``
4 create local-supervisor.clj for local mode.
5. fix the fails on supervisor test in Windows 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm supervisor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1184


commit 08934e29982d3936c9e247a8d7bac563053f869f
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-02-26T04:38:23Z

port Supervisor to java

commit b281c735f0089d24407af67586a1b41de45ac382
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-02-26T05:15:56Z

update supervisor's structure

commit 19fcafbd0fe1cbee49e797824c47ba1f6b727270
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-02T01:00:37Z

update test codes about supervisor

commit b09b4129d845aff6be285ea1748b842499c40e0b
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-04T04:14:41Z

Merge branch 'master' into supervisor

commit 42bacde20ea86867b874395532aa034cfad4f120
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-06T08:05:14Z

Merge branch 'master' into supervisor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-06 Thread hustfxj
Github user hustfxj closed the pull request at:

https://github.com/apache/storm/pull/1184


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1606] print the information of testcase...

2016-03-06 Thread hustfxj
GitHub user hustfxj opened a pull request:

https://github.com/apache/storm/pull/1189

[STORM-1606] print the  information of testcase which is on failure



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm travis

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1189.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1189


commit 812031ff7e3017dfcbff4c3434fbd3c2437dcb33
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-06T08:24:22Z

print the  information of testcase which is on failure




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1269] port backtype.storm.daemon.common...

2016-03-04 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1185#discussion_r55002193
  
--- Diff: storm-core/src/jvm/org/apache/storm/utils/Utils.java ---
@@ -2302,4 +2302,54 @@ public Object call() {
 public static long bitXor(Long a, Long b) {
 return a ^ b;
 }
+
+public static Integer parseInt(Object o) {
+if (o == null) {
+return null;
+}
+
+if (o instanceof String) {
+return Integer.parseInt(String.valueOf(o));
+} else if (o instanceof Long) {
+long value = (Long) o;
+return (int) value;
+} else if (o instanceof Integer) {
+return (Integer) o;
+} else {
+throw new RuntimeException("Invalid value " + 
o.getClass().getName() + " " + o);
+}
+}
+
+public static Integer parseInt(Object o, int defaultValue) {
--- End diff --

Utils has the methods . The name is getInt()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: STORM-1603 Storm UT fails on supervisor test i...

2016-03-04 Thread hustfxj
Github user hustfxj commented on a diff in the pull request:

https://github.com/apache/storm/pull/1183#discussion_r55001806
  
--- Diff: storm-core/test/clj/backtype/storm/supervisor_test.clj ---
@@ -261,8 +260,8 @@
 (str "-Dstorm.id=" mock-storm-id)
 (str "-Dworker.id=" mock-worker-id)
 (str "-Dworker.port=" mock-port)
-   "-Dstorm.log.dir=/logs"
-   
"-Dlog4j.configurationFile=/log4j2/worker.xml"
+(str "-Dstorm.log.dir=" 
file-path-separator "logs")
+(str "-Dlog4j.configurationFile=" 
file-path-separator "log4j2" file-path-separator "worker.xml")
--- End diff --

@HeartSaVioR  sorry, I didn't see STORM-1463 before.  I think we should 
find whole of things and fix it.  Thank you @HeartSaVioR 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-03 Thread hustfxj
Github user hustfxj commented on the pull request:

https://github.com/apache/storm/pull/1184#issuecomment-192118347
  
success on supervisor test in my computer of  windows, but CI have 2 
failures about supervisor-test.clj. I can't see the supervisor-test.xml. So I 
don't kown where failures come ? I didi some manual testing and things look 
good. I hope everyone can review it and give me some suggests. With my pressure!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] storm pull request: [STORM-1279] port backtype.storm.daemon.superv...

2016-03-03 Thread hustfxj
GitHub user hustfxj opened a pull request:

https://github.com/apache/storm/pull/1184

[STORM-1279] port backtype.storm.daemon.supervisor to java

1 port supervisor.clj to java;
2 Update all the callings to supervisor;
3 Supervisor's class hierarchy as follows:
  SupervisorManger is supervisor' manger which can clean and shutdown 
supervisor;
  SyncSupervisorEvent is responsible for downloading/removing 
assignments and topologys' files;
  SyncProcessEvent is responsible for starting/killing workers;
  SupervisorUtils have common methods;
4. fix the fails on supervisor test in Windows 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/hustfxj/storm supervisor

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/1184.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1184


commit 08934e29982d3936c9e247a8d7bac563053f869f
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-02-26T04:38:23Z

port Supervisor to java

commit b281c735f0089d24407af67586a1b41de45ac382
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-02-26T05:15:56Z

update supervisor's structure

commit 19fcafbd0fe1cbee49e797824c47ba1f6b727270
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-02T01:00:37Z

update test codes about supervisor

commit b09b4129d845aff6be285ea1748b842499c40e0b
Author: xiaojian.fxj <xiaojian@alibaba-inc.com>
Date:   2016-03-04T04:14:41Z

Merge branch 'master' into supervisor




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   >