[FLINK-3224] [DataStream] Call setInputType() on output formats that implement InputTypeConfigurable
This closes #1497 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b474da62 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b474da62 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b474da62 Branch: refs/heads/master Commit: b474da622925ecdd6d342f193fdd201477780f59 Parents: ee0bc13 Author: Nick Dimiduk <ndimi...@apache.org> Authored: Mon Jan 11 14:02:28 2016 -0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Tue Jan 12 22:25:18 2016 +0100 ---------------------------------------------------------------------- .../api/functions/sink/FileSinkFunction.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b474da62/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java index 504bc39..837f2d2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java @@ -20,9 +20,12 @@ package org.apache.flink.streaming.api.functions.sink; import java.io.IOException; import java.util.ArrayList; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.io.CleanupWhenUnsuccessful; import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; @@ -37,7 +40,8 @@ import org.slf4j.LoggerFactory; * @param <IN> * Input type */ -public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> { +public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> implements + InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -63,6 +67,14 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> { } @Override + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + if (format instanceof InputTypeConfigurable) { + InputTypeConfigurable itc = (InputTypeConfigurable) format; + itc.setInputType(type, executionConfig); + } + } + + @Override public void invoke(IN record) throws Exception { tupleList.add(record); if (updateCondition()) {