[FLINK-4268] [core] Add a parsers for BigDecimal/BigInteger

This closes #2304.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c02988b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c02988b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c02988b

Branch: refs/heads/flip-6
Commit: 5c02988b05c56f524fc8c65b15e16b0c24278a5e
Parents: e58fd6e
Author: twalthr <twal...@apache.org>
Authored: Wed Jul 27 15:48:48 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Tue Sep 20 16:43:59 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/types/parser/BigDecParser.java | 145 +++++++++++++++++++
 .../apache/flink/types/parser/BigIntParser.java | 126 ++++++++++++++++
 .../apache/flink/types/parser/FieldParser.java  |   4 +
 .../flink/types/parser/BigDecParserTest.java    |  79 ++++++++++
 .../flink/types/parser/BigIntParserTest.java    |  68 +++++++++
 5 files changed, 422 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
new file mode 100644
index 0000000..46a07fa
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigDecParser.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.math.BigDecimal;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link java.math.BigDecimal}.
+ */
+@PublicEvolving
+public class BigDecParser extends FieldParser<BigDecimal> {
+
+       private static final BigDecimal BIG_DECIMAL_INSTANCE = BigDecimal.ZERO;
+
+       private BigDecimal result;
+       private char[] reuse = null;
+
+       @Override
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, BigDecimal reusable) {
+               int i = startPos;
+
+               final int delimLimit = limit - delimiter.length + 1;
+
+               while (i < limit) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
+                                       return -1;
+                               }
+                               break;
+                       }
+                       i++;
+               }
+
+               if (i > startPos &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(i - 1)]))) {
+                       
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+                       return -1;
+               }
+
+               try {
+                       final int length = i - startPos;
+                       if (reuse == null || reuse.length < length) {
+                               reuse = new char[length];
+                       }
+                       for (int j = 0; j < length; j++) {
+                               final byte b = bytes[startPos + j];
+                               if ((b < '0' || b > '9') && b != '-' && b != 
'+' && b != '.' && b != 'E' && b != 'e') {
+                                       throw new NumberFormatException();
+                               }
+                               reuse[j] = (char) bytes[startPos + j];
+                       }
+
+                       this.result = new BigDecimal(reuse, 0, length);
+                       return (i == limit) ? limit : i + delimiter.length;
+               } catch (NumberFormatException e) {
+                       
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+                       return -1;
+               }
+       }
+
+       @Override
+       public BigDecimal createValue() {
+               return BIG_DECIMAL_INSTANCE;
+       }
+
+       @Override
+       public BigDecimal getLastResult() {
+               return this.result;
+       }
+
+       /**
+        * Static utility to parse a field of type BigDecimal from a byte 
sequence that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes    The bytes containing the text data that should be 
parsed.
+        * @param startPos The offset to start the parsing.
+        * @param length   The length of the byte sequence (counting from the 
offset).
+        * @return The parsed value.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
+        */
+       public static final BigDecimal parseField(byte[] bytes, int startPos, 
int length) {
+               return parseField(bytes, startPos, length, (char) 0xffff);
+       }
+
+       /**
+        * Static utility to parse a field of type BigDecimal from a byte 
sequence that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes     The bytes containing the text data that should be 
parsed.
+        * @param startPos  The offset to start the parsing.
+        * @param length    The length of the byte sequence (counting from the 
offset).
+        * @param delimiter The delimiter that terminates the field.
+        * @return The parsed value.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
+        */
+       public static final BigDecimal parseField(byte[] bytes, int startPos, 
int length, char delimiter) {
+               if (length <= 0) {
+                       throw new NumberFormatException("Invalid input: Empty 
string");
+               }
+               int i = 0;
+               final byte delByte = (byte) delimiter;
+
+               while (i < length && bytes[startPos + i] != delByte) {
+                       i++;
+               }
+
+               if (i > 0 &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + i - 1]))) {
+                       throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
+               }
+
+               final char[] chars = new char[i];
+               for (int j = 0; j < i; j++) {
+                       final byte b = bytes[startPos + j];
+                       if ((b < '0' || b > '9') && b != '-' && b != '+' && b 
!= '.' && b != 'E' && b != 'e') {
+                               throw new NumberFormatException();
+                       }
+                       chars[j] = (char) bytes[startPos + j];
+               }
+               return new BigDecimal(chars);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
new file mode 100644
index 0000000..13361c1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/BigIntParser.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+import java.math.BigInteger;
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * Parses a text field into a {@link java.math.BigInteger}.
+ */
+@PublicEvolving
+public class BigIntParser extends FieldParser<BigInteger> {
+
+       private static final BigInteger BIG_INTEGER_INSTANCE = BigInteger.ZERO;
+
+       private BigInteger result;
+
+       @Override
+       public int parseField(byte[] bytes, int startPos, int limit, byte[] 
delimiter, BigInteger reusable) {
+               int i = startPos;
+
+               final int delimLimit = limit - delimiter.length + 1;
+
+               while (i < limit) {
+                       if (i < delimLimit && delimiterNext(bytes, i, 
delimiter)) {
+                               if (i == startPos) {
+                                       
setErrorState(ParseErrorState.EMPTY_COLUMN);
+                                       return -1;
+                               }
+                               break;
+                       }
+                       i++;
+               }
+
+               if (i > startPos &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[(i - 1)]))) {
+                       
setErrorState(ParseErrorState.NUMERIC_VALUE_ILLEGAL_CHARACTER);
+                       return -1;
+               }
+
+               String str = new String(bytes, startPos, i - startPos);
+               try {
+                       this.result = new BigInteger(str);
+                       return (i == limit) ? limit : i + delimiter.length;
+               } catch (NumberFormatException e) {
+                       
setErrorState(ParseErrorState.NUMERIC_VALUE_FORMAT_ERROR);
+                       return -1;
+               }
+       }
+
+       @Override
+       public BigInteger createValue() {
+               return BIG_INTEGER_INSTANCE;
+       }
+
+       @Override
+       public BigInteger getLastResult() {
+               return this.result;
+       }
+
+       /**
+        * Static utility to parse a field of type BigInteger from a byte 
sequence that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes    The bytes containing the text data that should be 
parsed.
+        * @param startPos The offset to start the parsing.
+        * @param length   The length of the byte sequence (counting from the 
offset).
+        * @return The parsed value.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
+        */
+       public static final BigInteger parseField(byte[] bytes, int startPos, 
int length) {
+               return parseField(bytes, startPos, length, (char) 0xffff);
+       }
+
+       /**
+        * Static utility to parse a field of type BigInteger from a byte 
sequence that represents text
+        * characters
+        * (such as when read from a file stream).
+        *
+        * @param bytes     The bytes containing the text data that should be 
parsed.
+        * @param startPos  The offset to start the parsing.
+        * @param length    The length of the byte sequence (counting from the 
offset).
+        * @param delimiter The delimiter that terminates the field.
+        * @return The parsed value.
+        * @throws NumberFormatException Thrown when the value cannot be parsed 
because the text 
+        * represents not a correct number.
+        */
+       public static final BigInteger parseField(byte[] bytes, int startPos, 
int length, char delimiter) {
+               if (length <= 0) {
+                       throw new NumberFormatException("Invalid input: Empty 
string");
+               }
+               int i = 0;
+               final byte delByte = (byte) delimiter;
+
+               while (i < length && bytes[startPos + i] != delByte) {
+                       i++;
+               }
+
+               if (i > 0 &&
+                               (Character.isWhitespace(bytes[startPos]) || 
Character.isWhitespace(bytes[startPos + i - 1]))) {
+                       throw new NumberFormatException("There is leading or 
trailing whitespace in the numeric field.");
+               }
+
+               String str = new String(bytes, startPos, i);
+               return new BigInteger(str);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java 
b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
index 5f17840..a1b9c5f 100644
--- a/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
+++ b/flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.types.parser;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -208,6 +210,8 @@ public abstract class FieldParser<T> {
                PARSERS.put(Float.class, FloatParser.class);
                PARSERS.put(Double.class, DoubleParser.class);
                PARSERS.put(Boolean.class, BooleanParser.class);
+               PARSERS.put(BigDecimal.class, BigDecParser.class);
+               PARSERS.put(BigInteger.class, BigIntParser.class);
 
                // value types
                PARSERS.put(ByteValue.class, ByteValueParser.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java
new file mode 100644
index 0000000..9e17565
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/BigDecParserTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.math.BigDecimal;
+
+public class BigDecParserTest extends ParserTestBase<BigDecimal> {
+
+       @Override
+       public String[] getValidTestValues() {
+               return new String[] {
+                       "-12.5E1000", "-12.5E100", "-10000", "-1.1", "-1", 
"-0.44",
+                       "0", "0000000", "0e0", "0.000000000000000000000000001", 
"0.0000001",
+                       "0.1234123413478523984729447", "1", "10000", 
"10E100000", "10E1000000000"
+               };
+       }
+
+       @Override
+       public BigDecimal[] getValidTestResults() {
+               return new BigDecimal[] {
+                       new BigDecimal("-12.5E1000"),
+                       new BigDecimal("-12.5E100"),
+                       new BigDecimal("-10000"),
+                       new BigDecimal("-1.1"),
+                       new BigDecimal("-1"),
+                       new BigDecimal("-0.44"),
+                       new BigDecimal("0"),
+                       new BigDecimal("0"),
+                       new BigDecimal("0e0"),
+                       new BigDecimal("0.000000000000000000000000001"),
+                       new BigDecimal("0.0000001"),
+                       new BigDecimal("0.1234123413478523984729447"),
+                       new BigDecimal("1"),
+                       new BigDecimal("10000"),
+                       new BigDecimal("10E100000"),
+                       new BigDecimal("10E1000000000"),
+               };
+       }
+
+       @Override
+       public String[] getInvalidTestValues() {
+               return new String[] {
+                       "a", "123abc4", "-57-6", "7-877678", " 1", "2 ", " ", 
"\t"
+               };
+       }
+
+       @Override
+       public boolean allowsEmptyField() {
+               return false;
+       }
+
+       @Override
+       public FieldParser<BigDecimal> getParser() {
+               return new BigDecParser();
+       }
+
+       @Override
+       public Class<BigDecimal> getTypeClass() {
+               return BigDecimal.class;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5c02988b/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java 
b/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java
new file mode 100644
index 0000000..473cb55
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/types/parser/BigIntParserTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.types.parser;
+
+
+import java.math.BigInteger;
+
+public class BigIntParserTest extends ParserTestBase<BigInteger> {
+
+       @Override
+       public String[] getValidTestValues() {
+               return new String[] {
+                       "-8745979691234123413478523984729447", "-10000", "-1", 
"0",
+                       "0000000", "8745979691234123413478523984729447"
+               };
+       }
+
+       @Override
+       public BigInteger[] getValidTestResults() {
+               return new BigInteger[] {
+                       new BigInteger("-8745979691234123413478523984729447"),
+                       new BigInteger("-10000"),
+                       new BigInteger("-1"),
+                       new BigInteger("0"),
+                       new BigInteger("0"),
+                       new BigInteger("8745979691234123413478523984729447")
+               };
+       }
+
+       @Override
+       public String[] getInvalidTestValues() {
+               return new String[] {
+                       "1.1" ,"a", "123abc4", "-57-6", "7-877678", " 1", "2 ", 
" ", "\t"
+               };
+       }
+
+       @Override
+       public boolean allowsEmptyField() {
+               return false;
+       }
+
+       @Override
+       public FieldParser<BigInteger> getParser() {
+               return new BigIntParser();
+       }
+
+       @Override
+       public Class<BigInteger> getTypeClass() {
+               return BigInteger.class;
+       }
+}

Reply via email to