Repository: flink
Updated Branches:
  refs/heads/master 879bb1bb0 -> ac2137cfa


[FLINK-3777] Add openIF/closeIF methods to IF lifecycle

This closes #1903


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac2137cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac2137cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac2137cf

Branch: refs/heads/master
Commit: ac2137cfa5e63bd4f53a4b7669dc591ab210093f
Parents: 879bb1b
Author: Flavio Pompermaier <f.pomperma...@gmail.com>
Authored: Tue Apr 26 15:42:48 2016 +0200
Committer: zentol <ches...@apache.org>
Committed: Tue Apr 26 16:00:20 2016 +0200

----------------------------------------------------------------------
 .../api/common/io/ReplicatingInputFormat.java   | 20 ++++++++++++--
 .../flink/api/common/io/RichInputFormat.java    | 20 ++++++++++++++
 .../common/operators/GenericDataSourceBase.java | 10 ++++++-
 .../api/common/io/FileInputFormatTest.java      |  3 ++
 .../operators/GenericDataSourceBaseTest.java    |  8 ++++++
 .../operators/util/TestRichInputFormat.java     | 24 +++++++++++++++-
 .../flink/runtime/operators/DataSourceTask.java | 11 +++++++-
 .../runtime/operators/DataSourceTaskTest.java   | 29 +++++++++++++++++++-
 8 files changed, 118 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
index a084f64..14dc8f4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/ReplicatingInputFormat.java
@@ -119,18 +119,32 @@ public final class ReplicatingInputFormat<OT, S extends 
InputSplit> extends Rich
        }
 
        @Override
-       public void setRuntimeContext(RuntimeContext context){
-               if(this.replicatedIF instanceof RichInputFormat){
+       public void setRuntimeContext(RuntimeContext context) {
+               if (this.replicatedIF instanceof RichInputFormat) {
                        
((RichInputFormat)this.replicatedIF).setRuntimeContext(context);
                }
        }
 
        @Override
        public RuntimeContext getRuntimeContext(){
-               if(this.replicatedIF instanceof RichInputFormat){
+               if (this.replicatedIF instanceof RichInputFormat) {
                        return 
((RichInputFormat)this.replicatedIF).getRuntimeContext();
                } else{
                        throw new RuntimeException("The underlying input format 
to this ReplicatingInputFormat isn't context aware");
                }
        }
+
+       @Override
+       public void openInputFormat() {
+               if (this.replicatedIF instanceof RichInputFormat) {
+                       ((RichInputFormat)this.replicatedIF).openInputFormat();
+               }
+       }
+
+       @Override
+       public void closeInputFormat() {
+               if (this.replicatedIF instanceof RichInputFormat) {
+                       ((RichInputFormat)this.replicatedIF).closeInputFormat();
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
index 0c23e13..188be50 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/RichInputFormat.java
@@ -49,4 +49,24 @@ public abstract class RichInputFormat<OT, T extends 
InputSplit> implements Input
                        "it in one of the other life cycle methods.");
                }
        }
+
+       /**
+        * Opens this InputFormat instance. This method is called once per 
parallel instance.
+        * Resources should be allocated in this method. (e.g. database 
connections, cache, etc.)
+        * 
+        * @see InputFormat
+        */
+       public void openInputFormat() {
+               //do nothing here, just for subclasses
+       }
+
+       /**
+        * Closes this InputFormat instance. This method is called once per 
parallel instance.
+        * Resources allocated during {@link #openInputFormat()} should be 
closed in this method.
+        * 
+        * @see InputFormat
+        */
+       public void closeInputFormat() {
+               //do nothing here, just for subclasses
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
index 0794cc8..e80c99f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/operators/GenericDataSourceBase.java
@@ -210,10 +210,13 @@ public class GenericDataSourceBase<OUT, T extends 
InputFormat<OUT, ?>> extends O
        protected List<OUT> executeOnCollections(RuntimeContext ctx, 
ExecutionConfig executionConfig) throws Exception {
                @SuppressWarnings("unchecked")
                InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, 
InputSplit>) this.formatWrapper.getUserCodeObject();
+               //configure the input format
                inputFormat.configure(this.parameters);
 
-               if(inputFormat instanceof RichInputFormat){
+               //open the input format
+               if (inputFormat instanceof RichInputFormat) {
                        ((RichInputFormat) inputFormat).setRuntimeContext(ctx);
+                       ((RichInputFormat) inputFormat).openInputFormat();
                }
 
                List<OUT> result = new ArrayList<OUT>();
@@ -235,6 +238,11 @@ public class GenericDataSourceBase<OUT, T extends 
InputFormat<OUT, ?>> extends O
                        inputFormat.close();
                }
                
+               //close the input format
+               if (inputFormat instanceof RichInputFormat) {
+                       ((RichInputFormat) inputFormat).closeInputFormat();
+               }
+
                return result;
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 5aac540..63cb966 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -387,6 +387,7 @@ public class FileInputFormatTest {
                inputFormat.setFilePath(tempFile.toURI().toString());
 
                inputFormat.configure(config);
+               inputFormat.openInputFormat();
 
                FileInputSplit[] inputSplits = inputFormat.createInputSplits(3);
 
@@ -399,6 +400,8 @@ public class FileInputFormatTest {
                                }
                        }
                }
+
+               inputFormat.closeInputFormat();
        }
        
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
index bda2fb6..083039a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
@@ -81,11 +81,19 @@ public class GenericDataSourceBaseTest implements 
java.io.Serializable {
 
                        ExecutionConfig executionConfig = new ExecutionConfig();
                        executionConfig.disableObjectReuse();
+                       assertEquals(false, in.hasBeenClosed());
+                       assertEquals(false, in.hasBeenOpened());
                        List<String> resultMutableSafe = 
source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks, accumulatorMap), executionConfig);
+                       assertEquals(true, in.hasBeenClosed());
+                       assertEquals(true, in.hasBeenOpened());
 
                        in.reset();
                        executionConfig.enableObjectReuse();
+                       assertEquals(false, in.hasBeenClosed());
+                       assertEquals(false, in.hasBeenOpened());
                        List<String> resultRegular = 
source.executeOnCollections(new RuntimeUDFContext(taskInfo, null, 
executionConfig, cpTasks, accumulatorMap), executionConfig);
+                       assertEquals(true, in.hasBeenClosed());
+                       assertEquals(true, in.hasBeenOpened());
 
                        assertEquals(asList(TestIOData.RICH_NAMES), 
resultMutableSafe);
                        assertEquals(asList(TestIOData.RICH_NAMES), 
resultRegular);

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
index 0945391..0bcd018 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/operators/util/TestRichInputFormat.java
@@ -33,6 +33,8 @@ public class TestRichInputFormat extends 
GenericInputFormat<String> implements N
        private static final int NUM = 5;
        private static final String[] NAMES = TestIOData.NAMES;
        private int count = 0;
+       private boolean openCalled = false;
+       private boolean closeCalled = false;
 
        @Override
        public boolean reachedEnd() throws IOException {
@@ -46,7 +48,27 @@ public class TestRichInputFormat extends 
GenericInputFormat<String> implements N
                                
getRuntimeContext().getNumberOfParallelSubtasks();
        }
 
-       public void reset(){
+       public void reset() {
                count = 0;
+               openCalled = false;
+               closeCalled = false;
+       }
+
+       @Override
+       public void openInputFormat() {
+               openCalled = true;
+       }
+
+       @Override
+       public void closeInputFormat() {
+               closeCalled = true;
+       }
+
+       public boolean hasBeenOpened() {
+               return openCalled;
+       }
+
+       public boolean hasBeenClosed() {
+               return closeCalled;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 5eec40f..0c525ea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -97,9 +97,11 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                // 
--------------------------------------------------------------------
                LOG.debug(getLogString("Starting data source operator"));
 
-               
if(RichInputFormat.class.isAssignableFrom(this.format.getClass())){
+               if 
(RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
                        ((RichInputFormat) 
this.format).setRuntimeContext(createRuntimeContext());
                        LOG.debug(getLogString("Rich Source detected. 
Initializing runtime context."));
+                       ((RichInputFormat) this.format).openInputFormat();
+                       LOG.debug(getLogString("Rich Source detected. Opening 
the InputFormat."));
                }
 
                ExecutionConfig executionConfig = getExecutionConfig();
@@ -192,6 +194,13 @@ public class DataSourceTask<OT> extends AbstractInvokable {
                        }
                } finally {
                        BatchTask.clearWriters(eventualOutputs);
+                       // 
--------------------------------------------------------------------
+                       // Closing
+                       // 
--------------------------------------------------------------------
+                       if (this.format != null && 
RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
+                               ((RichInputFormat) 
this.format).closeInputFormat();
+                               LOG.debug(getLogString("Rich Source detected. 
Closing the InputFormat."));
+                       }
                }
 
                if (!this.taskCanceled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ac2137cf/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 96ae700..8f0642e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -23,6 +23,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -94,7 +95,18 @@ public class DataSourceTaskTest extends TaskTestBase {
                        Assert.fail("Invoke method caused exception.");
                }
                
-               Assert.assertTrue("Invalid output size. Expected: 
"+(keyCnt*valCnt)+" Actual: "+this.outList.size(),
+               try {
+                       Field formatField = 
DataSourceTask.class.getDeclaredField("format");
+                       formatField.setAccessible(true);
+                       MockInputFormat inputFormat = (MockInputFormat) 
formatField.get(testTask);
+                       Assert.assertTrue("Invalid status of the input format. 
Expected for opened: true, Actual: " + inputFormat.opened, inputFormat.opened);
+                       Assert.assertTrue("Invalid status of the input format. 
Expected for closed: true, Actual: " + inputFormat.closed, inputFormat.closed);
+               } catch (Exception e) {
+                       System.err.println(e);
+                       Assert.fail("Reflection error while trying to validate 
inputFormat status.");
+               }
+
+               Assert.assertTrue("Invalid output size. Expected: " + 
(keyCnt*valCnt) + " Actual: " + this.outList.size(),
                        this.outList.size() == keyCnt * valCnt);
                
                HashMap<Integer,HashSet<Integer>> keyValueCountMap = new 
HashMap<>(keyCnt);
@@ -237,6 +249,9 @@ public class DataSourceTaskTest extends TaskTestBase {
                
                private final IntValue key = new IntValue();
                private final IntValue value = new IntValue();
+
+               private boolean opened = false;
+               private boolean closed = false;
                
                @Override
                public Record readRecord(Record target, byte[] record, int 
offset, int numBytes) {
@@ -255,6 +270,18 @@ public class DataSourceTaskTest extends TaskTestBase {
                        target.setField(1, this.value);
                        return target;
                }
+
+               public void openInputFormat() {
+                       //ensure this is called only once
+                       Assert.assertFalse("Invalid status of the input format. 
Expected for opened: false, Actual: " + opened, opened);
+                       opened = true;
+               }
+
+               public void closeInputFormat() {
+                       //ensure this is called only once
+                       Assert.assertFalse("Invalid status of the input format. 
Expected for closed: false, Actual: " + closed, closed);
+                       closed = true;
+               }
        }
        
        public static class MockDelayingInputFormat extends 
DelimitedInputFormat<Record> {

Reply via email to