Repository: flink
Updated Branches:
  refs/heads/master 290a566cd -> b3231acaa


[FLINK-3786] [core] [api-extending] Add BigDecimal and BigInteger as Basic types

This closes #1928.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3231aca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3231aca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3231aca

Branch: refs/heads/master
Commit: b3231acaa445994a4bd234482364bb74b4bffae3
Parents: 290a566
Author: twalthr <twal...@apache.org>
Authored: Wed Apr 20 15:57:39 2016 +0200
Committer: twalthr <twal...@apache.org>
Committed: Mon May 2 10:54:55 2016 +0200

----------------------------------------------------------------------
 docs/internals/types_serialization.md           |   2 +-
 .../api/common/typeinfo/BasicTypeInfo.java      |  13 +-
 .../common/typeutils/base/BigDecComparator.java | 118 +++++++++++++++
 .../common/typeutils/base/BigDecSerializer.java | 141 ++++++++++++++++++
 .../common/typeutils/base/BigIntComparator.java | 116 +++++++++++++++
 .../common/typeutils/base/BigIntSerializer.java | 145 +++++++++++++++++++
 .../api/java/typeutils/TypeInfoParser.java      |  30 ++++
 .../common/typeutils/ComparatorTestBase.java    |   2 +-
 .../typeutils/base/BigDecComparatorTest.java    |  63 ++++++++
 .../typeutils/base/BigDecSerializerTest.java    |  56 +++++++
 .../typeutils/base/BigIntComparatorTest.java    |  57 ++++++++
 .../typeutils/base/BigIntSerializerTest.java    |  55 +++++++
 .../typeutils/base/DoubleSerializerTest.java    |   2 +-
 .../typeutils/base/FloatSerializerTest.java     |   2 +-
 .../typeutils/base/IntSerializerTest.java       |   2 +-
 .../typeutils/base/LongSerializerTest.java      |   2 +-
 .../api/java/typeutils/TypeExtractorTest.java   |  34 +++++
 .../api/java/typeutils/TypeInfoParserTest.java  |   4 +
 .../apache/flink/types/BasicTypeInfoTest.java   |   4 +-
 19 files changed, 840 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/docs/internals/types_serialization.md
----------------------------------------------------------------------
diff --git a/docs/internals/types_serialization.md 
b/docs/internals/types_serialization.md
index 7ff21f2..a657d9f 100644
--- a/docs/internals/types_serialization.md
+++ b/docs/internals/types_serialization.md
@@ -64,7 +64,7 @@ and, in specializations, comparators for the types.
 
 Internally, Flink makes the following distinctions between types:
 
-* Basic types: All Java primitives and their boxed form, plus `void`, 
`String`, and `Date`.
+* Basic types: All Java primitives and their boxed form, plus `void`, 
`String`, `Date`, `BigDecimal`, and `BigInteger`.
 
 * Primitive arrays and Object arrays
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
index c1d5605..e2fd74e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/BasicTypeInfo.java
@@ -19,6 +19,8 @@
 package org.apache.flink.api.common.typeinfo;
 
 import java.lang.reflect.Constructor;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
@@ -31,6 +33,10 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.InvalidTypesException;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BigDecComparator;
+import org.apache.flink.api.common.typeutils.base.BigDecSerializer;
+import org.apache.flink.api.common.typeutils.base.BigIntComparator;
+import org.apache.flink.api.common.typeutils.base.BigIntSerializer;
 import org.apache.flink.api.common.typeutils.base.BooleanComparator;
 import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
 import org.apache.flink.api.common.typeutils.base.ByteComparator;
@@ -56,7 +62,8 @@ import 
org.apache.flink.api.common.typeutils.base.VoidSerializer;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * Type information for primitive types (int, long, double, byte, ...), 
String, Date, and Void.
+ * Type information for primitive types (int, long, double, byte, ...), 
String, Date, Void,
+ * BigInteger, and BigDecimal.
  */
 @Public
 public class BasicTypeInfo<T> extends TypeInformation<T> implements 
AtomicType<T> {
@@ -74,6 +81,8 @@ public class BasicTypeInfo<T> extends TypeInformation<T> 
implements AtomicType<T
        public static final BasicTypeInfo<Character> CHAR_TYPE_INFO = new 
BasicTypeInfo<Character>(Character.class, new Class<?>[]{}, 
CharSerializer.INSTANCE, CharComparator.class);
        public static final BasicTypeInfo<Date> DATE_TYPE_INFO = new 
BasicTypeInfo<Date>(Date.class, new Class<?>[]{}, DateSerializer.INSTANCE, 
DateComparator.class);
        public static final BasicTypeInfo<Void> VOID_TYPE_INFO = new 
BasicTypeInfo<Void>(Void.class, new Class<?>[]{}, VoidSerializer.INSTANCE, 
null);
+       public static final BasicTypeInfo<BigInteger> BIG_INT_TYPE_INFO = new 
BasicTypeInfo<BigInteger>(BigInteger.class, new Class<?>[]{}, 
BigIntSerializer.INSTANCE, BigIntComparator.class);
+       public static final BasicTypeInfo<BigDecimal> BIG_DEC_TYPE_INFO = new 
BasicTypeInfo<BigDecimal>(BigDecimal.class, new Class<?>[]{}, 
BigDecSerializer.INSTANCE, BigDecComparator.class);
 
        // 
--------------------------------------------------------------------------------------------
 
@@ -240,5 +249,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> 
implements AtomicType<T
                TYPES.put(Date.class, DATE_TYPE_INFO);
                TYPES.put(Void.class, VOID_TYPE_INFO);
                TYPES.put(void.class, VOID_TYPE_INFO);
+               TYPES.put(BigInteger.class, BIG_INT_TYPE_INFO);
+               TYPES.put(BigDecimal.class, BIG_DEC_TYPE_INFO);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecComparator.java
new file mode 100644
index 0000000..90be11d
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecComparator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * Comparator for comparing BigDecimal values. Does not support null values.
+ */
+@Internal
+public final class BigDecComparator extends BasicTypeComparator<BigDecimal> {
+
+       private static final long serialVersionUID = 1L;
+
+       private static final long SMALLEST_MAGNITUDE = Integer.MAX_VALUE;
+
+       private static final long LARGEST_MAGNITUDE = ((long) 
Integer.MIN_VALUE) - Integer.MAX_VALUE + 1;
+
+       public BigDecComparator(boolean ascending) {
+               super(ascending);
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               BigDecimal bd1 = BigDecSerializer.readBigDecimal(firstSource);
+               BigDecimal bd2 = BigDecSerializer.readBigDecimal(secondSource);
+               int comp = bd1.compareTo(bd2); // null is not supported
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return true;
+       }
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return 5;
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return true;
+       }
+
+       /**
+        * Adds a normalized key containing a normalized order of magnitude of 
the given record.
+        * 2 bits determine the sign (negative, zero, positive), 33 bits 
determine the magnitude.
+        * This method adds at most 5 bytes that contain information.
+        */
+       @Override
+       public void putNormalizedKey(BigDecimal record, MemorySegment target, 
int offset, int len) {
+               final long signum = record.signum();
+
+               // order of magnitude
+               // smallest:
+               // scale = Integer.MAX, precision = 1 => SMALLEST_MAGNITUDE
+               // largest:
+               // scale = Integer.MIN, precision = Integer.MAX => 
LARGEST_MAGNITUDE
+               final long mag = ((long) record.scale()) - ((long) 
record.precision()) + 1;
+
+               // normalize value range: from 0 to (SMALLEST_MAGNITUDE + 
-1*LARGEST_MAGNITUDE)
+               final long normMag = -1L * LARGEST_MAGNITUDE + mag;
+
+               // normalize value range dependent on sign:
+               // 0 to (SMALLEST_MAGNITUDE + -1*LARGEST_MAGNITUDE)
+               // OR (SMALLEST_MAGNITUDE + -1*LARGEST_MAGNITUDE) to 0
+               // --> uses at most 33 bit (5 least-significant bytes)
+               long signNormMag = signum < 0 ? normMag : (SMALLEST_MAGNITUDE + 
-1L * LARGEST_MAGNITUDE - normMag);
+
+               // zero has no magnitude
+               // set 34th bit to flag zero
+               if (signum == 0) {
+                       signNormMag = 0L;
+                       signNormMag |= (1L << 34);
+               }
+               // set 35th bit to flag positive sign
+               else if (signum > 0) {
+                       signNormMag |= (1L << 35);
+               }
+
+               // add 5 least-significant bytes that contain value to target
+               for (int i = 0; i < 5 && len > 0; i++, len--) {
+                       final byte b = (byte) (signNormMag >>> (8 * (4 - i)));
+                       target.put(offset++, b);
+               }
+       }
+
+       @Override
+       public BigDecComparator duplicate() {
+               return new BigDecComparator(ascendingComparison);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
new file mode 100644
index 0000000..ebfdb40
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigDecSerializer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * Serializer for serializing/deserializing BigDecimal values including null 
values.
+ */
+@Internal
+public final class BigDecSerializer extends 
TypeSerializerSingleton<BigDecimal> {
+
+       private static final long serialVersionUID = 1L;
+
+       public static final BigDecSerializer INSTANCE = new BigDecSerializer();
+
+       @Override
+       public boolean isImmutableType() {
+               return true;
+       }
+
+       @Override
+       public BigDecimal createInstance() {
+               return BigDecimal.ZERO;
+       }
+
+       @Override
+       public BigDecimal copy(BigDecimal from) {
+               return from;
+       }
+       
+       @Override
+       public BigDecimal copy(BigDecimal from, BigDecimal reuse) {
+               return from;
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(BigDecimal record, DataOutputView target) throws 
IOException {
+               // null value support
+               if (record == null) {
+                       BigIntSerializer.writeBigInteger(null, target);
+                       return;
+               }
+               // fast paths for 0, 1, 10
+               // only reference equality is checked because equals would be 
too expensive
+               else if (record == BigDecimal.ZERO) {
+                       BigIntSerializer.writeBigInteger(BigInteger.ZERO, 
target);
+                       target.writeInt(0);
+                       return;
+               }
+               else if (record == BigDecimal.ONE) {
+                       BigIntSerializer.writeBigInteger(BigInteger.ONE, 
target);
+                       target.writeInt(0);
+                       return;
+               }
+               else if (record == BigDecimal.TEN) {
+                       BigIntSerializer.writeBigInteger(BigInteger.TEN, 
target);
+                       target.writeInt(0);
+                       return;
+               }
+               // default
+               BigIntSerializer.writeBigInteger(record.unscaledValue(), 
target);
+               target.writeInt(record.scale());
+       }
+
+       @Override
+       public BigDecimal deserialize(DataInputView source) throws IOException {
+               return readBigDecimal(source);
+       }
+
+       @Override
+       public BigDecimal deserialize(BigDecimal reuse, DataInputView source) 
throws IOException {
+               return readBigDecimal(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               final boolean isNull = BigIntSerializer.copyBigInteger(source, 
target);
+               if (!isNull) {
+                       final int scale = source.readInt();
+                       target.writeInt(scale);
+               }
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof BigDecSerializer;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //                           Static Helpers for BigInteger Serialization
+       // 
--------------------------------------------------------------------------------------------
+
+       public static BigDecimal readBigDecimal(DataInputView source) throws 
IOException {
+               final BigInteger unscaledValue = 
BigIntSerializer.readBigInteger(source);
+               if (unscaledValue == null) {
+                       return null;
+               }
+               final int scale = source.readInt();
+               // fast-path for 0, 1, 10
+               if (scale == 0) {
+                       if (unscaledValue == BigInteger.ZERO) {
+                               return BigDecimal.ZERO;
+                       }
+                       else if (unscaledValue == BigInteger.ONE) {
+                               return BigDecimal.ONE;
+                       }
+                       else if (unscaledValue == BigInteger.TEN) {
+                               return BigDecimal.TEN;
+                       }
+               }
+               // default
+               return new BigDecimal(unscaledValue, scale);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntComparator.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntComparator.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntComparator.java
new file mode 100644
index 0000000..50c5eaa
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntComparator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+
+/**
+ * Comparator for comparing BigInteger values. Does not support null values.
+ */
+@Internal
+public final class BigIntComparator extends BasicTypeComparator<BigInteger> {
+
+       private static final long serialVersionUID = 1L;
+
+       public BigIntComparator(boolean ascending) {
+               super(ascending);
+       }
+
+       @Override
+       public int compareSerialized(DataInputView firstSource, DataInputView 
secondSource) throws IOException {
+               BigInteger bi1 = BigIntSerializer.readBigInteger(firstSource);
+               BigInteger bi2 = BigIntSerializer.readBigInteger(secondSource);
+               int comp = bi1.compareTo(bi2); // null is not supported
+               return ascendingComparison ? comp : -comp;
+       }
+
+       @Override
+       public boolean supportsNormalizedKey() {
+               return true;
+       }
+
+       @Override
+       public boolean supportsSerializationWithKeyNormalization() {
+               return false;
+       }
+
+       @Override
+       public int getNormalizeKeyLen() {
+               return Integer.MAX_VALUE;
+       }
+
+       @Override
+       public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+               return true;
+       }
+
+       /**
+        * Adds a normalized key containing the normalized number of bits and 
MSBs of the given record.
+        * 1 bit determines the sign (negative, zero/positive), 31 bit the bit 
length of the record.
+        * Remaining bytes contain the most significant bits of the record.
+        */
+       @Override
+       public void putNormalizedKey(BigInteger record, MemorySegment target, 
int offset, int len) {
+               // add normalized bit length (the larger the length, the larger 
the value)
+               int bitLen = 0;
+               if (len > 0) {
+                       final int signum = record.signum();
+                       bitLen = record.bitLength();
+
+                       // normalize dependent on sign
+                       // from 0 to Integer.MAX
+                       // OR from Integer.MAX to 0
+                       int normBitLen = signum < 0 ? Integer.MAX_VALUE - 
bitLen : bitLen;
+
+                       // add sign
+                       if (signum >= 0) {
+                               normBitLen |= (1 << 31);
+                       }
+
+                       for (int i = 0; i < 4 && len > 0; i++, len--) {
+                               final byte b = (byte) (normBitLen >>> (8 * (3 - 
i)));
+                               target.put(offset++, b);
+                       }
+               }
+
+               // fill remaining bytes with most significant bits
+               int bitPos = bitLen - 1;
+               for (; len > 0; len--) {
+                       byte b = 0;
+                       for (int bytePos = 0; bytePos < 8 && bitPos >= 0; 
bytePos++, bitPos--) {
+                               b <<= 1;
+                               if (record.testBit(bitPos)) {
+                                       b |= 1;
+                               }
+                       }
+                       // the last byte might be partially filled, but that's 
ok within an equal bit length.
+                       // no need for padding bits.
+                       target.put(offset++, b);
+               }
+       }
+
+       @Override
+       public BigIntComparator duplicate() {
+               return new BigIntComparator(ascendingComparison);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
new file mode 100644
index 0000000..73b2f54
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+/**
+ * Serializer for serializing/deserializing BigInteger values including null 
values.
+ */
+@Internal
+public final class BigIntSerializer extends 
TypeSerializerSingleton<BigInteger> {
+
+       private static final long serialVersionUID = 1L;
+
+       public static final BigIntSerializer INSTANCE = new BigIntSerializer();
+
+       @Override
+       public boolean isImmutableType() {
+               return true;
+       }
+
+       @Override
+       public BigInteger createInstance() {
+               return BigInteger.ZERO;
+       }
+
+       @Override
+       public BigInteger copy(BigInteger from) {
+               return from;
+       }
+       
+       @Override
+       public BigInteger copy(BigInteger from, BigInteger reuse) {
+               return from;
+       }
+
+       @Override
+       public int getLength() {
+               return -1;
+       }
+
+       @Override
+       public void serialize(BigInteger record, DataOutputView target) throws 
IOException {
+               writeBigInteger(record, target);
+       }
+
+       @Override
+       public BigInteger deserialize(DataInputView source) throws IOException {
+               return readBigInteger(source);
+       }
+       
+       @Override
+       public BigInteger deserialize(BigInteger reuse, DataInputView source) 
throws IOException {
+               return readBigInteger(source);
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               copyBigInteger(source, target);
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return obj instanceof BigIntSerializer;
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       //                           Static Helpers for BigInteger Serialization
+       // 
--------------------------------------------------------------------------------------------
+
+       public static void writeBigInteger(BigInteger record, DataOutputView 
target) throws IOException {
+               // null value support
+               if (record == null) {
+                       target.writeInt(0);
+                       return;
+               }
+               // fast paths for 0, 1, 10
+               // only reference equality is checked because equals would be 
too expensive
+               else if (record == BigInteger.ZERO) {
+                       target.writeInt(1);
+                       return;
+               }
+               else if (record == BigInteger.ONE) {
+                       target.writeInt(2);
+                       return;
+               }
+               else if (record == BigInteger.TEN) {
+                       target.writeInt(3);
+                       return;
+               }
+               // default
+               final byte[] bytes = record.toByteArray();
+               // the length we write is offset by four, because null and 
short-paths for ZERO, ONE, and TEN
+               target.writeInt(bytes.length + 4);
+               target.write(bytes);
+       }
+
+       public static BigInteger readBigInteger(DataInputView source) throws 
IOException {
+               final int len = source.readInt();
+               if (len < 4) {
+                       switch (len) {
+                               case 0:
+                                       return null;
+                               case 1:
+                                       return BigInteger.ZERO;
+                               case 2:
+                                       return BigInteger.ONE;
+                               case 3:
+                                       return BigInteger.TEN;
+                       }
+               }
+               final byte[] bytes = new byte[len - 4];
+               source.read(bytes);
+               return new BigInteger(bytes);
+       }
+
+       public static boolean copyBigInteger(DataInputView source, 
DataOutputView target) throws IOException {
+               final int len = source.readInt();
+               target.writeInt(len);
+               if (len > 4) {
+                       target.write(source, len - 4);
+               }
+               return len == 0; // returns true if the copied record was null
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
index 33d0b69..d20c658 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeInfoParser.java
@@ -43,6 +43,8 @@ public class TypeInfoParser {
        private static final Pattern basicTypePattern = Pattern
                        
.compile("^((java\\.lang\\.)?(String|Integer|Byte|Short|Character|Double|Float|Long|Boolean|Void))(,|>|$|\\[)");
        private static final Pattern basicTypeDatePattern = 
Pattern.compile("^((java\\.util\\.)?Date)(,|>|$|\\[)");
+       private static final Pattern basicTypeBigIntPattern = 
Pattern.compile("^((java\\.math\\.)?BigInteger)(,|>|$|\\[)");
+       private static final Pattern basicTypeBigDecPattern = 
Pattern.compile("^((java\\.math\\.)?BigDecimal)(,|>|$|\\[)");
        private static final Pattern primitiveTypePattern = 
Pattern.compile("^(int|byte|short|char|double|float|long|boolean|void)(,|>|$|\\[)");
        private static final Pattern valueTypePattern = Pattern.compile("^((" + 
VALUE_PACKAGE.replaceAll("\\.", "\\\\.")
                        + 
"\\.)?(String|Int|Byte|Short|Char|Double|Float|Long|Boolean|List|Map|Null))Value(,|>|$|\\[)");
@@ -109,6 +111,8 @@ public class TypeInfoParser {
 
                final Matcher basicTypeMatcher = 
basicTypePattern.matcher(infoString);
                final Matcher basicTypeDateMatcher = 
basicTypeDatePattern.matcher(infoString);
+               final Matcher basicTypeBigIntMatcher = 
basicTypeBigIntPattern.matcher(infoString);
+               final Matcher basicTypeBigDecMatcher = 
basicTypeBigDecPattern.matcher(infoString);
 
                final Matcher primitiveTypeMatcher = 
primitiveTypePattern.matcher(infoString);
 
@@ -200,6 +204,32 @@ public class TypeInfoParser {
                        }
                        returnType = BasicTypeInfo.getInfoFor(clazz);
                }
+               // special basic type "BigInteger"
+               else if (basicTypeBigIntMatcher.find()) {
+                       String className = basicTypeBigIntMatcher.group(1);
+                       sb.delete(0, className.length());
+                       Class<?> clazz;
+                       // check if fully qualified
+                       if (className.startsWith("java.math")) {
+                               clazz = loadClass(className);
+                       } else {
+                               clazz = loadClass("java.math." + className);
+                       }
+                       returnType = BasicTypeInfo.getInfoFor(clazz);
+               }
+               // special basic type "BigDecimal"
+               else if (basicTypeBigDecMatcher.find()) {
+                       String className = basicTypeBigDecMatcher.group(1);
+                       sb.delete(0, className.length());
+                       Class<?> clazz;
+                       // check if fully qualified
+                       if (className.startsWith("java.math")) {
+                               clazz = loadClass(className);
+                       } else {
+                               clazz = loadClass("java.math." + className);
+                       }
+                       returnType = BasicTypeInfo.getInfoFor(clazz);
+               }
                // primitive types
                else if (primitiveTypeMatcher.find()) {
                        String keyword = primitiveTypeMatcher.group(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
index 793688d..401603e 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/ComparatorTestBase.java
@@ -334,7 +334,7 @@ public abstract class ComparatorTestBase<T> extends 
TestLogger {
                        // Get the normKeyLen on which we are testing
                        int normKeyLen = getNormKeyLen(halfLength, data, 
comparator);
 
-                       // Write the data into different 2 memory segements
+                       // Write the data into different 2 memory segments
                        MemorySegment memSegLow = 
setupNormalizedKeysMemSegment(data, normKeyLen, comparator);
                        MemorySegment memSegHigh = 
setupNormalizedKeysMemSegment(data, normKeyLen, comparator);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
new file mode 100644
index 0000000..ca5819e
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecComparatorTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class BigDecComparatorTest extends ComparatorTestBase<BigDecimal> {
+
+       @Override
+       protected TypeComparator<BigDecimal> createComparator(boolean 
ascending) {
+               return new BigDecComparator(ascending);
+       }
+
+       @Override
+       protected TypeSerializer<BigDecimal> createSerializer() {
+               return new BigDecSerializer();
+       }
+
+       @Override
+       protected BigDecimal[] getSortedTestData() {
+               return new BigDecimal[] {
+                       new BigDecimal("-12.5E1000"),
+                       new BigDecimal("-12.5E100"),
+                       BigDecimal.valueOf(-12E100),
+                       BigDecimal.valueOf(-10000),
+                       BigDecimal.valueOf(-1.1),
+                       BigDecimal.valueOf(-1),
+                       BigDecimal.valueOf(-0.44),
+                       BigDecimal.ZERO,
+                       new BigDecimal("0.000000000000000000000000001"),
+                       new BigDecimal("0.0000001"),
+                       new BigDecimal("0.1234123413478523984729447"),
+                       BigDecimal.valueOf(1),
+                       BigDecimal.valueOf(1.1),
+                       BigDecimal.TEN,
+                       new BigDecimal("10000"),
+                       BigDecimal.valueOf(12E100),
+                       new BigDecimal("12.5E100"),
+                       new BigDecimal("10E100000"),
+                       new BigDecimal("10E1000000000")
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
new file mode 100644
index 0000000..fd3cbd5
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigDecSerializerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.Random;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A test for the {@link BigDecSerializer}.
+ */
+public class BigDecSerializerTest extends SerializerTestBase<BigDecimal> {
+
+       @Override
+       protected TypeSerializer<BigDecimal> createSerializer() {
+               return new BigDecSerializer();
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<BigDecimal> getTypeClass() {
+               return BigDecimal.class;
+       }
+
+       @Override
+       protected BigDecimal[] getTestData() {
+               Random rnd = new Random(874597969123412341L);
+
+               return new BigDecimal[] {
+                       BigDecimal.ZERO, BigDecimal.ONE, BigDecimal.TEN,
+                       new BigDecimal(rnd.nextDouble()), new 
BigDecimal("874597969.1234123413478523984729447"),
+                       BigDecimal.valueOf(-1.444), 
BigDecimal.valueOf(-10000.888)};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntComparatorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntComparatorTest.java
new file mode 100644
index 0000000..ccc31bd
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntComparatorTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.math.BigInteger;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+public class BigIntComparatorTest extends ComparatorTestBase<BigInteger> {
+
+       @Override
+       protected TypeComparator<BigInteger> createComparator(boolean 
ascending) {
+               return new BigIntComparator(ascending);
+       }
+
+       @Override
+       protected TypeSerializer<BigInteger> createSerializer() {
+               return new BigIntSerializer();
+       }
+
+       @Override
+       protected BigInteger[] getSortedTestData() {
+               return new BigInteger[] {
+                       new BigInteger("-8745979691234123413478523984729447"),
+                       BigInteger.valueOf(-10000),
+                       BigInteger.valueOf(-1),
+                       BigInteger.ZERO,
+                       BigInteger.ONE,
+                       BigInteger.TEN,
+                       new BigInteger("127"),
+                       new BigInteger("128"),
+                       new BigInteger("129"),
+                       new BigInteger("130"),
+                       
BigInteger.valueOf(0b10000000_00000000_00000000_00000000L),
+                       
BigInteger.valueOf(0b10000000_00000000_00000000_00000001L),
+                       
BigInteger.valueOf(0b10000000_00000000_10000000_00000000L),
+                       new BigInteger("8745979691234123413478523984729447")
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntSerializerTest.java
new file mode 100644
index 0000000..ada9bd6
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BigIntSerializerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeutils.base;
+
+import java.math.BigInteger;
+import java.util.Random;
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * A test for the {@link BigIntSerializer}.
+ */
+public class BigIntSerializerTest extends SerializerTestBase<BigInteger> {
+
+       @Override
+       protected TypeSerializer<BigInteger> createSerializer() {
+               return new BigIntSerializer();
+       }
+
+       @Override
+       protected int getLength() {
+               return -1;
+       }
+
+       @Override
+       protected Class<BigInteger> getTypeClass() {
+               return BigInteger.class;
+       }
+
+       @Override
+       protected BigInteger[] getTestData() {
+               Random rnd = new Random(874597969123412341L);
+
+               return new BigInteger[] {
+                       BigInteger.ZERO, BigInteger.ONE, BigInteger.TEN,
+                       new BigInteger(1000, rnd), new 
BigInteger("8745979691234123413478523984729447"),
+                       BigInteger.valueOf(-1), BigInteger.valueOf(-10000)};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
index b1f46b7..543c0e9 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleSerializerTest.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
 /**
- * A test for the {@link DoubleSerializerTest}.
+ * A test for the {@link DoubleSerializer}.
  */
 public class DoubleSerializerTest extends SerializerTestBase<Double> {
        

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java
index 33e8fbe..b3b17fe 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatSerializerTest.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 /**
- * A test for the {@link FloatSerializerTest}.
+ * A test for the {@link FloatSerializer}.
  */
 public class FloatSerializerTest extends SerializerTestBase<Float> {
        

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java
index ce0d249..e829a4b 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntSerializerTest.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 /**
- * A test for the {@link IntSerializerTest}.
+ * A test for the {@link IntSerializer}.
  */
 public class IntSerializerTest extends SerializerTestBase<Integer> {
        

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java
index 1d4a301..4587f84 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongSerializerTest.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.api.common.typeutils.SerializerTestBase;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 /**
- * A test for the {@link LongSerializerTest}.
+ * A test for the {@link LongSerializer}.
  */
 public class LongSerializerTest extends SerializerTestBase<Long> {
        

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
index a8f5ded..74c01ef 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeExtractorTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.api.java.typeutils;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -39,6 +41,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -2016,4 +2019,35 @@ public class TypeExtractorTest {
                TypeExtractor.getMapReturnTypes(function,
                        (TypeInformation) new 
TupleTypeInfo<Tuple1<Object>>(customTypeInfo));
        }
+
+       @SuppressWarnings({ "rawtypes", "unchecked" })
+       @Test
+       public void testBigBasicTypes() {
+               MapFunction<?, ?> function = new MapFunction<Tuple2<BigInteger, 
BigDecimal>, Tuple2<BigInteger, BigDecimal>>() {
+                       @Override
+                       public Tuple2<BigInteger, BigDecimal> 
map(Tuple2<BigInteger, BigDecimal> value) throws Exception {
+                               return null;
+                       }
+               };
+
+               TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(
+                       function,
+                       (TypeInformation) TypeInformation.of(new 
TypeHint<Tuple2<BigInteger, BigDecimal>>() {
+               }));
+
+               Assert.assertTrue(ti.isTupleType());
+               TupleTypeInfo<?> tti = (TupleTypeInfo<?>) ti;
+               Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, 
tti.getTypeAt(0));
+               Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, 
tti.getTypeAt(1));
+
+               // use getForClass()
+               
Assert.assertTrue(TypeExtractor.getForClass(BigInteger.class).isBasicType());
+               
Assert.assertTrue(TypeExtractor.getForClass(BigDecimal.class).isBasicType());
+               Assert.assertEquals(tti.getTypeAt(0), 
TypeExtractor.getForClass(BigInteger.class));
+               Assert.assertEquals(tti.getTypeAt(1), 
TypeExtractor.getForClass(BigDecimal.class));
+
+               // use getForObject()
+               Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, 
TypeExtractor.getForObject(new BigInteger("42")));
+               Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, 
TypeExtractor.getForObject(new BigDecimal("42.42")));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
index e225460..f5fdae4 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoParserTest.java
@@ -58,6 +58,8 @@ public class TypeInfoParserTest {
                Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
TypeInfoParser.parse("Boolean"));
                Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, 
TypeInfoParser.parse("Void"));
                Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, 
TypeInfoParser.parse("Date"));
+               Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, 
TypeInfoParser.parse("BigInteger"));
+               Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, 
TypeInfoParser.parse("BigDecimal"));
                
                Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
TypeInfoParser.parse("java.lang.Integer"));
                Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
TypeInfoParser.parse("java.lang.Double"));
@@ -70,6 +72,8 @@ public class TypeInfoParserTest {
                Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, 
TypeInfoParser.parse("java.lang.Boolean"));
                Assert.assertEquals(BasicTypeInfo.VOID_TYPE_INFO, 
TypeInfoParser.parse("java.lang.Void"));
                Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, 
TypeInfoParser.parse("java.util.Date"));
+               Assert.assertEquals(BasicTypeInfo.BIG_INT_TYPE_INFO, 
TypeInfoParser.parse("java.math.BigInteger"));
+               Assert.assertEquals(BasicTypeInfo.BIG_DEC_TYPE_INFO, 
TypeInfoParser.parse("java.math.BigDecimal"));
                
                Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, 
TypeInfoParser.parse("int"));
                Assert.assertEquals(BasicTypeInfo.DOUBLE_TYPE_INFO, 
TypeInfoParser.parse("double"));

http://git-wip-us.apache.org/repos/asf/flink/blob/b3231aca/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java 
b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
index cdd06d0..c090b76 100644
--- a/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/BasicTypeInfoTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.types;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -30,7 +32,7 @@ public class BasicTypeInfoTest extends TestLogger {
 
        static Class<?>[] classes = {String.class, Integer.class, 
Boolean.class, Byte.class,
                Short.class, Long.class, Float.class, Double.class, 
Character.class, Date.class,
-               Void.class};
+               Void.class, BigInteger.class, BigDecimal.class};
 
        @Test
        public void testBasicTypeInfoEquality() {

Reply via email to