[FLINK-3224] [DataStream] Call setInputType() on output formats that implement 
InputTypeConfigurable

This closes #1497


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a726da6d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a726da6d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a726da6d

Branch: refs/heads/release-0.10
Commit: a726da6d134448fd8ae6df87ec4070c390d7e4ce
Parents: e6e0fd9
Author: Nick Dimiduk <ndimi...@apache.org>
Authored: Mon Jan 11 14:02:28 2016 -0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Wed Jan 13 00:25:43 2016 +0100

----------------------------------------------------------------------
 .../api/functions/sink/FileSinkFunction.java          | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a726da6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
index 504bc39..837f2d2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java
@@ -20,9 +20,12 @@ package org.apache.flink.streaming.api.functions.sink;
 import java.io.IOException;
 import java.util.ArrayList;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.configuration.Configuration;
 
 import org.slf4j.Logger;
@@ -37,7 +40,8 @@ import org.slf4j.LoggerFactory;
  * @param <IN>
  *            Input type
  */
-public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> {
+public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> 
implements
+       InputTypeConfigurable {
        
        private static final long serialVersionUID = 1L;
        
@@ -63,6 +67,14 @@ public abstract class FileSinkFunction<IN> extends 
RichSinkFunction<IN> {
        }
 
        @Override
+       public void setInputType(TypeInformation<?> type, ExecutionConfig 
executionConfig) {
+               if (format instanceof InputTypeConfigurable) {
+                       InputTypeConfigurable itc = (InputTypeConfigurable) 
format;
+                       itc.setInputType(type, executionConfig);
+               }
+       }
+
+       @Override
        public void invoke(IN record) throws Exception {
                tupleList.add(record);
                if (updateCondition()) {

Reply via email to