Repository: flink Updated Branches: refs/heads/master 290a566cd -> b3231acaa
[FLINK-3786] [core] [api-extending] Add BigDecimal and BigInteger as Basic types This closes #1928. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3231aca Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3231aca Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3231aca Branch: refs/heads/master Commit: b3231acaa445994a4bd234482364bb74b4bffae3 Parents: 290a566 Author: twalthr <twal...@apache.org> Authored: Wed Apr 20 15:57:39 2016 +0200 Committer: twalthr <twal...@apache.org> Committed: Mon May 2 10:54:55 2016 +0200 ---------------------------------------------------------------------- docs/internals/types_serialization.md | 2 +- .../api/common/typeinfo/BasicTypeInfo.java | 13 +- .../common/typeutils/base/BigDecComparator.java | 118 +++++++++++++++ .../common/typeutils/base/BigDecSerializer.java | 141 ++++++++++++++++++ .../common/typeutils/base/BigIntComparator.java | 116 +++++++++++++++ .../common/typeutils/base/BigIntSerializer.java | 145 +++++++++++++++++++ .../api/java/typeutils/TypeInfoParser.java | 30 ++++ .../common/typeutils/ComparatorTestBase.java | 2 +- .../typeutils/base/BigDecComparatorTest.java | 63 ++++++++ .../typeutils/base/BigDecSerializerTest.java | 56 +++++++ .../typeutils/base/BigIntComparatorTest.java | 57 ++++++++ .../typeutils/base/BigIntSerializerTest.java | 55 +++++++ .../typeutils/base/DoubleSerializerTest.java | 2 +- .../typeutils/base/FloatSerializerTest.java | 2 +- .../typeutils/base/IntSerializerTest.java | 2 +- .../typeutils/base/LongSerializerTest.java | 2 +- .../api/java/typeutils/TypeExtractorTest.java | 34 +++++ .../api/java/typeutils/TypeInfoParserTest.java | 4 + .../apache/flink/types/BasicTypeInfoTest.java | 4 +- 19 files changed, 840 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/docs/internals/types_serialization.md ---------------------------------------------------------------------- diff --git a/docs/internals/types_serialization.md b/docs/internals/types_serialization.md index 7ff21f2..a657d9f 100644 --- a/docs/internals/types_serialization.md +++ b/docs/internals/types_serialization.md @@ -64,7 +64,7 @@ and, in specializations, comparators for the types. Internally, Flink makes the following distinctions between types: -* Basic types: All Java primitives and their boxed form, plus `void`, `String`, and `Date`. +* Basic types: All Java primitives and their boxed form, plus `void`, `String`, `Date`, `BigDecimal`, and `BigInteger`. * Primitive arrays and Object arrays http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java index c1d5605..e2fd74e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java @@ -19,6 +19,8 @@ package org.apache.flink.api.common.typeinfo; import java.lang.reflect.Constructor; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.Arrays; import java.util.Date; import java.util.HashMap; @@ -31,6 +33,10 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BigDecComparator; +import org.apache.flink.api.common.typeutils.base.BigDecSerializer; +import org.apache.flink.api.common.typeutils.base.BigIntComparator; +import org.apache.flink.api.common.typeutils.base.BigIntSerializer; import org.apache.flink.api.common.typeutils.base.BooleanComparator; import org.apache.flink.api.common.typeutils.base.BooleanSerializer; import org.apache.flink.api.common.typeutils.base.ByteComparator; @@ -56,7 +62,8 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * Type information for primitive types (int, long, double, byte, ...), String, Date, and Void. + * Type information for primitive types (int, long, double, byte, ...), String, Date, Void, + * BigInteger, and BigDecimal. */ @Public public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> { @@ -74,6 +81,8 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new BasicTypeInfo<Character>(Character.class, new Class<?>[]{}, CharSerializer.INSTANCE, CharComparator.class); public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new BasicTypeInfo<Date>(Date.class, new Class<?>[]{}, DateSerializer.INSTANCE, DateComparator.class); public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new BasicTypeInfo<Void>(Void.class, new Class<?>[]{}, VoidSerializer.INSTANCE, null); + public static final BasicTypeInfo<BigInteger> BIG_INT_TYPE_INFO = new BasicTypeInfo<BigInteger>(BigInteger.class, new Class<?>[]{}, BigIntSerializer.INSTANCE, BigIntComparator.class); + public static final BasicTypeInfo<BigDecimal> BIG_DEC_TYPE_INFO = new BasicTypeInfo<BigDecimal>(BigDecimal.class, new Class<?>[]{}, BigDecSerializer.INSTANCE, BigDecComparator.class); // -------------------------------------------------------------------------------------------- @@ -240,5 +249,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T TYPES.put(Date.class, DATE_TYPE_INFO); TYPES.put(Void.class, VOID_TYPE_INFO); TYPES.put(void.class, VOID_TYPE_INFO); + TYPES.put(BigInteger.class, BIG_INT_TYPE_INFO); + TYPES.put(BigDecimal.class, BIG_DEC_TYPE_INFO); } } http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecComparator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecComparator.java new file mode 100644 index 0000000..90be11d --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecComparator.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import java.io.IOException; +import java.math.BigDecimal; +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; + +/** + * Comparator for comparing BigDecimal values. Does not support null values. + */ +@Internal +public final class BigDecComparator extends BasicTypeComparator<BigDecimal> { + + private static final long serialVersionUID = 1L; + + private static final long SMALLEST_MAGNITUDE = Integer.MAX_VALUE; + + private static final long LARGEST_MAGNITUDE = ((long) Integer.MIN_VALUE) - Integer.MAX_VALUE + 1; + + public BigDecComparator(boolean ascending) { + super(ascending); + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + BigDecimal bd1 = BigDecSerializer.readBigDecimal(firstSource); + BigDecimal bd2 = BigDecSerializer.readBigDecimal(secondSource); + int comp = bd1.compareTo(bd2); // null is not supported + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return true; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return 5; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return true; + } + + /** + * Adds a normalized key containing a normalized order of magnitude of the given record. + * 2 bits determine the sign (negative, zero, positive), 33 bits determine the magnitude. + * This method adds at most 5 bytes that contain information. + */ + @Override + public void putNormalizedKey(BigDecimal record, MemorySegment target, int offset, int len) { + final long signum = record.signum(); + + // order of magnitude + // smallest: + // scale = Integer.MAX, precision = 1 => SMALLEST_MAGNITUDE + // largest: + // scale = Integer.MIN, precision = Integer.MAX => LARGEST_MAGNITUDE + final long mag = ((long) record.scale()) - ((long) record.precision()) + 1; + + // normalize value range: from 0 to (SMALLEST_MAGNITUDE + -1*LARGEST_MAGNITUDE) + final long normMag = -1L * LARGEST_MAGNITUDE + mag; + + // normalize value range dependent on sign: + // 0 to (SMALLEST_MAGNITUDE + -1*LARGEST_MAGNITUDE) + // OR (SMALLEST_MAGNITUDE + -1*LARGEST_MAGNITUDE) to 0 + // --> uses at most 33 bit (5 least-significant bytes) + long signNormMag = signum < 0 ? normMag : (SMALLEST_MAGNITUDE + -1L * LARGEST_MAGNITUDE - normMag); + + // zero has no magnitude + // set 34th bit to flag zero + if (signum == 0) { + signNormMag = 0L; + signNormMag |= (1L << 34); + } + // set 35th bit to flag positive sign + else if (signum > 0) { + signNormMag |= (1L << 35); + } + + // add 5 least-significant bytes that contain value to target + for (int i = 0; i < 5 && len > 0; i++, len--) { + final byte b = (byte) (signNormMag >>> (8 * (4 - i))); + target.put(offset++, b); + } + } + + @Override + public BigDecComparator duplicate() { + return new BigDecComparator(ascendingComparison); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java new file mode 100644 index 0000000..ebfdb40 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +/** + * Serializer for serializing/deserializing BigDecimal values including null values. + */ +@Internal +public final class BigDecSerializer extends TypeSerializerSingleton<BigDecimal> { + + private static final long serialVersionUID = 1L; + + public static final BigDecSerializer INSTANCE = new BigDecSerializer(); + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public BigDecimal createInstance() { + return BigDecimal.ZERO; + } + + @Override + public BigDecimal copy(BigDecimal from) { + return from; + } + + @Override + public BigDecimal copy(BigDecimal from, BigDecimal reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(BigDecimal record, DataOutputView target) throws IOException { + // null value support + if (record == null) { + BigIntSerializer.writeBigInteger(null, target); + return; + } + // fast paths for 0, 1, 10 + // only reference equality is checked because equals would be too expensive + else if (record == BigDecimal.ZERO) { + BigIntSerializer.writeBigInteger(BigInteger.ZERO, target); + target.writeInt(0); + return; + } + else if (record == BigDecimal.ONE) { + BigIntSerializer.writeBigInteger(BigInteger.ONE, target); + target.writeInt(0); + return; + } + else if (record == BigDecimal.TEN) { + BigIntSerializer.writeBigInteger(BigInteger.TEN, target); + target.writeInt(0); + return; + } + // default + BigIntSerializer.writeBigInteger(record.unscaledValue(), target); + target.writeInt(record.scale()); + } + + @Override + public BigDecimal deserialize(DataInputView source) throws IOException { + return readBigDecimal(source); + } + + @Override + public BigDecimal deserialize(BigDecimal reuse, DataInputView source) throws IOException { + return readBigDecimal(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + final boolean isNull = BigIntSerializer.copyBigInteger(source, target); + if (!isNull) { + final int scale = source.readInt(); + target.writeInt(scale); + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof BigDecSerializer; + } + + // -------------------------------------------------------------------------------------------- + // Static Helpers for BigInteger Serialization + // -------------------------------------------------------------------------------------------- + + public static BigDecimal readBigDecimal(DataInputView source) throws IOException { + final BigInteger unscaledValue = BigIntSerializer.readBigInteger(source); + if (unscaledValue == null) { + return null; + } + final int scale = source.readInt(); + // fast-path for 0, 1, 10 + if (scale == 0) { + if (unscaledValue == BigInteger.ZERO) { + return BigDecimal.ZERO; + } + else if (unscaledValue == BigInteger.ONE) { + return BigDecimal.ONE; + } + else if (unscaledValue == BigInteger.TEN) { + return BigDecimal.TEN; + } + } + // default + return new BigDecimal(unscaledValue, scale); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntComparator.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntComparator.java new file mode 100644 index 0000000..50c5eaa --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntComparator.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import java.io.IOException; +import java.math.BigInteger; +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.MemorySegment; + +/** + * Comparator for comparing BigInteger values. Does not support null values. + */ +@Internal +public final class BigIntComparator extends BasicTypeComparator<BigInteger> { + + private static final long serialVersionUID = 1L; + + public BigIntComparator(boolean ascending) { + super(ascending); + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + BigInteger bi1 = BigIntSerializer.readBigInteger(firstSource); + BigInteger bi2 = BigIntSerializer.readBigInteger(secondSource); + int comp = bi1.compareTo(bi2); // null is not supported + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return true; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return Integer.MAX_VALUE; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return true; + } + + /** + * Adds a normalized key containing the normalized number of bits and MSBs of the given record. + * 1 bit determines the sign (negative, zero/positive), 31 bit the bit length of the record. + * Remaining bytes contain the most significant bits of the record. + */ + @Override + public void putNormalizedKey(BigInteger record, MemorySegment target, int offset, int len) { + // add normalized bit length (the larger the length, the larger the value) + int bitLen = 0; + if (len > 0) { + final int signum = record.signum(); + bitLen = record.bitLength(); + + // normalize dependent on sign + // from 0 to Integer.MAX + // OR from Integer.MAX to 0 + int normBitLen = signum < 0 ? Integer.MAX_VALUE - bitLen : bitLen; + + // add sign + if (signum >= 0) { + normBitLen |= (1 << 31); + } + + for (int i = 0; i < 4 && len > 0; i++, len--) { + final byte b = (byte) (normBitLen >>> (8 * (3 - i))); + target.put(offset++, b); + } + } + + // fill remaining bytes with most significant bits + int bitPos = bitLen - 1; + for (; len > 0; len--) { + byte b = 0; + for (int bytePos = 0; bytePos < 8 && bitPos >= 0; bytePos++, bitPos--) { + b <<= 1; + if (record.testBit(bitPos)) { + b |= 1; + } + } + // the last byte might be partially filled, but that's ok within an equal bit length. + // no need for padding bits. + target.put(offset++, b); + } + } + + @Override + public BigIntComparator duplicate() { + return new BigIntComparator(ascendingComparison); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java new file mode 100644 index 0000000..73b2f54 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import java.io.IOException; +import java.math.BigInteger; +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +/** + * Serializer for serializing/deserializing BigInteger values including null values. + */ +@Internal +public final class BigIntSerializer extends TypeSerializerSingleton<BigInteger> { + + private static final long serialVersionUID = 1L; + + public static final BigIntSerializer INSTANCE = new BigIntSerializer(); + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public BigInteger createInstance() { + return BigInteger.ZERO; + } + + @Override + public BigInteger copy(BigInteger from) { + return from; + } + + @Override + public BigInteger copy(BigInteger from, BigInteger reuse) { + return from; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(BigInteger record, DataOutputView target) throws IOException { + writeBigInteger(record, target); + } + + @Override + public BigInteger deserialize(DataInputView source) throws IOException { + return readBigInteger(source); + } + + @Override + public BigInteger deserialize(BigInteger reuse, DataInputView source) throws IOException { + return readBigInteger(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + copyBigInteger(source, target); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof BigIntSerializer; + } + + // -------------------------------------------------------------------------------------------- + // Static Helpers for BigInteger Serialization + // -------------------------------------------------------------------------------------------- + + public static void writeBigInteger(BigInteger record, DataOutputView target) throws IOException { + // null value support + if (record == null) { + target.writeInt(0); + return; + } + // fast paths for 0, 1, 10 + // only reference equality is checked because equals would be too expensive + else if (record == BigInteger.ZERO) { + target.writeInt(1); + return; + } + else if (record == BigInteger.ONE) { + target.writeInt(2); + return; + } + else if (record == BigInteger.TEN) { + target.writeInt(3); + return; + } + // default + final byte[] bytes = record.toByteArray(); + // the length we write is offset by four, because null and short-paths for ZERO, ONE, and TEN + target.writeInt(bytes.length + 4); + target.write(bytes); + } + + public static BigInteger readBigInteger(DataInputView source) throws IOException { + final int len = source.readInt(); + if (len < 4) { + switch (len) { + case 0: + return null; + case 1: + return BigInteger.ZERO; + case 2: + return BigInteger.ONE; + case 3: + return BigInteger.TEN; + } + } + final byte[] bytes = new byte[len - 4]; + source.read(bytes); + return new BigInteger(bytes); + } + + public static boolean copyBigInteger(DataInputView source, DataOutputView target) throws IOException { + final int len = source.readInt(); + target.writeInt(len); + if (len > 4) { + target.write(source, len - 4); + } + return len == 0; // returns true if the copied record was null + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java index 33d0b69..d20c658 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java @@ -43,6 +43,8 @@ public class TypeInfoParser { private static final Pattern basicTypePattern = Pattern .compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean|Void))(,|>|$|\\[)"); private static final Pattern basicTypeDatePattern = Pattern.compile("^((java\\.util\\.)?Date)(,|>|$|\\[)"); + private static final Pattern basicTypeBigIntPattern = Pattern.compile("^((java\\.math\\.)?BigInteger)(,|>|$|\\[)"); + private static final Pattern basicTypeBigDecPattern = Pattern.compile("^((java\\.math\\.)?BigDecimal)(,|>|$|\\[)"); private static final Pattern primitiveTypePattern = Pattern.compile("^(int|byte|short|char|double|float|long|boolean|void)(,|>|$|\\[)"); private static final Pattern valueTypePattern = Pattern.compile("^((" + VALUE_PACKAGE.replaceAll("\\.", "\\\\.") + "\\.)?(String|Int|Byte|Short|Char|Double|Float|Long|Boolean|List|Map|Null))Value(,|>|$|\\[)"); @@ -109,6 +111,8 @@ public class TypeInfoParser { final Matcher basicTypeMatcher = basicTypePattern.matcher(infoString); final Matcher basicTypeDateMatcher = basicTypeDatePattern.matcher(infoString); + final Matcher basicTypeBigIntMatcher = basicTypeBigIntPattern.matcher(infoString); + final Matcher basicTypeBigDecMatcher = basicTypeBigDecPattern.matcher(infoString); final Matcher primitiveTypeMatcher = primitiveTypePattern.matcher(infoString); @@ -200,6 +204,32 @@ public class TypeInfoParser { } returnType = BasicTypeInfo.getInfoFor(clazz); } + // special basic type "BigInteger" + else if (basicTypeBigIntMatcher.find()) { + String className = basicTypeBigIntMatcher.group(1); + sb.delete(0, className.length()); + Class<?> clazz; + // check if fully qualified + if (className.startsWith("java.math")) { + clazz = loadClass(className); + } else { + clazz = loadClass("java.math." + className); + } + returnType = BasicTypeInfo.getInfoFor(clazz); + } + // special basic type "BigDecimal" + else if (basicTypeBigDecMatcher.find()) { + String className = basicTypeBigDecMatcher.group(1); + sb.delete(0, className.length()); + Class<?> clazz; + // check if fully qualified + if (className.startsWith("java.math")) { + clazz = loadClass(className); + } else { + clazz = loadClass("java.math." + className); + } + returnType = BasicTypeInfo.getInfoFor(clazz); + } // primitive types else if (primitiveTypeMatcher.find()) { String keyword = primitiveTypeMatcher.group(1); http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java index 793688d..401603e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java @@ -334,7 +334,7 @@ public abstract class ComparatorTestBase<T> extends TestLogger { // Get the normKeyLen on which we are testing int normKeyLen = getNormKeyLen(halfLength, data, comparator); - // Write the data into different 2 memory segements + // Write the data into different 2 memory segments MemorySegment memSegLow = setupNormalizedKeysMemSegment(data, normKeyLen, comparator); MemorySegment memSegHigh = setupNormalizedKeysMemSegment(data, normKeyLen, comparator); http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java new file mode 100644 index 0000000..ca5819e --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import java.math.BigDecimal; +import java.math.BigInteger; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +public class BigDecComparatorTest extends ComparatorTestBase<BigDecimal> { + + @Override + protected TypeComparator<BigDecimal> createComparator(boolean ascending) { + return new BigDecComparator(ascending); + } + + @Override + protected TypeSerializer<BigDecimal> createSerializer() { + return new BigDecSerializer(); + } + + @Override + protected BigDecimal[] getSortedTestData() { + return new BigDecimal[] { + new BigDecimal("-12.5E1000"), + new BigDecimal("-12.5E100"), + BigDecimal.valueOf(-12E100), + BigDecimal.valueOf(-10000), + BigDecimal.valueOf(-1.1), + BigDecimal.valueOf(-1), + BigDecimal.valueOf(-0.44), + BigDecimal.ZERO, + new BigDecimal("0.000000000000000000000000001"), + new BigDecimal("0.0000001"), + new BigDecimal("0.1234123413478523984729447"), + BigDecimal.valueOf(1), + BigDecimal.valueOf(1.1), + BigDecimal.TEN, + new BigDecimal("10000"), + BigDecimal.valueOf(12E100), + new BigDecimal("12.5E100"), + new BigDecimal("10E100000"), + new BigDecimal("10E1000000000") + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java new file mode 100644 index 0000000..fd3cbd5 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Random; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * A test for the {@link BigDecSerializer}. + */ +public class BigDecSerializerTest extends SerializerTestBase<BigDecimal> { + + @Override + protected TypeSerializer<BigDecimal> createSerializer() { + return new BigDecSerializer(); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<BigDecimal> getTypeClass() { + return BigDecimal.class; + } + + @Override + protected BigDecimal[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new BigDecimal[] { + BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.TEN, + new BigDecimal(rnd.nextDouble()), new BigDecimal("874597969.1234123413478523984729447"), + BigDecimal.valueOf(-1.444), BigDecimal.valueOf(-10000.888)}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntComparatorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntComparatorTest.java new file mode 100644 index 0000000..ccc31bd --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntComparatorTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import java.math.BigInteger; +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +public class BigIntComparatorTest extends ComparatorTestBase<BigInteger> { + + @Override + protected TypeComparator<BigInteger> createComparator(boolean ascending) { + return new BigIntComparator(ascending); + } + + @Override + protected TypeSerializer<BigInteger> createSerializer() { + return new BigIntSerializer(); + } + + @Override + protected BigInteger[] getSortedTestData() { + return new BigInteger[] { + new BigInteger("-8745979691234123413478523984729447"), + BigInteger.valueOf(-10000), + BigInteger.valueOf(-1), + BigInteger.ZERO, + BigInteger.ONE, + BigInteger.TEN, + new BigInteger("127"), + new BigInteger("128"), + new BigInteger("129"), + new BigInteger("130"), + BigInteger.valueOf(0b10000000_00000000_00000000_00000000L), + BigInteger.valueOf(0b10000000_00000000_00000000_00000001L), + BigInteger.valueOf(0b10000000_00000000_10000000_00000000L), + new BigInteger("8745979691234123413478523984729447") + }; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntSerializerTest.java new file mode 100644 index 0000000..ada9bd6 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntSerializerTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import java.math.BigInteger; +import java.util.Random; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * A test for the {@link BigIntSerializer}. + */ +public class BigIntSerializerTest extends SerializerTestBase<BigInteger> { + + @Override + protected TypeSerializer<BigInteger> createSerializer() { + return new BigIntSerializer(); + } + + @Override + protected int getLength() { + return -1; + } + + @Override + protected Class<BigInteger> getTypeClass() { + return BigInteger.class; + } + + @Override + protected BigInteger[] getTestData() { + Random rnd = new Random(874597969123412341L); + + return new BigInteger[] { + BigInteger.ZERO, BigInteger.ONE, BigInteger.TEN, + new BigInteger(1000, rnd), new BigInteger("8745979691234123413478523984729447"), + BigInteger.valueOf(-1), BigInteger.valueOf(-10000)}; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java index b1f46b7..543c0e9 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.DoubleSerializer; /** - * A test for the {@link DoubleSerializerTest}. + * A test for the {@link DoubleSerializer}. */ public class DoubleSerializerTest extends SerializerTestBase<Double> { http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java index 33e8fbe..b3b17fe 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.FloatSerializer; /** - * A test for the {@link FloatSerializerTest}. + * A test for the {@link FloatSerializer}. */ public class FloatSerializerTest extends SerializerTestBase<Float> { http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java index ce0d249..e829a4b 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.IntSerializer; /** - * A test for the {@link IntSerializerTest}. + * A test for the {@link IntSerializer}. */ public class IntSerializerTest extends SerializerTestBase<Integer> { http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java index 1d4a301..4587f84 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.SerializerTestBase; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; /** - * A test for the {@link LongSerializerTest}. + * A test for the {@link LongSerializer}. */ public class LongSerializerTest extends SerializerTestBase<Long> { http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java index a8f5ded..74c01ef 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java @@ -21,6 +21,8 @@ package org.apache.flink.api.java.typeutils; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -39,6 +41,7 @@ import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -2016,4 +2019,35 @@ public class TypeExtractorTest { TypeExtractor.getMapReturnTypes(function, (TypeInformation) new TupleTypeInfo<Tuple1<Object>>(customTypeInfo)); } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testBigBasicTypes() { + MapFunction<?, ?> function = new MapFunction<Tuple2<BigInteger, BigDecimal>, Tuple2<BigInteger, BigDecimal>>() { + @Override + public Tuple2<BigInteger, BigDecimal> map(Tuple2<BigInteger, BigDecimal> value) throws Exception { + return null; + } + }; + + TypeInformation<?> ti = TypeExtractor.getMapReturnTypes( + function, + (TypeInformation) TypeInformation.of(new TypeHint<Tuple2<BigInteger, BigDecimal>>() { + })); + + Assert.assertTrue(ti.isTupleType()); + TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti; + Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, tti.getTypeAt(0)); + Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, tti.getTypeAt(1)); + + // use getForClass() + Assert.assertTrue(TypeExtractor.getForClass(BigInteger.class).isBasicType()); + Assert.assertTrue(TypeExtractor.getForClass(BigDecimal.class).isBasicType()); + Assert.assertEquals(tti.getTypeAt(0), TypeExtractor.getForClass(BigInteger.class)); + Assert.assertEquals(tti.getTypeAt(1), TypeExtractor.getForClass(BigDecimal.class)); + + // use getForObject() + Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, TypeExtractor.getForObject(new BigInteger("42"))); + Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, TypeExtractor.getForObject(new BigDecimal("42.42"))); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java index e225460..f5fdae4 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java @@ -58,6 +58,8 @@ public class TypeInfoParserTest { Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("Boolean")); Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("Void")); Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, TypeInfoParser.parse("Date")); + Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, TypeInfoParser.parse("BigInteger")); + Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, TypeInfoParser.parse("BigDecimal")); Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("java.lang.Integer")); Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("java.lang.Double")); @@ -70,6 +72,8 @@ public class TypeInfoParserTest { Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeInfoParser.parse("java.lang.Boolean")); Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, TypeInfoParser.parse("java.lang.Void")); Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, TypeInfoParser.parse("java.util.Date")); + Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, TypeInfoParser.parse("java.math.BigInteger")); + Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, TypeInfoParser.parse("java.math.BigDecimal")); Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, TypeInfoParser.parse("int")); Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, TypeInfoParser.parse("double")); http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java index cdd06d0..c090b76 100644 --- a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java +++ b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java @@ -18,6 +18,8 @@ package org.apache.flink.types; +import java.math.BigDecimal; +import java.math.BigInteger; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -30,7 +32,7 @@ public class BasicTypeInfoTest extends TestLogger { static Class<?>[] classes = {String.class, Integer.class, Boolean.class, Byte.class, Short.class, Long.class, Float.class, Double.class, Character.class, Date.class, - Void.class}; + Void.class, BigInteger.class, BigDecimal.class}; @Test public void testBasicTypeInfoEquality() {