I'm attaching the patch that I put together for decimal.
This includes:
* Decimal schema translation from Avro to Parquet
- Need to add date, time, timestamp
- Need to add Parquet to Avro support
* Read-side support for any Avro logical type
* Special write-side support for decimal
- This was added to fixed and bytes
- We should try to make this more generic
This was based on a branch of Avro with logical type support, so where
Avro classes are missing we would need to copy them into Parquet. The
only other fix is that Schema doesn't have a getLogicalType accessor
until 1.8.0, so we will need to use this instead:
LogicalTypes.fromSchemaIgnoreInvalid(schema)
I think it is pretty straight-forward, but if you have questions let me
know.
rb
On 10/15/2015 10:24 AM, Chris Mathews wrote:
Hi Ryan
Thanks for this - it sounds just what we need.
How do we go about doing a trial of the local copies with our code ?
It would be good to check this all out now if 1.8.0 is delayed for a while ?
contact me by https://drillers.slack.com/messages/dev/team/cmathews/ to discuss.
Cheers — Chris
On 15 Oct 2015, at 17:59, Julien Le Dem <[email protected]> wrote:
thanks Ryan!
(cc parquet dev list as well)
On Thu, Oct 15, 2015 at 9:46 AM, Ryan Blue <[email protected]> wrote:
Hi Chris,
Avro does have support for dates, but it hasn't been released yet because
1.8.0 was blocked on license issues (AVRO-1722). I have a branch with
preliminary parquet-avro support for Decimal (which uses the same Avro
construct) if you would like to take a look at it.
What we can do in the mean time, before Avro's logical type support is
released, is to copy the logical types and conversions into parquet-avro
and use local copies until we can use the correct upstream ones. If you're
interested, ping me and we can get it working.
rb
On 10/15/2015 09:24 AM, Julien Le Dem wrote:
Hi Chris,
You could probably contribute some sort of type annotation to
parquet-avro so that it produces the data type in the Parquet schema.
This class generates a Parquet schema from the Avro schema:
https://github.com/apache/parquet-mr/blob/master/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java
It looks like Avro has some annotation support but I have never used it.
http://avro.apache.org/docs/1.4.1/idl.html#minutiae_annotations
(CC'ing Ryan, who knows the avro integration much better)
On Thu, Oct 15, 2015 at 8:45 AM, Chris Mathews <[email protected]
<mailto:[email protected]>> wrote:
Thank you Jacques - yes this is exactly the issue I am having.
We are currently using Avro to define schemas for our Parquet files,
and as you correctly point out there is no way of defining date
types in Avro. Due to the volumes of data we are dealing with,
using CTAS is not an option for us as we need to create the Parquet
files on the fly rather than create intermediate files for use as
input to the CTAS.
Since Avro is the restriction here we need to come up with some way
of defining our Parquet schemas so the date types can be defined
natively; and as you say there will be no need for any casting again.
What we really need is some way of creating a schema for use in a
Java app, similar to the way Avro works using JSON, for ease of
implementation but without the restrictions.
We are still investigating this approach, but for now we are casting !
Cheers — Chris
On 15 Oct 2015, at 16:19, Jacques Nadeau <[email protected]
<mailto:[email protected]>> wrote:
A little clarification here:
Parquet has native support for date types. Drill does too.
However, since
Avro does not, there is no way that I know of to write a Parquet
file via
the Avro adapter that will not require a cast. If you did a CTAS
in Drill
and cast the data types correctly in the CTAS, then Drill will
output
Parquet files that never need casting again.
--
Jacques Nadeau
CTO and Co-Founder, Dremio
On Thu, Oct 15, 2015 at 7:31 AM, Stefán Baxter
<[email protected] <mailto:[email protected]>>
wrote:
Hi Chris,
I understand now, thank you.
What threw me off was that, in our standard use-case, we are not
using cast
for our TIMESTAMP_MILLIS fields and I thought we were getting
them directly
formatted from Parquet but then I overlooked our UDF that is
handling the
casting... sorry :).
Thank you for taking the time!
Regards,
-Stefan
On Thu, Oct 15, 2015 at 2:21 PM, Chris Mathews <[email protected]
<mailto:[email protected]>> wrote:
Hi Stefan
I am not sure I fully understand your question 'why you don't
seem to be
storing your dates in Parquet Date files.'
As far as I am aware all date types in Parquet (ie: DATE,
TIME_MILLIS,
TIMESTAMP_MILLIS) are all stored as either in int32 or int64
annotated
types. The only other option is to store them as strings (or
VARCHAR) and
interpret them as required when selecting from the Parquet files.
Please let me know if I have understood your question correctly
or not.
What I have not acheived yet is to use Avro schema definitions
(via JSON)
to define a TIMESTAMP type, which is why we have gone the route
of
defining
a VIEW for each Parquet file. By doing this we reduce the amount
of
casting
we have to do when building the query since the VIEW
effectively does all
the casting for us behind the scenes.
We are currently looking at possibility of defining Parquet
schemas
directly (using java) without going the Avro route; in other
words
produce
a parser from JSON to Parquet, similar to the Avro parser but
supporting
some other logical types.
Some background to our drill trials:
We are generating billions of columns from machine generated
data every
day. There a quite a number of different types of machine
generating this
data and the format of the data varies between machines. Some
produce
string format dates/timestamps and others numeric (unix epoch
style), and
we have to normalise this data to a common format; for dates
this format
is
the Parquet TIMESTAMP_MILLIS type because we need to use
millisecond
granularity when available.
quote from Parquet docs:
"TIMESTAMP_MILLIS Logical date and time. Annotates an int64
that stores
the number of milliseconds from the Unix epoch, 00:00:00.000 on
1 January
1970, UTC."
This type corresponds nicely to the SQL type TIMESTAMP (which
is why we
cast).
Again, hope this helps.
Cheers -- Chris
On 15 Oct 2015, at 14:46, Stefán Baxter
<[email protected] <mailto:[email protected]>>
wrote:
Thank you Chris, this clarifies a whole lot :).
I wanted to try to avoid the cast in the CTAS on the way from
Avro to
Parquet (not possible) and then avoid casting as much as
possible when
selecting from the Parquet files.
What is still unclear to me is why you don't seem to be
storing your
dates
in Parquet Date files.
Can you please elaborate a bit on the pros/cons?
Regards,
-Stefan
On Thu, Oct 15, 2015 at 10:59 AM, Chris Mathews
<[email protected] <mailto:[email protected]>>
wrote:
Hello Stefan
We use Avro to define our schemas for Parquet files, and we
find that
using long for dates and converting the dates to long using
milliseconds
works. We then CAST the long to a TIMESTAMP on the way out
during the
SELECT statement (or by using a VIEW).
example java snippet:
// various date and time formats
public static final String FORMAT_Z_TIMESTAMP = "yyyy-MM-dd
HH:mm:ss.SSS";
public static final String FORMAT_DATETIME = "yyyy-MM-dd
HH:mm:ss";
public static final String FORMAT_DATE = "yyyy-MM-dd";
…
// parser for each format
public final SimpleDateFormat sdf_z_timestamp = new
SimpleDateFormat(
FORMAT_Z_TIMESTAMP );
public final SimpleDateFormat sdf_datetime = new
SimpleDateFormat(
FORMAT_DATETIME );
public final SimpleDateFormat sdf_date = new
SimpleDateFormat(
FORMAT_DATE );
…
// choose parser based on column name / string format
public SimpleDateFormat sdf =
(NAME_Z_TIMESTAMP.equals(name())) ?
sdf_z_timestamp
: (NAME_DATETIME.equals(name())) ?
sdf_datetime
: (NAME_DATE.equals(name())) ?
sdf_date
: null;
…
Date date = sdf.parse(str);
long millis = date.getTime();
Object value = new java.lang.Long(millis);
We then use something like
AvroParquetWriter<GenericRecord> writer = new
AvroParquetWriter<>(hdfs_path, schema);
GenericRecord data = new GenericData.Record(schema);
data.put( name(), value);
writer.write(data);
to write the records out directly to a Parquet file.
example schema:
{
"type": "record",
"name": "timestamp_test",
"doc": "Avro -> Parquet long to timestamp test",
"fields":
[
{ "name": "z_timestamp", "type": "long" }
,{ "name": "datetime", "type": "long" }
,{ "name": "date", "type": "long" }
,{ "name": "granularity", "type": "long" }
]
}
Then to get the data back we either define a VIEW, or cast
during the
SELECT statement.
example view:
use hdfs.cjm;
create or replace view TIMESTAMP_TEST_VIEW as
SELECT
CAST(`z_timestamp` AS TIMESTAMP) AS `z_timestamp`
,CAST(`datetime` AS TIMESTAMP) AS `datetime`
,CAST(`date` AS DATE) AS `date`
,CAST(`granularity` AS BIGINT) AS `granularity`
FROM hdfs.cjm.TIMESTAMP_TEST;
Then execute the following to get results:
0: jdbc:drill:> select z_timestamp, `datetime`, `date`,
granularity
from
TIMESTAMP_TEST limit 1;
+----------------+----------------+----------------+--------------+
| z_timestamp | datetime | date |
granularity |
+----------------+----------------+----------------+--------------+
| 1429592511991 | 1429520400000 | 1421625600000 | 3600
|
+----------------+----------------+----------------+--------------+
1 row selected (2.593 seconds)
0: jdbc:drill:> select z_timestamp, `datetime`, `date`,
granularity
from
TIMESTAMP_TEST_VIEW limit 1;
+--------------------------+------------------------+-------------+--------------+
| z_timestamp | datetime |
date |
granularity |
+--------------------------+------------------------+-------------+--------------+
| 2015-04-22 05:16:22.173 | 2015-04-21 12:00:00.0 |
2015-01-20 |
3600
|
+--------------------------+------------------------+-------------+--------------+
1 row selected (3.282 seconds)
0: jdbc:drill:> SELECT
. . . . . . . > CAST(`z_timestamp` AS TIMESTAMP) AS
`z_timestamp`
. . . . . . . > ,CAST(`datetime` AS TIMESTAMP) AS `datetime`
. . . . . . . > ,CAST(`date` AS DATE) AS `date`
. . . . . . . > ,CAST(`granularity` AS BIGINT) AS
`granularity`
. . . . . . . > from TIMESTAMP_TEST limit 1;
+--------------------------+------------------------+-------------+--------------+
| z_timestamp | datetime |
date |
granularity |
+--------------------------+------------------------+-------------+--------------+
| 2015-04-22 05:16:22.173 | 2015-04-21 09:00:00.0 |
2015-01-20 |
3600
|
+--------------------------+------------------------+-------------+--------------+
1 row selected (3.071 seconds)
Hope this helps.
Cheers — Chris
On 14 Oct 2015, at 16:07, Stefán Baxter
<[email protected] <mailto:[email protected]>>
wrote:
Hi,
What is the best practice when working with dates in a
Avro/Parquet
scenario?
Avro does not support dates directly (only longs) and I'm
wondering
how
the
get persisted in Parquet.
Perhaps Parquet does not distinguish between long and
date-long in
any
significant way.
Regards,
-Stefan
--
Julien
--
Ryan Blue
Software Engineer
Cloudera, Inc.
--
Julien
--
Ryan Blue
Software Engineer
Cloudera, Inc.
>From 6261e26e107d539f815ac9f5e28b3feb32572599 Mon Sep 17 00:00:00 2001
From: Ryan Blue <[email protected]>
Date: Tue, 6 Oct 2015 10:18:46 -0700
Subject: [PATCH] Add simple decimal support to parquet-avro.
This doesn't fully implement logical types. The write side only has
support for logical types that are stored as bytes or fixed.
---
.../java/parquet/avro/AvroRecordConverter.java | 10 +-
.../java/parquet/avro/AvroSchemaConverter.java | 70 +++++----
.../main/java/parquet/avro/AvroWriteSupport.java | 21 ++-
.../java/parquet/avro/ParentValueContainer.java | 175 +++++++++++++++++++++
.../java/parquet/avro/TestAvroSchemaConverter.java | 29 ++++
.../src/test/java/parquet/avro/TestReadWrite.java | 65 ++++++++
.../src/main/java/parquet/schema/Types.java | 12 +-
7 files changed, 343 insertions(+), 39 deletions(-)
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroRecordConverter.java
index 9f53bc7..c6d4748 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroRecordConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroRecordConverter.java
@@ -165,7 +165,10 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
}
private static Converter newConverter(Schema schema, Type type,
- GenericData model, ParentValueContainer parent) {
+ GenericData model, ParentValueContainer setter) {
+ ParentValueContainer parent = ParentValueContainer
+ .getConversionContainer(setter, schema, model);
+
if (schema.getType().equals(Schema.Type.BOOLEAN)) {
return new AvroConverters.FieldBooleanConverter(parent);
} else if (schema.getType().equals(Schema.Type.INT)) {
@@ -267,6 +270,11 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter {
@SuppressWarnings("unchecked")
private static <T> Class<T> getDatumClass(Schema schema, GenericData model) {
+ if (model.getConversionFor(schema.getLogicalType()) != null) {
+ // use generic classes to pass data to conversions
+ return null;
+ }
+
if (model instanceof SpecificData) {
// this works for reflect as well
return ((SpecificData) model).getClass(schema);
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
index 62425a3..30ff800 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroSchemaConverter.java
@@ -20,17 +20,21 @@ package parquet.avro;
import java.util.*;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.jackson.node.NullNode;
import parquet.schema.ConversionPatterns;
+import parquet.schema.DecimalMetadata;
import parquet.schema.GroupType;
import parquet.schema.MessageType;
import parquet.schema.OriginalType;
import parquet.schema.PrimitiveType;
import parquet.schema.Type;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Types;
import static parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import static parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT;
@@ -114,25 +118,26 @@ public class AvroSchemaConverter {
}
private Type convertField(String fieldName, Schema schema, Type.Repetition repetition) {
+ Types.PrimitiveBuilder<PrimitiveType> builder;
Schema.Type type = schema.getType();
if (type.equals(Schema.Type.BOOLEAN)) {
- return primitive(fieldName, BOOLEAN, repetition);
+ builder = Types.primitive(BOOLEAN, repetition);
} else if (type.equals(Schema.Type.INT)) {
- return primitive(fieldName, INT32, repetition);
+ builder = Types.primitive(INT32, repetition);
} else if (type.equals(Schema.Type.LONG)) {
- return primitive(fieldName, INT64, repetition);
+ builder = Types.primitive(INT64, repetition);
} else if (type.equals(Schema.Type.FLOAT)) {
- return primitive(fieldName, FLOAT, repetition);
+ builder = Types.primitive(FLOAT, repetition);
} else if (type.equals(Schema.Type.DOUBLE)) {
- return primitive(fieldName, DOUBLE, repetition);
+ builder = Types.primitive(DOUBLE, repetition);
} else if (type.equals(Schema.Type.BYTES)) {
- return primitive(fieldName, BINARY, repetition);
+ builder = Types.primitive(BINARY, repetition);
} else if (type.equals(Schema.Type.STRING)) {
- return primitive(fieldName, BINARY, repetition, UTF8);
+ builder = Types.primitive(BINARY, repetition).as(UTF8);
} else if (type.equals(Schema.Type.RECORD)) {
return new GroupType(repetition, fieldName, convertFields(schema.getFields()));
} else if (type.equals(Schema.Type.ENUM)) {
- return primitive(fieldName, BINARY, repetition, ENUM);
+ builder = Types.primitive(BINARY, repetition).as(ENUM);
} else if (type.equals(Schema.Type.ARRAY)) {
if (writeOldListStructure) {
return ConversionPatterns.listType(repetition, fieldName,
@@ -146,12 +151,24 @@ public class AvroSchemaConverter {
// avro map key type is always string
return ConversionPatterns.stringKeyMapType(repetition, fieldName, valType);
} else if (type.equals(Schema.Type.FIXED)) {
- return primitive(fieldName, FIXED_LEN_BYTE_ARRAY, repetition,
- schema.getFixedSize(), null);
+ builder = Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+ .length(schema.getFixedSize());
} else if (type.equals(Schema.Type.UNION)) {
return convertUnion(fieldName, schema, repetition);
+ } else {
+ throw new UnsupportedOperationException("Cannot convert Avro type " + type);
+ }
+
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType != null) {
+ if (logicalType instanceof LogicalTypes.Decimal) {
+ builder = builder.as(DECIMAL)
+ .precision(((LogicalTypes.Decimal) logicalType).getPrecision())
+ .scale(((LogicalTypes.Decimal) logicalType).getScale());
+ }
}
- throw new UnsupportedOperationException("Cannot convert Avro type " + type);
+
+ return builder.named(fieldName);
}
private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) {
@@ -188,24 +205,6 @@ public class AvroSchemaConverter {
return convertField(field.name(), field.schema());
}
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
- int typeLength, OriginalType originalType) {
- return new PrimitiveType(repetition, primitive, typeLength, name,
- originalType);
- }
-
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition,
- OriginalType originalType) {
- return new PrimitiveType(repetition, primitive, name, originalType);
- }
-
- private PrimitiveType primitive(String name,
- PrimitiveType.PrimitiveTypeName primitive, Type.Repetition repetition) {
- return new PrimitiveType(repetition, primitive, name, null);
- }
-
public Schema convert(MessageType parquetSchema) {
return convertFields(parquetSchema.getName(), parquetSchema.getFields());
}
@@ -233,7 +232,7 @@ public class AvroSchemaConverter {
final PrimitiveTypeName parquetPrimitiveTypeName =
parquetType.asPrimitiveType().getPrimitiveTypeName();
final OriginalType originalType = parquetType.getOriginalType();
- return parquetPrimitiveTypeName.convert(
+ Schema schema = parquetPrimitiveTypeName.convert(
new PrimitiveType.PrimitiveTypeNameConverter<Schema, RuntimeException>() {
@Override
public Schema convertBOOLEAN(PrimitiveTypeName primitiveTypeName) {
@@ -273,6 +272,17 @@ public class AvroSchemaConverter {
}
}
});
+
+ if (originalType == DECIMAL && (
+ parquetPrimitiveTypeName == BINARY ||
+ parquetPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY)) {
+ // avro only supports fixed and binary decimals
+ DecimalMetadata meta = parquetType.asPrimitiveType().getDecimalMetadata();
+ schema = LogicalTypes.decimal(meta.getPrecision(), meta.getScale())
+ .addToSchema(schema);
+ }
+
+ return schema;
} else {
GroupType parquetGroupType = parquetType.asGroupType();
OriginalType originalType = parquetGroupType.getOriginalType();
diff --git a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
index b452a57..f01fbf2 100644
--- a/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
+++ b/parquet-avro/src/main/java/parquet/avro/AvroWriteSupport.java
@@ -23,6 +23,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
@@ -254,7 +256,14 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
if (value instanceof byte[]) {
recordConsumer.addBinary(Binary.fromReusedByteArray((byte[]) value));
} else {
- recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
+ LogicalType logicalType = nonNullAvroSchema.getLogicalType();
+ if (value != null && logicalType != null) {
+ Conversion conversion = model.getConversionByClass(value.getClass(), logicalType);
+ recordConsumer.addBinary(Binary.fromReusedByteBuffer(
+ conversion.toBytes(value, nonNullAvroSchema, logicalType)));
+ } else {
+ recordConsumer.addBinary(Binary.fromReusedByteBuffer((ByteBuffer) value));
+ }
}
} else if (avroType.equals(Schema.Type.STRING)) {
recordConsumer.addBinary(fromAvroString(value));
@@ -269,7 +278,15 @@ public class AvroWriteSupport<T> extends WriteSupport<T> {
} else if (avroType.equals(Schema.Type.UNION)) {
writeUnion(type.asGroupType(), nonNullAvroSchema, value);
} else if (avroType.equals(Schema.Type.FIXED)) {
- recordConsumer.addBinary(Binary.fromReusedByteArray(((GenericFixed) value).bytes()));
+ LogicalType logicalType = nonNullAvroSchema.getLogicalType();
+ if (value != null && logicalType != null) {
+ Conversion conversion = model.getConversionByClass(value.getClass(), logicalType);
+ recordConsumer.addBinary(Binary.fromReusedByteArray(
+ conversion.toFixed(value, nonNullAvroSchema, logicalType).bytes()));
+ } else {
+ recordConsumer.addBinary(Binary.fromReusedByteArray(
+ ((GenericFixed) value).bytes()));
+ }
}
}
diff --git a/parquet-avro/src/main/java/parquet/avro/ParentValueContainer.java b/parquet-avro/src/main/java/parquet/avro/ParentValueContainer.java
index 6891b3d..c2ccbc4 100644
--- a/parquet-avro/src/main/java/parquet/avro/ParentValueContainer.java
+++ b/parquet-avro/src/main/java/parquet/avro/ParentValueContainer.java
@@ -18,6 +18,16 @@
*/
package parquet.avro;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.IndexedRecord;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+
abstract class ParentValueContainer {
/**
@@ -60,4 +70,169 @@ abstract class ParentValueContainer {
add(value);
}
+ static class LogicalTypePrimitiveContainer extends ParentValueContainer {
+ private final ParentValueContainer wrapped;
+ private final Schema schema;
+ private final LogicalType logicalType;
+ private final Conversion conversion;
+
+ public LogicalTypePrimitiveContainer(ParentValueContainer wrapped,
+ Schema schema, Conversion conversion) {
+ this.wrapped = wrapped;
+ this.schema = schema;
+ this.logicalType = schema.getLogicalType();
+ this.conversion = conversion;
+ }
+
+ @Override
+ public void addDouble(double value) {
+ wrapped.add(conversion.fromDouble(value, schema, logicalType));
+ }
+
+ @Override
+ public void addFloat(float value) {
+ wrapped.add(conversion.fromFloat(value, schema, logicalType));
+ }
+
+ @Override
+ public void addLong(long value) {
+ wrapped.add(conversion.fromLong(value, schema, logicalType));
+ }
+
+ @Override
+ public void addInt(int value) {
+ wrapped.add(conversion.fromInt(value, schema, logicalType));
+ }
+
+ @Override
+ public void addShort(short value) {
+ wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+ }
+
+ @Override
+ public void addChar(char value) {
+ wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+ }
+
+ @Override
+ public void addByte(byte value) {
+ wrapped.add(conversion.fromInt((int) value, schema, logicalType));
+ }
+
+ @Override
+ public void addBoolean(boolean value) {
+ wrapped.add(conversion.fromBoolean(value, schema, logicalType));
+ }
+ }
+
+ static ParentValueContainer getConversionContainer(
+ final ParentValueContainer parent, final Schema schema,
+ GenericData model) {
+ final LogicalType logicalType = schema.getLogicalType();
+ final Conversion conversion = model.getConversionFor(logicalType);
+ if (conversion == null) {
+ return parent;
+ }
+
+ switch (schema.getType()) {
+ case STRING:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromCharSequence(
+ (CharSequence) value, schema, logicalType));
+ }
+ };
+ case BOOLEAN:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromBoolean(
+ (Boolean) value, schema, logicalType));
+ }
+ };
+ case INT:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromInt(
+ (Integer) value, schema, logicalType));
+ }
+ };
+ case LONG:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromLong(
+ (Long) value, schema, logicalType));
+ }
+ };
+ case FLOAT:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromFloat(
+ (Float) value, schema, logicalType));
+ }
+ };
+ case DOUBLE:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion) {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromDouble(
+ (Double) value, schema, logicalType));
+ }
+ };
+ case BYTES:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromBytes(
+ (ByteBuffer) value, schema, logicalType));
+ }
+ };
+ case FIXED:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromFixed(
+ (GenericData.Fixed) value, schema, logicalType));
+ }
+ };
+ case RECORD:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromRecord(
+ (IndexedRecord) value, schema, logicalType));
+ }
+ };
+ case ARRAY:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromArray(
+ (Collection<?>) value, schema, logicalType));
+ }
+ };
+ case MAP:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromMap(
+ (Map<?, ?>) value, schema, logicalType));
+ }
+ };
+ case ENUM:
+ return new ParentValueContainer() {
+ @Override
+ public void add(Object value) {
+ parent.add(conversion.fromEnumSymbol(
+ (GenericEnumSymbol) value, schema, logicalType));
+ }
+ };
+ default:
+ return new LogicalTypePrimitiveContainer(parent, schema, conversion);
+ }
+ }
}
diff --git a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java
index d6efc2f..62e6c07 100644
--- a/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java
+++ b/parquet-avro/src/test/java/parquet/avro/TestAvroSchemaConverter.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import java.util.Arrays;
import java.util.Collections;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.codehaus.jackson.node.NullNode;
@@ -504,6 +505,34 @@ public class TestAvroSchemaConverter {
testParquetToAvroConversion(NEW_BEHAVIOR, schema, parquetSchema);
}
+ @Test
+ public void testDecimalBytesType() throws Exception {
+ Schema schema = Schema.createRecord("myrecord", null, null, false);
+ Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+ Schema.create(Schema.Type.BYTES));
+ schema.setFields(Collections.singletonList(
+ new Schema.Field("dec", decimal, null, null)));
+
+ testRoundTripConversion(schema,
+ "message myrecord {\n" +
+ " required binary dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
+ @Test
+ public void testDecimalFixedType() throws Exception {
+ Schema schema = Schema.createRecord("myrecord", null, null, false);
+ Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+ Schema.createFixed("d", null, null, 8));
+ schema.setFields(Collections.singletonList(
+ new Schema.Field("dec", decimal, null, null)));
+
+ testRoundTripConversion(schema,
+ "message myrecord {\n" +
+ " required fixed_len_byte_array(8) dec (DECIMAL(9,2));\n" +
+ "}\n");
+ }
+
public static Schema optional(Schema original) {
return Schema.createUnion(Lists.newArrayList(
Schema.create(Schema.Type.NULL),
diff --git a/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java b/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
index e93dd36..01a6964 100644
--- a/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
+++ b/parquet-avro/src/test/java/parquet/avro/TestReadWrite.java
@@ -24,13 +24,19 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import java.io.File;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
@@ -39,9 +45,13 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import parquet.hadoop.ParquetReader;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.api.WriteSupport;
import parquet.io.api.Binary;
@@ -225,6 +235,61 @@ public class TestReadWrite {
assertEquals(ImmutableMap.of(str("a"), 1, str("b"), 2), nextRecord.get("mymap"));
}
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ @Test
+ public void testDecimalValues() throws Exception {
+ Schema decimalSchema = Schema.createRecord("myrecord", null, null, false);
+ Schema decimal = LogicalTypes.decimal(9, 2).addToSchema(
+ Schema.create(Schema.Type.BYTES));
+ decimalSchema.setFields(Collections.singletonList(
+ new Schema.Field("dec", decimal, null, null)));
+
+ // add the decimal conversion to a generic data model
+ GenericData decimalSupport = new GenericData();
+ decimalSupport.addLogicalTypeConversion(new Conversions.DecimalConversion());
+
+ File file = temp.newFile("decimal.parquet");
+ file.delete();
+ Path path = new Path(file.toString());
+
+ ParquetWriter<GenericRecord> writer = AvroParquetWriter
+ .<GenericRecord>builder(path)
+ .withDataModel(decimalSupport)
+ .withSchema(decimalSchema)
+ .build();
+
+ Random random = new Random(34L);
+ GenericRecordBuilder builder = new GenericRecordBuilder(decimalSchema);
+ List<GenericRecord> expected = Lists.newArrayList();
+ for (int i = 0; i < 1000; i += 1) {
+ BigDecimal dec = new BigDecimal(new BigInteger(64, random), 2);
+ builder.set("dec", dec);
+
+ GenericRecord rec = builder.build();
+ expected.add(rec);
+ writer.write(builder.build());
+ }
+ writer.close();
+
+ ParquetReader<GenericRecord> reader = AvroParquetReader
+ .<GenericRecord>builder(path)
+ .withDataModel(decimalSupport)
+ .disableCompatibility()
+ .build();
+ List<GenericRecord> records = Lists.newArrayList();
+ GenericRecord rec;
+ while ((rec = reader.read()) != null) {
+ records.add(rec);
+ }
+ reader.close();
+
+ Assert.assertTrue("dec field should be a BigDecimal instance",
+ records.get(0).get("dec") instanceof BigDecimal);
+ Assert.assertEquals("Content should match", expected, records);
+ }
+
@Test
public void testAll() throws Exception {
Schema schema = new Schema.Parser().parse(
diff --git a/parquet-column/src/main/java/parquet/schema/Types.java b/parquet-column/src/main/java/parquet/schema/Types.java
index 754d823..8406cb2 100644
--- a/parquet-column/src/main/java/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/parquet/schema/Types.java
@@ -331,9 +331,9 @@ public class Types {
* @param length an int length
* @return this builder for method chaining
*/
- public BasePrimitiveBuilder<P, THIS> length(int length) {
+ public THIS length(int length) {
this.length = length;
- return this;
+ return self();
}
/**
@@ -346,9 +346,9 @@ public class Types {
* @param precision an int precision value for the DECIMAL
* @return this builder for method chaining
*/
- public BasePrimitiveBuilder<P, THIS> precision(int precision) {
+ public THIS precision(int precision) {
this.precision = precision;
- return this;
+ return self();
}
/**
@@ -364,9 +364,9 @@ public class Types {
* @param scale an int scale value for the DECIMAL
* @return this builder for method chaining
*/
- public BasePrimitiveBuilder<P, THIS> scale(int scale) {
+ public THIS scale(int scale) {
this.scale = scale;
- return this;
+ return self();
}
@Override
--
2.1.4