[FLINK-3224] [DataStream] Call setInputType() on output formats that implement InputTypeConfigurable
This closes #1497 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a726da6d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a726da6d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a726da6d Branch: refs/heads/release-0.10 Commit: a726da6d134448fd8ae6df87ec4070c390d7e4ce Parents: e6e0fd9 Author: Nick Dimiduk <ndimi...@apache.org> Authored: Mon Jan 11 14:02:28 2016 -0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Jan 13 00:25:43 2016 +0100 ---------------------------------------------------------------------- .../api/functions/sink/FileSinkFunction.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a726da6d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java index 504bc39..837f2d2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/FileSinkFunction.java @@ -20,9 +20,12 @@ package org.apache.flink.streaming.api.functions.sink; import java.io.IOException; import java.util.ArrayList; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.io.CleanupWhenUnsuccessful; import org.apache.flink.api.common.io.OutputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; @@ -37,7 +40,8 @@ import org.slf4j.LoggerFactory; * @param <IN> * Input type */ -public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> { +public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> implements + InputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -63,6 +67,14 @@ public abstract class FileSinkFunction<IN> extends RichSinkFunction<IN> { } @Override + public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) { + if (format instanceof InputTypeConfigurable) { + InputTypeConfigurable itc = (InputTypeConfigurable) format; + itc.setInputType(type, executionConfig); + } + } + + @Override public void invoke(IN record) throws Exception { tupleList.add(record); if (updateCondition()) {