Hi David, Thanks for digging into the code! I had a quick look into the classes as well. As far as I can see, your analysis is correct and the BOM handling in DelimitedInputFormat and TextInputFormat (and other text-based IFs such as CsvInputFormat) is broken. In fact, its obvious that nobody paid attention to this yet.
It would be great if you could open a Jira issue and copy your analysis and solution proposal into it. While on it, we could also deprecated the (duplicated) setCharsetName() method from TextInputFormat and redirect it to DelimitedInputFormat.setCharset(). Would you also be interested in contributing a fix for this problem? Best, Fabian  https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java#L95 2018-08-09 14:55 GMT+02:00 David Dreyfus <dddrey...@gmail.com>: > Hi Fabian, > > Thank you for taking my email. > TextInputFormat.setCharsetName("UTF-16") appears to set the private > variable TextInputFormat.charsetName. > It doesn't appear to cause additional behavior that would help interpret > UTF-16 data. > > The method I've tested is calling DelimitedInputFormat.setCharset("UTF-16"), > which then sets TextInputFormat.charsetName and then modifies the > previously set delimiterString to construct the proper byte string encoding > of the the delimiter. This same charsetName is also used in > TextInputFormat.readRecord() to interpret the bytes read from the file. > > There are two problems that this implementation would seem to have when > using UTF-16. > > 1. delimiterString.getBytes(getCharset()) in DelimitedInputFormat.java > will return a Big Endian byte sequence including the Byte Order Mark (BOM). > The actual text file will not contain a BOM at each line ending, so the > delimiter will never be read. Moreover, if the actual byte encoding of the > file is Little Endian, the bytes will be interpreted incorrectly. > 2. TextInputFormat.readRecord() will not see a BOM each time it > decodes a byte sequence with the String(bytes, offset, numBytes, charset) > call. Therefore, it will assume Big Endian, which may not always be > correct. > > While there are likely many solutions, I would think that all of them > would have to start by reading the BOM from the file when a Split is opened > and then using that BOM to modify the specified encoding to a BOM specific > one when the caller doesn't specify one, and to overwrite the caller's > specification if the BOM is in conflict with the caller's specification. > That is, if the BOM indicates Little Endian and the caller indicates > UTF-16BE, Flink should rewrite the charsetName as UTF-16LE. > > I hope this makes sense and that I haven't been testing incorrectly or > misreading the code. > > Thank you, > David > > On Thu, Aug 9, 2018 at 4:04 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi David, >> >> Did you try to set the encoding on the TextInputFormat with >> >> TextInputFormat tif = ... >> tif.setCharsetName("UTF-16"); >> >> Best, Fabian >> >> 2018-08-08 17:45 GMT+02:00 David Dreyfus <dddrey...@gmail.com>: >> >>> Hello - >>> >>> It does not appear that Flink supports a charset encoding of "UTF-16". >>> It particular, it doesn't appear that Flink consumes the Byte Order Mark >>> (BOM) to establish whether a UTF-16 file is UTF-16LE or UTF-16BE. Are there >>> any plans to enhance Flink to handle UTF-16 with BOM? >>> >>> Thank you, >>> David >>> >> >>