Repository: storm Updated Branches: refs/heads/1.x-branch be0d10558 -> 97aeef57e
STORM-2127: Storm-eventhubs should use latest amqp and eventhubs-client versions Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/60e6eb9d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/60e6eb9d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/60e6eb9d Branch: refs/heads/1.x-branch Commit: 60e6eb9dd685f1ee434065f0de80bd4b1e45bef6 Parents: bf5d5e6 Author: Ravi Peri <ravip...@microsoft.com> Authored: Mon Oct 17 15:59:56 2016 -0700 Committer: Ravi Peri <ravip...@microsoft.com> Committed: Mon Oct 17 15:59:56 2016 -0700 ---------------------------------------------------------------------- external/storm-eventhubs/pom.xml | 21 +++- .../storm/eventhubs/bolt/EventHubBolt.java | 29 +++-- .../eventhubs/spout/BinaryEventDataScheme.java | 66 ++++++++++ .../storm/eventhubs/spout/EventDataScheme.java | 38 ++++-- .../storm/eventhubs/spout/EventHubSpout.java | 8 +- .../eventhubs/spout/EventHubSpoutConfig.java | 119 ++++++++++++++----- .../storm/eventhubs/spout/FieldConstants.java | 1 + .../storm/eventhubs/spout/IEventDataScheme.java | 15 ++- .../eventhubs/spout/StringEventDataScheme.java | 69 +++++++++++ pom.xml | 2 + 10 files changed, 307 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/pom.xml b/external/storm-eventhubs/pom.xml index 2c87b21..50ad00c 100755 --- a/external/storm-eventhubs/pom.xml +++ b/external/storm-eventhubs/pom.xml @@ -30,11 +30,7 @@ <packaging>jar</packaging> <name>storm-eventhubs</name> <description>EventHubs Storm Spout</description> - - <properties> - <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <eventhubs.client.version>0.9.1</eventhubs.client.version> - </properties> + <build> <plugins> <plugin> @@ -104,6 +100,21 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-client</artifactId> + <version>${qpid.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-client-jms</artifactId> + <version>${qpid.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-amqp-1-0-common</artifactId> + <version>${qpid.version}</version> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java index ac5018b..5d34c4b 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/bolt/EventHubBolt.java @@ -39,12 +39,11 @@ public class EventHubBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; private static final Logger logger = LoggerFactory .getLogger(EventHubBolt.class); - + protected OutputCollector collector; protected EventHubSender sender; protected EventHubBoltConfig boltConfig; - - + public EventHubBolt(String connectionString, String entityPath) { boltConfig = new EventHubBoltConfig(connectionString, entityPath); } @@ -54,28 +53,29 @@ public class EventHubBolt extends BaseRichBolt { boltConfig = new EventHubBoltConfig(userName, password, namespace, entityPath, partitionMode); } - + public EventHubBolt(EventHubBoltConfig config) { boltConfig = config; } @Override - public void prepare(Map config, TopologyContext context, OutputCollector collector) { + public void prepare(Map config, TopologyContext context, + OutputCollector collector) { this.collector = collector; String myPartitionId = null; - if(boltConfig.getPartitionMode()) { - //We can use the task index (starting from 0) as the partition ID + if (boltConfig.getPartitionMode()) { + // We can use the task index (starting from 0) as the partition ID myPartitionId = "" + context.getThisTaskIndex(); } logger.info("creating sender: " + boltConfig.getConnectionString() + ", " + boltConfig.getEntityPath() + ", " + myPartitionId); try { EventHubClient eventHubClient = EventHubClient.create( - boltConfig.getConnectionString(), boltConfig.getEntityPath()); + boltConfig.getConnectionString(), + boltConfig.getEntityPath()); sender = eventHubClient.createPartitionSender(myPartitionId); - } - catch(Exception ex) { - logger.error(ex.getMessage()); + } catch (Exception ex) { + collector.reportError(ex); throw new RuntimeException(ex); } @@ -86,16 +86,15 @@ public class EventHubBolt extends BaseRichBolt { try { sender.send(boltConfig.getEventDataFormat().serialize(tuple)); collector.ack(tuple); - } - catch(EventHubException ex) { - logger.error(ex.getMessage()); + } catch (EventHubException ex) { + collector.reportError(ex); collector.fail(tuple); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - + } } http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java new file mode 100644 index 0000000..1964fa6 --- /dev/null +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/BinaryEventDataScheme.java @@ -0,0 +1,66 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.spout; + +import org.apache.qpid.amqp_1_0.client.Message; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.storm.tuple.Fields; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * An Event Data Scheme which deserializes message payload into the raw bytes. + * + * The resulting tuple would contain two items, the first being the message + * bytes, and the second a map of properties that include metadata, which can be + * used to determine who processes the message, and how it is processed. + */ +public class BinaryEventDataScheme implements IEventDataScheme { + + @Override + public List<Object> deserialize(Message message) { + final List<Object> fieldContents = new ArrayList<Object>(); + + Map metaDataMap = new HashMap(); + byte[] messageData = new byte[0]; + + for (Section section : message.getPayload()) { + if (section instanceof Data) { + Data data = (Data) section; + messageData = data.getValue().getArray(); + } else if (section instanceof ApplicationProperties) { + final ApplicationProperties applicationProperties = (ApplicationProperties) section; + metaDataMap = applicationProperties.getValue(); + } + } + + fieldContents.add(messageData); + fieldContents.add(metaDataMap); + return fieldContents; + } + + @Override + public Fields getOutputFields() { + return new Fields(FieldConstants.Message, FieldConstants.META_DATA); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java index 0e275a5..22f5df4 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventDataScheme.java @@ -19,37 +19,59 @@ package org.apache.storm.eventhubs.spout; import org.apache.storm.tuple.Fields; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; + import org.apache.qpid.amqp_1_0.client.Message; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; import org.apache.qpid.amqp_1_0.type.messaging.Data; +/** + * An Event Data Scheme which deserializes message payload into the Strings. No + * encoding is assumed. The receiver will need to handle parsing of the string + * data in appropriate encoding. + * + * The resulting tuple would contain two items: the the message string, and a + * map of properties that include metadata, which can be used to determine who + * processes the message, and how it is processed. + * + * For passing the raw bytes of a messsage to Bolts, refer to + * {@link BinaryEventDataScheme}. + */ public class EventDataScheme implements IEventDataScheme { private static final long serialVersionUID = 1L; @Override public List<Object> deserialize(Message message) { - List<Object> fieldContents = new ArrayList<Object>(); + final List<Object> fieldContents = new ArrayList<Object>(); + + Map metaDataMap = new HashMap(); + String messageData = ""; for (Section section : message.getPayload()) { if (section instanceof Data) { Data data = (Data) section; - fieldContents.add(new String(data.getValue().getArray())); - return fieldContents; + messageData = new String(data.getValue().getArray()); } else if (section instanceof AmqpValue) { AmqpValue amqpValue = (AmqpValue) section; - fieldContents.add(amqpValue.getValue().toString()); - return fieldContents; + messageData = amqpValue.getValue().toString(); + } else if (section instanceof ApplicationProperties) { + final ApplicationProperties applicationProperties = (ApplicationProperties) section; + metaDataMap = applicationProperties.getValue(); } } - return null; + fieldContents.add(messageData); + fieldContents.add(metaDataMap); + return fieldContents; } @Override public Fields getOutputFields() { - return new Fields(FieldConstants.Message); + return new Fields(FieldConstants.Message, FieldConstants.META_DATA); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java index ff40315..479ce17 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpout.java @@ -121,8 +121,8 @@ public class EventHubSpout extends BaseRichSpout { zkEndpointAddress = sb.toString(); } stateStore = new ZookeeperStateStore(zkEndpointAddress, - (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES), - (Integer)config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL)); + Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_TIMES).toString()), + Integer.parseInt(config.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL).toString())); } stateStore.open(); @@ -152,7 +152,7 @@ public class EventHubSpout extends BaseRichSpout { try { preparePartitions(config, totalTasks, taskIndex, collector); } catch (Exception e) { - logger.error(e.getMessage()); + collector.reportError(e); throw new RuntimeException(e); } @@ -167,7 +167,7 @@ public class EventHubSpout extends BaseRichSpout { } return concatMetricsDataMaps; } - }, (Integer)config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)); + }, Integer.parseInt(config.get(Config.TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS).toString())); logger.info("end open()"); } http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java index 77cd998..b8b8bbf 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/EventHubSpoutConfig.java @@ -23,7 +23,7 @@ import java.util.List; import com.microsoft.eventhubs.client.ConnectionStringBuilder; public class EventHubSpoutConfig implements Serializable { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; public static final String EH_SERVICE_FQDN_SUFFIX = "servicebus.windows.net"; private final String userName; @@ -32,52 +32,70 @@ public class EventHubSpoutConfig implements Serializable { private final String entityPath; private final int partitionCount; - private String zkConnectionString = null; //if null then use zookeeper used by Storm + private String zkConnectionString = null; // if null then use zookeeper used + // by Storm private int checkpointIntervalInSeconds = 10; private int receiverCredits = 1024; private int maxPendingMsgsPerPartition = 1024; - private long enqueueTimeFilter = 0; //timestamp in millisecond, 0 means disabling filter + private long enqueueTimeFilter = 0; // timestamp in millisecond, 0 means + // disabling filter private String connectionString; private String topologyName; - private IEventDataScheme scheme = new EventDataScheme(); - private String consumerGroupName = null; //if null then use default consumer group + private IEventDataScheme scheme = new StringEventDataScheme(); + private String consumerGroupName = null; // if null then use default + // consumer group - //These are mandatory parameters - public EventHubSpoutConfig(String username, String password, String namespace, - String entityPath, int partitionCount) { + // These are mandatory parameters + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount) { this.userName = username; this.password = password; this.connectionString = new ConnectionStringBuilder(username, password, - namespace).getConnectionString(); + namespace).getConnectionString(); this.namespace = namespace; this.entityPath = entityPath; this.partitionCount = partitionCount; } - //Keep this constructor for backward compatibility - public EventHubSpoutConfig(String username, String password, String namespace, - String entityPath, int partitionCount, String zkConnectionString) { + // Keep this constructor for backward compatibility + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString) { this(username, password, namespace, entityPath, partitionCount); setZkConnectionString(zkConnectionString); } - - //Keep this constructor for backward compatibility - public EventHubSpoutConfig(String username, String password, String namespace, - String entityPath, int partitionCount, String zkConnectionString, - int checkpointIntervalInSeconds, int receiverCredits) { + + // Keep this constructor for backward compatibility + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString, int checkpointIntervalInSeconds, + int receiverCredits) { this(username, password, namespace, entityPath, partitionCount, zkConnectionString); setCheckpointIntervalInSeconds(checkpointIntervalInSeconds); setReceiverCredits(receiverCredits); } - //Keep this constructor for backward compatibility - public EventHubSpoutConfig(String username, String password, String namespace, - String entityPath, int partitionCount, String zkConnectionString, - int checkpointIntervalInSeconds, int receiverCredits, int maxPendingMsgsPerPartition, long enqueueTimeFilter) { - + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString, int checkpointIntervalInSeconds, + int receiverCredits, long enqueueTimeFilter) { this(username, password, namespace, entityPath, partitionCount, - zkConnectionString, checkpointIntervalInSeconds, receiverCredits); + zkConnectionString, checkpointIntervalInSeconds, + receiverCredits); + setEnqueueTimeFilter(enqueueTimeFilter); + } + + // Keep this constructor for backward compatibility + public EventHubSpoutConfig(String username, String password, + String namespace, String entityPath, int partitionCount, + String zkConnectionString, int checkpointIntervalInSeconds, + int receiverCredits, int maxPendingMsgsPerPartition, + long enqueueTimeFilter) { + + this(username, password, namespace, entityPath, partitionCount, + zkConnectionString, checkpointIntervalInSeconds, + receiverCredits); setMaxPendingMsgsPerPartition(maxPendingMsgsPerPartition); setEnqueueTimeFilter(enqueueTimeFilter); } @@ -102,6 +120,11 @@ public class EventHubSpoutConfig implements Serializable { zkConnectionString = value; } + public EventHubSpoutConfig withZkConnectionString(String value) { + setZkConnectionString(value); + return this; + } + public int getCheckpointIntervalInSeconds() { return checkpointIntervalInSeconds; } @@ -109,7 +132,12 @@ public class EventHubSpoutConfig implements Serializable { public void setCheckpointIntervalInSeconds(int value) { checkpointIntervalInSeconds = value; } - + + public EventHubSpoutConfig withCheckpointIntervalInSeconds(int value) { + setCheckpointIntervalInSeconds(value); + return this; + } + public int getReceiverCredits() { return receiverCredits; } @@ -117,7 +145,12 @@ public class EventHubSpoutConfig implements Serializable { public void setReceiverCredits(int value) { receiverCredits = value; } - + + public EventHubSpoutConfig withReceiverCredits(int value) { + setReceiverCredits(value); + return this; + } + public int getMaxPendingMsgsPerPartition() { return maxPendingMsgsPerPartition; } @@ -125,7 +158,12 @@ public class EventHubSpoutConfig implements Serializable { public void setMaxPendingMsgsPerPartition(int value) { maxPendingMsgsPerPartition = value; } - + + public EventHubSpoutConfig withMaxPendingMsgsPerPartition(int value) { + setMaxPendingMsgsPerPartition(value); + return this; + } + public long getEnqueueTimeFilter() { return enqueueTimeFilter; } @@ -134,6 +172,11 @@ public class EventHubSpoutConfig implements Serializable { enqueueTimeFilter = value; } + public EventHubSpoutConfig withEnqueueTimeFilter(long value) { + setEnqueueTimeFilter(value); + return this; + } + public String getTopologyName() { return topologyName; } @@ -142,6 +185,11 @@ public class EventHubSpoutConfig implements Serializable { topologyName = value; } + public EventHubSpoutConfig withTopologyName(String value) { + setTopologyName(value); + return this; + } + public IEventDataScheme getEventDataScheme() { return scheme; } @@ -150,6 +198,11 @@ public class EventHubSpoutConfig implements Serializable { this.scheme = scheme; } + public EventHubSpoutConfig withEventDataScheme(IEventDataScheme value) { + setEventDataScheme(value); + return this; + } + public String getConsumerGroupName() { return consumerGroupName; } @@ -158,6 +211,11 @@ public class EventHubSpoutConfig implements Serializable { consumerGroupName = value; } + public EventHubSpoutConfig withConsumerGroupName(String value) { + setConsumerGroupName(value); + return this; + } + public List<String> getPartitionList() { List<String> partitionList = new ArrayList<String>(); @@ -174,6 +232,11 @@ public class EventHubSpoutConfig implements Serializable { public void setTargetAddress(String targetFqnAddress) { this.connectionString = new ConnectionStringBuilder(userName, password, - namespace, targetFqnAddress).getConnectionString(); + namespace, targetFqnAddress).getConnectionString(); + } + + public EventHubSpoutConfig withTargetAddress(String targetFqnAddress) { + setTargetAddress(targetFqnAddress); + return this; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java index bd655d6..b238391 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/FieldConstants.java @@ -22,4 +22,5 @@ public class FieldConstants { public static final String PartitionKey = "partitionKey"; public static final String Offset = "offset"; public static final String Message = "message"; + public static final String META_DATA = "metadata"; } http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java index b7e03b4..652e77d 100755 --- a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/IEventDataScheme.java @@ -24,7 +24,20 @@ import org.apache.qpid.amqp_1_0.client.Message; public interface IEventDataScheme extends Serializable { + /** + * Deserialize an AMQP Message into a Tuple. + * + * @see #getOutputFields() for the list of fields the tuple will contain. + * + * @param message The Message to Deserialize. + * @return A tuple containing the deserialized fields of the message. + */ List<Object> deserialize(Message message); + /** + * Retrieve the Fields that are present on tuples created by this object. + * + * @return The Fields that are present on tuples created by this object. + */ Fields getOutputFields(); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java ---------------------------------------------------------------------- diff --git a/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java new file mode 100644 index 0000000..e6eb6de --- /dev/null +++ b/external/storm-eventhubs/src/main/java/org/apache/storm/eventhubs/spout/StringEventDataScheme.java @@ -0,0 +1,69 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.storm.eventhubs.spout; + +import org.apache.storm.tuple.Fields; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.amqp_1_0.client.Message; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.storm.tuple.Fields; + +/** + * An Event Data Scheme which deserializes message payload into the Strings. + * No encoding is assumed. The receiver will need to handle parsing of the + * string data in appropriate encoding. + * + * Note: Unlike other schemes provided, this scheme does not include any + * metadata. + * + * For metadata please refer to {@link BinaryEventDataScheme}, {@link EventDataScheme} + */ +public class StringEventDataScheme implements IEventDataScheme { + + private static final long serialVersionUID = 1L; + + @Override + public List<Object> deserialize(Message message) { + final List<Object> fieldContents = new ArrayList<Object>(); + + for (Section section : message.getPayload()) { + if (section instanceof Data) { + Data data = (Data) section; + fieldContents.add(new String(data.getValue().getArray())); + } else if (section instanceof AmqpValue) { + AmqpValue amqpValue = (AmqpValue) section; + fieldContents.add(amqpValue.getValue().toString()); + } + } + + return fieldContents; + } + + @Override + public Fields getOutputFields() { + return new Fields(FieldConstants.Message); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/60e6eb9d/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a1466c9..acf22e7 100644 --- a/pom.xml +++ b/pom.xml @@ -267,6 +267,8 @@ <aetherVersion>1.0.0.v20140518</aetherVersion> <mavenVersion>3.1.0</mavenVersion> <wagonVersion>1.0</wagonVersion> + <qpid.version>0.32</qpid.version> + <eventhubs.client.version>1.0.1</eventhubs.client.version> </properties> <modules>