Repository: kafka Updated Branches: refs/heads/trunk 2e6177359 -> 6f2f1f984
KAFKA-2626: Handle null keys and value validation properly in OffsetStorageWriter. Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Gwen Shapira Closes #345 from ewencp/kafka-2626-offset-storage-writer-null-values Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6f2f1f98 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6f2f1f98 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6f2f1f98 Branch: refs/heads/trunk Commit: 6f2f1f9843f537b9bda3aa3951a867fdee661761 Parents: 2e61773 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Fri Oct 23 17:01:33 2015 -0700 Committer: Gwen Shapira <csh...@gmail.com> Committed: Fri Oct 23 17:01:33 2015 -0700 ---------------------------------------------------------------------- .../kafka/copycat/storage/OffsetUtils.java | 5 ++ .../storage/OffsetStorageWriterTest.java | 71 +++++++++++++++----- 2 files changed, 59 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6f2f1f98/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java index 8d78a57..9ba7662 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java @@ -34,6 +34,11 @@ public class OffsetUtils { } public static <K, V> void validateFormat(Map<K, V> offsetData) { + // Both keys and values for offsets may be null. For values, this is a useful way to delete offsets or indicate + // that there's not usable concept of offsets in your source system. + if (offsetData == null) + return; + for (Map.Entry<K, V> entry : offsetData.entrySet()) { if (!(entry.getKey() instanceof String)) throw new DataException("Offsets may only use String keys"); http://git-wip-us.apache.org/repos/asf/kafka/blob/6f2f1f98/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java index e33ecd0..3dd0b52 100644 --- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java +++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java @@ -45,15 +45,11 @@ public class OffsetStorageWriterTest { private static final String NAMESPACE = "namespace"; // Copycat format - any types should be accepted here private static final Map<String, String> OFFSET_KEY = Collections.singletonMap("key", "key"); - private static final List<Object> OFFSET_KEY_WRAPPED = Arrays.asList(NAMESPACE, OFFSET_KEY); private static final Map<String, Integer> OFFSET_VALUE = Collections.singletonMap("key", 12); // Serialized private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes(); private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes(); - private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED - = Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED), - ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED)); @Mock private OffsetBackingStore store; @Mock private Converter keyConverter; @@ -79,7 +75,7 @@ public class OffsetStorageWriterTest { public void testWriteFlush() throws Exception { @SuppressWarnings("unchecked") Callback<Void> callback = PowerMock.createMock(Callback.class); - expectStore(callback, false); + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); PowerMock.replayAll(); @@ -91,6 +87,41 @@ public class OffsetStorageWriterTest { PowerMock.verifyAll(); } + // It should be possible to set offset values to null + @Test + public void testWriteNullValueFlush() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> callback = PowerMock.createMock(Callback.class); + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, callback, false, null); + + PowerMock.replayAll(); + + writer.offset(OFFSET_KEY, null); + + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + + // It should be possible to use null keys. These aren't actually stored as null since the key is wrapped to include + // info about the namespace (connector) + @Test + public void testWriteNullKeyFlush() throws Exception { + @SuppressWarnings("unchecked") + Callback<Void> callback = PowerMock.createMock(Callback.class); + expectStore(null, null, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); + + PowerMock.replayAll(); + + writer.offset(null, OFFSET_VALUE); + + assertTrue(writer.beginFlush()); + writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); + + PowerMock.verifyAll(); + } + @Test public void testNoOffsetsToFlush() { // If no offsets are flushed, we should finish immediately and not have made any calls to the @@ -112,9 +143,9 @@ public class OffsetStorageWriterTest { @SuppressWarnings("unchecked") final Callback<Void> callback = PowerMock.createMock(Callback.class); // First time the write fails - expectStore(callback, true); + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, true, null); // Second time it succeeds - expectStore(callback, false); + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null); // Third time it has no data to flush so we won't get past beginFlush() PowerMock.replayAll(); @@ -135,7 +166,7 @@ public class OffsetStorageWriterTest { final Callback<Void> callback = PowerMock.createMock(Callback.class); // Trigger the send, but don't invoke the callback so we'll still be mid-flush CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); - expectStore(null, false, allowStoreCompleteCountdown); + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown); PowerMock.replayAll(); @@ -165,7 +196,7 @@ public class OffsetStorageWriterTest { CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); // In this test, the write should be cancelled so the callback will not be invoked and is not // passed to the expectStore call - expectStore(null, false, allowStoreCompleteCountdown); + expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown); PowerMock.replayAll(); @@ -180,13 +211,13 @@ public class OffsetStorageWriterTest { PowerMock.verifyAll(); } - private void expectStore(final Callback<Void> callback, final boolean fail) { - expectStore(callback, fail, null); - } - /** * Expect a request to store data to the underlying OffsetBackingStore. * + * @param key the key for the offset + * @param keySerialized serialized version of the key + * @param value the value for the offset + * @param valueSerialized serialized version of the value * @param callback the callback to invoke when completed, or null if the callback isn't * expected to be invoked * @param fail if true, treat @@ -195,14 +226,20 @@ public class OffsetStorageWriterTest { * ensure tests complete. * @return the captured set of ByteBuffer key-value pairs passed to the storage layer */ - private void expectStore(final Callback<Void> callback, + private void expectStore(Map<String, String> key, byte[] keySerialized, + Map<String, Integer> value, byte[] valueSerialized, + final Callback<Void> callback, final boolean fail, final CountDownLatch waitForCompletion) { - EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, OFFSET_KEY_WRAPPED)).andReturn(OFFSET_KEY_SERIALIZED); - EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, OFFSET_VALUE)).andReturn(OFFSET_VALUE_SERIALIZED); + List<Object> keyWrapped = Arrays.asList(NAMESPACE, key); + EasyMock.expect(keyConverter.fromCopycatData(NAMESPACE, null, keyWrapped)).andReturn(keySerialized); + EasyMock.expect(valueConverter.fromCopycatData(NAMESPACE, null, value)).andReturn(valueSerialized); final Capture<Callback<Void>> storeCallback = Capture.newInstance(); - EasyMock.expect(store.set(EasyMock.eq(OFFSETS_SERIALIZED), EasyMock.capture(storeCallback))) + final Map<ByteBuffer, ByteBuffer> offsetsSerialized = Collections.singletonMap( + keySerialized == null ? null : ByteBuffer.wrap(keySerialized), + valueSerialized == null ? null : ByteBuffer.wrap(valueSerialized)); + EasyMock.expect(store.set(EasyMock.eq(offsetsSerialized), EasyMock.capture(storeCallback))) .andAnswer(new IAnswer<Future<Void>>() { @Override public Future<Void> answer() throws Throwable {