XuQianJin-Stars closed pull request #7157: [FLINK-10134] UTF-16 support for 
TextInputFormat bug
URL: https://github.com/apache/flink/pull/7157
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index c1ef344175b..e13560d4823 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -28,6 +28,7 @@
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.parser.FieldParser;
+import org.apache.flink.util.LRUCache;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,6 +37,7 @@
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Map;
 
 /**
  * Base implementation for input formats that split the input at a delimiter 
into records.
@@ -62,6 +64,23 @@
        // Charset is not serializable
        private transient Charset charset;
 
+       /**
+        * The charset of bom in the file to process.
+        */
+       private transient Charset bomIdentifiedCharset;
+       /**
+        * This is the charset that is configured via setCharset().
+        */
+       private transient Charset configuredCharset;
+       /**
+        * The Map to record the BOM encoding of all files.
+        */
+       private transient final Map<String, Charset> fileBomCharsetMap;
+       /**
+        * The bytes to BOM check.
+        */
+       byte[] bomBytes = new byte[]{(byte) 0x00, (byte) 0xFE, (byte) 0xFF, 
(byte) 0xEF, (byte) 0xBB, (byte) 0xBF};
+
        /**
         * The default read buffer size = 1MB.
         */
@@ -184,6 +203,7 @@ protected DelimitedInputFormat(Path filePath, Configuration 
configuration) {
                        configuration = GlobalConfiguration.loadConfiguration();
                }
                loadConfigParameters(configuration);
+               this.fileBomCharsetMap = new LRUCache<>(1024);
        }
 
        /**
@@ -195,12 +215,25 @@ protected DelimitedInputFormat(Path filePath, 
Configuration configuration) {
         */
        @PublicEvolving
        public Charset getCharset() {
-               if (this.charset == null) {
+               if (this.configuredCharset != null) {
+                       this.charset = this.configuredCharset;
+               } else if (this.bomIdentifiedCharset != null) {
+                       this.charset = this.bomIdentifiedCharset;
+               } else {
                        this.charset = Charset.forName(charsetName);
                }
                return this.charset;
        }
 
+       /**
+        * get the charsetName.
+        *
+        * @return the charsetName
+        */
+       public String getCharsetName() {
+               return charsetName;
+       }
+
        /**
         * Set the name of the character set used for the row delimiter. This is
         * also used by subclasses to interpret field delimiters, comment 
strings,
@@ -214,7 +247,7 @@ public Charset getCharset() {
        @PublicEvolving
        public void setCharset(String charset) {
                this.charsetName = Preconditions.checkNotNull(charset);
-               this.charset = null;
+               this.configuredCharset = getSpecialCharset(charset);
 
                if (this.delimiterString != null) {
                        this.delimiter = delimiterString.getBytes(getCharset());
@@ -472,6 +505,7 @@ public void open(FileInputSplit split) throws IOException {
 
                this.offset = splitStart;
                if (this.splitStart != 0) {
+                       setBomFileCharset(split);
                        this.stream.seek(offset);
                        readLine();
                        // if the first partial record already pushes the 
stream over
@@ -481,6 +515,7 @@ public void open(FileInputSplit split) throws IOException {
                        }
                } else {
                        fillBuffer(0);
+                       setBomFileCharset(split);
                }
        }
 
@@ -536,6 +571,71 @@ public void close() throws IOException {
                super.close();
        }
 
+       /**
+        * Special default processing for utf-16 and utf-32 is performed.
+        *
+        * @param charsetName
+        * @return
+        */
+       private Charset getSpecialCharset(String charsetName) {
+               Charset charset;
+               switch (charsetName.toUpperCase()) {
+                       case "UTF-16":
+                               charset = Charset.forName("UTF-16BE");
+                               break;
+                       case "UTF-32":
+                               charset = Charset.forName("UTF-32BE");
+                               break;
+                       default:
+                               charset = Charset.forName(charsetName);
+                               break;
+               }
+               return charset;
+       }
+       /**
+        * Set file bom encoding.
+        *
+        * @param split
+        */
+       private void setBomFileCharset(FileInputSplit split) {
+               try {
+                       if (configuredCharset == null) {
+                               String filePath = split.getPath().toString();
+                               if 
(this.fileBomCharsetMap.containsKey(filePath)) {
+                                       this.bomIdentifiedCharset = 
this.fileBomCharsetMap.get(filePath);
+                               } else {
+                                       byte[] bomBuffer = new byte[4];
+                                       if (this.splitStart != 0) {
+                                               this.stream.seek(0);
+                                               this.stream.read(bomBuffer, 0, 
bomBuffer.length);
+                                               
this.stream.seek(split.getStart());
+                                       } else {
+                                               
System.arraycopy(this.readBuffer, 0, bomBuffer, 0, 4);
+                                       }
+                                       if ((bomBuffer[0] == bomBytes[0]) && 
(bomBuffer[1] == bomBytes[0]) && (bomBuffer[2] == bomBytes[1])
+                                               && (bomBuffer[3] == 
bomBytes[2])) {
+                                               this.bomIdentifiedCharset = 
Charset.forName("UTF-32BE");
+                                       } else if ((bomBuffer[0] == 
bomBytes[2]) && (bomBuffer[1] == bomBytes[1]) && (bomBuffer[2] == bomBytes[0])
+                                               && (bomBuffer[3] == 
bomBytes[0])) {
+                                               this.bomIdentifiedCharset = 
Charset.forName("UTF-32LE");
+                                       } else if ((bomBuffer[0] == 
bomBytes[3]) && (bomBuffer[1] == bomBytes[4]) && (bomBuffer[2] == bomBytes[5])) 
{
+                                               this.bomIdentifiedCharset = 
Charset.forName("UTF-8");
+                                       } else if ((bomBuffer[0] == 
bomBytes[1]) && (bomBuffer[1] == bomBytes[2])) {
+                                               this.bomIdentifiedCharset = 
Charset.forName("UTF-16BE");
+                                       } else if ((bomBuffer[0] == 
bomBytes[2]) && (bomBuffer[1] == bomBytes[1])) {
+                                               this.bomIdentifiedCharset = 
Charset.forName("UTF-16LE");
+                                       } else {
+                                               this.bomIdentifiedCharset = 
Charset.forName(charsetName);
+                                       }
+                                       this.fileBomCharsetMap.put(filePath, 
this.bomIdentifiedCharset);
+                               }
+                       }
+               } catch (Exception e) {
+                       LOG.warn("Failed to get file bom encoding.");
+                       this.bomIdentifiedCharset = 
Charset.forName(charsetName);
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        protected final boolean readLine() throws IOException {
diff --git a/flink-core/src/main/java/org/apache/flink/util/LRUCache.java 
b/flink-core/src/main/java/org/apache/flink/util/LRUCache.java
new file mode 100644
index 00000000000..ee7aa2516ca
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/LRUCache.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A LRUCache by LinkedHashMap.
+ */
+public class LRUCache<K, V> extends LinkedHashMap<K, V> implements 
java.io.Serializable {
+
+       private final int maxCacheSize;
+
+       public LRUCache(int cacheSize) {
+               super((int) Math.ceil(cacheSize / 0.75) + 1, 0.75f, true);
+               maxCacheSize = cacheSize;
+       }
+
+       @Override
+       protected boolean removeEldestEntry(Map.Entry eldest) {
+               return size() > maxCacheSize;
+       }
+
+       @Override
+       public String toString() {
+               StringBuilder sb = new StringBuilder();
+               for (Map.Entry<K, V> entry : entrySet()) {
+                       sb.append(String.format("%s:%s ", entry.getKey(), 
entry.getValue()));
+               }
+               return sb.toString();
+       }
+}
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index 82793adc137..6c21743e859 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -21,6 +21,7 @@
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 
 import java.io.IOException;
@@ -58,16 +59,30 @@ public TextInputFormat(Path filePath) {
 
        // 
--------------------------------------------------------------------------------------------
 
-       public String getCharsetName() {
-               return charsetName;
-       }
-
        public void setCharsetName(String charsetName) {
                if (charsetName == null) {
                        throw new IllegalArgumentException("Charset must not be 
null.");
                }
 
                this.charsetName = charsetName;
+               this.setCharset(charsetName);
+       }
+
+       /**
+        * Processing for Delimiter special cases.
+        */
+       private void setSpecialDelimiter() {
+               String delimiterString = "\n";
+               if (this.getDelimiter() != null && this.getDelimiter().length 
== 1
+                       && this.getDelimiter()[0] == (byte) '\n') {
+                       this.setDelimiter(delimiterString);
+               }
+       }
+
+       @Override
+       public void open(FileInputSplit split) throws IOException {
+               super.open(split);
+               setSpecialDelimiter();
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -92,7 +107,7 @@ public String readRecord(String reusable, byte[] bytes, int 
offset, int numBytes
                        numBytes -= 1;
                }
 
-               return new String(bytes, offset, numBytes, this.charsetName);
+               return new String(bytes, offset, numBytes, this.getCharset());
        }
 
        // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index e78232ac1e5..6de946b8585 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -25,16 +25,20 @@
 
 import org.junit.Test;
 
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -215,4 +219,243 @@ private void testRemovingTrailingCR(String lineBreaker, 
String delimiter) {
                }
        }
 
+       /**
+        * Test different file encodings,for example:UTF-8, UTF-8 with bom, 
UTF-16LE, UTF-16BE, UTF-32LE, UTF-32BE.
+        */
+       @Test
+       public void testFileCharset() {
+               String data = "Hello|ハロー|при\\вет|Bon^*|\\|<>|jour|Сайн. байна 
уу|안녕*하세요.";
+               // Default separator
+               testAllFileCharset(data);
+               // Specified separator
+               testAllFileCharset(data, "^*|\\|<>|");
+       }
+
+       private void testAllFileCharset(String data) {
+               testAllFileCharset(data, "");
+       }
+
+       private void testAllFileCharset(String data, String delimiter) {
+               try {
+                       // test UTF-8, no bom, UTF-8
+                       testFileCharset(data, "UTF-8", false, "UTF-8", 
delimiter);
+                       // test UTF-8, have bom, UTF-8
+                       testFileCharset(data, "UTF-8", true, "UTF-8", 
delimiter);
+                       // test UTF-16BE, no, UTF-16
+                       testFileCharset(data, "UTF-16BE", false, "UTF-16", 
delimiter);
+                       // test UTF-16BE, yes, UTF-16
+                       testFileCharset(data, "UTF-16BE", true, "UTF-16", 
delimiter);
+                       // test UTF-16LE, no, UTF-16LE
+                       testFileCharset(data, "UTF-16LE", false, "UTF-16LE", 
delimiter);
+                       // test UTF-16LE, yes, UTF-16
+                       testFileCharset(data, "UTF-16LE", true, "UTF-16", 
delimiter);
+                       // test UTF-16BE, no, UTF-16BE
+                       testFileCharset(data, "UTF-16BE", false, "UTF-16BE", 
delimiter);
+                       // test UTF-16BE, yes, UTF-16LE
+                       testFileCharset(data, "UTF-16BE", true, "UTF-16LE", 
delimiter);
+                       // test UTF-16LE, yes, UTF-16BE
+                       testFileCharset(data, "UTF-16LE", true, "UTF-16BE", 
delimiter);
+                       // test UTF-32BE, no, UTF-32
+                       testFileCharset(data, "UTF-32BE", false, "UTF-32", 
delimiter);
+                       // test UTF-32BE, yes, UTF-32
+                       testFileCharset(data, "UTF-32BE", true, "UTF-32", 
delimiter);
+                       // test UTF-32LE, yes, UTF-32
+                       testFileCharset(data, "UTF-32LE", true, "UTF-32", 
delimiter);
+                       // test UTF-32LE, no, UTF-32LE
+                       testFileCharset(data, "UTF-32LE", false, "UTF-32LE", 
delimiter);
+                       // test UTF-32BE, no, UTF-32BE
+                       testFileCharset(data, "UTF-32BE", false, "UTF-32BE", 
delimiter);
+                       // test UTF-32BE, yes, UTF-32LE
+                       testFileCharset(data, "UTF-32BE", true, "UTF-32LE", 
delimiter);
+                       // test UTF-32LE, yes, UTF-32BE
+                       testFileCharset(data, "UTF-32LE", true, "UTF-32BE", 
delimiter);
+                       //------------------Don't set the 
charset------------------------
+                       // test UTF-8, have bom, Don't set the charset
+                       testFileCharset(data, "UTF-8", true, delimiter);
+                       // test UTF-8, no bom, Don't set the charset
+                       testFileCharset(data, "UTF-8", false, delimiter);
+                       // test UTF-16BE, no bom, Don't set the charset
+                       testFileCharset(data, "UTF-16BE", false, delimiter);
+                       // test UTF-16BE, have bom, Don't set the charset
+                       testFileCharset(data, "UTF-16BE", true, delimiter);
+                       // test UTF-16LE, have bom, Don't set the charset
+                       testFileCharset(data, "UTF-16LE", true, delimiter);
+                       // test UTF-32BE, no bom, Don't set the charset
+                       testFileCharset(data, "UTF-32BE", false, delimiter);
+                       // test UTF-32BE, have bom, Don't set the charset
+                       testFileCharset(data, "UTF-32BE", true, delimiter);
+                       // test UTF-32LE, have bom, Don't set the charset
+                       testFileCharset(data, "UTF-32LE", true, delimiter);
+               } catch (Throwable t) {
+                       System.err.println("test failed with exception: " + 
t.getMessage());
+                       t.printStackTrace(System.err);
+                       fail("Test erroneous");
+               }
+       }
+
+       /**
+        * To create UTF EncodedFile.
+        *
+        * @param data
+        * @param fileCharset
+        * @param hasBom
+        * @return
+        */
+       private File createUTFEncodedFile(String data, String fileCharset, 
boolean hasBom) throws Exception {
+               BufferedWriter bw = null;
+               OutputStreamWriter osw = null;
+               FileOutputStream fos = null;
+
+               byte[] bom = new byte[]{};
+               if (hasBom) {
+                       switch (fileCharset) {
+                               case "UTF-8":
+                                       bom = new byte[]{(byte) 0xEF, (byte) 
0xBB, (byte) 0xBF};
+                                       break;
+                               case "UTF-16":
+                                       bom = new byte[]{(byte) 0xFE, (byte) 
0xFF};
+                                       break;
+                               case "UTF-16LE":
+                                       bom = new byte[]{(byte) 0xFF, (byte) 
0xFE};
+                                       break;
+                               case "UTF-16BE":
+                                       bom = new byte[]{(byte) 0xFE, (byte) 
0xFF};
+                                       break;
+                               case "UTF-32":
+                                       bom = new byte[]{(byte) 0x00, (byte) 
0x00, (byte) 0xFE, (byte) 0xFF};
+                                       break;
+                               case "UTF-32LE":
+                                       bom = new byte[]{(byte) 0xFF, (byte) 
0xFE, (byte) 0x00, (byte) 0x00};
+                                       break;
+                               case "UTF-32BE":
+                                       bom = new byte[]{(byte) 0x00, (byte) 
0x00, (byte) 0xFE, (byte) 0xFF};
+                                       break;
+                               default:
+                                       throw new Exception("can not find the 
utf code");
+                       }
+               }
+
+               // create input file
+               File tempFile = File.createTempFile("TextInputFormatTest", 
"tmp");
+               tempFile.deleteOnExit();
+               tempFile.setWritable(true);
+               fos = new FileOutputStream(tempFile, true);
+
+               if (tempFile.length() < 1) {
+                       if (hasBom) {
+                               fos.write(bom);
+                       }
+               }
+
+               osw = new OutputStreamWriter(fos, fileCharset);
+               bw = new BufferedWriter(osw);
+               bw.write(data);
+               bw.newLine();
+
+               bw.close();
+               fos.close();
+
+               return tempFile;
+       }
+
+       private void testFileCharset(String data, String fileCharset, Boolean 
hasBom, String delimiter) {
+               testFileCharset(data, fileCharset, hasBom, null, delimiter);
+       }
+
+       private void testFileCharset(String data, String fileCharset, Boolean 
hasBom, String specifiedCharset, String delimiter) {
+               try {
+                       int offset = 0;
+                       String carriageReturn = 
java.security.AccessController.doPrivileged(
+                               new 
sun.security.action.GetPropertyAction("line.separator"));
+                       String delimiterString = delimiter.isEmpty() ? 
carriageReturn : delimiter;
+                       byte[] delimiterBytes = 
delimiterString.getBytes(fileCharset);
+                       String[] utfArray = {"UTF-8", "UTF-16", "UTF-16LE", 
"UTF-16BE"};
+                       if (hasBom) {
+                               if 
(Arrays.asList(utfArray).contains(fileCharset)) {
+                                       offset = 1;
+                               }
+                       }
+
+                       File tempFile = createUTFEncodedFile(data, fileCharset, 
hasBom);
+
+                       TextInputFormat inputFormat = new TextInputFormat(new 
Path(tempFile.toURI().toString()));
+                       if (specifiedCharset != null) {
+                               inputFormat.setCharsetName(specifiedCharset);
+                       }
+                       if (delimiterBytes.length > 0) {
+                               inputFormat.setDelimiter(delimiterBytes);
+                       }
+
+                       Configuration parameters = new Configuration();
+                       inputFormat.configure(parameters);
+
+                       FileInputSplit[] splits = 
inputFormat.createInputSplits(1);
+                       assertTrue("expected at least one input split", 
splits.length >= 1);
+                       inputFormat.open(splits[0]);
+
+                       String result = "";
+                       int i = 0;
+                       data = data + carriageReturn;
+                       String delimiterStr = new String(delimiterBytes, 0, 
delimiterBytes.length, fileCharset);
+                       String[] strArr = data.split(delimiterStr
+                               .replace("\\", "\\\\")
+                               .replace("^", "\\^")
+                               .replace("|", "\\|")
+                               .replace("[", "\\[")
+                               .replace("*", "\\*")
+                               .replace(".", "\\.")
+                       );
+
+                       while (!inputFormat.reachedEnd()) {
+                               if (i < strArr.length) {
+                                       result = inputFormat.nextRecord("");
+                                       if (i == 0) {
+                                               result = 
result.substring(offset);
+                                       }
+                                       if (Charset.forName(fileCharset) != 
inputFormat.getCharset()) {
+                                               assertNotEquals(strArr[i], 
result);
+                                       } else {
+                                               assertEquals(strArr[i], result);
+                                       }
+                                       i++;
+                               } else {
+                                       inputFormat.nextRecord("");
+                               }
+                       }
+                       assertTrue(inputFormat.reachedEnd() || null == 
inputFormat.nextRecord(result));
+               } catch (Throwable t) {
+                       System.err.println("test failed with exception: " + 
t.getMessage());
+                       t.printStackTrace(System.err);
+                       fail("Test erroneous");
+               }
+       }
+
+       @Test
+       public void testFileCharsetReadByMultiSplits() {
+               String carriageReturn = 
java.security.AccessController.doPrivileged(
+                       new 
sun.security.action.GetPropertyAction("line.separator"));
+               final String data = "First line" + carriageReturn + "Second 
line";
+               try {
+                       File tempFile = createUTFEncodedFile(data, "UTF-16", 
false);
+
+                       TextInputFormat inputFormat = new TextInputFormat(new 
Path(tempFile.toURI().toString()));
+                       inputFormat.setCharsetName("UTF-32");
+
+                       Configuration parameters = new Configuration();
+                       inputFormat.configure(parameters);
+
+                       FileInputSplit[] splits = 
inputFormat.createInputSplits(3);
+                       assertTrue("expected at least one input split", 
splits.length >= 1);
+                       String result = "";
+                       for (FileInputSplit split : splits) {
+                               inputFormat.open(split);
+                               result = inputFormat.nextRecord("");
+                       }
+                       assertTrue(inputFormat.reachedEnd() || null == 
inputFormat.nextRecord(result));
+               } catch (Throwable t) {
+                       System.err.println("test failed with exception: " + 
t.getMessage());
+                       t.printStackTrace(System.err);
+                       fail("Test erroneous");
+               }
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to