[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/storm/pull/2639


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187555046
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -127,8 +130,14 @@ public void setJmsAcknowledgeMode(final int mode) {
 messageHandler = new TransactedSessionMessageHandler();
 break;
 default:
-LOG.warn("Unsupported Acknowledge mode: "
-+ mode + " (See javax.jms.Session for valid values)");
+// individual message ack-ing needs vendor specific mode
+if (individualAcks) {
--- End diff --

This change requires end users to call individualAcks earlier than this 
method. Could we guide this in javadoc or in exception message? Or can we fix 
it? (Maybe it defers verification step in open() so violating fail-fast...)


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-11 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187554061
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -403,50 +274,31 @@ public void ack(Object msgId) {
  * Will only be called if we're transactional or not 
AUTO_ACKNOWLEDGE
  */
 @Override
-public void fail(Object msgId) {
+public void fail(final Object msgId) {
 LOG.warn("Message failed: " + msgId);
-this.pendingMessages.clear();
-this.toCommit.clear();
-synchronized (this.recoveryMutex) {
-this.hasFailures = true;
-}
+messageHandler.fail(msgId);
 }
 
 /**
- * Use the {@link #tupleProducer} to determine which fields are about 
to be emitted.
+ * Use the {@link #tupleProducer} to determine which fields are about
+ * to be emitted.
  *
- * Note that {@link #nextTuple()} always emits to the default 
stream, and thus only fields declared
- * for this stream are used.
+ * Note that {@link #nextTuple()} always emits to the default 
stream,
+ * and thus only fields declared for this stream are used.
  */
 @Override
-public void declareOutputFields(OutputFieldsDeclarer declarer) {
+public void declareOutputFields(final OutputFieldsDeclarer declarer) {
 this.tupleProducer.declareOutputFields(declarer);
 
 }
 
 /**
- * Returns true if the spout has received failures from 
which it has not yet recovered.
- *
- * @return {@code true} if there were failures, {@code false} 
otherwise.
- */
-public boolean hasFailures() {
-return this.hasFailures;
-}
-
-/**
- * Marks a healthy session state.
- */
-protected void recovered() {
-this.hasFailures = false;
-}
-
-/**
- * Sets the periodicity of the timer task that checks for failures and 
recovers the JMS session.
+ * Sets the periodicity of the timer task that
+ * checks for failures and recovers the JMS session.
  *
  * @param period desired wait period
  */
-public void setRecoveryPeriodMs(long period) {
-this.recoveryPeriodMs = period;
+public void setRecoveryPeriodMs(final long period) {
--- End diff --

Could we have patch for 1.x-branch with deprecating the method? I'm OK to 
remove the method in 2.0.0, but you're right for 1.x.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187500706
  
--- Diff: external/storm-jms/pom.xml ---
@@ -94,7 +94,7 @@
 maven-checkstyle-plugin
 
 
-63
+73
--- End diff --

I cant figure out why checkstyle keeps complaining. Running "mvn 
checkstyle:check" doesnt throw any warnings for JMSSpout.java. I have set it to 
64 for the build to pass.

And the rules we have seems too restrictive. (Java-doc for each 
variable/method and line length of 80). Should probably relook so that its not 
a time waste.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187496374
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -403,50 +274,31 @@ public void ack(Object msgId) {
  * Will only be called if we're transactional or not 
AUTO_ACKNOWLEDGE
  */
 @Override
-public void fail(Object msgId) {
+public void fail(final Object msgId) {
 LOG.warn("Message failed: " + msgId);
-this.pendingMessages.clear();
-this.toCommit.clear();
-synchronized (this.recoveryMutex) {
-this.hasFailures = true;
-}
+messageHandler.fail(msgId);
 }
 
 /**
- * Use the {@link #tupleProducer} to determine which fields are about 
to be emitted.
+ * Use the {@link #tupleProducer} to determine which fields are about
+ * to be emitted.
  *
- * Note that {@link #nextTuple()} always emits to the default 
stream, and thus only fields declared
- * for this stream are used.
+ * Note that {@link #nextTuple()} always emits to the default 
stream,
+ * and thus only fields declared for this stream are used.
  */
 @Override
-public void declareOutputFields(OutputFieldsDeclarer declarer) {
+public void declareOutputFields(final OutputFieldsDeclarer declarer) {
 this.tupleProducer.declareOutputFields(declarer);
 
 }
 
 /**
- * Returns true if the spout has received failures from 
which it has not yet recovered.
- *
- * @return {@code true} if there were failures, {@code false} 
otherwise.
- */
-public boolean hasFailures() {
-return this.hasFailures;
-}
-
-/**
- * Marks a healthy session state.
- */
-protected void recovered() {
-this.hasFailures = false;
-}
-
-/**
- * Sets the periodicity of the timer task that checks for failures and 
recovers the JMS session.
+ * Sets the periodicity of the timer task that
+ * checks for failures and recovers the JMS session.
  *
  * @param period desired wait period
  */
-public void setRecoveryPeriodMs(long period) {
-this.recoveryPeriodMs = period;
+public void setRecoveryPeriodMs(final long period) {
--- End diff --

I just left it since its breaking the public API and if someone is using 
this in their code.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187496258
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -262,42 +189,26 @@ public void onMessage(Message msg) {
  * topic/queue.
  */
 @Override
-public void open(Map conf,
- TopologyContext context,
- SpoutOutputCollector collector) {
+public void open(final Map conf,
+ final TopologyContext context,
+ final SpoutOutputCollector spoutOutputCollector) {
 
-if (this.jmsProvider == null) {
-throw new IllegalStateException("JMS provider has not been 
set.");
-}
-if (this.tupleProducer == null) {
-throw new IllegalStateException("JMS Tuple Producer has not 
been set.");
+if (jmsProvider == null) {
+throw new IllegalStateException(
+"JMS provider has not been set.");
 }
-// TODO get the default value from storm instead of hard coding 30 
secs
-Long topologyTimeout =
-((Number) 
conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 
DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
-if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > 
this.recoveryPeriodMs) {
-LOG.warn("*** WARNING *** : "
- + "Recovery period (" + this.recoveryPeriodMs + " 
ms.) is less then the configured "
- + "'topology.message.timeout.secs' of " + 
topologyTimeout
- + " secs. This could lead to a message replay 
flood!");
+if (tupleProducer == null) {
+throw new IllegalStateException(
+"JMS Tuple Producer has not been set.");
 }
-this.queue = new LinkedBlockingQueue();
-this.toCommit = new TreeSet();
-this.pendingMessages = new HashMap();
-this.collector = collector;
+collector = spoutOutputCollector;
 try {
-ConnectionFactory cf = this.jmsProvider.connectionFactory();
-Destination dest = this.jmsProvider.destination();
-this.connection = cf.createConnection();
-this.session = connection.createSession(false, 
this.jmsAcknowledgeMode);
-MessageConsumer consumer = session.createConsumer(dest);
-consumer.setMessageListener(this);
-this.connection.start();
-if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) 
{
-this.recoveryTimer = new Timer();
-this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 
RECOVERY_DELAY_MS, this.recoveryPeriodMs);
-}
-
+ConnectionFactory cf = jmsProvider.connectionFactory();
+Destination dest = jmsProvider.destination();
+connection = cf.createConnection();
+session = messageHandler.createSession(connection);
--- End diff --

We can throw an exception if `setIndividualAck` is invoked and the ACK mode 
is still the standard ones.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187496121
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -18,164 +18,124 @@
 
 package org.apache.storm.jms.spout;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import org.apache.storm.Config;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.jms.JmsTupleProducer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
- * A Storm Spout implementation that listens to a JMS topic 
or queue and outputs tuples based on the messages it receives.
+ * A Storm Spout implementation that listens to a JMS topic or
+ * queue and outputs tuples based on the messages it receives.
  *
  * JmsSpout instances rely on JmsProducer
  * implementations to obtain the JMS
  * ConnectionFactory and Destination objects 
necessary
  * to connect to a JMS topic/queue.
  *
- * When a JmsSpout receives a JMS message, it delegates to 
an
- * internal JmsTupleProducer instance to create a Storm tuple 
from the incoming message.
+ * When a {@code JmsSpout} receives a JMS message, it delegates to an
+ * internal {@code JmsTupleProducer} instance to create a Storm tuple from
+ * the incoming message.
  *
  * Typically, developers will supply a custom 
JmsTupleProducer
  * implementation appropriate for the expected message content.
  */
 @SuppressWarnings("serial")
-public class JmsSpout extends BaseRichSpout implements MessageListener {
+public class JmsSpout extends BaseRichSpout {
 
-/**
- * The logger object instance for this class.
- */
+/** The logger object instance for this class. */
 private static final Logger LOG = 
LoggerFactory.getLogger(JmsSpout.class);
 
-/**
- * The logger of the recovery task.
- */
-private static final Logger RECOVERY_TASK_LOG = 
LoggerFactory.getLogger(RecoveryTask.class);
-
-/**
- * Time to sleep between queue polling attempts.
- */
+/** Time to sleep between queue polling attempts. */
 private static final int POLL_INTERVAL_MS = 50;
 
-/**
- * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
- */
-private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
-
-/**
- * Time to wait before queuing the first recovery task.
- */
-private static final int RECOVERY_DELAY_MS = 10;
-/**
- * Used to safely recover failed JMS sessions across instances.
- */
-private final Serializable recoveryMutex = "RECOVERY_MUTEX";
 /**
  * The acknowledgment mode used for this instance.
  *
  * @see Session
  */
 private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-/**
- * Indicates whether or not this spout should run as a singleton.
- */
+
+/** Sets up the way we want to handle the emit, ack and fails. */
+private transient MessageHandler messageHandler = new MessageHandler();
+
+/** Indicates whether or not this spout should run as a singleton. */
 private boolean distributed = true;
-/**
- * Used to generate tuples from incoming messages.
- */
+
+/** Used to generate tuples from incoming messages. */
 private JmsTupleProducer tupleProducer;
-/**
- * Encapsulates jms related classes needed to communicate with the mq.
- */
+
+/** Encapsulates jms related classes needed to communicate with the 
mq. */
 private 

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187496062
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -18,164 +18,124 @@
 
 package org.apache.storm.jms.spout;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import org.apache.storm.Config;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.jms.JmsTupleProducer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
- * A Storm Spout implementation that listens to a JMS topic 
or queue and outputs tuples based on the messages it receives.
+ * A Storm Spout implementation that listens to a JMS topic or
+ * queue and outputs tuples based on the messages it receives.
  *
  * JmsSpout instances rely on JmsProducer
  * implementations to obtain the JMS
  * ConnectionFactory and Destination objects 
necessary
  * to connect to a JMS topic/queue.
  *
- * When a JmsSpout receives a JMS message, it delegates to 
an
- * internal JmsTupleProducer instance to create a Storm tuple 
from the incoming message.
+ * When a {@code JmsSpout} receives a JMS message, it delegates to an
+ * internal {@code JmsTupleProducer} instance to create a Storm tuple from
+ * the incoming message.
  *
  * Typically, developers will supply a custom 
JmsTupleProducer
  * implementation appropriate for the expected message content.
  */
 @SuppressWarnings("serial")
-public class JmsSpout extends BaseRichSpout implements MessageListener {
+public class JmsSpout extends BaseRichSpout {
 
-/**
- * The logger object instance for this class.
- */
+/** The logger object instance for this class. */
 private static final Logger LOG = 
LoggerFactory.getLogger(JmsSpout.class);
 
-/**
- * The logger of the recovery task.
- */
-private static final Logger RECOVERY_TASK_LOG = 
LoggerFactory.getLogger(RecoveryTask.class);
-
-/**
- * Time to sleep between queue polling attempts.
- */
+/** Time to sleep between queue polling attempts. */
 private static final int POLL_INTERVAL_MS = 50;
 
-/**
- * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
- */
-private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
-
-/**
- * Time to wait before queuing the first recovery task.
- */
-private static final int RECOVERY_DELAY_MS = 10;
-/**
- * Used to safely recover failed JMS sessions across instances.
- */
-private final Serializable recoveryMutex = "RECOVERY_MUTEX";
 /**
  * The acknowledgment mode used for this instance.
  *
  * @see Session
  */
 private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-/**
- * Indicates whether or not this spout should run as a singleton.
- */
+
+/** Sets up the way we want to handle the emit, ack and fails. */
+private transient MessageHandler messageHandler = new MessageHandler();
--- End diff --

Good catch. Yes I should make the MessageHandler serializable.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187495978
  
--- Diff: external/storm-jms/pom.xml ---
@@ -94,7 +94,7 @@
 maven-checkstyle-plugin
 
 
-63
+73
--- End diff --

Actually I can see more violations in JMSSpout.java in master branch (43) 
vs the patch (2). I am not sure why. Will fix it anyways.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187486786
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -18,164 +18,124 @@
 
 package org.apache.storm.jms.spout;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import org.apache.storm.Config;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.jms.JmsTupleProducer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
- * A Storm Spout implementation that listens to a JMS topic 
or queue and outputs tuples based on the messages it receives.
+ * A Storm Spout implementation that listens to a JMS topic or
+ * queue and outputs tuples based on the messages it receives.
  *
  * JmsSpout instances rely on JmsProducer
  * implementations to obtain the JMS
  * ConnectionFactory and Destination objects 
necessary
  * to connect to a JMS topic/queue.
  *
- * When a JmsSpout receives a JMS message, it delegates to 
an
- * internal JmsTupleProducer instance to create a Storm tuple 
from the incoming message.
+ * When a {@code JmsSpout} receives a JMS message, it delegates to an
+ * internal {@code JmsTupleProducer} instance to create a Storm tuple from
+ * the incoming message.
  *
  * Typically, developers will supply a custom 
JmsTupleProducer
  * implementation appropriate for the expected message content.
  */
 @SuppressWarnings("serial")
-public class JmsSpout extends BaseRichSpout implements MessageListener {
+public class JmsSpout extends BaseRichSpout {
 
-/**
- * The logger object instance for this class.
- */
+/** The logger object instance for this class. */
 private static final Logger LOG = 
LoggerFactory.getLogger(JmsSpout.class);
 
-/**
- * The logger of the recovery task.
- */
-private static final Logger RECOVERY_TASK_LOG = 
LoggerFactory.getLogger(RecoveryTask.class);
-
-/**
- * Time to sleep between queue polling attempts.
- */
+/** Time to sleep between queue polling attempts. */
 private static final int POLL_INTERVAL_MS = 50;
 
-/**
- * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
- */
-private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
-
-/**
- * Time to wait before queuing the first recovery task.
- */
-private static final int RECOVERY_DELAY_MS = 10;
-/**
- * Used to safely recover failed JMS sessions across instances.
- */
-private final Serializable recoveryMutex = "RECOVERY_MUTEX";
 /**
  * The acknowledgment mode used for this instance.
  *
  * @see Session
  */
 private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-/**
- * Indicates whether or not this spout should run as a singleton.
- */
+
+/** Sets up the way we want to handle the emit, ack and fails. */
+private transient MessageHandler messageHandler = new MessageHandler();
+
+/** Indicates whether or not this spout should run as a singleton. */
 private boolean distributed = true;
-/**
- * Used to generate tuples from incoming messages.
- */
+
+/** Used to generate tuples from incoming messages. */
 private JmsTupleProducer tupleProducer;
-/**
- * Encapsulates jms related classes needed to communicate with the mq.
- */
+
+/** Encapsulates jms related classes needed to communicate with the 
mq. */
 private JmsProvider 

[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187488104
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -403,50 +274,31 @@ public void ack(Object msgId) {
  * Will only be called if we're transactional or not 
AUTO_ACKNOWLEDGE
  */
 @Override
-public void fail(Object msgId) {
+public void fail(final Object msgId) {
 LOG.warn("Message failed: " + msgId);
-this.pendingMessages.clear();
-this.toCommit.clear();
-synchronized (this.recoveryMutex) {
-this.hasFailures = true;
-}
+messageHandler.fail(msgId);
 }
 
 /**
- * Use the {@link #tupleProducer} to determine which fields are about 
to be emitted.
+ * Use the {@link #tupleProducer} to determine which fields are about
+ * to be emitted.
  *
- * Note that {@link #nextTuple()} always emits to the default 
stream, and thus only fields declared
- * for this stream are used.
+ * Note that {@link #nextTuple()} always emits to the default 
stream,
+ * and thus only fields declared for this stream are used.
  */
 @Override
-public void declareOutputFields(OutputFieldsDeclarer declarer) {
+public void declareOutputFields(final OutputFieldsDeclarer declarer) {
 this.tupleProducer.declareOutputFields(declarer);
 
 }
 
 /**
- * Returns true if the spout has received failures from 
which it has not yet recovered.
- *
- * @return {@code true} if there were failures, {@code false} 
otherwise.
- */
-public boolean hasFailures() {
-return this.hasFailures;
-}
-
-/**
- * Marks a healthy session state.
- */
-protected void recovered() {
-this.hasFailures = false;
-}
-
-/**
- * Sets the periodicity of the timer task that checks for failures and 
recovers the JMS session.
+ * Sets the periodicity of the timer task that
+ * checks for failures and recovers the JMS session.
  *
  * @param period desired wait period
  */
-public void setRecoveryPeriodMs(long period) {
-this.recoveryPeriodMs = period;
+public void setRecoveryPeriodMs(final long period) {
--- End diff --

This method can be removed.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187487737
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -262,42 +189,26 @@ public void onMessage(Message msg) {
  * topic/queue.
  */
 @Override
-public void open(Map conf,
- TopologyContext context,
- SpoutOutputCollector collector) {
+public void open(final Map conf,
+ final TopologyContext context,
+ final SpoutOutputCollector spoutOutputCollector) {
 
-if (this.jmsProvider == null) {
-throw new IllegalStateException("JMS provider has not been 
set.");
-}
-if (this.tupleProducer == null) {
-throw new IllegalStateException("JMS Tuple Producer has not 
been set.");
+if (jmsProvider == null) {
+throw new IllegalStateException(
+"JMS provider has not been set.");
 }
-// TODO get the default value from storm instead of hard coding 30 
secs
-Long topologyTimeout =
-((Number) 
conf.getOrDefault(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 
DEFAULT_MESSAGE_TIMEOUT_SECS)).longValue();
-if ((TimeUnit.SECONDS.toMillis(topologyTimeout)) > 
this.recoveryPeriodMs) {
-LOG.warn("*** WARNING *** : "
- + "Recovery period (" + this.recoveryPeriodMs + " 
ms.) is less then the configured "
- + "'topology.message.timeout.secs' of " + 
topologyTimeout
- + " secs. This could lead to a message replay 
flood!");
+if (tupleProducer == null) {
+throw new IllegalStateException(
+"JMS Tuple Producer has not been set.");
 }
-this.queue = new LinkedBlockingQueue();
-this.toCommit = new TreeSet();
-this.pendingMessages = new HashMap();
-this.collector = collector;
+collector = spoutOutputCollector;
 try {
-ConnectionFactory cf = this.jmsProvider.connectionFactory();
-Destination dest = this.jmsProvider.destination();
-this.connection = cf.createConnection();
-this.session = connection.createSession(false, 
this.jmsAcknowledgeMode);
-MessageConsumer consumer = session.createConsumer(dest);
-consumer.setMessageListener(this);
-this.connection.start();
-if (this.isDurableSubscription() && this.recoveryPeriodMs > 0) 
{
-this.recoveryTimer = new Timer();
-this.recoveryTimer.scheduleAtFixedRate(new RecoveryTask(), 
RECOVERY_DELAY_MS, this.recoveryPeriodMs);
-}
-
+ConnectionFactory cf = jmsProvider.connectionFactory();
+Destination dest = jmsProvider.destination();
+connection = cf.createConnection();
+session = messageHandler.createSession(connection);
--- End diff --

We may want to consider the case: users provide mode which is not in JMS 
standard, and also setIndividualAck() is not called.

Now the case is handled as same as AUTO_ACKNOWLEDGE because of providing 
default value of messageHandler. Is it intended? We prevented the case with 
IllegalArgumentException.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187484538
  
--- Diff: external/storm-jms/pom.xml ---
@@ -94,7 +94,7 @@
 maven-checkstyle-plugin
 
 
-63
+73
--- End diff --

We should not increase the number, decrease or no change is only allowed. 
Please fix checkstyle violations based on checkstyle report and keep the number.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187485457
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -18,164 +18,124 @@
 
 package org.apache.storm.jms.spout;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import org.apache.storm.Config;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.jms.JmsTupleProducer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import java.util.HashMap;
+import java.util.Map;
+
 
 /**
- * A Storm Spout implementation that listens to a JMS topic 
or queue and outputs tuples based on the messages it receives.
+ * A Storm Spout implementation that listens to a JMS topic or
+ * queue and outputs tuples based on the messages it receives.
  *
  * JmsSpout instances rely on JmsProducer
  * implementations to obtain the JMS
  * ConnectionFactory and Destination objects 
necessary
  * to connect to a JMS topic/queue.
  *
- * When a JmsSpout receives a JMS message, it delegates to 
an
- * internal JmsTupleProducer instance to create a Storm tuple 
from the incoming message.
+ * When a {@code JmsSpout} receives a JMS message, it delegates to an
+ * internal {@code JmsTupleProducer} instance to create a Storm tuple from
+ * the incoming message.
  *
  * Typically, developers will supply a custom 
JmsTupleProducer
  * implementation appropriate for the expected message content.
  */
 @SuppressWarnings("serial")
-public class JmsSpout extends BaseRichSpout implements MessageListener {
+public class JmsSpout extends BaseRichSpout {
 
-/**
- * The logger object instance for this class.
- */
+/** The logger object instance for this class. */
 private static final Logger LOG = 
LoggerFactory.getLogger(JmsSpout.class);
 
-/**
- * The logger of the recovery task.
- */
-private static final Logger RECOVERY_TASK_LOG = 
LoggerFactory.getLogger(RecoveryTask.class);
-
-/**
- * Time to sleep between queue polling attempts.
- */
+/** Time to sleep between queue polling attempts. */
 private static final int POLL_INTERVAL_MS = 50;
 
-/**
- * The default value for {@link Config#TOPOLOGY_MESSAGE_TIMEOUT_SECS}.
- */
-private static final int DEFAULT_MESSAGE_TIMEOUT_SECS = 30;
-
-/**
- * Time to wait before queuing the first recovery task.
- */
-private static final int RECOVERY_DELAY_MS = 10;
-/**
- * Used to safely recover failed JMS sessions across instances.
- */
-private final Serializable recoveryMutex = "RECOVERY_MUTEX";
 /**
  * The acknowledgment mode used for this instance.
  *
  * @see Session
  */
 private int jmsAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;
-/**
- * Indicates whether or not this spout should run as a singleton.
- */
+
+/** Sets up the way we want to handle the emit, ack and fails. */
+private transient MessageHandler messageHandler = new MessageHandler();
--- End diff --

This must not be `transient`, because assigning `messageHandler` happens 
before serialization and Spout will lose the assigned value when 
deserialization happens. Are any of implementations of MessageHandler 
non-serializable?


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-05-10 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r187484768
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -18,164 +18,124 @@
 
 package org.apache.storm.jms.spout;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeSet;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-import org.apache.storm.Config;
 import org.apache.storm.jms.JmsProvider;
 import org.apache.storm.jms.JmsTupleProducer;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichSpout;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.Connection;
--- End diff --

Reorganizing imports happens now, and it looks like one of checkstyle 
violation.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-04-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r183539214
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -339,26 +339,26 @@ public void nextTuple() {
  */
 @Override
 public void ack(Object msgId) {
-
 Message msg = this.pendingMessages.remove(msgId);
-JmsMessageID oldest = this.toCommit.first();
-if (msgId.equals(oldest)) {
-if (msg != null) {
-try {
-LOG.debug("Committing...");
-msg.acknowledge();
-LOG.debug("JMS Message acked: " + msgId);
-this.toCommit.remove(msgId);
-} catch (JMSException e) {
-LOG.warn("Error acknowldging JMS message: " + msgId, 
e);
+if (!toCommit.isEmpty()) {
+JmsMessageID oldest = this.toCommit.first();
+if (msgId.equals(oldest)) {
+if (msg != null) {
+try {
+LOG.debug("Committing...");
+msg.acknowledge();
--- End diff --

I am not sure acking the oldest message in JMS is correct even for 
`CLIENT_ACKNOWLEDGE`. This would ack the new messages that have been consumed 
in the session (and possibly emitted) even before the spout received the ACK 
for the message. I guess we should keep removing the message from `toCommit` 
and invoke the JMS ack when its the last message in `toCommit`. (assuming we 
dont consume any other message in the meanwhile).


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-04-23 Thread arunmahadevan
Github user arunmahadevan commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r183531789
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -339,26 +339,26 @@ public void nextTuple() {
  */
 @Override
 public void ack(Object msgId) {
-
 Message msg = this.pendingMessages.remove(msgId);
-JmsMessageID oldest = this.toCommit.first();
-if (msgId.equals(oldest)) {
-if (msg != null) {
-try {
-LOG.debug("Committing...");
-msg.acknowledge();
-LOG.debug("JMS Message acked: " + msgId);
-this.toCommit.remove(msgId);
-} catch (JMSException e) {
-LOG.warn("Error acknowldging JMS message: " + msgId, 
e);
+if (!toCommit.isEmpty()) {
+JmsMessageID oldest = this.toCommit.first();
+if (msgId.equals(oldest)) {
+if (msg != null) {
+try {
+LOG.debug("Committing...");
+msg.acknowledge();
--- End diff --

This piece of code was already there. I am guessing its based on the JMS 
acknowledgement mode.

See - https://docs.oracle.com/cd/E19798-01/821-1841/bncfw/index.html

In `Session.CLIENT_ACKNOWLEDGE` - Acknowledging a consumed message 
automatically acknowledges the receipt of all messages that have been consumed 
by its session, so this logic seems fine. 

The spout is ignoring Auto acknowledgement mode, but I am not sure about 
the other modes like `DUPS_OK_ACKNOWLEDGE` or `SESSION_TRANSACTED` work. cc 
@ptgoetz who might have more context around this.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-04-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r183233200
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -339,26 +339,26 @@ public void nextTuple() {
  */
 @Override
 public void ack(Object msgId) {
-
 Message msg = this.pendingMessages.remove(msgId);
-JmsMessageID oldest = this.toCommit.first();
-if (msgId.equals(oldest)) {
-if (msg != null) {
-try {
-LOG.debug("Committing...");
-msg.acknowledge();
-LOG.debug("JMS Message acked: " + msgId);
-this.toCommit.remove(msgId);
-} catch (JMSException e) {
-LOG.warn("Error acknowldging JMS message: " + msgId, 
e);
+if (!toCommit.isEmpty()) {
--- End diff --

Could we leave the log message (at least DEBUG) so that we can see which 
messages are ignored while acking due to previous failure?


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-04-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r183233219
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -339,26 +339,26 @@ public void nextTuple() {
  */
 @Override
 public void ack(Object msgId) {
-
 Message msg = this.pendingMessages.remove(msgId);
-JmsMessageID oldest = this.toCommit.first();
-if (msgId.equals(oldest)) {
-if (msg != null) {
-try {
-LOG.debug("Committing...");
-msg.acknowledge();
-LOG.debug("JMS Message acked: " + msgId);
-this.toCommit.remove(msgId);
-} catch (JMSException e) {
-LOG.warn("Error acknowldging JMS message: " + msgId, 
e);
+if (!toCommit.isEmpty()) {
--- End diff --

Might be better to leave the log message which messages are dropped in 
`fail()` too.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-04-22 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/storm/pull/2639#discussion_r183233537
  
--- Diff: 
external/storm-jms/src/main/java/org/apache/storm/jms/spout/JmsSpout.java ---
@@ -339,26 +339,26 @@ public void nextTuple() {
  */
 @Override
 public void ack(Object msgId) {
-
 Message msg = this.pendingMessages.remove(msgId);
-JmsMessageID oldest = this.toCommit.first();
-if (msgId.equals(oldest)) {
-if (msg != null) {
-try {
-LOG.debug("Committing...");
-msg.acknowledge();
-LOG.debug("JMS Message acked: " + msgId);
-this.toCommit.remove(msgId);
-} catch (JMSException e) {
-LOG.warn("Error acknowldging JMS message: " + msgId, 
e);
+if (!toCommit.isEmpty()) {
+JmsMessageID oldest = this.toCommit.first();
+if (msgId.equals(oldest)) {
+if (msg != null) {
+try {
+LOG.debug("Committing...");
+msg.acknowledge();
--- End diff --

I'm sorry I'm not familiar with JMS, but could you explain how this 
approach guarantee ack are done for all messages? Looks like it just removes 
the message from `toCommit` if late ack messages come earlier.


---


[GitHub] storm pull request #2639: STORM-3035: fix the issue in JmsSpout.ack when toC...

2018-04-19 Thread arunmahadevan
GitHub user arunmahadevan opened a pull request:

https://github.com/apache/storm/pull/2639

STORM-3035: fix the issue in JmsSpout.ack when toCommit is empty



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/arunmahadevan/storm STORM-3035

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/storm/pull/2639.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2639


commit f091f0ffda8bf6b1fc52981f38cca757d9c98559
Author: Arun Mahadevan 
Date:   2018-04-19T17:52:38Z

STORM-3035: fix the issue in JmsSpout.ack when toCommit is empty




---