[FLINK-2351] [core] Remove IOFormat ConfigBuilders This closes #1420
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6bac9214 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6bac9214 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6bac9214 Branch: refs/heads/master Commit: 6bac921445d8fdecc4951bd4b9342bce0994c5ba Parents: b80ecfd Author: zentol <ches...@apache.org> Authored: Sat Nov 28 14:36:06 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Mon Nov 30 17:44:12 2015 +0100 ---------------------------------------------------------------------- .../api/common/io/DelimitedInputFormat.java | 95 -------------------- .../flink/api/common/io/FileInputFormat.java | 66 -------------- .../flink/api/common/io/FileOutputFormat.java | 50 ----------- .../operators/base/FileDataSourceBase.java | 28 ------ 4 files changed, 239 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6bac9214/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index 78c6705..cb32fc3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.common.operators.base.FileDataSourceBase; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -616,98 +615,4 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> { * The configuration key to set the number of samples to take for the statistics. */ private static final String NUM_STATISTICS_SAMPLES = "delimited-format.numSamples"; - - // ----------------------------------- Config Builder ----------------------------------------- - - /** - * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent - * fashion. - * - * @return A config builder for setting parameters. - */ - public static ConfigBuilder configureDelimitedFormat(FileDataSourceBase<?> target) { - return new ConfigBuilder(target.getParameters()); - } - - /** - * Abstract builder used to set parameters to the input format's configuration in a fluent way. - */ - protected static class AbstractConfigBuilder<T> extends FileInputFormat.AbstractConfigBuilder<T> { - - private static final String NEWLINE_DELIMITER = "\n"; - - // -------------------------------------------------------------------- - - /** - * Creates a new builder for the given configuration. - * - * @param config The configuration into which the parameters will be written. - */ - protected AbstractConfigBuilder(Configuration config) { - super(config); - } - - // -------------------------------------------------------------------- - - /** - * Sets the delimiter to be a single character, namely the given one. The character must be within - * the value range <code>0</code> to <code>127</code>. - * - * @param delimiter The delimiter character. - * @return The builder itself. - */ - public T recordDelimiter(char delimiter) { - if (delimiter == '\n') { - this.config.setString(RECORD_DELIMITER, NEWLINE_DELIMITER); - } else { - this.config.setString(RECORD_DELIMITER, String.valueOf(delimiter)); - } - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the delimiter to be the given string. The string will be converted to bytes for more efficient - * comparison during input parsing. The conversion will be done using the platforms default charset. - * - * @param delimiter The delimiter string. - * @return The builder itself. - */ - public T recordDelimiter(String delimiter) { - this.config.setString(RECORD_DELIMITER, delimiter); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the number of line samples to take in order to estimate the base statistics for the - * input format. - * - * @param numSamples The number of line samples to take. - * @return The builder itself. - */ - public T numSamplesForStatistics(int numSamples) { - this.config.setInteger(NUM_STATISTICS_SAMPLES, numSamples); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - } - - /** - * A builder used to set parameters to the input format's configuration in a fluent way. - */ - public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> { - - /** - * Creates a new builder for the given configuration. - * - * @param targetConfig The configuration into which the parameters will be written. - */ - protected ConfigBuilder(Configuration targetConfig) { - super(targetConfig); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bac9214/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 4dee9c7..37e8749 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -34,7 +34,6 @@ import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.api.common.operators.GenericDataSourceBase; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -922,69 +921,4 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS * The config parameter which defines whether input directories are recursively traversed. */ public static final String ENUMERATE_NESTED_FILES_FLAG = "recursive.file.enumeration"; - - - // ----------------------------------- Config Builder ----------------------------------------- - - /** - * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent - * fashion. - * - * @return A config builder for setting parameters. - */ - public static ConfigBuilder configureFileFormat(GenericDataSourceBase<?, ?> target) { - return new ConfigBuilder(target.getParameters()); - } - - /** - * Abstract builder used to set parameters to the input format's configuration in a fluent way. - */ - protected static abstract class AbstractConfigBuilder<T> { - /** - * The configuration into which the parameters will be written. - */ - protected final Configuration config; - - // -------------------------------------------------------------------- - - /** - * Creates a new builder for the given configuration. - * - * @param targetConfig The configuration into which the parameters will be written. - */ - protected AbstractConfigBuilder(Configuration targetConfig) { - this.config = targetConfig; - } - - // -------------------------------------------------------------------- - - /** - * Sets the path to the file or directory to be read by this file input format. - * - * @param filePath The path to the file or directory. - * @return The builder itself. - */ - public T filePath(String filePath) { - this.config.setString(FILE_PARAMETER_KEY, filePath); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - } - - /** - * A builder used to set parameters to the input format's configuration in a fluent way. - */ - public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> { - - /** - * Creates a new builder for the given configuration. - * - * @param targetConfig The configuration into which the parameters will be written. - */ - protected ConfigBuilder(Configuration targetConfig) { - super(targetConfig); - } - - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bac9214/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java index 219877d..a5515c8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java @@ -23,7 +23,6 @@ import java.io.IOException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.flink.api.common.operators.base.FileDataSinkBase; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; @@ -312,53 +311,4 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen } } } - - // ============================================================================================ - - /** - * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent - * fashion. - * - * @return A config builder for setting parameters. - */ - public static ConfigBuilder configureFileFormat(FileDataSinkBase<?> target) { - return new ConfigBuilder(target.getParameters()); - } - - /** - * A builder used to set parameters to the output format's configuration in a fluent way. - */ - public static abstract class AbstractConfigBuilder<T> { - - /** - * The configuration into which the parameters will be written. - */ - protected final Configuration config; - - // -------------------------------------------------------------------- - - /** - * Creates a new builder for the given configuration. - * - * @param targetConfig The configuration into which the parameters will be written. - */ - protected AbstractConfigBuilder(Configuration targetConfig) { - this.config = targetConfig; - } - } - - /** - * A builder used to set parameters to the input format's configuration in a fluent way. - */ - public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> { - - /** - * Creates a new builder for the given configuration. - * - * @param targetConfig The configuration into which the parameters will be written. - */ - protected ConfigBuilder(Configuration targetConfig) { - super(targetConfig); - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6bac9214/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSourceBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSourceBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSourceBase.java index 32ad380..907819f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSourceBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/FileDataSourceBase.java @@ -62,34 +62,6 @@ public class FileDataSourceBase<OUT> extends GenericDataSourceBase<OUT, FileInpu this(f, operatorInfo, Preconditions.checkNotNull(filePath, "The file path may not be null."), "File " + filePath); } - /** - * Creates a new instance for the given file using the given file input format. - * - * @param f The {@link org.apache.flink.api.common.io.FileInputFormat} implementation used to read the data. - * @param operatorInfo The type information for the output type. - * @param filePath The file location. The file path must be a fully qualified URI, including the address schema. - * @param name The given name for the Pact, used in plans, logs and progress messages. - */ - public FileDataSourceBase(Class<? extends FileInputFormat<OUT>> f, OperatorInformation<OUT> operatorInfo, String filePath, String name) { - super(f, operatorInfo, name); - - Preconditions.checkNotNull(filePath, "The file path may not be null."); - - this.filePath = filePath; - FileInputFormat.configureFileFormat(this).filePath(filePath); - } - - /** - * Creates a new instance for the given file using the given input format. The contract has the default name. - * - * @param f The {@link org.apache.flink.api.common.io.FileInputFormat} implementation used to read the data. - * @param operatorInfo The type information for the output type. - * @param filePath The file location. The file path must be a fully qualified URI, including the address schema. - */ - public FileDataSourceBase(Class<? extends FileInputFormat<OUT>> f, OperatorInformation<OUT> operatorInfo, String filePath) { - this(f, operatorInfo, Preconditions.checkNotNull(filePath, "The file path may not be null."), "File " + filePath); - } - // -------------------------------------------------------------------------------------------- /**