Repository: flink Updated Branches: refs/heads/master 879bb1bb0 -> ac2137cfa
[FLINK-3777] Add openIF/closeIF methods to IF lifecycle This closes #1903 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac2137cf Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac2137cf Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac2137cf Branch: refs/heads/master Commit: ac2137cfa5e63bd4f53a4b7669dc591ab210093f Parents: 879bb1b Author: Flavio Pompermaier <f.pomperma...@gmail.com> Authored: Tue Apr 26 15:42:48 2016 +0200 Committer: zentol <ches...@apache.org> Committed: Tue Apr 26 16:00:20 2016 +0200 ---------------------------------------------------------------------- .../api/common/io/ReplicatingInputFormat.java | 20 ++++++++++++-- .../flink/api/common/io/RichInputFormat.java | 20 ++++++++++++++ .../common/operators/GenericDataSourceBase.java | 10 ++++++- .../api/common/io/FileInputFormatTest.java | 3 ++ .../operators/GenericDataSourceBaseTest.java | 8 ++++++ .../operators/util/TestRichInputFormat.java | 24 +++++++++++++++- .../flink/runtime/operators/DataSourceTask.java | 11 +++++++- .../runtime/operators/DataSourceTaskTest.java | 29 +++++++++++++++++++- 8 files changed, 118 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java index a084f64..14dc8f4 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java @@ -119,18 +119,32 @@ public final class ReplicatingInputFormat<OT, S extends InputSplit> extends Rich } @Override - public void setRuntimeContext(RuntimeContext context){ - if(this.replicatedIF instanceof RichInputFormat){ + public void setRuntimeContext(RuntimeContext context) { + if (this.replicatedIF instanceof RichInputFormat) { ((RichInputFormat)this.replicatedIF).setRuntimeContext(context); } } @Override public RuntimeContext getRuntimeContext(){ - if(this.replicatedIF instanceof RichInputFormat){ + if (this.replicatedIF instanceof RichInputFormat) { return ((RichInputFormat)this.replicatedIF).getRuntimeContext(); } else{ throw new RuntimeException("The underlying input format to this ReplicatingInputFormat isn't context aware"); } } + + @Override + public void openInputFormat() { + if (this.replicatedIF instanceof RichInputFormat) { + ((RichInputFormat)this.replicatedIF).openInputFormat(); + } + } + + @Override + public void closeInputFormat() { + if (this.replicatedIF instanceof RichInputFormat) { + ((RichInputFormat)this.replicatedIF).closeInputFormat(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java index 0c23e13..188be50 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java @@ -49,4 +49,24 @@ public abstract class RichInputFormat<OT, T extends InputSplit> implements Input "it in one of the other life cycle methods."); } } + + /** + * Opens this InputFormat instance. This method is called once per parallel instance. + * Resources should be allocated in this method. (e.g. database connections, cache, etc.) + * + * @see InputFormat + */ + public void openInputFormat() { + //do nothing here, just for subclasses + } + + /** + * Closes this InputFormat instance. This method is called once per parallel instance. + * Resources allocated during {@link #openInputFormat()} should be closed in this method. + * + * @see InputFormat + */ + public void closeInputFormat() { + //do nothing here, just for subclasses + } } http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java index 0794cc8..e80c99f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java @@ -210,10 +210,13 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O protected List<OUT> executeOnCollections(RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception { @SuppressWarnings("unchecked") InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject(); + //configure the input format inputFormat.configure(this.parameters); - if(inputFormat instanceof RichInputFormat){ + //open the input format + if (inputFormat instanceof RichInputFormat) { ((RichInputFormat) inputFormat).setRuntimeContext(ctx); + ((RichInputFormat) inputFormat).openInputFormat(); } List<OUT> result = new ArrayList<OUT>(); @@ -235,6 +238,11 @@ public class GenericDataSourceBase<OUT, T extends InputFormat<OUT, ?>> extends O inputFormat.close(); } + //close the input format + if (inputFormat instanceof RichInputFormat) { + ((RichInputFormat) inputFormat).closeInputFormat(); + } + return result; } http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java index 5aac540..63cb966 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java @@ -387,6 +387,7 @@ public class FileInputFormatTest { inputFormat.setFilePath(tempFile.toURI().toString()); inputFormat.configure(config); + inputFormat.openInputFormat(); FileInputSplit[] inputSplits = inputFormat.createInputSplits(3); @@ -399,6 +400,8 @@ public class FileInputFormatTest { } } } + + inputFormat.closeInputFormat(); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java index bda2fb6..083039a 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java @@ -81,11 +81,19 @@ public class GenericDataSourceBaseTest implements java.io.Serializable { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.disableObjectReuse(); + assertEquals(false, in.hasBeenClosed()); + assertEquals(false, in.hasBeenOpened()); List<String> resultMutableSafe = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + assertEquals(true, in.hasBeenClosed()); + assertEquals(true, in.hasBeenOpened()); in.reset(); executionConfig.enableObjectReuse(); + assertEquals(false, in.hasBeenClosed()); + assertEquals(false, in.hasBeenOpened()); List<String> resultRegular = source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, executionConfig, cpTasks, accumulatorMap), executionConfig); + assertEquals(true, in.hasBeenClosed()); + assertEquals(true, in.hasBeenOpened()); assertEquals(asList(TestIOData.RICH_NAMES), resultMutableSafe); assertEquals(asList(TestIOData.RICH_NAMES), resultRegular); http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java index 0945391..0bcd018 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java @@ -33,6 +33,8 @@ public class TestRichInputFormat extends GenericInputFormat<String> implements N private static final int NUM = 5; private static final String[] NAMES = TestIOData.NAMES; private int count = 0; + private boolean openCalled = false; + private boolean closeCalled = false; @Override public boolean reachedEnd() throws IOException { @@ -46,7 +48,27 @@ public class TestRichInputFormat extends GenericInputFormat<String> implements N getRuntimeContext().getNumberOfParallelSubtasks(); } - public void reset(){ + public void reset() { count = 0; + openCalled = false; + closeCalled = false; + } + + @Override + public void openInputFormat() { + openCalled = true; + } + + @Override + public void closeInputFormat() { + closeCalled = true; + } + + public boolean hasBeenOpened() { + return openCalled; + } + + public boolean hasBeenClosed() { + return closeCalled; } } http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java index 5eec40f..0c525ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java @@ -97,9 +97,11 @@ public class DataSourceTask<OT> extends AbstractInvokable { // -------------------------------------------------------------------- LOG.debug(getLogString("Starting data source operator")); - if(RichInputFormat.class.isAssignableFrom(this.format.getClass())){ + if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) { ((RichInputFormat) this.format).setRuntimeContext(createRuntimeContext()); LOG.debug(getLogString("Rich Source detected. Initializing runtime context.")); + ((RichInputFormat) this.format).openInputFormat(); + LOG.debug(getLogString("Rich Source detected. Opening the InputFormat.")); } ExecutionConfig executionConfig = getExecutionConfig(); @@ -192,6 +194,13 @@ public class DataSourceTask<OT> extends AbstractInvokable { } } finally { BatchTask.clearWriters(eventualOutputs); + // -------------------------------------------------------------------- + // Closing + // -------------------------------------------------------------------- + if (this.format != null && RichInputFormat.class.isAssignableFrom(this.format.getClass())) { + ((RichInputFormat) this.format).closeInputFormat(); + LOG.debug(getLogString("Rich Source detected. Closing the InputFormat.")); + } } if (!this.taskCanceled) { http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java index 96ae700..8f0642e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java @@ -23,6 +23,7 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -94,7 +95,18 @@ public class DataSourceTaskTest extends TaskTestBase { Assert.fail("Invoke method caused exception."); } - Assert.assertTrue("Invalid output size. Expected: "+(keyCnt*valCnt)+" Actual: "+this.outList.size(), + try { + Field formatField = DataSourceTask.class.getDeclaredField("format"); + formatField.setAccessible(true); + MockInputFormat inputFormat = (MockInputFormat) formatField.get(testTask); + Assert.assertTrue("Invalid status of the input format. Expected for opened: true, Actual: " + inputFormat.opened, inputFormat.opened); + Assert.assertTrue("Invalid status of the input format. Expected for closed: true, Actual: " + inputFormat.closed, inputFormat.closed); + } catch (Exception e) { + System.err.println(e); + Assert.fail("Reflection error while trying to validate inputFormat status."); + } + + Assert.assertTrue("Invalid output size. Expected: " + (keyCnt*valCnt) + " Actual: " + this.outList.size(), this.outList.size() == keyCnt * valCnt); HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt); @@ -237,6 +249,9 @@ public class DataSourceTaskTest extends TaskTestBase { private final IntValue key = new IntValue(); private final IntValue value = new IntValue(); + + private boolean opened = false; + private boolean closed = false; @Override public Record readRecord(Record target, byte[] record, int offset, int numBytes) { @@ -255,6 +270,18 @@ public class DataSourceTaskTest extends TaskTestBase { target.setField(1, this.value); return target; } + + public void openInputFormat() { + //ensure this is called only once + Assert.assertFalse("Invalid status of the input format. Expected for opened: false, Actual: " + opened, opened); + opened = true; + } + + public void closeInputFormat() { + //ensure this is called only once + Assert.assertFalse("Invalid status of the input format. Expected for closed: false, Actual: " + closed, closed); + closed = true; + } } public static class MockDelayingInputFormat extends DelimitedInputFormat<Record> {