Repository: incubator-gobblin Updated Branches: refs/heads/master a28ff2185 -> f502cbee3
[GOBBLIN-184] Call the flush method of CloseOnFlushWriterWrapper when a FlushControlMessage is received Closes #2040 from htran1/close_on_flush_handler_fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f502cbee Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f502cbee Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f502cbee Branch: refs/heads/master Commit: f502cbee33282c89438613c5882018006c5f3225 Parents: a28ff21 Author: Hung Tran <hut...@linkedin.com> Authored: Wed Aug 9 09:04:03 2017 -0700 Committer: Hung Tran <hut...@linkedin.com> Committed: Wed Aug 9 09:04:03 2017 -0700 ---------------------------------------------------------------------- .../writer/CloseOnFlushWriterWrapper.java | 9 +++- .../writer/CloseOnFlushWriterWrapperTest.java | 46 +++++++++++++++----- 2 files changed, 42 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f502cbee/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java index 2c81e52..c244b2d 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapper.java @@ -28,6 +28,7 @@ import com.google.common.base.Preconditions; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.records.ControlMessageHandler; +import org.apache.gobblin.records.FlushControlMessageHandler; import org.apache.gobblin.stream.RecordEnvelope; import org.apache.gobblin.util.Decorator; import org.apache.gobblin.util.FinalState; @@ -126,7 +127,13 @@ public class CloseOnFlushWriterWrapper<D> extends WriterWrapper<D> implements De @Override public ControlMessageHandler getMessageHandler() { - return this.writer.getMessageHandler(); + // if close on flush is configured then create a handler that will invoke the wrapper's flush to perform close + // on flush operations, otherwise return the wrapped writer's handler. + if (this.closeOnFlush) { + return new FlushControlMessageHandler(this); + } else { + return this.writer.getMessageHandler(); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f502cbee/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java index 11b2274..b14793a 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/writer/CloseOnFlushWriterWrapperTest.java @@ -26,6 +26,9 @@ import org.testng.annotations.Test; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.records.ControlMessageHandler; +import org.apache.gobblin.stream.ControlMessage; +import org.apache.gobblin.stream.FlushControlMessage; import org.apache.gobblin.stream.RecordEnvelope; public class CloseOnFlushWriterWrapperTest { @@ -40,12 +43,13 @@ public class CloseOnFlushWriterWrapperTest { byte[] record = new byte[]{'a', 'b', 'c', 'd'}; writer.writeEnvelope(new RecordEnvelope(record)); - writer.flush(); + writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); - Assert.assertEquals(dummyWriters.get(0).closed, false); - Assert.assertEquals(dummyWriters.get(0).committed, false); + Assert.assertFalse(dummyWriters.get(0).closed); + Assert.assertFalse(dummyWriters.get(0).committed); + Assert.assertTrue(dummyWriters.get(0).handlerCalled); } @Test @@ -59,12 +63,14 @@ public class CloseOnFlushWriterWrapperTest { byte[] record = new byte[]{'a', 'b', 'c', 'd'}; writer.writeEnvelope(new RecordEnvelope(record)); - writer.flush(); + writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); - Assert.assertEquals(dummyWriters.get(0).closed, true); - Assert.assertEquals(dummyWriters.get(0).committed, true); + Assert.assertTrue(dummyWriters.get(0).closed); + Assert.assertTrue(dummyWriters.get(0).committed); + // handler from CloseOnFlushWriterWrapper should have been called instead + Assert.assertFalse(dummyWriters.get(0).handlerCalled); } @Test @@ -78,22 +84,24 @@ public class CloseOnFlushWriterWrapperTest { byte[] record = new byte[]{'a', 'b', 'c', 'd'}; writer.writeEnvelope(new RecordEnvelope(record)); - writer.flush(); + writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); Assert.assertEquals(dummyWriters.size(), 1); Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(0).flushCount, 1); - Assert.assertEquals(dummyWriters.get(0).closed, true); - Assert.assertEquals(dummyWriters.get(0).committed, true); + Assert.assertTrue(dummyWriters.get(0).closed); + Assert.assertTrue(dummyWriters.get(0).committed); + Assert.assertFalse(dummyWriters.get(0).handlerCalled); writer.writeEnvelope(new RecordEnvelope(record)); - writer.flush(); + writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush"))); Assert.assertEquals(dummyWriters.size(), 2); Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1); Assert.assertEquals(dummyWriters.get(1).flushCount, 1); - Assert.assertEquals(dummyWriters.get(1).closed, true); - Assert.assertEquals(dummyWriters.get(1).committed, true); + Assert.assertTrue(dummyWriters.get(1).closed); + Assert.assertTrue(dummyWriters.get(1).committed); + Assert.assertFalse(dummyWriters.get(1).handlerCalled); } private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> dummyWriters, WorkUnitState state) { @@ -113,6 +121,7 @@ public class CloseOnFlushWriterWrapperTest { private int flushCount = 0; private boolean committed = false; private boolean closed = false; + private boolean handlerCalled = false; DummyWriter() { } @@ -153,6 +162,19 @@ public class CloseOnFlushWriterWrapperTest { } @Override + public ControlMessageHandler getMessageHandler() { + return new ControlMessageHandler() { + @Override + public void handleMessage(ControlMessage message) { + handlerCalled = true; + if (message instanceof FlushControlMessage) { + flush(); + } + } + }; + } + + @Override public void flush() { this.flushCount++; }