Repository: storm
Updated Branches:
  refs/heads/1.x-branch be0d10558 -> 97aeef57e


STORM-2127: Storm-eventhubs should use latest amqp and eventhubs-client versions


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60e6eb9d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60e6eb9d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60e6eb9d

Branch: refs/heads/1.x-branch
Commit: 60e6eb9dd685f1ee434065f0de80bd4b1e45bef6
Parents: bf5d5e6
Author: Ravi Peri <ravip...@microsoft.com>
Authored: Mon Oct 17 15:59:56 2016 -0700
Committer: Ravi Peri <ravip...@microsoft.com>
Committed: Mon Oct 17 15:59:56 2016 -0700

----------------------------------------------------------------------
 external/storm-eventhubs/pom.xml                |  21 +++-
 .../storm/eventhubs/bolt/EventHubBolt.java      |  29 +++--
 .../eventhubs/spout/BinaryEventDataScheme.java  |  66 ++++++++++
 .../storm/eventhubs/spout/EventDataScheme.java  |  38 ++++--
 .../storm/eventhubs/spout/EventHubSpout.java    |   8 +-
 .../eventhubs/spout/EventHubSpoutConfig.java    | 119 ++++++++++++++-----
 .../storm/eventhubs/spout/FieldConstants.java   |   1 +
 .../storm/eventhubs/spout/IEventDataScheme.java |  15 ++-
 .../eventhubs/spout/StringEventDataScheme.java  |  69 +++++++++++
 pom.xml                                         |   2 +
 10 files changed, 307 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml
index 2c87b21..50ad00c 100755
--- a/external/storm-eventhubs/pom.xml
+++ b/external/storm-eventhubs/pom.xml
@@ -30,11 +30,7 @@
     <packaging>jar</packaging>
     <name>storm-eventhubs</name>
     <description>EventHubs Storm Spout</description>
-
-    <properties>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <eventhubs.client.version>0.9.1</eventhubs.client.version>
-    </properties>
+    
     <build>
         <plugins>
             <plugin>
@@ -104,6 +100,21 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-amqp-1-0-client</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.qpid</groupId>
+            <artifactId>qpid-amqp-1-0-common</artifactId>
+            <version>${qpid.version}</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>4.11</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
index ac5018b..5d34c4b 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java
@@ -39,12 +39,11 @@ public class EventHubBolt extends BaseRichBolt {
   private static final long serialVersionUID = 1L;
   private static final Logger logger = LoggerFactory
       .getLogger(EventHubBolt.class);
-  
+
   protected OutputCollector collector;
   protected EventHubSender sender;
   protected EventHubBoltConfig boltConfig;
-  
-  
+
   public EventHubBolt(String connectionString, String entityPath) {
     boltConfig = new EventHubBoltConfig(connectionString, entityPath);
   }
@@ -54,28 +53,29 @@ public class EventHubBolt extends BaseRichBolt {
     boltConfig = new EventHubBoltConfig(userName, password, namespace,
         entityPath, partitionMode);
   }
-  
+
   public EventHubBolt(EventHubBoltConfig config) {
     boltConfig = config;
   }
 
   @Override
-  public void prepare(Map config, TopologyContext context, OutputCollector 
collector) {
+  public void prepare(Map config, TopologyContext context,
+      OutputCollector collector) {
     this.collector = collector;
     String myPartitionId = null;
-    if(boltConfig.getPartitionMode()) {
-      //We can use the task index (starting from 0) as the partition ID
+    if (boltConfig.getPartitionMode()) {
+      // We can use the task index (starting from 0) as the partition ID
       myPartitionId = "" + context.getThisTaskIndex();
     }
     logger.info("creating sender: " + boltConfig.getConnectionString()
         + ", " + boltConfig.getEntityPath() + ", " + myPartitionId);
     try {
       EventHubClient eventHubClient = EventHubClient.create(
-          boltConfig.getConnectionString(), boltConfig.getEntityPath());
+          boltConfig.getConnectionString(),
+          boltConfig.getEntityPath());
       sender = eventHubClient.createPartitionSender(myPartitionId);
-    }
-    catch(Exception ex) {
-      logger.error(ex.getMessage());
+    } catch (Exception ex) {
+      collector.reportError(ex);
       throw new RuntimeException(ex);
     }
 
@@ -86,16 +86,15 @@ public class EventHubBolt extends BaseRichBolt {
     try {
       sender.send(boltConfig.getEventDataFormat().serialize(tuple));
       collector.ack(tuple);
-    }
-    catch(EventHubException ex) {
-      logger.error(ex.getMessage());
+    } catch (EventHubException ex) {
+      collector.reportError(ex);
       collector.fail(tuple);
     }
   }
 
   @Override
   public void declareOutputFields(OutputFieldsDeclarer declarer) {
-    
+
   }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
new file mode 100644
index 0000000..1964fa6
--- /dev/null
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java
@@ -0,0 +1,66 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
*******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An Event Data Scheme which deserializes message payload into the raw bytes.
+ *
+ * The resulting tuple would contain two items, the first being the message
+ * bytes, and the second a map of properties that include metadata, which can 
be
+ * used to determine who processes the message, and how it is processed.
+ */
+public class BinaryEventDataScheme implements IEventDataScheme {
+
+  @Override
+  public List<Object> deserialize(Message message) {
+    final List<Object> fieldContents = new ArrayList<Object>();
+
+    Map metaDataMap = new HashMap();
+    byte[] messageData = new byte[0];
+
+    for (Section section : message.getPayload()) {
+      if (section instanceof Data) {
+        Data data = (Data) section;
+        messageData = data.getValue().getArray();
+      } else if (section instanceof ApplicationProperties) {
+        final ApplicationProperties applicationProperties = 
(ApplicationProperties) section;
+        metaDataMap = applicationProperties.getValue();
+      }
+    }
+
+    fieldContents.add(messageData);
+    fieldContents.add(metaDataMap);
+    return fieldContents;
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(FieldConstants.Message, FieldConstants.META_DATA);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
index 0e275a5..22f5df4 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java
@@ -19,37 +19,59 @@ package org.apache.storm.eventhubs.spout;
 
 import org.apache.storm.tuple.Fields;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.qpid.amqp_1_0.client.Message;
 import org.apache.qpid.amqp_1_0.type.Section;
 import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
 import org.apache.qpid.amqp_1_0.type.messaging.Data;
 
+/**
+ * An Event Data Scheme which deserializes message payload into the Strings. No
+ * encoding is assumed. The receiver will need to handle parsing of the string
+ * data in appropriate encoding.
+ *
+ * The resulting tuple would contain two items: the the message string, and a
+ * map of properties that include metadata, which can be used to determine who
+ * processes the message, and how it is processed.
+ * 
+ * For passing the raw bytes of a messsage to Bolts, refer to
+ * {@link BinaryEventDataScheme}.
+ */
 public class EventDataScheme implements IEventDataScheme {
 
   private static final long serialVersionUID = 1L;
 
   @Override
   public List<Object> deserialize(Message message) {
-    List<Object> fieldContents = new ArrayList<Object>();
+    final List<Object> fieldContents = new ArrayList<Object>();
+
+    Map metaDataMap = new HashMap();
+    String messageData = "";
 
     for (Section section : message.getPayload()) {
       if (section instanceof Data) {
         Data data = (Data) section;
-        fieldContents.add(new String(data.getValue().getArray()));
-        return fieldContents;
+        messageData = new String(data.getValue().getArray());
       } else if (section instanceof AmqpValue) {
         AmqpValue amqpValue = (AmqpValue) section;
-        fieldContents.add(amqpValue.getValue().toString());
-        return fieldContents;
+        messageData = amqpValue.getValue().toString();
+      } else if (section instanceof ApplicationProperties) {
+        final ApplicationProperties applicationProperties = 
(ApplicationProperties) section;
+        metaDataMap = applicationProperties.getValue();
       }
     }
 
-    return null;
+    fieldContents.add(messageData);
+    fieldContents.add(metaDataMap);
+    return fieldContents;
   }
 
   @Override
   public Fields getOutputFields() {
-    return new Fields(FieldConstants.Message);
+    return new Fields(FieldConstants.Message, FieldConstants.META_DATA);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
index ff40315..479ce17 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java
@@ -121,8 +121,8 @@ public class EventHubSpout extends BaseRichSpout {
         zkEndpointAddress = sb.toString();
       }
       stateStore = new ZookeeperStateStore(zkEndpointAddress,
-          (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES),
-          (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL));
+           
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()),
+           
Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString()));
     }
     stateStore.open();
 
@@ -152,7 +152,7 @@ public class EventHubSpout extends BaseRichSpout {
     try {
       preparePartitions(config, totalTasks, taskIndex, collector);
     } catch (Exception e) {
-      logger.error(e.getMessage());
+      collector.reportError(e);
       throw new RuntimeException(e);
     }
     
@@ -167,7 +167,7 @@ public class EventHubSpout extends BaseRichSpout {
           }
           return concatMetricsDataMaps;
       }
-    }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS));
+    }, 
Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString()));
     logger.info("end open()");
   }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
index 77cd998..b8b8bbf 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java
@@ -23,7 +23,7 @@ import java.util.List;
 import com.microsoft.eventhubs.client.ConnectionStringBuilder;
 
 public class EventHubSpoutConfig implements Serializable {
-  private static final long serialVersionUID = 1L; 
+  private static final long serialVersionUID = 1L;
 
   public static final String EH_SERVICE_FQDN_SUFFIX = "servicebus.windows.net";
   private final String userName;
@@ -32,52 +32,70 @@ public class EventHubSpoutConfig implements Serializable {
   private final String entityPath;
   private final int partitionCount;
 
-  private String zkConnectionString = null; //if null then use zookeeper used 
by Storm
+  private String zkConnectionString = null; // if null then use zookeeper used
+                        // by Storm
   private int checkpointIntervalInSeconds = 10;
   private int receiverCredits = 1024;
   private int maxPendingMsgsPerPartition = 1024;
-  private long enqueueTimeFilter = 0; //timestamp in millisecond, 0 means 
disabling filter
+  private long enqueueTimeFilter = 0; // timestamp in millisecond, 0 means
+                    // disabling filter
   private String connectionString;
   private String topologyName;
-  private IEventDataScheme scheme = new EventDataScheme();
-  private String consumerGroupName = null; //if null then use default consumer 
group
+  private IEventDataScheme scheme = new StringEventDataScheme();
+  private String consumerGroupName = null; // if null then use default
+                        // consumer group
 
-  //These are mandatory parameters
-  public EventHubSpoutConfig(String username, String password, String 
namespace,
-      String entityPath, int partitionCount) {
+  // These are mandatory parameters
+  public EventHubSpoutConfig(String username, String password,
+      String namespace, String entityPath, int partitionCount) {
     this.userName = username;
     this.password = password;
     this.connectionString = new ConnectionStringBuilder(username, password,
-               namespace).getConnectionString();
+        namespace).getConnectionString();
     this.namespace = namespace;
     this.entityPath = entityPath;
     this.partitionCount = partitionCount;
   }
 
-  //Keep this constructor for backward compatibility
-  public EventHubSpoutConfig(String username, String password, String 
namespace,
-      String entityPath, int partitionCount, String zkConnectionString) {
+  // Keep this constructor for backward compatibility
+  public EventHubSpoutConfig(String username, String password,
+      String namespace, String entityPath, int partitionCount,
+      String zkConnectionString) {
     this(username, password, namespace, entityPath, partitionCount);
     setZkConnectionString(zkConnectionString);
   }
-  
-  //Keep this constructor for backward compatibility
-  public EventHubSpoutConfig(String username, String password, String 
namespace,
-      String entityPath, int partitionCount, String zkConnectionString,
-      int checkpointIntervalInSeconds, int receiverCredits) {
+
+  // Keep this constructor for backward compatibility
+  public EventHubSpoutConfig(String username, String password,
+      String namespace, String entityPath, int partitionCount,
+      String zkConnectionString, int checkpointIntervalInSeconds,
+      int receiverCredits) {
     this(username, password, namespace, entityPath, partitionCount,
         zkConnectionString);
     setCheckpointIntervalInSeconds(checkpointIntervalInSeconds);
     setReceiverCredits(receiverCredits);
   }
 
-  //Keep this constructor for backward compatibility
-  public EventHubSpoutConfig(String username, String password, String 
namespace,
-    String entityPath, int partitionCount, String zkConnectionString,
-    int checkpointIntervalInSeconds, int receiverCredits, int 
maxPendingMsgsPerPartition, long enqueueTimeFilter) {
-    
+  public EventHubSpoutConfig(String username, String password,
+      String namespace, String entityPath, int partitionCount,
+      String zkConnectionString, int checkpointIntervalInSeconds,
+      int receiverCredits, long enqueueTimeFilter) {
     this(username, password, namespace, entityPath, partitionCount,
-        zkConnectionString, checkpointIntervalInSeconds, receiverCredits);
+        zkConnectionString, checkpointIntervalInSeconds,
+        receiverCredits);
+    setEnqueueTimeFilter(enqueueTimeFilter);
+  }
+
+  // Keep this constructor for backward compatibility
+  public EventHubSpoutConfig(String username, String password,
+      String namespace, String entityPath, int partitionCount,
+      String zkConnectionString, int checkpointIntervalInSeconds,
+      int receiverCredits, int maxPendingMsgsPerPartition,
+      long enqueueTimeFilter) {
+
+    this(username, password, namespace, entityPath, partitionCount,
+        zkConnectionString, checkpointIntervalInSeconds,
+        receiverCredits);
     setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition);
     setEnqueueTimeFilter(enqueueTimeFilter);
   }
@@ -102,6 +120,11 @@ public class EventHubSpoutConfig implements Serializable {
     zkConnectionString = value;
   }
 
+  public EventHubSpoutConfig withZkConnectionString(String value) {
+    setZkConnectionString(value);
+    return this;
+  }
+
   public int getCheckpointIntervalInSeconds() {
     return checkpointIntervalInSeconds;
   }
@@ -109,7 +132,12 @@ public class EventHubSpoutConfig implements Serializable {
   public void setCheckpointIntervalInSeconds(int value) {
     checkpointIntervalInSeconds = value;
   }
-  
+
+  public EventHubSpoutConfig withCheckpointIntervalInSeconds(int value) {
+    setCheckpointIntervalInSeconds(value);
+    return this;
+  }
+
   public int getReceiverCredits() {
     return receiverCredits;
   }
@@ -117,7 +145,12 @@ public class EventHubSpoutConfig implements Serializable {
   public void setReceiverCredits(int value) {
     receiverCredits = value;
   }
-  
+
+  public EventHubSpoutConfig withReceiverCredits(int value) {
+    setReceiverCredits(value);
+    return this;
+  }
+
   public int getMaxPendingMsgsPerPartition() {
     return maxPendingMsgsPerPartition;
   }
@@ -125,7 +158,12 @@ public class EventHubSpoutConfig implements Serializable {
   public void setMaxPendingMsgsPerPartition(int value) {
     maxPendingMsgsPerPartition = value;
   }
-  
+
+  public EventHubSpoutConfig withMaxPendingMsgsPerPartition(int value) {
+    setMaxPendingMsgsPerPartition(value);
+    return this;
+  }
+
   public long getEnqueueTimeFilter() {
     return enqueueTimeFilter;
   }
@@ -134,6 +172,11 @@ public class EventHubSpoutConfig implements Serializable {
     enqueueTimeFilter = value;
   }
 
+  public EventHubSpoutConfig withEnqueueTimeFilter(long value) {
+    setEnqueueTimeFilter(value);
+    return this;
+  }
+
   public String getTopologyName() {
     return topologyName;
   }
@@ -142,6 +185,11 @@ public class EventHubSpoutConfig implements Serializable {
     topologyName = value;
   }
 
+  public EventHubSpoutConfig withTopologyName(String value) {
+    setTopologyName(value);
+    return this;
+  }
+
   public IEventDataScheme getEventDataScheme() {
     return scheme;
   }
@@ -150,6 +198,11 @@ public class EventHubSpoutConfig implements Serializable {
     this.scheme = scheme;
   }
 
+  public EventHubSpoutConfig withEventDataScheme(IEventDataScheme value) {
+    setEventDataScheme(value);
+    return this;
+  }
+
   public String getConsumerGroupName() {
     return consumerGroupName;
   }
@@ -158,6 +211,11 @@ public class EventHubSpoutConfig implements Serializable {
     consumerGroupName = value;
   }
 
+  public EventHubSpoutConfig withConsumerGroupName(String value) {
+    setConsumerGroupName(value);
+    return this;
+  }
+
   public List<String> getPartitionList() {
     List<String> partitionList = new ArrayList<String>();
 
@@ -174,6 +232,11 @@ public class EventHubSpoutConfig implements Serializable {
 
   public void setTargetAddress(String targetFqnAddress) {
     this.connectionString = new ConnectionStringBuilder(userName, password,
-               namespace, targetFqnAddress).getConnectionString();
+        namespace, targetFqnAddress).getConnectionString();
+  }
+
+  public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) {
+    setTargetAddress(targetFqnAddress);
+    return this;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
index bd655d6..b238391 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java
@@ -22,4 +22,5 @@ public class FieldConstants {
   public static final String PartitionKey = "partitionKey";
   public static final String Offset = "offset";
   public static final String Message = "message";
+  public static final String META_DATA = "metadata";
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
index b7e03b4..652e77d 100755
--- 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java
@@ -24,7 +24,20 @@ import org.apache.qpid.amqp_1_0.client.Message;
 
 public interface IEventDataScheme extends Serializable {
 
+  /**
+   * Deserialize an AMQP Message into a Tuple.
+   *
+   * @see #getOutputFields() for the list of fields the tuple will contain.
+   *
+   * @param message The Message to Deserialize.
+   * @return A tuple containing the deserialized fields of the message.
+   */
   List<Object> deserialize(Message message);
 
+  /**
+   * Retrieve the Fields that are present on tuples created by this object.
+   *
+   * @return The Fields that are present on tuples created by this object.
+   */
   Fields getOutputFields();
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
new file mode 100644
index 0000000..e6eb6de
--- /dev/null
+++ 
b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ 
*******************************************************************************/
+package org.apache.storm.eventhubs.spout;
+
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.amqp_1_0.client.Message;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * An Event Data Scheme which deserializes message payload into the Strings.
+ * No encoding is assumed. The receiver will need to handle parsing of the 
+ * string data in appropriate encoding.
+ *
+ * Note: Unlike other schemes provided, this scheme does not include any 
+ * metadata. 
+ * 
+ * For metadata please refer to {@link BinaryEventDataScheme}, {@link 
EventDataScheme} 
+ */
+public class StringEventDataScheme implements IEventDataScheme {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public List<Object> deserialize(Message message) {
+    final List<Object> fieldContents = new ArrayList<Object>();
+
+    for (Section section : message.getPayload()) {
+      if (section instanceof Data) {
+        Data data = (Data) section;
+        fieldContents.add(new String(data.getValue().getArray()));
+      } else if (section instanceof AmqpValue) {
+        AmqpValue amqpValue = (AmqpValue) section;
+        fieldContents.add(amqpValue.getValue().toString());
+      }
+    }
+    
+    return fieldContents;
+  }
+
+  @Override
+  public Fields getOutputFields() {
+    return new Fields(FieldConstants.Message);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a1466c9..acf22e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -267,6 +267,8 @@
         <aetherVersion>1.0.0.v20140518</aetherVersion>
         <mavenVersion>3.1.0</mavenVersion>
         <wagonVersion>1.0</wagonVersion>
+        <qpid.version>0.32</qpid.version>
+        <eventhubs.client.version>1.0.1</eventhubs.client.version>
     </properties>
 
     <modules>

Reply via email to