Repository: kafka
Updated Branches:
  refs/heads/trunk 2e6177359 -> 6f2f1f984


KAFKA-2626: Handle null keys and value validation properly in 
OffsetStorageWriter.

Author: Ewen Cheslack-Postava <m...@ewencp.org>

Reviewers: Gwen Shapira

Closes #345 from ewencp/kafka-2626-offset-storage-writer-null-values


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f2f1f98
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f2f1f98
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f2f1f98

Branch: refs/heads/trunk
Commit: 6f2f1f9843f537b9bda3aa3951a867fdee661761
Parents: 2e61773
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Authored: Fri Oct 23 17:01:33 2015 -0700
Committer: Gwen Shapira <csh...@gmail.com>
Committed: Fri Oct 23 17:01:33 2015 -0700

----------------------------------------------------------------------
 .../kafka/copycat/storage/OffsetUtils.java      |  5 ++
 .../storage/OffsetStorageWriterTest.java        | 71 +++++++++++++++-----
 2 files changed, 59 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6f2f1f98/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
index 8d78a57..9ba7662 100644
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
+++ 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
@@ -34,6 +34,11 @@ public class OffsetUtils {
     }
 
     public static <K, V> void validateFormat(Map<K, V> offsetData) {
+        // Both keys and values for offsets may be null. For values, this is a 
useful way to delete offsets or indicate
+        // that there's not usable concept of offsets in your source system.
+        if (offsetData == null)
+            return;
+
         for (Map.Entry<K, V> entry : offsetData.entrySet()) {
             if (!(entry.getKey() instanceof String))
                 throw new DataException("Offsets may only use String keys");

http://git-wip-us.apache.org/repos/asf/kafka/blob/6f2f1f98/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
index e33ecd0..3dd0b52 100644
--- 
a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
+++ 
b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
@@ -45,15 +45,11 @@ public class OffsetStorageWriterTest {
     private static final String NAMESPACE = "namespace";
     // Copycat format - any types should be accepted here
     private static final Map<String, String> OFFSET_KEY = 
Collections.singletonMap("key", "key");
-    private static final List<Object> OFFSET_KEY_WRAPPED = 
Arrays.asList(NAMESPACE, OFFSET_KEY);
     private static final Map<String, Integer> OFFSET_VALUE = 
Collections.singletonMap("key", 12);
 
     // Serialized
     private static final byte[] OFFSET_KEY_SERIALIZED = 
"key-serialized".getBytes();
     private static final byte[] OFFSET_VALUE_SERIALIZED = 
"value-serialized".getBytes();
-    private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED
-            = Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED),
-            ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
 
     @Mock private OffsetBackingStore store;
     @Mock private Converter keyConverter;
@@ -79,7 +75,7 @@ public class OffsetStorageWriterTest {
     public void testWriteFlush() throws Exception {
         @SuppressWarnings("unchecked")
         Callback<Void> callback = PowerMock.createMock(Callback.class);
-        expectStore(callback, false);
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, false, null);
 
         PowerMock.replayAll();
 
@@ -91,6 +87,41 @@ public class OffsetStorageWriterTest {
         PowerMock.verifyAll();
     }
 
+    // It should be possible to set offset values to null
+    @Test
+    public void testWriteNullValueFlush() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> callback = PowerMock.createMock(Callback.class);
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, callback, 
false, null);
+
+        PowerMock.replayAll();
+
+        writer.offset(OFFSET_KEY, null);
+
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    // It should be possible to use null keys. These aren't actually stored as 
null since the key is wrapped to include
+    // info about the namespace (connector)
+    @Test
+    public void testWriteNullKeyFlush() throws Exception {
+        @SuppressWarnings("unchecked")
+        Callback<Void> callback = PowerMock.createMock(Callback.class);
+        expectStore(null, null, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, 
callback, false, null);
+
+        PowerMock.replayAll();
+
+        writer.offset(null, OFFSET_VALUE);
+
+        assertTrue(writer.beginFlush());
+        writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
     @Test
     public void testNoOffsetsToFlush() {
         // If no offsets are flushed, we should finish immediately and not 
have made any calls to the
@@ -112,9 +143,9 @@ public class OffsetStorageWriterTest {
         @SuppressWarnings("unchecked")
         final Callback<Void> callback = PowerMock.createMock(Callback.class);
         // First time the write fails
-        expectStore(callback, true);
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, true, null);
         // Second time it succeeds
-        expectStore(callback, false);
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, callback, false, null);
         // Third time it has no data to flush so we won't get past beginFlush()
 
         PowerMock.replayAll();
@@ -135,7 +166,7 @@ public class OffsetStorageWriterTest {
         final Callback<Void> callback = PowerMock.createMock(Callback.class);
         // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
         CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
-        expectStore(null, false, allowStoreCompleteCountdown);
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown);
 
         PowerMock.replayAll();
 
@@ -165,7 +196,7 @@ public class OffsetStorageWriterTest {
         CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
         // In this test, the write should be cancelled so the callback will 
not be invoked and is not
         // passed to the expectStore call
-        expectStore(null, false, allowStoreCompleteCountdown);
+        expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown);
 
         PowerMock.replayAll();
 
@@ -180,13 +211,13 @@ public class OffsetStorageWriterTest {
         PowerMock.verifyAll();
     }
 
-    private void expectStore(final Callback<Void> callback, final boolean 
fail) {
-        expectStore(callback, fail, null);
-    }
-
     /**
      * Expect a request to store data to the underlying OffsetBackingStore.
      *
+     * @param key the key for the offset
+     * @param keySerialized serialized version of the key
+     * @param value the value for the offset
+     * @param valueSerialized serialized version of the value
      * @param callback the callback to invoke when completed, or null if the 
callback isn't
      *                 expected to be invoked
      * @param fail if true, treat
@@ -195,14 +226,20 @@ public class OffsetStorageWriterTest {
      *                          ensure tests complete.
      * @return the captured set of ByteBuffer key-value pairs passed to the 
storage layer
      */
-    private void expectStore(final Callback<Void> callback,
+    private void expectStore(Map<String, String> key, byte[] keySerialized,
+                             Map<String, Integer> value, byte[] 
valueSerialized,
+                             final Callback<Void> callback,
                              final boolean fail,
                              final CountDownLatch waitForCompletion) {
-        EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, 
OFFSET_KEY_WRAPPED)).andReturn(OFFSET_KEY_SERIALIZED);
-        EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, 
OFFSET_VALUE)).andReturn(OFFSET_VALUE_SERIALIZED);
+        List<Object> keyWrapped = Arrays.asList(NAMESPACE, key);
+        EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, 
keyWrapped)).andReturn(keySerialized);
+        EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, 
value)).andReturn(valueSerialized);
 
         final Capture<Callback<Void>> storeCallback = Capture.newInstance();
-        EasyMock.expect(store.set(EasyMock.eq(OFFSETS_SERIALIZED), 
EasyMock.capture(storeCallback)))
+        final Map<ByteBuffer, ByteBuffer> offsetsSerialized = 
Collections.singletonMap(
+                keySerialized == null ? null : ByteBuffer.wrap(keySerialized),
+                valueSerialized == null ? null : 
ByteBuffer.wrap(valueSerialized));
+        EasyMock.expect(store.set(EasyMock.eq(offsetsSerialized), 
EasyMock.capture(storeCallback)))
                 .andAnswer(new IAnswer<Future<Void>>() {
                     @Override
                     public Future<Void> answer() throws Throwable {

Reply via email to