STORM-697: Fixed incorrect typing for offset

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11194653
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11194653
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11194653

Branch: refs/heads/master
Commit: 11194653f43ce0d4f65d0051270cd86f2191cbc5
Parents: 2f119c6
Author: matt.tieman <matt.tie...@inin.com>
Authored: Tue Mar 3 16:42:31 2015 -0500
Committer: matt.tieman <matt.tie...@inin.com>
Committed: Tue Mar 3 16:42:31 2015 -0500

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java    | 2 +-
 .../src/jvm/storm/kafka/MessageMetadataScheme.java          | 2 +-
 .../jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java | 2 +-
 .../storm-kafka/src/jvm/storm/kafka/PartitionManager.java   | 9 ++++++++-
 .../src/jvm/storm/kafka/StringMessageAndMetadataScheme.java | 2 +-
 .../storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java    | 8 ++++----
 6 files changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 9af49fe..17d0fb7 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -211,7 +211,7 @@ public class KafkaUtils {
         return tups;
     }
     
-    public static Iterable<List<Object>> generateTuples(KafkaConfig 
kafkaConfig, Message msg, Partition partition, int offset) {
+    public static Iterable<List<Object>> generateTuples(KafkaConfig 
kafkaConfig, Message msg, Partition partition, long offset) {
         Iterable<List<Object>> tups;
         ByteBuffer payload = msg.payload();
         if (payload == null) {

http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
index d0dd2be..da7acbf 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
@@ -21,5 +21,5 @@ import backtype.storm.spout.Scheme;
  * limitations under the License.
  */
 public interface MessageMetadataScheme extends Scheme {
-    public List<Object> deserializeMessageWithMetadata(byte[] message, 
Partition partition, int offset);
+    public List<Object> deserializeMessageWithMetadata(byte[] message, 
Partition partition, long offset);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
 
b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
index 6226676..5eb20b5 100644
--- 
a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ 
b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -14,7 +14,7 @@ public class MessageMetadataSchemeAsMultiScheme extends 
SchemeAsMultiScheme {
     }
 
     @SuppressWarnings("unchecked")
-    public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] 
message, Partition partition, int offset) {
+    public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] 
message, Partition partition, long offset) {
         List<Object> o = ((MessageMetadataScheme) 
scheme).deserializeMessageWithMetadata(message, partition, offset);
         if (o == null) {
             return null;

http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 63e70cf..e1186e3 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -134,7 +134,14 @@ public class PartitionManager {
             if (toEmit == null) {
                 return EmitState.NO_EMITTED;
             }
-            Iterable<List<Object>> tups = 
KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
+            
+            Iterable<List<Object>> tups;
+            if (_spoutConfig.tupleMetaData) {
+                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg, 
_partition, toEmit.offset);
+            } else {
+                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
+            }
+            
             if (tups != null) {
                 for (List<Object> tup : tups) {
                     collector.emit(tup, new KafkaMessageId(_partition, 
toEmit.offset));

http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java 
b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
index 262a27c..2dc4c02 100644
--- 
a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
+++ 
b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
@@ -12,7 +12,7 @@ public class StringMessageAndMetadataScheme extends 
StringScheme implements Mess
     public static final String STRING_SCHEME_OFFSET = "offset";
 
     @Override
-    public List<Object> deserializeMessageWithMetadata(byte[] message, 
Partition partition, int offset) {
+    public List<Object> deserializeMessageWithMetadata(byte[] message, 
Partition partition, long offset) {
         String stringMessage = StringScheme.deserializeString(message);
         return new Values(stringMessage, partition.partition, offset);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/11194653/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java 
b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index a7c9b2b..362d721 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -176,7 +176,7 @@ public class KafkaUtilsTest {
         String value = "value";
         Partition mockPartition = Mockito.mock(Partition.class);
         mockPartition.partition = 0;
-        int offset = 0;
+        long offset = 0L;
         
         config.scheme = new MessageMetadataSchemeAsMultiScheme(new 
StringMessageAndMetadataScheme());
         config.tupleMetaData = true;
@@ -187,8 +187,8 @@ public class KafkaUtilsTest {
             Iterable<List<Object>> lists = KafkaUtils.generateTuples(config, 
msg.message(), mockPartition, offset);
             List<Object> values = lists.iterator().next(); 
             assertEquals("Message is incorrect", value, values.get(0));
-            assertEquals("Offset is incorrect", offset, values.get(1));
-            assertEquals("Partition is incorrect", mockPartition.partition, 
values.get(2));
+            assertEquals("Partition is incorrect", mockPartition.partition, 
values.get(1));
+            assertEquals("Offset is incorrect", offset, values.get(2));
         }
     }
     
@@ -197,7 +197,7 @@ public class KafkaUtilsTest {
         String value = "value";
         Partition mockPartition = Mockito.mock(Partition.class);
         mockPartition.partition = 0;
-        int offset = 0;
+        Long offset = 0L;
         
         config.scheme = new SchemeAsMultiScheme(new StringScheme());
         config.tupleMetaData = true;

Reply via email to