[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17334114#comment-17334114 ] Flink Jira Bot commented on FLINK-10134: This issue was marked "stale-assigned" and has not received an update in 7 days. It is now automatically unassigned. If you are still working on it, you can assign it to yourself again. Please also give an update about the status of the work. > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Assignee: Forward Xu >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.13.0 > > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17323488#comment-17323488 ] Flink Jira Bot commented on FLINK-10134: This issue is assigned but has not received an update in 7 days so it has been labeled "stale-assigned". If you are still working on the issue, please give an update and remove the label. If you are no longer working on the issue, please unassign so someone else may work on it. In 7 days the issue will be automatically unassigned. > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Assignee: Forward Xu >Priority: Major > Labels: pull-request-available, stale-assigned > Fix For: 1.13.0 > > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228782#comment-17228782 ] Stephan Ewen commented on FLINK-10134: -- The new file source should handle this correctly, using the [TextLineFormat|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/reader/TextLineFormat.java] That one simply uses Java's charset decoders, which handle UTF-16 correctly. > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: API / DataSet >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Assignee: Forward Xu >Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706598#comment-16706598 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars closed pull request #7157: [FLINK-10134] UTF-16 support for TextInputFormat bug URL: https://github.com/apache/flink/pull/7157 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index c1ef344175b..e13560d4823 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -28,6 +28,7 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.Path; import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.LRUCache; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,6 +37,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Map; /** * Base implementation for input formats that split the input at a delimiter into records. @@ -62,6 +64,23 @@ // Charset is not serializable private transient Charset charset; + /** +* The charset of bom in the file to process. +*/ + private transient Charset bomIdentifiedCharset; + /** +* This is the charset that is configured via setCharset(). +*/ + private transient Charset configuredCharset; + /** +* The Map to record the BOM encoding of all files. +*/ + private transient final Map fileBomCharsetMap; + /** +* The bytes to BOM check. +*/ + byte[] bomBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + /** * The default read buffer size = 1MB. */ @@ -184,6 +203,7 @@ protected DelimitedInputFormat(Path filePath, Configuration configuration) { configuration = GlobalConfiguration.loadConfiguration(); } loadConfigParameters(configuration); + this.fileBomCharsetMap = new LRUCache<>(1024); } /** @@ -195,12 +215,25 @@ protected DelimitedInputFormat(Path filePath, Configuration configuration) { */ @PublicEvolving public Charset getCharset() { - if (this.charset == null) { + if (this.configuredCharset != null) { + this.charset = this.configuredCharset; + } else if (this.bomIdentifiedCharset != null) { + this.charset = this.bomIdentifiedCharset; + } else { this.charset = Charset.forName(charsetName); } return this.charset; } + /** +* get the charsetName. +* +* @return the charsetName +*/ + public String getCharsetName() { + return charsetName; + } + /** * Set the name of the character set used for the row delimiter. This is * also used by subclasses to interpret field delimiters, comment strings, @@ -214,7 +247,7 @@ public Charset getCharset() { @PublicEvolving public void setCharset(String charset) { this.charsetName = Preconditions.checkNotNull(charset); - this.charset = null; + this.configuredCharset = getSpecialCharset(charset); if (this.delimiterString != null) { this.delimiter = delimiterString.getBytes(getCharset()); @@ -472,6 +505,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); this.stream.seek(offset); readLine(); // if the first partial record already pushes the stream over @@ -481,6 +515,7 @@ public void open(FileInputSplit split) throws IOException { } } else { fillBuffer(0); + setBomFileCharset(split); } } @@ -536,6 +571,71 @@ public void close() throws IOException { super.close(); } + /** +* Special default processing for utf-16 and utf-32 is performed. +* +* @param charsetName
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16699989#comment-16699989 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #7157: [FLINK-10134] UTF-16 support for TextInputFormat bug URL: https://github.com/apache/flink/pull/7157#issuecomment-441947433 hi @fhueske Thank you very much for your help. If you can spare some time, I would like to continue asking you to review this PR for me. Best, qianjin This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16695482#comment-16695482 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars opened a new pull request #7157: [FLINK-10134] UTF-16 support for TextInputFormat bug URL: https://github.com/apache/flink/pull/7157 ## What is the purpose of the change *(This PR is designed to solve the problem of `TextInputFormat` using UTF-16 encoding to parse files.)* ## Brief change log *(To solve this bug, I added a file BOM header encoding check to determine the current file encoding, so that when user-defined encoding format and file with a BOM header encoding format conflict processing, specific changes in the following::)* - I add `getBomFileCharset` function to `DelimitedInputFormat.java` to detect the current file BOM header coding judgment, mainly `UTF-16BE`, `UTF-16LE`, `UTF-8 with BOM header`, `UTF-32BE`, `UTF-32LE` these types, default to `UTF-8`. - I added the `bomBytes`,`fileBomCharsetMap`,`bomIdentifiedCharset` ,`configuredCharset` variable to the `DelimitedInputFormat.java`, `getBomFileCharset(split)` to the `open` method, and`setBomFileCharset` to set the `bomIdentifiedCharset`,`fileBomCharsetMap` variable.*The file name that has been parsed is used as the key, and the encoded value is inserted as a value into the `fileBomCharsetMap`. - In the `DelimitedInputFormat.java` method `getCharset()`, the encoding logic is added to obtain the encoding of the current file. I would handle the different charsets with three private fields. 1.configuredCharset: This is the charset that is configured via setCharset() 2.bomIdentifiedCharset: This is the charset that is set by setBomFileCharset() 3.charset: This is the charset that is returned by getCharset(). If not set before (i.e., null), it is set first depending on the configuredCharset and bomIdentifiedCharset. - In the `DelimitedInputFormat.java` method`GetSpecialCharset` handles both special cases of user input, utf-16 utf-32. ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *I added `testFileCharset` and `testAllFileCharset` and `createUTFEncodedFile` and `testFileCharsetReadByMultiSplits` to `TextInputFormatTest`.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) ## Discuss The following are the modifications and discussions that have been made by this bug. Thank you for the review by Fabian Hueske [jira link](https://issues.apache.org/jira/browse/FLINK-10134?focusedCommentId=16652877=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16652877) [pr6823](https://github.com/apache/flink/pull/6823) [pr6710](https://github.com/apache/flink/pull/6710) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16694539#comment-16694539 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars closed pull request #7092: [FLINK-10134] UTF-16 support for TextInputFormat bug URL: https://github.com/apache/flink/pull/7092 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index c1ef344175b..af99f0c9d54 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -28,6 +28,7 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.Path; import org.apache.flink.types.parser.FieldParser; +import org.apache.flink.util.LRUCache; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,21 +37,22 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; +import java.util.Map; /** * Base implementation for input formats that split the input at a delimiter into records. * The parsing of the record bytes into the record has to be implemented in the * {@link #readRecord(Object, byte[], int, int)} method. - * + * * The default delimiter is the newline character {@code '\n'}. */ @Public public abstract class DelimitedInputFormat extends FileInputFormat implements CheckpointableInputFormat { - + private static final long serialVersionUID = 1L; // -- Constants --- - + /** * The log. */ @@ -62,26 +64,46 @@ // Charset is not serializable private transient Charset charset; + /** +* The charset of bom in the file to process. +*/ + private transient Charset bomIdentifiedCharset; + + /** +* This is the charset that is configured via setCharset(). +*/ + private transient Charset configuredCharset; + + /** +* The Map to record the BOM encoding of all files. +*/ + private transient final Map fileBomCharsetMap; + + /** +* The bytes to BOM check. +*/ + byte[] bomBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + /** * The default read buffer size = 1MB. */ private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024; - + /** * Indication that the number of samples has not been set by the configuration. */ private static final int NUM_SAMPLES_UNDEFINED = -1; - + /** * The maximum number of line samples to be taken. */ private static int DEFAULT_MAX_NUM_SAMPLES; - + /** * The minimum number of line samples to be taken. */ private static int DEFAULT_MIN_NUM_SAMPLES; - + /** * The maximum size of a sample record before sampling is aborted. To catch cases where a wrong delimiter is given. */ @@ -98,7 +120,7 @@ protected static void loadGlobalConfigParams() { protected static void loadConfigParameters(Configuration parameters) { int maxSamples = parameters.getInteger(OptimizerOptions.DELIMITED_FORMAT_MAX_LINE_SAMPLES); int minSamples = parameters.getInteger(OptimizerOptions.DELIMITED_FORMAT_MIN_LINE_SAMPLES); - + if (maxSamples < 0) { LOG.error("Invalid default maximum number of line samples: " + maxSamples + ". Using default value of " + OptimizerOptions.DELIMITED_FORMAT_MAX_LINE_SAMPLES.key()); @@ -109,17 +131,17 @@ protected static void loadConfigParameters(Configuration parameters) { OptimizerOptions.DELIMITED_FORMAT_MIN_LINE_SAMPLES.key()); minSamples = OptimizerOptions.DELIMITED_FORMAT_MIN_LINE_SAMPLES.defaultValue(); } - + DEFAULT_MAX_NUM_SAMPLES = maxSamples; - + if (minSamples > maxSamples) { LOG.error("Default minimum number of line samples cannot be greater the default maximum number " + - "of line samples: min=" + minSamples + ", max=" + maxSamples + ". Defaulting minimum to
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686178#comment-16686178 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars opened a new pull request #7092: [FLINK-10134] UTF-16 support for TextInputFormat bug URL: https://github.com/apache/flink/pull/7092 ## What is the purpose of the change *(This PR is designed to solve the problem of `TextInputFormat` using UTF-16 encoding to parse files.)* ## Brief change log *(To solve this bug, I added a file BOM header encoding check to determine the current file encoding, so that when user-defined encoding format and file with a BOM header encoding format conflict processing, specific changes in the following::)* - I add `getBomFileCharset` function to `DelimitedInputFormat.java` to detect the current file BOM header coding judgment, mainly `UTF-16BE`, `UTF-16LE`, `UTF-8 with BOM header`, `UTF-32BE`, `UTF-32LE` these types, default to `UTF-8`. - I added the `bomBytes`,`fileBomCharsetMap`,`bomIdentifiedCharset` ,`configuredCharset` variable to the `DelimitedInputFormat.java`, `getBomFileCharset(split)` to the `open` method, and`setBomFileCharset` to set the `bomIdentifiedCharset`,`fileBomCharsetMap` variable.*The file name that has been parsed is used as the key, and the encoded value is inserted as a value into the `fileBomCharsetMap`. - In the `DelimitedInputFormat.java` method `getCharset()`, the encoding logic is added to obtain the encoding of the current file. I would handle the different charsets with three private fields. 1.configuredCharset: This is the charset that is configured via setCharset() 2.bomIdentifiedCharset: This is the charset that is set by setBomFileCharset() 3.charset: This is the charset that is returned by getCharset(). If not set before (i.e., null), it is set first depending on the configuredCharset and bomIdentifiedCharset. - In the `DelimitedInputFormat.java` method`GetSpecialCharset` handles both special cases of user input, utf-16 utf-32. ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *I added `testFileCharset` and `testAllFileCharset` and `createUTFEncodedFile` and `testFileCharsetReadByMultiSplits` to `TextInputFormatTest`.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) ## Discuss The following are the modifications and discussions that have been made by this bug. Thank you for the review by Fabian Hueske [jira link](https://issues.apache.org/jira/browse/FLINK-10134?focusedCommentId=16652877=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16652877) [pr6823](https://github.com/apache/flink/pull/6823) [pr6710](https://github.com/apache/flink/pull/6710) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16686116#comment-16686116 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars closed pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16662108#comment-16662108 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r227739035 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: @fhueske Adding `FileInputFormat.readFileHeader()` to `FileInputFormat` still needs to get the 4 bytes of the bom header through the stream. I think it's okay to open the `stream` in `DelimitedInputFormat` and then process it. Also for the Stream of `InputStreamFSInputWrapper`'s I need to open and read 4 bytes and then close the stream. `But fillBuffer(0)` will also do the open and close operations of the stream. This is my question. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656580#comment-16656580 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r226598262 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: `DelimitedInputFormat` is filled with a `readBuffer` variable when calling `fillBuffer(0)`. We can copy 4 bytes from readBuffer. It should not have a big impact. As follows: `System.arraycopy(this.readBuffer, 0, bomBuffer, 0, 4);` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656456#comment-16656456 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r226574036 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: I am suggesting that we only do a BOM check if the user did not configure a charset or if the user explicitly configured an UTF charset. In case the user explicitly configured a different charset, we do not check the BOM because it could also be valid data. `InputStreamFSInputWrapper`'s limitation in seeking makes it a bit tricky to read the first bytes of a file, since the stream is opened and seeked in `FileInputFormat.open()`. Maybe we can implement a protected method `FileInputFormat.readFileHeader()` that is called after the stream was opened. `DelimitedInputFormat` can then override the method to check for the BOM. By default, the method should not do anything. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656173#comment-16656173 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r226516362 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: I have two questions about this commit, as follows: For the first suggestion, I feel that users often cannot know the encoding of the file accurately. For example: file encoding `UTF-16LE`, with bom header, user-specified encoding `UTF-16BE` will report an error. And there is bom UTF with bom encoding I believe will be the majority. So I think it is necessary to do the bom code detection first, which is better for the user experience. For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. This method cannot currently seek to position 0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656161#comment-16656161 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r226516362 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: I have two questions about this commit, as follows: For the first suggestion, I feel that users often cannot know the encoding of the file accurately. For example: file encoding `UTF-16LE`, with bom header, user-specified encoding `UTF-16BE` will report an error. And there is bom UTF encoding I believe will be the majority. So I think it is necessary to do the bom code detection first, which is better for the user experience. For the fourth recommendation, the seek of `GenericCsvInputFormat` cannot be seek to position 0. It calls the `seek` method of `InputStreamFSInputWrapper`. This method cannot currently seek to position 0. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16654945#comment-16654945 ] xuqianjin commented on FLINK-10134: --- @[~till.rohrmann] I will take the time to submit the PR. > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Critical > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652877#comment-16652877 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#issuecomment-430469738 @fhueske Thank you very much again for your review, I will modify the code according to your comments,Then submit the reworked code to the mater branch and remove the commits that have been submitted by the release-1.6 branch. Every time your suggestion allows me to learn a lot of content. Thank you very much again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651871#comment-16651871 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225543780 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -504,16 +533,119 @@ private void initBuffers() { this.end = false; } + /** +* Set file bom encoding. +* +* @param split +*/ + private void setBomFileCharset(FileInputSplit split) { + try { + String filePath = split.getPath().toString(); + if (this.fileBomCharsetMap.containsKey(filePath)) { + this.bomCharset = this.fileBomCharsetMap.get(filePath); + } else { + byte[] checkBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + byte[] bomBuffer = new byte[4]; + + if (this.splitStart != 0) { + this.stream.seek(0); + this.stream.read(bomBuffer, 0, bomBuffer.length); + this.stream.seek(split.getStart()); + } else { + System.arraycopy(this.readBuffer, 0, bomBuffer, 0, 3); + } + + if ((bomBuffer[0] == checkBytes[0]) && (bomBuffer[1] == checkBytes[0]) && (bomBuffer[2] == checkBytes[1]) + && (bomBuffer[3] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-32BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1]) && (bomBuffer[2] == checkBytes[0]) + && (bomBuffer[3] == checkBytes[0])) { + this.bomCharset = Charset.forName("UTF-32LE"); + } else if ((bomBuffer[0] == checkBytes[3]) && (bomBuffer[1] == checkBytes[4]) && (bomBuffer[2] == checkBytes[5])) { + this.bomCharset = Charset.forName("UTF-8"); + } else if ((bomBuffer[0] == checkBytes[1]) && (bomBuffer[1] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-16BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1])) { + this.bomCharset = Charset.forName("UTF-16LE"); + } else { + this.bomCharset = Charset.forName(charsetName); + } + + this.fileBomCharsetMap.put(filePath, this.bomCharset); + } + } catch (Exception e) { + LOG.warn("Failed to get file bom encoding."); + this.bomCharset = Charset.forName(charsetName); + } + + setParasByCharset(this.bomCharset); + } + + /** +* Set stepSize, delimiterNewLinePos, delimiterCarrageReturnPos by charset. +* +* @param charset +*/ + private void setParasByCharset(Charset charset) { + int stepSize; + byte[] delimiterBytes = new byte[]{'\n'}; + switch (charset.toString()) { + case "UTF-8": + stepSize = 1; + break; + case "UTF-16": + stepSize = 2; + this.delimiterNewLinePos = 1; + delimiterBytes = new byte[]{(byte) 0x00, '\n'}; + break; + case "UTF-16LE": + stepSize = 2; + delimiterBytes = new byte[]{'\n', (byte) 0x00}; + this.delimiterNewLinePos = 0; + break; + case "UTF-16BE": + stepSize = 2; + this.delimiterNewLinePos = 1; + delimiterBytes = new byte[]{(byte) 0x00, '\n'}; + break; + case "UTF-32": Review comment: We should not have "UTF-32" but only "UTF-32BE"
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651887#comment-16651887 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225573745 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { + first = "Fir" + delimiterStr + "st li" + delimiterStr + "ne"; + testAllFileCharsetWithDelimiter(first, delimiterStr); + } + } + + private void testAllFileCharsetNoDelimiter(String first) { + testAllFileCharsetWithDelimiter(first, ""); + } + + private void testAllFileCharsetWithDelimiter(String first, String delimiter) { + try { + final byte[] noBom = new byte[]{}; + final byte[] utf8Bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + final byte[] utf16LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE}; + final byte[] utf16BEBom = new byte[]{(byte) 0xFE, (byte) 0xFF}; + final byte[] utf32LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00}; + final byte[] utf32BEBom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF}; + + // test UTF-8 have bom + testFileCharset(first, "UTF-8", "UTF-32", 1, utf8Bom, delimiter.getBytes("UTF-8")); + // test UTF-8 without bom + testFileCharset(first, "UTF-8", "UTF-8", 0, noBom, delimiter.getBytes("UTF-8")); + // test UTF-16LE without bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 0, noBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE without bom + testFileCharset(first, "UTF-16BE", "UTF-16BE", 0, noBom, delimiter.getBytes("UTF-16BE")); + // test UTF-16LE have bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 1, utf16LEBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE have bom + testFileCharset(first, "UTF-16BE", "UTF-16", 1, utf16BEBom, delimiter.getBytes("UTF-16BE")); + // test UTF-32LE without bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, noBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE without bom + testFileCharset(first, "UTF-32BE", "UTF-32BE", 0, noBom, delimiter.getBytes("UTF-32BE")); + // test UTF-32LE have bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, utf32LEBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE have bom + testFileCharset(first, "UTF-32BE", "UTF-32", 0, utf32BEBom, delimiter.getBytes("UTF-32BE")); + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); t.printStackTrace(System.err); fail("Test erroneous"); } } + /** +* Test different file encodings. +* +* @param data +* @param fileCharset File itself encoding +* @param targetCharset User specified code +* @param offsetReturn result offset +* @param bom Bom content +* @param delimiter +*/ + private void testFileCharset(String data,
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651891#comment-16651891 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225578428 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { + first = "Fir" + delimiterStr + "st li" + delimiterStr + "ne"; + testAllFileCharsetWithDelimiter(first, delimiterStr); + } + } + + private void testAllFileCharsetNoDelimiter(String first) { + testAllFileCharsetWithDelimiter(first, ""); + } + + private void testAllFileCharsetWithDelimiter(String first, String delimiter) { + try { + final byte[] noBom = new byte[]{}; + final byte[] utf8Bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + final byte[] utf16LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE}; + final byte[] utf16BEBom = new byte[]{(byte) 0xFE, (byte) 0xFF}; + final byte[] utf32LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00}; + final byte[] utf32BEBom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF}; + + // test UTF-8 have bom + testFileCharset(first, "UTF-8", "UTF-32", 1, utf8Bom, delimiter.getBytes("UTF-8")); + // test UTF-8 without bom + testFileCharset(first, "UTF-8", "UTF-8", 0, noBom, delimiter.getBytes("UTF-8")); + // test UTF-16LE without bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 0, noBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE without bom + testFileCharset(first, "UTF-16BE", "UTF-16BE", 0, noBom, delimiter.getBytes("UTF-16BE")); + // test UTF-16LE have bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 1, utf16LEBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE have bom + testFileCharset(first, "UTF-16BE", "UTF-16", 1, utf16BEBom, delimiter.getBytes("UTF-16BE")); + // test UTF-32LE without bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, noBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE without bom + testFileCharset(first, "UTF-32BE", "UTF-32BE", 0, noBom, delimiter.getBytes("UTF-32BE")); + // test UTF-32LE have bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, utf32LEBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE have bom + testFileCharset(first, "UTF-32BE", "UTF-32", 0, utf32BEBom, delimiter.getBytes("UTF-32BE")); + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); t.printStackTrace(System.err); fail("Test erroneous"); } } + /** +* Test different file encodings. +* +* @param data +* @param fileCharset File itself encoding +* @param targetCharset User specified code +* @param offsetReturn result offset +* @param bom Bom content +* @param delimiter +*/ + private void testFileCharset(String data,
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651888#comment-16651888 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225577966 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); Review comment: Why is this tested without a delimiter? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651880#comment-16651880 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225580979 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { + first = "Fir" + delimiterStr + "st li" + delimiterStr + "ne"; + testAllFileCharsetWithDelimiter(first, delimiterStr); + } + } + + private void testAllFileCharsetNoDelimiter(String first) { + testAllFileCharsetWithDelimiter(first, ""); + } + + private void testAllFileCharsetWithDelimiter(String first, String delimiter) { + try { + final byte[] noBom = new byte[]{}; + final byte[] utf8Bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + final byte[] utf16LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE}; + final byte[] utf16BEBom = new byte[]{(byte) 0xFE, (byte) 0xFF}; + final byte[] utf32LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00}; + final byte[] utf32BEBom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF}; + + // test UTF-8 have bom + testFileCharset(first, "UTF-8", "UTF-32", 1, utf8Bom, delimiter.getBytes("UTF-8")); + // test UTF-8 without bom + testFileCharset(first, "UTF-8", "UTF-8", 0, noBom, delimiter.getBytes("UTF-8")); + // test UTF-16LE without bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 0, noBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE without bom + testFileCharset(first, "UTF-16BE", "UTF-16BE", 0, noBom, delimiter.getBytes("UTF-16BE")); + // test UTF-16LE have bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 1, utf16LEBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE have bom + testFileCharset(first, "UTF-16BE", "UTF-16", 1, utf16BEBom, delimiter.getBytes("UTF-16BE")); + // test UTF-32LE without bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, noBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE without bom + testFileCharset(first, "UTF-32BE", "UTF-32BE", 0, noBom, delimiter.getBytes("UTF-32BE")); + // test UTF-32LE have bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, utf32LEBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE have bom + testFileCharset(first, "UTF-32BE", "UTF-32", 0, utf32BEBom, delimiter.getBytes("UTF-32BE")); + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); t.printStackTrace(System.err); fail("Test erroneous"); } } + /** +* Test different file encodings. +* +* @param data +* @param fileCharset File itself encoding +* @param targetCharset User specified code +* @param offsetReturn result offset +* @param bom Bom content +* @param delimiter +*/ + private void testFileCharset(String data,
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651884#comment-16651884 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225553773 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -62,26 +64,41 @@ // Charset is not serializable private transient Charset charset; + /** +* The charset of bom in the file to process. +*/ + private transient Charset bomCharset; + + /** +* The Map to record the BOM encoding of all files. +*/ + private transient final Map fileBomCharsetMap; Review comment: I would bound the size of the map to something like 1024 entries. Once the map exceeds the size, we should start removing entries. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651889#comment-16651889 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225553146 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -62,26 +64,41 @@ // Charset is not serializable private transient Charset charset; + /** +* The charset of bom in the file to process. +*/ + private transient Charset bomCharset; + + /** +* The Map to record the BOM encoding of all files. +*/ + private transient final Map fileBomCharsetMap; + + /** +* The stepSize to record different encoding formats. +*/ + protected transient int charsetStepSize = 1; Review comment: UTF-16 is variable length encoded. Hence, the number of bytes of a character depend on the character not only on the charset. I'd remove this variable and move it to `TextInputFormat` which uses it in a specific way. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651892#comment-16651892 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225581316 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { + first = "Fir" + delimiterStr + "st li" + delimiterStr + "ne"; + testAllFileCharsetWithDelimiter(first, delimiterStr); + } + } + + private void testAllFileCharsetNoDelimiter(String first) { + testAllFileCharsetWithDelimiter(first, ""); + } + + private void testAllFileCharsetWithDelimiter(String first, String delimiter) { + try { + final byte[] noBom = new byte[]{}; + final byte[] utf8Bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + final byte[] utf16LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE}; + final byte[] utf16BEBom = new byte[]{(byte) 0xFE, (byte) 0xFF}; + final byte[] utf32LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00}; + final byte[] utf32BEBom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF}; + + // test UTF-8 have bom + testFileCharset(first, "UTF-8", "UTF-32", 1, utf8Bom, delimiter.getBytes("UTF-8")); + // test UTF-8 without bom + testFileCharset(first, "UTF-8", "UTF-8", 0, noBom, delimiter.getBytes("UTF-8")); + // test UTF-16LE without bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 0, noBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE without bom + testFileCharset(first, "UTF-16BE", "UTF-16BE", 0, noBom, delimiter.getBytes("UTF-16BE")); + // test UTF-16LE have bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 1, utf16LEBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE have bom + testFileCharset(first, "UTF-16BE", "UTF-16", 1, utf16BEBom, delimiter.getBytes("UTF-16BE")); + // test UTF-32LE without bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, noBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE without bom + testFileCharset(first, "UTF-32BE", "UTF-32BE", 0, noBom, delimiter.getBytes("UTF-32BE")); + // test UTF-32LE have bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, utf32LEBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE have bom + testFileCharset(first, "UTF-32BE", "UTF-32", 0, utf32BEBom, delimiter.getBytes("UTF-32BE")); + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); t.printStackTrace(System.err); fail("Test erroneous"); } } + /** +* Test different file encodings. +* +* @param data +* @param fileCharset File itself encoding +* @param targetCharset User specified code +* @param offsetReturn result offset +* @param bom Bom content +* @param delimiter +*/ + private void testFileCharset(String data,
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651885#comment-16651885 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225534279 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -504,16 +533,119 @@ private void initBuffers() { this.end = false; } + /** +* Set file bom encoding. +* +* @param split +*/ + private void setBomFileCharset(FileInputSplit split) { + try { + String filePath = split.getPath().toString(); + if (this.fileBomCharsetMap.containsKey(filePath)) { + this.bomCharset = this.fileBomCharsetMap.get(filePath); + } else { + byte[] checkBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + byte[] bomBuffer = new byte[4]; + + if (this.splitStart != 0) { + this.stream.seek(0); + this.stream.read(bomBuffer, 0, bomBuffer.length); + this.stream.seek(split.getStart()); + } else { + System.arraycopy(this.readBuffer, 0, bomBuffer, 0, 3); + } + + if ((bomBuffer[0] == checkBytes[0]) && (bomBuffer[1] == checkBytes[0]) && (bomBuffer[2] == checkBytes[1]) + && (bomBuffer[3] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-32BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1]) && (bomBuffer[2] == checkBytes[0]) + && (bomBuffer[3] == checkBytes[0])) { + this.bomCharset = Charset.forName("UTF-32LE"); + } else if ((bomBuffer[0] == checkBytes[3]) && (bomBuffer[1] == checkBytes[4]) && (bomBuffer[2] == checkBytes[5])) { + this.bomCharset = Charset.forName("UTF-8"); + } else if ((bomBuffer[0] == checkBytes[1]) && (bomBuffer[1] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-16BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1])) { + this.bomCharset = Charset.forName("UTF-16LE"); + } else { + this.bomCharset = Charset.forName(charsetName); + } + + this.fileBomCharsetMap.put(filePath, this.bomCharset); Review comment: Only set a `bomCharset` if we identified it correctly. Otherwise, set it to `null` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651875#comment-16651875 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225577009 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { + first = "Fir" + delimiterStr + "st li" + delimiterStr + "ne"; + testAllFileCharsetWithDelimiter(first, delimiterStr); + } + } + + private void testAllFileCharsetNoDelimiter(String first) { + testAllFileCharsetWithDelimiter(first, ""); + } + + private void testAllFileCharsetWithDelimiter(String first, String delimiter) { + try { + final byte[] noBom = new byte[]{}; + final byte[] utf8Bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + final byte[] utf16LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE}; + final byte[] utf16BEBom = new byte[]{(byte) 0xFE, (byte) 0xFF}; + final byte[] utf32LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00}; + final byte[] utf32BEBom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF}; + + // test UTF-8 have bom + testFileCharset(first, "UTF-8", "UTF-32", 1, utf8Bom, delimiter.getBytes("UTF-8")); Review comment: we should have the following tests (fileCharset, hasBom, specifiedCharset): * UTF-8, no, UTF-8 * UTF-8, yes, UTF-8 * UTF-16BE, no, UTF-16 * UTF-16BE, yes, UTF-16 * UTF-16LE, yes, UTF-16 * UTF-16LE, no, UTF-16LE * UTF-16BE, no, UTF-16BE * UTF-16BE, yes, UTF-16LE * UTF-16LE, yes, UTF-16BE * UTF-32BE, no, UTF-32 * UTF-32BE, yes, UTF-32 * UTF-32LE, yes, UTF-32 * UTF-32LE, no, UTF-32LE * UTF-32BE, no, UTF-32BE * UTF-32BE, yes, UTF-32LE * UTF-32LE, yes, UTF-32BE This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651876#comment-16651876 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225543679 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -504,16 +533,119 @@ private void initBuffers() { this.end = false; } + /** +* Set file bom encoding. +* +* @param split +*/ + private void setBomFileCharset(FileInputSplit split) { + try { + String filePath = split.getPath().toString(); + if (this.fileBomCharsetMap.containsKey(filePath)) { + this.bomCharset = this.fileBomCharsetMap.get(filePath); + } else { + byte[] checkBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + byte[] bomBuffer = new byte[4]; + + if (this.splitStart != 0) { + this.stream.seek(0); + this.stream.read(bomBuffer, 0, bomBuffer.length); + this.stream.seek(split.getStart()); + } else { + System.arraycopy(this.readBuffer, 0, bomBuffer, 0, 3); + } + + if ((bomBuffer[0] == checkBytes[0]) && (bomBuffer[1] == checkBytes[0]) && (bomBuffer[2] == checkBytes[1]) + && (bomBuffer[3] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-32BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1]) && (bomBuffer[2] == checkBytes[0]) + && (bomBuffer[3] == checkBytes[0])) { + this.bomCharset = Charset.forName("UTF-32LE"); + } else if ((bomBuffer[0] == checkBytes[3]) && (bomBuffer[1] == checkBytes[4]) && (bomBuffer[2] == checkBytes[5])) { + this.bomCharset = Charset.forName("UTF-8"); + } else if ((bomBuffer[0] == checkBytes[1]) && (bomBuffer[1] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-16BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1])) { + this.bomCharset = Charset.forName("UTF-16LE"); + } else { + this.bomCharset = Charset.forName(charsetName); + } + + this.fileBomCharsetMap.put(filePath, this.bomCharset); + } + } catch (Exception e) { + LOG.warn("Failed to get file bom encoding."); + this.bomCharset = Charset.forName(charsetName); + } + + setParasByCharset(this.bomCharset); + } + + /** +* Set stepSize, delimiterNewLinePos, delimiterCarrageReturnPos by charset. +* +* @param charset +*/ + private void setParasByCharset(Charset charset) { + int stepSize; + byte[] delimiterBytes = new byte[]{'\n'}; + switch (charset.toString()) { + case "UTF-8": + stepSize = 1; + break; + case "UTF-16": Review comment: We should not have "UTF-16" but only "UTF-16BE" This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651890#comment-16651890 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225579332 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { + first = "Fir" + delimiterStr + "st li" + delimiterStr + "ne"; + testAllFileCharsetWithDelimiter(first, delimiterStr); + } + } + + private void testAllFileCharsetNoDelimiter(String first) { + testAllFileCharsetWithDelimiter(first, ""); + } + + private void testAllFileCharsetWithDelimiter(String first, String delimiter) { + try { + final byte[] noBom = new byte[]{}; + final byte[] utf8Bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + final byte[] utf16LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE}; + final byte[] utf16BEBom = new byte[]{(byte) 0xFE, (byte) 0xFF}; + final byte[] utf32LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00}; + final byte[] utf32BEBom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF}; + + // test UTF-8 have bom + testFileCharset(first, "UTF-8", "UTF-32", 1, utf8Bom, delimiter.getBytes("UTF-8")); + // test UTF-8 without bom + testFileCharset(first, "UTF-8", "UTF-8", 0, noBom, delimiter.getBytes("UTF-8")); + // test UTF-16LE without bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 0, noBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE without bom + testFileCharset(first, "UTF-16BE", "UTF-16BE", 0, noBom, delimiter.getBytes("UTF-16BE")); + // test UTF-16LE have bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 1, utf16LEBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE have bom + testFileCharset(first, "UTF-16BE", "UTF-16", 1, utf16BEBom, delimiter.getBytes("UTF-16BE")); + // test UTF-32LE without bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, noBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE without bom + testFileCharset(first, "UTF-32BE", "UTF-32BE", 0, noBom, delimiter.getBytes("UTF-32BE")); + // test UTF-32LE have bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, utf32LEBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE have bom + testFileCharset(first, "UTF-32BE", "UTF-32", 0, utf32BEBom, delimiter.getBytes("UTF-32BE")); + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); t.printStackTrace(System.err); fail("Test erroneous"); } } + /** +* Test different file encodings. +* +* @param data +* @param fileCharset File itself encoding +* @param targetCharset User specified code +* @param offsetReturn result offset +* @param bom Bom content +* @param delimiter +*/ + private void testFileCharset(String data,
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651867#comment-16651867 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225533378 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -504,16 +533,119 @@ private void initBuffers() { this.end = false; } + /** +* Set file bom encoding. +* +* @param split +*/ + private void setBomFileCharset(FileInputSplit split) { + try { + String filePath = split.getPath().toString(); + if (this.fileBomCharsetMap.containsKey(filePath)) { + this.bomCharset = this.fileBomCharsetMap.get(filePath); + } else { + byte[] checkBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + byte[] bomBuffer = new byte[4]; + + if (this.splitStart != 0) { Review comment: I don't think it is necessary to optimize for special cases here. We can just reset the stream to pos 0 and read the first four bytes. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651869#comment-16651869 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225534152 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -504,16 +533,119 @@ private void initBuffers() { this.end = false; } + /** +* Set file bom encoding. +* +* @param split +*/ + private void setBomFileCharset(FileInputSplit split) { + try { + String filePath = split.getPath().toString(); + if (this.fileBomCharsetMap.containsKey(filePath)) { + this.bomCharset = this.fileBomCharsetMap.get(filePath); + } else { + byte[] checkBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + byte[] bomBuffer = new byte[4]; + + if (this.splitStart != 0) { + this.stream.seek(0); + this.stream.read(bomBuffer, 0, bomBuffer.length); + this.stream.seek(split.getStart()); + } else { + System.arraycopy(this.readBuffer, 0, bomBuffer, 0, 3); + } + + if ((bomBuffer[0] == checkBytes[0]) && (bomBuffer[1] == checkBytes[0]) && (bomBuffer[2] == checkBytes[1]) + && (bomBuffer[3] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-32BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1]) && (bomBuffer[2] == checkBytes[0]) + && (bomBuffer[3] == checkBytes[0])) { + this.bomCharset = Charset.forName("UTF-32LE"); + } else if ((bomBuffer[0] == checkBytes[3]) && (bomBuffer[1] == checkBytes[4]) && (bomBuffer[2] == checkBytes[5])) { + this.bomCharset = Charset.forName("UTF-8"); + } else if ((bomBuffer[0] == checkBytes[1]) && (bomBuffer[1] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-16BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1])) { + this.bomCharset = Charset.forName("UTF-16LE"); + } else { + this.bomCharset = Charset.forName(charsetName); Review comment: Only set a `bomCharset` if we identified it correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651886#comment-16651886 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225577541 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { Review comment: Testing for multiple similar delimiiters does not add much value. Add a multi-charater delimiter as well like `"|<>|"` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651868#comment-16651868 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225532403 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -504,16 +533,119 @@ private void initBuffers() { this.end = false; } + /** +* Set file bom encoding. +* +* @param split +*/ + private void setBomFileCharset(FileInputSplit split) { + try { + String filePath = split.getPath().toString(); + if (this.fileBomCharsetMap.containsKey(filePath)) { + this.bomCharset = this.fileBomCharsetMap.get(filePath); + } else { + byte[] checkBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; Review comment: This can be a static field of the class. Rename to `bomBytes` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651883#comment-16651883 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225574255 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { + first = "Fir" + delimiterStr + "st li" + delimiterStr + "ne"; + testAllFileCharsetWithDelimiter(first, delimiterStr); + } + } + + private void testAllFileCharsetNoDelimiter(String first) { + testAllFileCharsetWithDelimiter(first, ""); + } + + private void testAllFileCharsetWithDelimiter(String first, String delimiter) { + try { + final byte[] noBom = new byte[]{}; Review comment: Move the bom variables to `testFileCharset` and change its `bom` parameter to `boolean hasBom` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651873#comment-16651873 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225567131 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { + first = "Fir" + delimiterStr + "st li" + delimiterStr + "ne"; + testAllFileCharsetWithDelimiter(first, delimiterStr); + } + } + + private void testAllFileCharsetNoDelimiter(String first) { + testAllFileCharsetWithDelimiter(first, ""); + } + + private void testAllFileCharsetWithDelimiter(String first, String delimiter) { + try { + final byte[] noBom = new byte[]{}; + final byte[] utf8Bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + final byte[] utf16LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE}; + final byte[] utf16BEBom = new byte[]{(byte) 0xFE, (byte) 0xFF}; + final byte[] utf32LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00}; + final byte[] utf32BEBom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF}; + + // test UTF-8 have bom + testFileCharset(first, "UTF-8", "UTF-32", 1, utf8Bom, delimiter.getBytes("UTF-8")); + // test UTF-8 without bom + testFileCharset(first, "UTF-8", "UTF-8", 0, noBom, delimiter.getBytes("UTF-8")); + // test UTF-16LE without bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 0, noBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE without bom + testFileCharset(first, "UTF-16BE", "UTF-16BE", 0, noBom, delimiter.getBytes("UTF-16BE")); + // test UTF-16LE have bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 1, utf16LEBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE have bom + testFileCharset(first, "UTF-16BE", "UTF-16", 1, utf16BEBom, delimiter.getBytes("UTF-16BE")); + // test UTF-32LE without bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, noBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE without bom + testFileCharset(first, "UTF-32BE", "UTF-32BE", 0, noBom, delimiter.getBytes("UTF-32BE")); + // test UTF-32LE have bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, utf32LEBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE have bom + testFileCharset(first, "UTF-32BE", "UTF-32", 0, utf32BEBom, delimiter.getBytes("UTF-32BE")); + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); t.printStackTrace(System.err); fail("Test erroneous"); } } + /** +* Test different file encodings. +* +* @param data +* @param fileCharset File itself encoding +* @param targetCharset User specified code +* @param offsetReturn result offset +* @param bom Bom content +* @param delimiter +*/ + private void testFileCharset(String data,
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651881#comment-16651881 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225568282 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,12 +207,212 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); } - catch (Throwable t) { + } + + /** +* Test different file encodings,for example: UTF-8, UTF-8 with bom, UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE. +*/ + @Test + public void testFileCharset() { + String first = "First line"; + + // Test special different languages + for (final String data : new String[]{"Hello", "ハロー", "привет", "Bonjour", "Сайн байна уу", "안녕하세요."}) { + testAllFileCharsetNoDelimiter(data); + } + + // Test special symbol + for (final String delimiterStr : new String[]{"\\", "^", "|", "[", ".", "*"}) { + first = "Fir" + delimiterStr + "st li" + delimiterStr + "ne"; + testAllFileCharsetWithDelimiter(first, delimiterStr); + } + } + + private void testAllFileCharsetNoDelimiter(String first) { + testAllFileCharsetWithDelimiter(first, ""); + } + + private void testAllFileCharsetWithDelimiter(String first, String delimiter) { + try { + final byte[] noBom = new byte[]{}; + final byte[] utf8Bom = new byte[]{(byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + final byte[] utf16LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE}; + final byte[] utf16BEBom = new byte[]{(byte) 0xFE, (byte) 0xFF}; + final byte[] utf32LEBom = new byte[]{(byte) 0xFF, (byte) 0xFE, (byte) 0x00, (byte) 0x00}; + final byte[] utf32BEBom = new byte[]{(byte) 0x00, (byte) 0x00, (byte) 0xFE, (byte) 0xFF}; + + // test UTF-8 have bom + testFileCharset(first, "UTF-8", "UTF-32", 1, utf8Bom, delimiter.getBytes("UTF-8")); + // test UTF-8 without bom + testFileCharset(first, "UTF-8", "UTF-8", 0, noBom, delimiter.getBytes("UTF-8")); + // test UTF-16LE without bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 0, noBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE without bom + testFileCharset(first, "UTF-16BE", "UTF-16BE", 0, noBom, delimiter.getBytes("UTF-16BE")); + // test UTF-16LE have bom + testFileCharset(first, "UTF-16LE", "UTF-16LE", 1, utf16LEBom, delimiter.getBytes("UTF-16LE")); + // test UTF-16BE have bom + testFileCharset(first, "UTF-16BE", "UTF-16", 1, utf16BEBom, delimiter.getBytes("UTF-16BE")); + // test UTF-32LE without bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, noBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE without bom + testFileCharset(first, "UTF-32BE", "UTF-32BE", 0, noBom, delimiter.getBytes("UTF-32BE")); + // test UTF-32LE have bom + testFileCharset(first, "UTF-32LE", "UTF-32LE", 0, utf32LEBom, delimiter.getBytes("UTF-32LE")); + // test UTF-32BE have bom + testFileCharset(first, "UTF-32BE", "UTF-32", 0, utf32BEBom, delimiter.getBytes("UTF-32BE")); + } catch (Throwable t) { System.err.println("test failed with exception: " + t.getMessage()); t.printStackTrace(System.err); fail("Test erroneous"); } } + /** +* Test different file encodings. +* +* @param data +* @param fileCharset File itself encoding +* @param targetCharset User specified code +* @param offsetReturn result offset +* @param bom Bom content +* @param delimiter +*/ + private void testFileCharset(String data,
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651879#comment-16651879 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225539582 ## File path: flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java ## @@ -68,6 +68,7 @@ public void setCharsetName(String charsetName) { } Review comment: I would remove the `charsetName` field from `TextInputFormat` and add a `getCharsetName()` method to `DelimitedInputFormat`. Remove all logic from `setCharsetName()` except passing the `charsetName` on to `DelimitedInputFormat.setCharset()`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651882#comment-16651882 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225548099 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -161,15 +178,17 @@ protected static void loadConfigParameters(Configuration parameters) { // The delimiter may be set with a byte-sequence or a String. In the latter // case the byte representation is updated consistent with current charset. private byte[] delimiter = new byte[] {'\n'}; + protected int delimiterNewLinePos = 0; Review comment: These variables are only used in `TextInputFormat`. I would initialize and move them there. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651874#comment-16651874 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225479771 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -215,6 +240,7 @@ public Charset getCharset() { public void setCharset(String charset) { this.charsetName = Preconditions.checkNotNull(charset); this.charset = null; Review comment: I would handle the different charsets with three `private` fields. 1. `configuredCharset`: This is the charset that is configured via `setCharset()` 2. `bomIdentifiedCharset`: This is the charset that is set by `setBomFileCharset()` 3. `charset`: This is the charset that is returned by `getCharset()`. If not set before (i.e., `null`), it is set first depending on the `configuredCharset` and `bomIdentifiedCharset`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651877#comment-16651877 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225547062 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -215,6 +240,7 @@ public Charset getCharset() { public void setCharset(String charset) { this.charsetName = Preconditions.checkNotNull(charset); this.charset = null; + this.bomCharset = null; if (this.delimiterString != null) { this.delimiter = delimiterString.getBytes(getCharset()); Review comment: We can remove this here. We can only configure the delimiter once we know the definitive charset. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651872#comment-16651872 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225547565 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -504,16 +533,119 @@ private void initBuffers() { this.end = false; } + /** +* Set file bom encoding. +* +* @param split +*/ + private void setBomFileCharset(FileInputSplit split) { + try { + String filePath = split.getPath().toString(); + if (this.fileBomCharsetMap.containsKey(filePath)) { + this.bomCharset = this.fileBomCharsetMap.get(filePath); + } else { + byte[] checkBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + byte[] bomBuffer = new byte[4]; + + if (this.splitStart != 0) { + this.stream.seek(0); + this.stream.read(bomBuffer, 0, bomBuffer.length); + this.stream.seek(split.getStart()); + } else { + System.arraycopy(this.readBuffer, 0, bomBuffer, 0, 3); + } + + if ((bomBuffer[0] == checkBytes[0]) && (bomBuffer[1] == checkBytes[0]) && (bomBuffer[2] == checkBytes[1]) + && (bomBuffer[3] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-32BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1]) && (bomBuffer[2] == checkBytes[0]) + && (bomBuffer[3] == checkBytes[0])) { + this.bomCharset = Charset.forName("UTF-32LE"); + } else if ((bomBuffer[0] == checkBytes[3]) && (bomBuffer[1] == checkBytes[4]) && (bomBuffer[2] == checkBytes[5])) { + this.bomCharset = Charset.forName("UTF-8"); + } else if ((bomBuffer[0] == checkBytes[1]) && (bomBuffer[1] == checkBytes[2])) { + this.bomCharset = Charset.forName("UTF-16BE"); + } else if ((bomBuffer[0] == checkBytes[2]) && (bomBuffer[1] == checkBytes[1])) { + this.bomCharset = Charset.forName("UTF-16LE"); + } else { + this.bomCharset = Charset.forName(charsetName); + } + + this.fileBomCharsetMap.put(filePath, this.bomCharset); + } + } catch (Exception e) { + LOG.warn("Failed to get file bom encoding."); + this.bomCharset = Charset.forName(charsetName); + } + + setParasByCharset(this.bomCharset); + } + + /** +* Set stepSize, delimiterNewLinePos, delimiterCarrageReturnPos by charset. +* +* @param charset +*/ + private void setParasByCharset(Charset charset) { Review comment: This is very special logic for the default delimiter. We need to get the correct `delimiter` bytes also for custom delimiter strings. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651878#comment-16651878 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225549931 ## File path: flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java ## @@ -86,13 +87,13 @@ public void configure(Configuration parameters) { @Override public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException { //Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line - if (this.getDelimiter() != null && this.getDelimiter().length == 1 - && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1 - && bytes[offset + numBytes - 1] == CARRIAGE_RETURN){ - numBytes -= 1; + if (this.getDelimiter() != null && this.getDelimiter().length >= 1 Review comment: I think this should be refactored a bit. Move the logic into a separate (private static) method and try to simplify it. We can check outside of `readRecord()` if the delimiter is equal to NEW_LINE or not. The whole index arithmetic is also not very easy to understand. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651870#comment-16651870 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#discussion_r225531993 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -472,6 +498,7 @@ public void open(FileInputSplit split) throws IOException { this.offset = splitStart; if (this.splitStart != 0) { + setBomFileCharset(split); Review comment: We can move the BOM configuration out of the condition. The following logic should be applied: 1. Check if a UTF or no charset was configured (by default, we assum UTF-8). If a different charset is explicitly configured, skip the BOM check. 2. seek to position 0 3. try to fetch a BOM. 4. if `(splitStart != 0)` seek to the beginning of the split We configure the charset depending on the BOM. * If we find a BOM, we configure corresponding UTF charset. * If we don't find a BOM, UTF-16 and UTF-32 are converted into UTF-16BE / UTF-32BE (BE is the assumed default). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16648957#comment-16648957 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823#issuecomment-429544772 @fhueske Thank you very much, I hope you can help me review this PR again. Use your precious time.The code logic passed the test on travis-ci, and this time also improved the test case. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646221#comment-16646221 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r224388251 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: @fhueske I resubmitted a PR with the following link address: [https://github.com/apache/flink/pull/6823](https://github.com/apache/flink/pull/6823) Thank you very much, I hope you can help me review this PR again. Use your precious time. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646214#comment-16646214 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars opened a new pull request #6823: [FLINK-10134] UTF-16 support for TextInputFormat bug refixed URL: https://github.com/apache/flink/pull/6823 ## What is the purpose of the change *(This PR is designed to solve the problem of `TextInputFormat` using UTF-16 encoding to parse files.)* ## Brief change log *(To solve this bug, I added a file BOM header encoding check to determine the current file encoding, so that when user-defined encoding format and file with a BOM header encoding format conflict processing, specific changes in the following::)* - *I add `getBomFileCharset` function to `DelimitedInputFormat.java` to detect the current file BOM header coding judgment, mainly `UTF-16BE`, `UTF-16LE`, `UTF-8 with BOM header`, `UTF-32BE`, `UTF-32LE` these types, default to `UTF-8`.* - *I added the `bomCharset`,`fileBomCharsetMap`,`charsetStepSize` variable to the `DelimitedInputFormat.java`, `getBomFileCharset(split)` to the `open` method, and `getBomCharsetName` to get the `bomCharset`,`fileBomCharsetMap`,`charsetStepSize` variable.*The file name that has been parsed is used as the key, and the encoded value is inserted as a value into the `fileBomCharsetMap`. - In the `DelimitedInputFormat.java` method `getCharset()`, the encoding logic is added to obtain the encoding of the current file. If the `bomCharset` has a value, the charset of the current class is overwritten by the `bomCharset` encoding. If the `bomCharset` has no value, the `charset` of the current class is taken. - Modify the readRecord method logic in `TextInputFormat.java`, using `this.getCharset()`, `this.delimiterNewLinePos`, `this.delimiterCarrageReturnPos` to support the processing of the end separator for each line of multiple encoding formats. ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *I added `testFileCharset` and `testFileCharsetReadByMultiSplits` to `TextInputFormatTest`.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no ) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16646118#comment-16646118 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r224359511 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: @fhueske I have developed the code logic. Do I want to resubmit the PR or reopen the PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16644565#comment-16644565 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r223963818 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: I don't think we should do too much magic here. If the specified encoding does not match the actual encoding of a file, we could reject it (either logging the error and continuing or failing) since we cannot infer a different encoding with 100% certainty. We can make that more liberal with a flag later on, IMO. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16644563#comment-16644563 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r223963818 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: I don't think we should do too much magic here. If the specified encoding does not match the actual encoding of a file, we could reject it (either logging the error and continuing or failing) since we cannot infer a different encoding with 100% certainty. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16644406#comment-16644406 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r223929236 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: Whether to judge the encoding type of a file or byte stream (no BOM), there may be the following scenario: The file type is encoded as UTF-8, but the user-specified encoding type is UTF-16 or UTF-32, which still causes incorrect garbled parsing. On the other hand, it is a nuisance to determine what type of encoding a file or byte stream (no BOM) is. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16643069#comment-16643069 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r223633778 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: Yes, I think that makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16642687#comment-16642687 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r223541560 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: Well, I first take the logic of checking the BOM in FileInputFormat to DelimitedInputFormat. I want to use a Map to cache the BOM encoding of the file, using the filename as the key and the BOM encoding as the value. If the value exists in the Map, the corresponding value is read, and if the Map does not exist, the BOM encoding of the file is read. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641470#comment-16641470 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r223268822 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: Yes, I'm aware of that. It would also be required for every split unless we cache the BOM per file. OTOH, if we do it in the JM, the job cannot start until a single thread had a look at the first bytes of each file. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16641419#comment-16641419 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r223259155 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: > I'm not sure if we want to check the BOM during split generation. > > 1. This might become a bottleneck, since splits are generated by the JobManager. OTOH, there is currently an effort to parallelize split generation. > 2. FileInputFormat is currently not handling any charset issues. > > An alternative would be to check the BOM in `DelimitedInputFormat` when a split is opened. @fhueske Hi, fhueske, if you check the BOM in DelimitedInputFormat when opening the split, I think the following should be considered: 1. A file is split into different TaskManagers, then the BOM of the verification file is required on each TaskManager. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16636362#comment-16636362 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r222166893 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,8 +206,110 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); + } + } + + @Test + public void testUTF16Read() { + final String first = "First line"; + final String second = "Second line"; + + try { + // create input file + File tempFile = File.createTempFile("TextInputFormatTest", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + PrintStream ps = new PrintStream(tempFile, "UTF-16"); + ps.println(first); + ps.println(second); + ps.close(); + + TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString())); + inputFormat.setCharsetName("UTF-32"); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + +// inputFormat.setDelimiter("\r"); Review comment: @fhueske I am sorry to reply to you so late. Thank you very much for your inspection and suggestions. I will modify and adjust them one by one according to your suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635417#comment-16635417 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221933563 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,8 +206,110 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); + } + } + + @Test + public void testUTF16Read() { + final String first = "First line"; + final String second = "Second line"; + + try { + // create input file + File tempFile = File.createTempFile("TextInputFormatTest", "tmp"); + tempFile.deleteOnExit(); + tempFile.setWritable(true); + + PrintStream ps = new PrintStream(tempFile, "UTF-16"); + ps.println(first); + ps.println(second); + ps.close(); + + TextInputFormat inputFormat = new TextInputFormat(new Path(tempFile.toURI().toString())); + inputFormat.setCharsetName("UTF-32"); + + Configuration parameters = new Configuration(); + inputFormat.configure(parameters); + +// inputFormat.setDelimiter("\r"); Review comment: please add tests to check if the `'\'` case is correctly handled for the different encodings. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635422#comment-16635422 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221900610 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -862,35 +865,87 @@ public void close() throws IOException { stream = null; } } - + /** * Override this method to supports multiple paths. * When this method will be removed, all FileInputFormats have to support multiple paths. * * @return True if the FileInputFormat supports multiple paths, false otherwise. -* * @deprecated Will be removed for Flink 2.0. */ @Deprecated public boolean supportsMultiPaths() { return false; } + @Override public String toString() { return getFilePaths() == null || getFilePaths().length == 0 ? "File Input (unknown file)" : - "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + } + + /** +* Get file bom encoding +* +* @param fs +* @return +*/ + public String getBomCharset(FileStatus fs) { + FSDataInputStream inStream = null; + String charset, testFileSystem = "TestFileSystem"; + byte[] bom = new byte[4]; + byte[] bytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + try { Review comment: Hmm, `FileInputFormat` is charset agnostics, which makes sense. Maybe we should move this logic to `DelimitedInputFormat` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635424#comment-16635424 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221897008 ## File path: flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java ## @@ -39,6 +39,9 @@ /** The number of bytes in the file to process. */ private final long length; + /** The charset of bom in the file to process. */ + private String bomCharsetName; Review comment: make `final` and set to `null` in the other constructors This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635421#comment-16635421 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221891446 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -862,35 +865,87 @@ public void close() throws IOException { stream = null; } } - + /** * Override this method to supports multiple paths. * When this method will be removed, all FileInputFormats have to support multiple paths. * * @return True if the FileInputFormat supports multiple paths, false otherwise. -* * @deprecated Will be removed for Flink 2.0. */ @Deprecated public boolean supportsMultiPaths() { return false; } + @Override public String toString() { return getFilePaths() == null || getFilePaths().length == 0 ? "File Input (unknown file)" : - "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + } + + /** +* Get file bom encoding +* +* @param fs +* @return +*/ + public String getBomCharset(FileStatus fs) { + FSDataInputStream inStream = null; + String charset, testFileSystem = "TestFileSystem"; Review comment: I don't think we should add test related code here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635415#comment-16635415 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221901134 ## File path: flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java ## @@ -85,14 +90,26 @@ public void configure(Configuration parameters) { @Override public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException { + String utf8 = "UTF-8"; Review comment: This method must be as lightweight as possible. The step size can be determined when a split is opened. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635423#comment-16635423 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221890824 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -862,35 +865,87 @@ public void close() throws IOException { stream = null; } } - + /** * Override this method to supports multiple paths. * When this method will be removed, all FileInputFormats have to support multiple paths. * * @return True if the FileInputFormat supports multiple paths, false otherwise. -* * @deprecated Will be removed for Flink 2.0. */ @Deprecated public boolean supportsMultiPaths() { return false; } + @Override public String toString() { return getFilePaths() == null || getFilePaths().length == 0 ? "File Input (unknown file)" : - "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + } + + /** +* Get file bom encoding +* +* @param fs Review comment: please add descriptive JavaDocs This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635425#comment-16635425 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221932785 ## File path: flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java ## @@ -85,14 +90,26 @@ public void configure(Configuration parameters) { @Override public String readRecord(String reusable, byte[] bytes, int offset, int numBytes) throws IOException { + String utf8 = "UTF-8"; + String utf16 = "UTF-16"; + String utf32 = "UTF-32"; + int stepSize = 0; + String charsetName = this.getCharsetName(); + if (charsetName.contains(utf8)) { + stepSize = 1; + } else if (charsetName.contains(utf16)) { + stepSize = 2; + } else if (charsetName.contains(utf32)) { + stepSize = 4; + } //Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line if (this.getDelimiter() != null && this.getDelimiter().length == 1 - && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= 1 - && bytes[offset + numBytes - 1] == CARRIAGE_RETURN){ - numBytes -= 1; + && this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= stepSize Review comment: We only check the first byte of a character. Are these checks actually compatible with with all encodings (LE and BE)? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635420#comment-16635420 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221892225 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -862,35 +865,87 @@ public void close() throws IOException { stream = null; } } - + /** * Override this method to supports multiple paths. * When this method will be removed, all FileInputFormats have to support multiple paths. * * @return True if the FileInputFormat supports multiple paths, false otherwise. -* * @deprecated Will be removed for Flink 2.0. */ @Deprecated public boolean supportsMultiPaths() { return false; } + @Override public String toString() { return getFilePaths() == null || getFilePaths().length == 0 ? "File Input (unknown file)" : - "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + } + + /** +* Get file bom encoding +* +* @param fs +* @return +*/ + public String getBomCharset(FileStatus fs) { + FSDataInputStream inStream = null; + String charset, testFileSystem = "TestFileSystem"; + byte[] bom = new byte[4]; + byte[] bytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + try { + /* +* int read() Reads a data byte from this input stream. Int read(byte[] b) will be at most b.length from this input stream +* Bytes of data are read into a byte array. Int read(byte[] b, int off, int len) +* Reads up to len bytes of data from this input stream into a byte array. +*/ + FileSystem fileSystem = fs.getPath().getFileSystem(); + if (testFileSystem.equals(fileSystem.getClass().getSimpleName())) { + fileSystem = new LocalFileSystem(); + } + + inStream = fileSystem.open(fs.getPath()); + inStream.read(bom, 0, bom.length); + + if ((bom[0] == bytes[0]) && (bom[1] == bytes[0]) && (bom[2] == bytes[1]) && (bom[3] == bytes[2])) { + charset = "UTF-32BE"; + } else if ((bom[0] == bytes[2]) && (bom[1] == bytes[1]) && (bom[2] == bytes[0]) && (bom[3] == bytes[0])) { + charset = "UTF-32LE"; + } else if ((bom[0] == bytes[3]) && (bom[1] == bytes[4]) && (bom[2] == bytes[5])) { + charset = "UTF-8"; + } else if ((bom[0] == bytes[1]) && (bom[1] == bytes[2])) { + charset = "UTF-16BE"; + } else if ((bom[0] == bytes[2]) && (bom[1] == bytes[1])) { + charset = "UTF-16LE"; + } else { + charset = null; + } + } catch (Exception e) { + throw new IllegalArgumentException("Failed to get file bom encoding."); Review comment: We should not fail if there is an empty file in a directory. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635412#comment-16635412 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221933227 ## File path: flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java ## @@ -207,8 +206,110 @@ private void testRemovingTrailingCR(String lineBreaker, String delimiter) { assertEquals(content, result); } + } catch (Throwable t) { + System.err.println("test failed with exception: " + t.getMessage()); + t.printStackTrace(System.err); + fail("Test erroneous"); + } + } + + @Test + public void testUTF16Read() { Review comment: I think we need more tests, including tests that check correct behavior for LE and BE. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635418#comment-16635418 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221891681 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -862,35 +865,87 @@ public void close() throws IOException { stream = null; } } - + /** * Override this method to supports multiple paths. * When this method will be removed, all FileInputFormats have to support multiple paths. * * @return True if the FileInputFormat supports multiple paths, false otherwise. -* * @deprecated Will be removed for Flink 2.0. */ @Deprecated public boolean supportsMultiPaths() { return false; } + @Override public String toString() { return getFilePaths() == null || getFilePaths().length == 0 ? "File Input (unknown file)" : - "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + } + + /** +* Get file bom encoding +* +* @param fs +* @return +*/ + public String getBomCharset(FileStatus fs) { + FSDataInputStream inStream = null; + String charset, testFileSystem = "TestFileSystem"; + byte[] bom = new byte[4]; + byte[] bytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + try { + /* +* int read() Reads a data byte from this input stream. Int read(byte[] b) will be at most b.length from this input stream Review comment: please move comment to the right line of code or remove it This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635426#comment-16635426 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221896574 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -467,6 +477,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc */ @Override public void open(FileInputSplit split) throws IOException { + this.bomCharsetName = split.getBomCharsetName(); Review comment: We need to make sure that the `this.delimiter` is correctly defined, i.e., that the delimiter string is converted to the correct byte sequence (depending on the BOM if defined). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635416#comment-16635416 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221892538 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -862,35 +865,87 @@ public void close() throws IOException { stream = null; } } - + /** * Override this method to supports multiple paths. * When this method will be removed, all FileInputFormats have to support multiple paths. * * @return True if the FileInputFormat supports multiple paths, false otherwise. -* * @deprecated Will be removed for Flink 2.0. */ @Deprecated public boolean supportsMultiPaths() { return false; } + @Override public String toString() { return getFilePaths() == null || getFilePaths().length == 0 ? "File Input (unknown file)" : - "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + "File Input (" + Arrays.toString(this.getFilePaths()) + ')'; + } + + /** +* Get file bom encoding +* +* @param fs +* @return +*/ + public String getBomCharset(FileStatus fs) { + FSDataInputStream inStream = null; + String charset, testFileSystem = "TestFileSystem"; + byte[] bom = new byte[4]; + byte[] bytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, (byte) 0xEF, (byte) 0xBB, (byte) 0xBF}; + try { Review comment: I think we should only try to read a BOM if the user specified a UTF-16 or UTF-32 charset. Otherwise, we might interpret the bytes incorrectly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions.
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635413#comment-16635413 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221900961 ## File path: flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java ## @@ -59,7 +59,12 @@ public TextInputFormat(Path filePath) { // public String getCharsetName() { - return charsetName; + String bomCharsetName = getBomCharsetName(); Review comment: I think this logic should go into `DelimitedInputFormat` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635419#comment-16635419 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221898165 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java ## @@ -221,6 +224,11 @@ public void setCharset(String charset) { } } + @PublicEvolving + public String getBomCharsetName() { Review comment: I don't think we should add a new method here because we would need to update all IFs that extend `DelimitedInputFormat` to use the correct method. Can we change the `getCharset()` method to return the BOM corrected charset? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635414#comment-16635414 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221890642 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -601,41 +602,44 @@ public LocatableInputSplitAssigner getInputSplitAssigner(FileInputSplit[] splits if (unsplittable) { int splitNum = 0; for (final FileStatus file : files) { + String bomCharsetName = getBomCharset(file); Review comment: I'm not sure if we want to check the BOM during split generation. 1. This might become a bottleneck, since splits are generated by the JobManager. OTOH, there is currently an effort to parallelize split generation. 2. FileInputFormat is currently not handling any charset issues. An alternative would be to check the BOM in `DelimitedInputFormat` when a split is opened. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16635235#comment-16635235 ] ASF GitHub Bot commented on FLINK-10134: fhueske commented on a change in pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#discussion_r221889992 ## File path: flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java ## @@ -55,20 +55,20 @@ * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented. * Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to * change the life cycle behavior. - * Review comment: Can you move the whitespace and checkstyle changes into a separate commit? This would make it much easier to review the PR. Thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16631870#comment-16631870 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-425442916 @fhueske hi, Stephan mentioned you in the email reply to me, let me find you help, I don't know if you have time to discuss this jira, but I still look forward to your reply. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16628183#comment-16628183 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-424565714 @StephanEwen Hello, regarding the two questions you raised yesterday, I have some opinions about myself and I don’t know if it’s right. 1.Where should the BOM be read?I think it is still necessary to increase the logic for processing the bom when the file is started at the beginning of the file. Add an attribute to the read bom encoding logic to record the file bom encoding.For example: put it in the function `createInputSplits`. 2.Regarding the second performance problem, you can use the previously generated bom code to judge UTF8 with bom, UTF16 wuth bom, UTF32 with bom, and control the byte size to process the end of each line, because I found The previous bug garbled is actually a coding problem, one is caused by improper processing of the end byte of each line. I have done the following for this problem: `String utf8 = "UTF-8";` `String utf16 = "UTF-16";` `String utf32 = "UTF-32";` `int stepSize = 0;` `String charsetName = this.getCharsetName();` `if (charsetName.contains(utf8)) {` `stepSize = 1;` `} else if (charsetName.contains(utf16)) {` `stepSize = 2;` `} else if (charsetName.contains(utf32)) {` `stepSize = 4;` `}` `//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line` `if (this.getDelimiter() != null && this.getDelimiter().length == 1` `&& this.getDelimiter()[0] == NEW_LINE && offset + numBytes >= stepSize` `&& bytes[offset + numBytes - stepSize] == CARRIAGE_RETURN) {` `numBytes -= stepSize;` `}` `numBytes = numBytes - stepSize + 1;` `return new String(bytes, offset, numBytes, this.getCharsetName());` These are some of my own ideas. I hope that you can give some better suggestions and handle this jira better. Thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or >
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626693#comment-16626693 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars edited a comment on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954 > I think we need to revisit the way this fix works. At this point, it probably makes sense to close the PR and go back to a design discussion first. I would suggest to have that design discussion on the JIRA issue first. > > (1) Where should the BOM be read? Beginning of the file only? Then it should most likely be part of split generation. > > (2) Any design cannot have additional logic in the method that is called per record, otherwise it will have too much performance impact. @StephanEwen Thank you for your suggestion. I close the PR first. Then discuss how to fix this bug. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626690#comment-16626690 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars closed pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index c1ef344175b..8eb43424264 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -62,6 +62,9 @@ // Charset is not serializable private transient Charset charset; + /** The charset of bom in the file to process. */ + private String bomCharsetName; + /** * The default read buffer size = 1MB. */ @@ -221,6 +224,11 @@ public void setCharset(String charset) { } } + @PublicEvolving + public String getBomCharsetName() { + return this.bomCharsetName; + } + public byte[] getDelimiter() { return delimiter; } @@ -341,7 +349,7 @@ public void configure(Configuration parameters) { @Override public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - final FileBaseStatistics cachedFileStats = cachedStats instanceof FileBaseStatistics ? + final FileBaseStatistics cachedFileStats = cachedStats instanceof FileInputFormat.FileBaseStatistics ? (FileBaseStatistics) cachedStats : null; // store properties @@ -408,7 +416,9 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc while (samplesTaken < numSamples && fileNum < allFiles.size()) { // make a split for the sample and use it to read a record FileStatus file = allFiles.get(fileNum); - FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null); + String bomCharsetName = getBomCharset(file); + + FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null, bomCharsetName); // we open the split, read one line, and take its length try { @@ -467,6 +477,7 @@ public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOExc */ @Override public void open(FileInputSplit split) throws IOException { + this.bomCharsetName = split.getBomCharsetName(); super.open(split); initBuffers(); @@ -736,7 +747,7 @@ public void reopen(FileInputSplit split, Long state) throws IOException { Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); Preconditions.checkArgument(state == -1 || state >= split.getStart(), - " Illegal offset "+ state +", smaller than the splits start=" + split.getStart()); + " Illegal offset " + state + ", smaller than the splits start=" + split.getStart()); try { this.open(split); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 14cf647cd24..c58344fba6c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -34,7 +34,7 @@ import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; - +import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,20 +55,20 @@ * {@link #nextRecord(Object)} and {@link #reachedEnd()} methods need to be implemented. * Additionally, one may override {@link #open(FileInputSplit)} and {@link #close()} to * change the life cycle behavior. - * + * * After the {@link
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16626689#comment-16626689 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-424188954 > I think we need to revisit the way this fix works. At this point, it probably makes sense to close the PR and go back to a design discussion first. I would suggest to have that design discussion on the JIRA issue first. > > (1) Where should the BOM be read? Beginning of the file only? Then it should most likely be part of split generation. > > (2) Any design cannot have additional logic in the method that is called per record, otherwise it will have too much performance impact. @StephanEwen Thank you for your suggestion. I close the PR frist. Then discuss how to fix this bug. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1662#comment-1662 ] ASF GitHub Bot commented on FLINK-10134: StephanEwen commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-423914033 I think we need to revisit the way this fix works. At this point, it probably makes sense to close the PR and go back to a design discussion first. I would suggest to have that design discussion on the JIRA issue first. (1) Where should the BOM be read? Beginning of the file only? Then it should most likely be part of split generation. (2) Any design cannot have additional logic in the method that is called per record, otherwise it will have too much performance impact. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16622111#comment-16622111 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-423198922 @tillrohrmann @vinoyang @hequn8128 Hello experts, I submitted a PR and I need you to review and give your valuable guidance. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620859#comment-16620859 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-422881621 > Hi @XuQianJin-Stars , thanks for your PR. Could you rebase your commits? @Clark Thank you. I have re submitted PR. You can help me check my PR and give you advice. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620249#comment-16620249 ] ASF GitHub Bot commented on FLINK-10134: Clark commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-422694446 Hi @XuQianJin-Stars , thanks for your PR. Could you rebase your commits? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16620130#comment-16620130 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars removed a comment on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-422263671 1. I add file bom charset check in FileInputFormat.java 2. I add the bomCharsetName variable in DelimitedInputFormat.java. 3.In TextInputFormat, if the current encoding conflicts with the BOM encoding, the BOM encoding is the standard, and if no conflict occurs, the set encoding is the standard. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619992#comment-16619992 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars edited a comment on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-422626468 > Hi @XuQianJin-Stars , thanks for your PR. Would you fill in the PR template to give reviewers enough contextual information for the review. @hequn8128 I have added PR instructions. I hope you can review the PR I submitted, thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619991#comment-16619991 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-422626468 > Hi @XuQianJin-Stars , thanks for your PR. Would you fill in the PR template to give reviewers enough contextual information for the review. @hequn8128 I have added PR instructions, I hope you can review, thank you. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619948#comment-16619948 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-422613308 > Hi @XuQianJin-Stars , thanks for your PR. Would you fill in the PR template to give reviewers enough contextual information for the review. @hequn8128 Thank you. This is the first time I submitted PR. Some format specifications are unfamiliar. I will read the specifications and fill in the explanations. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16619946#comment-16619946 ] ASF GitHub Bot commented on FLINK-10134: hequn8128 commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-422612596 Hi @XuQianJin-Stars , thanks for your PR. Would you fill in the PR template to give reviewers enough contextual information for the review. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618515#comment-16618515 ] xuqianjin commented on FLINK-10134: --- I'm trying to repair this bug. What's wrong with it? Please give your comments. Thank you. The following three major modifications have been made. # I add file bom charset check in FileInputFormat.java # I add the bomCharsetName variable in DelimitedInputFormat.java. # In TextInputFormat, if the current encoding conflicts with the BOM encoding, the BOM encoding is the standard, and if no conflict occurs, the set encoding is the standard. > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618513#comment-16618513 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars commented on issue #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710#issuecomment-422263671 1. I add file bom charset check in FileInputFormat.java 2. I add the bomCharsetName variable in DelimitedInputFormat.java. 3.In TextInputFormat, if the current encoding conflicts with the BOM encoding, the BOM encoding is the standard, and if no conflict occurs, the set encoding is the standard. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UTF-16 support for TextInputFormat > -- > > Key: FLINK-10134 > URL: https://issues.apache.org/jira/browse/FLINK-10134 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.2 >Reporter: David Dreyfus >Priority: Blocker > Labels: pull-request-available > > It does not appear that Flink supports a charset encoding of "UTF-16". It > particular, it doesn't appear that Flink consumes the Byte Order Mark (BOM) > to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. > > TextInputFormat.setCharset("UTF-16") calls DelimitedInputFormat.setCharset(), > which sets TextInputFormat.charsetName and then modifies the previously set > delimiterString to construct the proper byte string encoding of the the > delimiter. This same charsetName is also used in TextInputFormat.readRecord() > to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when using > UTF-16. > # delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java will > return a Big Endian byte sequence including the Byte Order Mark (BOM). The > actual text file will not contain a BOM at each line ending, so the delimiter > will never be read. Moreover, if the actual byte encoding of the file is > Little Endian, the bytes will be interpreted incorrectly. > # TextInputFormat.readRecord() will not see a BOM each time it decodes a > byte sequence with the String(bytes, offset, numBytes, charset) call. > Therefore, it will assume Big Endian, which may not always be correct. [1] > [https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95] > > While there are likely many solutions, I would think that all of them would > have to start by reading the BOM from the file when a Split is opened and > then using that BOM to modify the specified encoding to a BOM specific one > when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. That > is, if the BOM indicates Little Endian and the caller indicates UTF-16BE, > Flink should rewrite the charsetName as UTF-16LE. > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > I've verified the problem on version 1.4.2. I believe the problem exists on > all versions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10134) UTF-16 support for TextInputFormat
[ https://issues.apache.org/jira/browse/FLINK-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16618506#comment-16618506 ] ASF GitHub Bot commented on FLINK-10134: XuQianJin-Stars opened a new pull request #6710: [FLINK-10134] UTF-16 support for TextInputFormat bug fixed URL: https://github.com/apache/flink/pull/6710 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)