more updates
Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/5b37113b Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/5b37113b Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/5b37113b Branch: refs/heads/orc-72 Commit: 5b37113b73eb0e12744f2711326e11cd2ef6eaef Parents: 86628bc Author: Owen O'Malley <omal...@apache.org> Authored: Mon Oct 3 10:01:40 2016 -0700 Committer: Owen O'Malley <omal...@apache.org> Committed: Mon Oct 10 13:59:16 2016 -0700 ---------------------------------------------------------------------- java/bench/pom.xml | 4 + .../src/java/org/apache/orc/bench/AvroScan.java | 47 --- .../org/apache/orc/bench/AvroSchemaUtils.java | 190 ---------- .../java/org/apache/orc/bench/AvroWriter.java | 375 ------------------- .../java/org/apache/orc/bench/CsvReader.java | 175 --------- .../src/java/org/apache/orc/bench/CsvScan.java | 40 -- .../java/org/apache/orc/bench/GithubToAvro.java | 2 + .../java/org/apache/orc/bench/GithubToJson.java | 2 +- .../java/org/apache/orc/bench/GithubToOrc.java | 4 +- .../org/apache/orc/bench/GithubToParquet.java | 2 + .../java/org/apache/orc/bench/JsonReader.java | 278 -------------- .../src/java/org/apache/orc/bench/JsonScan.java | 61 --- .../src/java/org/apache/orc/bench/OrcScan.java | 46 --- .../java/org/apache/orc/bench/ParquetScan.java | 54 --- .../java/org/apache/orc/bench/SalesToAvro.java | 1 + .../org/apache/orc/bench/SalesToParquet.java | 1 + .../java/org/apache/orc/bench/TaxiToAvro.java | 2 + .../java/org/apache/orc/bench/TaxiToJson.java | 1 + .../java/org/apache/orc/bench/TaxiToOrc.java | 1 + .../org/apache/orc/bench/TaxiToParquet.java | 2 + .../org/apache/orc/bench/avro/AvroScan.java | 47 +++ .../apache/orc/bench/avro/AvroSchemaUtils.java | 190 ++++++++++ .../org/apache/orc/bench/avro/AvroWriter.java | 375 +++++++++++++++++++ .../org/apache/orc/bench/csv/CsvReader.java | 175 +++++++++ .../java/org/apache/orc/bench/csv/CsvScan.java | 41 ++ .../org/apache/orc/bench/json/JsonReader.java | 278 ++++++++++++++ .../org/apache/orc/bench/json/JsonScan.java | 61 +++ .../java/org/apache/orc/bench/orc/OrcScan.java | 46 +++ .../apache/orc/bench/parquet/ParquetScan.java | 54 +++ java/pom.xml | 15 +- 30 files changed, 1295 insertions(+), 1275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/pom.xml ---------------------------------------------------------------------- diff --git a/java/bench/pom.xml b/java/bench/pom.xml index f0bf55a..f40f21b 100644 --- a/java/bench/pom.xml +++ b/java/bench/pom.xml @@ -67,6 +67,10 @@ <artifactId>hive-storage-api</artifactId> </dependency> <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + </dependency> + <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-core</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/AvroScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/AvroScan.java b/java/bench/src/java/org/apache/orc/bench/AvroScan.java deleted file mode 100644 index 61f6a62..0000000 --- a/java/bench/src/java/org/apache/orc/bench/AvroScan.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.bench; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.mapred.FsInput; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; - -public class AvroScan { - public static void main(String[] args) throws Exception { - Configuration conf = new Configuration(); - long rowCount = 0; - for(String filename: args) { - FsInput file = new FsInput(new Path(filename), conf); - DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); - DataFileReader<GenericRecord> dataFileReader = - new DataFileReader<>(file, datumReader); - GenericRecord record = null; - while (dataFileReader.hasNext()) { - record = dataFileReader.next(record); - rowCount += 1; - } - } - System.out.println("Rows read: " + rowCount); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java deleted file mode 100644 index 02931c3..0000000 --- a/java/bench/src/java/org/apache/orc/bench/AvroSchemaUtils.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.bench; - -import org.apache.avro.Schema; -import org.apache.orc.TypeDescription; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -/** - * Convert Hive TypeInfo to an Avro Schema - */ -public class AvroSchemaUtils { - - private AvroSchemaUtils() { - // No instances - } - - public static Schema createAvroSchema(TypeDescription typeInfo) { - Schema schema; - switch (typeInfo.getCategory()) { - case STRING: - schema = Schema.create(Schema.Type.STRING); - break; - case CHAR: - schema = getSchemaFor("{" + - "\"type\":\"string\"," + - "\"logicalType\":\"char\"," + - "\"maxLength\":" + typeInfo.getMaxLength() + "}"); - break; - case VARCHAR: - schema = getSchemaFor("{" + - "\"type\":\"string\"," + - "\"logicalType\":\"varchar\"," + - "\"maxLength\":" + typeInfo.getMaxLength() + "}"); - break; - case BINARY: - schema = Schema.create(Schema.Type.BYTES); - break; - case BYTE: - schema = Schema.create(Schema.Type.INT); - break; - case SHORT: - schema = Schema.create(Schema.Type.INT); - break; - case INT: - schema = Schema.create(Schema.Type.INT); - break; - case LONG: - schema = Schema.create(Schema.Type.LONG); - break; - case FLOAT: - schema = Schema.create(Schema.Type.FLOAT); - break; - case DOUBLE: - schema = Schema.create(Schema.Type.DOUBLE); - break; - case BOOLEAN: - schema = Schema.create(Schema.Type.BOOLEAN); - break; - case DECIMAL: - String precision = String.valueOf(typeInfo.getPrecision()); - String scale = String.valueOf(typeInfo.getScale()); - schema = getSchemaFor("{" + - "\"type\":\"bytes\"," + - "\"logicalType\":\"decimal\"," + - "\"precision\":" + precision + "," + - "\"scale\":" + scale + "}"); - break; - case DATE: - schema = getSchemaFor("{" + - "\"type\":\"int\"," + - "\"logicalType\":\"date\"}"); - break; - case TIMESTAMP: - schema = getSchemaFor("{" + - "\"type\":\"long\"," + - "\"logicalType\":\"timestamp-millis\"}"); - break; - case LIST: - schema = createAvroArray(typeInfo); - break; - case MAP: - schema = createAvroMap(typeInfo); - break; - case STRUCT: - schema = createAvroRecord(typeInfo); - break; - case UNION: - schema = createAvroUnion(typeInfo); - break; - default: - throw new UnsupportedOperationException(typeInfo + " is not supported."); - } - - return wrapInUnionWithNull(schema); - } - - private static Schema createAvroUnion(TypeDescription typeInfo) { - List<Schema> childSchemas = new ArrayList<>(); - for (TypeDescription childTypeInfo : typeInfo.getChildren()) { - Schema childSchema = createAvroSchema(childTypeInfo); - if (childSchema.getType() == Schema.Type.UNION) { - for (Schema grandkid: childSchema.getTypes()) { - if (childSchema.getType() != Schema.Type.NULL) { - childSchemas.add(grandkid); - } - } - } else { - childSchemas.add(childSchema); - } - } - - return Schema.createUnion(childSchemas); - } - - private static Schema createAvroRecord(TypeDescription typeInfo) { - List<Schema.Field> childFields = new ArrayList<>(); - - List<String> fieldNames = typeInfo.getFieldNames(); - List<TypeDescription> fieldTypes = typeInfo.getChildren(); - - for (int i = 0; i < fieldNames.size(); ++i) { - TypeDescription childTypeInfo = fieldTypes.get(i); - Schema.Field field = new Schema.Field(fieldNames.get(i), - createAvroSchema(childTypeInfo), childTypeInfo.toString(), - (Object) null); - childFields.add(field); - } - - Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(), - typeInfo.toString(), null, false); - recordSchema.setFields(childFields); - return recordSchema; - } - - private static Schema createAvroMap(TypeDescription typeInfo) { - TypeDescription keyTypeInfo = typeInfo.getChildren().get(0); - if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) { - throw new UnsupportedOperationException("Avro only supports maps with string keys " - + typeInfo); - } - - Schema valueSchema = createAvroSchema(typeInfo.getChildren().get(1)); - - return Schema.createMap(valueSchema); - } - - private static Schema createAvroArray(TypeDescription typeInfo) { - Schema child = createAvroSchema(typeInfo.getChildren().get(0)); - return Schema.createArray(child); - } - - private static Schema wrapInUnionWithNull(Schema schema) { - Schema NULL = Schema.create(Schema.Type.NULL); - switch (schema.getType()) { - case NULL: - return schema; - case UNION: - List<Schema> kids = schema.getTypes(); - List<Schema> newKids = new ArrayList<>(kids.size() + 1); - newKids.add(NULL); - return Schema.createUnion(newKids); - default: - return Schema.createUnion(Arrays.asList(NULL, schema)); - } - } - - private static Schema getSchemaFor(String str) { - Schema.Parser parser = new Schema.Parser(); - return parser.parse(str); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/AvroWriter.java deleted file mode 100644 index 094d115..0000000 --- a/java/bench/src/java/org/apache/orc/bench/AvroWriter.java +++ /dev/null @@ -1,375 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.bench; - -import org.apache.avro.Schema; -import org.apache.avro.file.CodecFactory; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.TypeDescription; - -import java.io.IOException; -import java.nio.Buffer; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Properties; - -public class AvroWriter { - - static Properties setHiveSchema(TypeDescription schema) { - if (schema.getCategory() != TypeDescription.Category.STRUCT) { - throw new IllegalArgumentException("Assumes struct type as root, not " + - schema); - } - StringBuilder fieldNames = new StringBuilder(); - StringBuilder fieldTypes = new StringBuilder(); - List<String> childNames = schema.getFieldNames(); - List<TypeDescription> childTypes = schema.getChildren(); - for(int f=0; f < childNames.size(); ++f) { - if (f != 0) { - fieldNames.append(','); - fieldTypes.append(','); - } - fieldNames.append(childNames.get(f)); - fieldTypes.append(childTypes.get(f).toString()); - } - Properties properties = new Properties(); - properties.put("columns", fieldNames.toString()); - properties.put("columns.types", fieldTypes.toString()); - return properties; - } - - interface AvroConverter { - Object convert(ColumnVector vector, int row); - } - - private static class BooleanConverter implements AvroConverter { - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - LongColumnVector vector = (LongColumnVector) cv; - return vector.vector[row] != 0; - } else { - return null; - } - } - } - - private static class IntConverter implements AvroConverter { - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - LongColumnVector vector = (LongColumnVector) cv; - return (int) vector.vector[row]; - } else { - return null; - } - } - } - - private static class LongConverter implements AvroConverter { - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - LongColumnVector vector = (LongColumnVector) cv; - return vector.vector[row]; - } else { - return null; - } - } - } - - private static class FloatConverter implements AvroConverter { - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - DoubleColumnVector vector = (DoubleColumnVector) cv; - return (float) vector.vector[row]; - } else { - return null; - } - } - } - - private static class DoubleConverter implements AvroConverter { - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - DoubleColumnVector vector = (DoubleColumnVector) cv; - return vector.vector[row]; - } else { - return null; - } - } - } - - private static class StringConverter implements AvroConverter { - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - BytesColumnVector vector = (BytesColumnVector) cv; - return new String(vector.vector[row], vector.start[row], - vector.length[row]); - } else { - return null; - } - } - } - - private static class BinaryConverter implements AvroConverter { - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - BytesColumnVector vector = (BytesColumnVector) cv; - return ByteBuffer.wrap(vector.vector[row], vector.start[row], - vector.length[row]); - } else { - return null; - } - } - } - - private static class TimestampConverter implements AvroConverter { - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - TimestampColumnVector vector = (TimestampColumnVector) cv; - return vector.time[row]; - } else { - return null; - } - } - } - - private static class DecimalConverter implements AvroConverter { - final int scale; - DecimalConverter(int scale) { - this.scale = scale; - } - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - DecimalColumnVector vector = (DecimalColumnVector) cv; - return getBufferFromDecimal( - vector.vector[row].getHiveDecimal(), scale); - } else { - return null; - } - } - } - - private static class ListConverter implements AvroConverter { - final Schema avroSchema; - final AvroConverter childConverter; - - ListConverter(TypeDescription schema, Schema avroSchema) { - this.avroSchema = avroSchema; - childConverter = createConverter(schema.getChildren().get(0), - removeNullable(avroSchema.getElementType())); - } - - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - ListColumnVector vector = (ListColumnVector) cv; - int offset = (int) vector.offsets[row]; - int length = (int) vector.lengths[row]; - GenericData.Array result = new GenericData.Array(length, avroSchema); - for(int i=0; i < length; ++i) { - result.add(childConverter.convert(vector.child, offset + i)); - } - return result; - } else { - return null; - } - } - } - - private static class StructConverter implements AvroConverter { - final Schema avroSchema; - final AvroConverter[] childConverters; - - StructConverter(TypeDescription schema, Schema avroSchema) { - this.avroSchema = avroSchema; - List<TypeDescription> childrenTypes = schema.getChildren(); - childConverters = new AvroConverter[childrenTypes.size()]; - List<Schema.Field> fields = avroSchema.getFields(); - for(int f=0; f < childConverters.length; ++f) { - childConverters[f] = createConverter(childrenTypes.get(f), - removeNullable(fields.get(f).schema())); - } - } - - public Object convert(ColumnVector cv, int row) { - if (cv.isRepeating) { - row = 0; - } - if (cv.noNulls || !cv.isNull[row]) { - StructColumnVector vector = (StructColumnVector) cv; - GenericData.Record result = new GenericData.Record(avroSchema); - for(int f=0; f < childConverters.length; ++f) { - result.put(f, childConverters[f].convert(vector.fields[f], row)); - } - return result; - } else { - return null; - } - } - } - - static AvroConverter createConverter(TypeDescription types, - Schema avroSchema) { - switch (types.getCategory()) { - case BINARY: - return new BinaryConverter(); - case BOOLEAN: - return new BooleanConverter(); - case BYTE: - case SHORT: - case INT: - return new IntConverter(); - case LONG: - return new LongConverter(); - case FLOAT: - return new FloatConverter(); - case DOUBLE: - return new DoubleConverter(); - case CHAR: - case VARCHAR: - case STRING: - return new StringConverter(); - case TIMESTAMP: - return new TimestampConverter(); - case DECIMAL: - return new DecimalConverter(types.getScale()); - case LIST: - return new ListConverter(types, avroSchema); - case STRUCT: - return new StructConverter(types, avroSchema); - default: - throw new IllegalArgumentException("Unhandled type " + types); - } - } - - /** - * Remove the union(null, ...) wrapper around the schema. - * - * All of the types in Hive are nullable and in Avro those are represented - * by wrapping each type in a union type with the void type. - * @param avro The avro type - * @return The avro type with the nullable layer removed - */ - static Schema removeNullable(Schema avro) { - while (avro.getType() == Schema.Type.UNION) { - List<Schema> children = avro.getTypes(); - if (children.size() == 2 && - children.get(0).getType() == Schema.Type.NULL) { - avro = children.get(1); - } else { - break; - } - } - return avro; - } - - private final AvroConverter[] converters; - private final DataFileWriter writer; - private final GenericRecord record; - - public AvroWriter(Path path, TypeDescription schema, - Configuration conf, - String compression) throws IOException { - List<TypeDescription> childTypes = schema.getChildren(); - Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema); - List<Schema.Field> avroFields = avroSchema.getFields(); - converters = new AvroConverter[childTypes.size()]; - for(int c=0; c < converters.length; ++c) { - converters[c] = createConverter(childTypes.get(c), - removeNullable(avroFields.get(c).schema())); - } - GenericDatumWriter gdw = new GenericDatumWriter(avroSchema); - writer = new DataFileWriter(gdw); - if (compression != null & !"".equals(compression)) { - writer.setCodec(CodecFactory.fromString(compression)); - } - writer.create(avroSchema, path.getFileSystem(conf).create(path)); - record = new GenericData.Record(avroSchema); - } - - public void writeBatch(VectorizedRowBatch batch) throws IOException { - for(int r=0; r < batch.size; ++r) { - for(int f=0; f < batch.cols.length; ++f) { - record.put(f, converters[f].convert(batch.cols[f], r)); - } - writer.append(record); - } - } - - public void close() throws IOException { - writer.close(); - } - - static Buffer getBufferFromBytes(byte[] input) { - ByteBuffer bb = ByteBuffer.wrap(input); - return bb.rewind(); - } - - public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) { - if (dec == null) { - return null; - } - - dec = dec.setScale(scale); - return getBufferFromBytes(dec.unscaledValue().toByteArray()); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/CsvReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/CsvReader.java b/java/bench/src/java/org/apache/orc/bench/CsvReader.java deleted file mode 100644 index 5c86a89..0000000 --- a/java/bench/src/java/org/apache/orc/bench/CsvReader.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.bench; - -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.TypeDescription; - -import java.io.DataInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.sql.Timestamp; -import java.util.Iterator; -import java.util.List; -import java.util.zip.GZIPInputStream; - -public class CsvReader { - private final Iterator<CSVRecord> parser; - private final ColumnReader[] readers; - - interface ColumnReader { - void read(String value, ColumnVector vect, int row); - } - - static class LongColumnReader implements ColumnReader { - public void read(String value, ColumnVector vect, int row) { - if ("".equals(value)) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - LongColumnVector vector = (LongColumnVector) vect; - vector.vector[row] = Long.parseLong(value); - } - } - } - - static class DoubleColumnReader implements ColumnReader { - public void read(String value, ColumnVector vect, int row) { - if ("".equals(value)) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - DoubleColumnVector vector = (DoubleColumnVector) vect; - vector.vector[row] = Double.parseDouble(value); - } - } - } - - static class StringColumnReader implements ColumnReader { - public void read(String value, ColumnVector vect, int row) { - if ("".equals(value)) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - BytesColumnVector vector = (BytesColumnVector) vect; - byte[] bytes = value.getBytes(); - vector.setRef(row, bytes, 0, bytes.length); - } - } - } - - static class TimestampColumnReader implements ColumnReader { - public void read(String value, ColumnVector vect, int row) { - if ("".equals(value)) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - TimestampColumnVector vector = (TimestampColumnVector) vect; - vector.set(row, Timestamp.valueOf(value)); - } - } - } - - static class DecimalColumnReader implements ColumnReader { - public void read(String value, ColumnVector vect, int row) { - if ("".equals(value)) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - DecimalColumnVector vector = (DecimalColumnVector) vect; - vector.vector[row].set(HiveDecimal.create(value)); - } - } - } - - ColumnReader createReader(TypeDescription schema) { - switch (schema.getCategory()) { - case BYTE: - case SHORT: - case INT: - case LONG: - return new LongColumnReader(); - case FLOAT: - case DOUBLE: - return new DoubleColumnReader(); - case CHAR: - case VARCHAR: - case STRING: - return new StringColumnReader(); - case DECIMAL: - return new DecimalColumnReader(); - case TIMESTAMP: - return new TimestampColumnReader(); - default: - throw new IllegalArgumentException("Unhandled type " + schema); - } - } - - public CsvReader(Path path, - Configuration conf, - TypeDescription schema) throws IOException { - FileSystem fs = path.getFileSystem(conf); - FSDataInputStream raw = fs.open(path); - String name = path.getName(); - int lastDot = name.lastIndexOf("."); - InputStream input = raw; - if (lastDot >= 0) { - if (".gz".equals(name.substring(lastDot))) { - input = new DataInputStream(new GZIPInputStream(raw)); - } - } - parser = new CSVParser(new InputStreamReader(input), - CSVFormat.RFC4180.withHeader()).iterator(); - List<TypeDescription> columnTypes = schema.getChildren(); - readers = new ColumnReader[columnTypes.size()]; - int c = 0; - for(TypeDescription columnType: columnTypes) { - readers[c++] = createReader(columnType); - } - } - - public boolean nextBatch(VectorizedRowBatch batch) throws IOException { - batch.reset(); - int maxSize = batch.getMaxSize(); - while (parser.hasNext() && batch.size < maxSize) { - CSVRecord record = parser.next(); - int c = 0; - for(String val: record) { - readers[c].read(val, batch.cols[c], batch.size); - c += 1; - } - batch.size++; - } - return batch.size != 0; - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/CsvScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/CsvScan.java b/java/bench/src/java/org/apache/orc/bench/CsvScan.java deleted file mode 100644 index f2ec61a..0000000 --- a/java/bench/src/java/org/apache/orc/bench/CsvScan.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.bench; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.TypeDescription; - -public class CsvScan { - public static void main(String[] args) throws Exception { - Configuration conf = new Configuration(); - long rowCount = 0; - TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema"); - for(String filename: args) { - CsvReader reader = new CsvReader(new Path(filename), conf, schema); - VectorizedRowBatch batch = schema.createRowBatch(); - while (reader.nextBatch(batch)) { - rowCount += batch.size; - } - } - System.out.println("Rows read: " + rowCount); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java index 982db64..ee882e7 100644 --- a/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java +++ b/java/bench/src/java/org/apache/orc/bench/GithubToAvro.java @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.avro.AvroWriter; +import org.apache.orc.bench.json.JsonReader; public class GithubToAvro { http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToJson.java b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java index f5ae6b1..1dd23de 100644 --- a/java/bench/src/java/org/apache/orc/bench/GithubToJson.java +++ b/java/bench/src/java/org/apache/orc/bench/GithubToJson.java @@ -22,11 +22,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.json.JsonReader; import org.apache.orc.tools.FileDump; import java.io.OutputStreamWriter; import java.io.Writer; -import java.util.zip.GZIPOutputStream; public class GithubToJson { http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java index cbc1997..ebd6443 100644 --- a/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java +++ b/java/bench/src/java/org/apache/orc/bench/GithubToOrc.java @@ -24,9 +24,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; - -import java.io.IOException; -import java.io.InputStream; +import org.apache.orc.bench.json.JsonReader; public class GithubToOrc { http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java index e1fafdc..db88c52 100644 --- a/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java +++ b/java/bench/src/java/org/apache/orc/bench/GithubToParquet.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.avro.AvroWriter; +import org.apache.orc.bench.json.JsonReader; import java.util.Properties; http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/JsonReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/JsonReader.java b/java/bench/src/java/org/apache/orc/bench/JsonReader.java deleted file mode 100644 index 599c872..0000000 --- a/java/bench/src/java/org/apache/orc/bench/JsonReader.java +++ /dev/null @@ -1,278 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.bench; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonStreamParser; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.TypeDescription; - -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.sql.Timestamp; -import java.util.List; -import java.util.zip.GZIPInputStream; - -public class JsonReader { - private final TypeDescription schema; - private final JsonStreamParser parser; - private final JsonConverter[] converters; - - interface JsonConverter { - void convert(JsonElement value, ColumnVector vect, int row); - } - - static class BooleanColumnConverter implements JsonConverter { - public void convert(JsonElement value, ColumnVector vect, int row) { - if (value == null || value.isJsonNull()) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - LongColumnVector vector = (LongColumnVector) vect; - vector.vector[row] = value.getAsBoolean() ? 1 : 0; - } - } - } - - static class LongColumnConverter implements JsonConverter { - public void convert(JsonElement value, ColumnVector vect, int row) { - if (value == null || value.isJsonNull()) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - LongColumnVector vector = (LongColumnVector) vect; - vector.vector[row] = value.getAsLong(); - } - } - } - - static class DoubleColumnConverter implements JsonConverter { - public void convert(JsonElement value, ColumnVector vect, int row) { - if (value == null || value.isJsonNull()) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - DoubleColumnVector vector = (DoubleColumnVector) vect; - vector.vector[row] = value.getAsDouble(); - } - } - } - - static class StringColumnConverter implements JsonConverter { - public void convert(JsonElement value, ColumnVector vect, int row) { - if (value == null || value.isJsonNull()) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - BytesColumnVector vector = (BytesColumnVector) vect; - byte[] bytes = value.getAsString().getBytes(); - vector.setRef(row, bytes, 0, bytes.length); - } - } - } - - static class BinaryColumnConverter implements JsonConverter { - public void convert(JsonElement value, ColumnVector vect, int row) { - if (value == null || value.isJsonNull()) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - BytesColumnVector vector = (BytesColumnVector) vect; - String binStr = value.getAsString(); - byte[] bytes = new byte[binStr.length()/2]; - for(int i=0; i < bytes.length; ++i) { - bytes[i] = (byte) Integer.parseInt(binStr.substring(i*2, i*2+2), 16); - } - vector.setRef(row, bytes, 0, bytes.length); - } - } - } - - static class TimestampColumnConverter implements JsonConverter { - public void convert(JsonElement value, ColumnVector vect, int row) { - if (value == null || value.isJsonNull()) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - TimestampColumnVector vector = (TimestampColumnVector) vect; - vector.set(row, Timestamp.valueOf(value.getAsString() - .replaceAll("[TZ]", " "))); - } - } - } - - static class DecimalColumnConverter implements JsonConverter { - public void convert(JsonElement value, ColumnVector vect, int row) { - if (value == null || value.isJsonNull()) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - DecimalColumnVector vector = (DecimalColumnVector) vect; - vector.vector[row].set(HiveDecimal.create(value.getAsString())); - } - } - } - - static class StructColumnConverter implements JsonConverter { - private JsonConverter[] childrenConverters; - private List<String> fieldNames; - - public StructColumnConverter(TypeDescription schema) { - List<TypeDescription> kids = schema.getChildren(); - childrenConverters = new JsonConverter[kids.size()]; - for(int c=0; c < childrenConverters.length; ++c) { - childrenConverters[c] = createConverter(kids.get(c)); - } - fieldNames = schema.getFieldNames(); - } - - public void convert(JsonElement value, ColumnVector vect, int row) { - if (value == null || value.isJsonNull()) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - StructColumnVector vector = (StructColumnVector) vect; - JsonObject obj = value.getAsJsonObject(); - for(int c=0; c < childrenConverters.length; ++c) { - JsonElement elem = obj.get(fieldNames.get(c)); - childrenConverters[c].convert(elem, vector.fields[c], row); - } - } - } - } - - static class ListColumnConverter implements JsonConverter { - private JsonConverter childrenConverter; - - public ListColumnConverter(TypeDescription schema) { - childrenConverter = createConverter(schema.getChildren().get(0)); - } - - public void convert(JsonElement value, ColumnVector vect, int row) { - if (value == null || value.isJsonNull()) { - vect.noNulls = false; - vect.isNull[row] = true; - } else { - ListColumnVector vector = (ListColumnVector) vect; - JsonArray obj = value.getAsJsonArray(); - vector.lengths[row] = obj.size(); - vector.offsets[row] = vector.childCount; - vector.childCount += vector.lengths[row]; - vector.child.ensureSize(vector.childCount, true); - for(int c=0; c < obj.size(); ++c) { - childrenConverter.convert(obj.get(c), vector.child, - (int) vector.offsets[row] + c); - } - } - } - } - - static JsonConverter createConverter(TypeDescription schema) { - switch (schema.getCategory()) { - case BYTE: - case SHORT: - case INT: - case LONG: - return new LongColumnConverter(); - case FLOAT: - case DOUBLE: - return new DoubleColumnConverter(); - case CHAR: - case VARCHAR: - case STRING: - return new StringColumnConverter(); - case DECIMAL: - return new DecimalColumnConverter(); - case TIMESTAMP: - return new TimestampColumnConverter(); - case BINARY: - return new BinaryColumnConverter(); - case BOOLEAN: - return new BooleanColumnConverter(); - case STRUCT: - return new StructColumnConverter(schema); - case LIST: - return new ListColumnConverter(schema); - default: - throw new IllegalArgumentException("Unhandled type " + schema); - } - } - - public JsonReader(Path path, - Configuration conf, - TypeDescription schema) throws IOException { - this.schema = schema; - FileSystem fs = path.getFileSystem(conf); - FSDataInputStream raw = fs.open(path); - String name = path.getName(); - int lastDot = name.lastIndexOf("."); - InputStream input = raw; - if (lastDot >= 0) { - if (".gz".equals(name.substring(lastDot))) { - input = new GZIPInputStream(raw); - } - } - parser = new JsonStreamParser(new InputStreamReader(input)); - if (schema.getCategory() != TypeDescription.Category.STRUCT) { - throw new IllegalArgumentException("Root must be struct - " + schema); - } - List<TypeDescription> fieldTypes = schema.getChildren(); - converters = new JsonConverter[fieldTypes.size()]; - for(int c = 0; c < converters.length; ++c) { - converters[c] = createConverter(fieldTypes.get(c)); - } - } - - public boolean nextBatch(VectorizedRowBatch batch) throws IOException { - batch.reset(); - int maxSize = batch.getMaxSize(); - List<String> fieldNames = schema.getFieldNames(); - while (parser.hasNext() && batch.size < maxSize) { - JsonObject elem = parser.next().getAsJsonObject(); - for(int c=0; c < converters.length; ++c) { - // look up each field to see if it is in the input, otherwise - // set it to null. - JsonElement field = elem.get(fieldNames.get(c)); - if (field == null) { - batch.cols[c].noNulls = false; - batch.cols[c].isNull[batch.size] = true; - } else { - converters[c].convert(field, batch.cols[c], batch.size); - } - } - batch.size++; - } - return batch.size != 0; - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/JsonScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/JsonScan.java b/java/bench/src/java/org/apache/orc/bench/JsonScan.java deleted file mode 100644 index 1115ae6..0000000 --- a/java/bench/src/java/org/apache/orc/bench/JsonScan.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.bench; - -import com.google.gson.JsonStreamParser; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.OrcFile; -import org.apache.orc.Reader; -import org.apache.orc.RecordReader; -import org.apache.orc.TypeDescription; - -import java.io.InputStream; -import java.io.InputStreamReader; -import java.util.zip.GZIPInputStream; - -public class JsonScan { - public static void main(String[] args) throws Exception { - Configuration conf = new Configuration(); - OrcFile.ReaderOptions options = OrcFile.readerOptions(conf); - long rowCount = 0; - for(String filename: args) { - Path path = new Path(filename); - FileSystem fs = path.getFileSystem(conf); - FSDataInputStream raw = fs.open(path); - int lastDot = filename.lastIndexOf("."); - InputStream input = raw; - if (lastDot >= 0) { - if (".gz".equals(filename.substring(lastDot))) { - input = new GZIPInputStream(raw); - } - } - JsonStreamParser parser = - new JsonStreamParser(new InputStreamReader(input)); - while (parser.hasNext()) { - parser.next(); - rowCount += 1; - } - } - System.out.println("Rows read: " + rowCount); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/OrcScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/OrcScan.java b/java/bench/src/java/org/apache/orc/bench/OrcScan.java deleted file mode 100644 index 096f3fa..0000000 --- a/java/bench/src/java/org/apache/orc/bench/OrcScan.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.bench; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.orc.OrcFile; -import org.apache.orc.Reader; -import org.apache.orc.RecordReader; -import org.apache.orc.TypeDescription; - -public class OrcScan { - public static void main(String[] args) throws Exception { - Configuration conf = new Configuration(); - OrcFile.ReaderOptions options = OrcFile.readerOptions(conf); - long rowCount = 0; - for(String filename: args) { - Reader reader = OrcFile.createReader(new Path(filename), options); - TypeDescription schema = reader.getSchema(); - RecordReader rows = reader.rows(); - VectorizedRowBatch batch = schema.createRowBatch(); - while (rows.nextBatch(batch)) { - rowCount += batch.size; - } - rows.close(); - } - System.out.println("Rows read: " + rowCount); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/ParquetScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/ParquetScan.java b/java/bench/src/java/org/apache/orc/bench/ParquetScan.java deleted file mode 100644 index ccaaa2a..0000000 --- a/java/bench/src/java/org/apache/orc/bench/ParquetScan.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.bench; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; -import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; -import org.apache.parquet.hadoop.ParquetInputFormat; - -public class ParquetScan { - public static void main(String[] args) throws Exception { - JobConf conf = new JobConf(); - long rowCount = 0; - ParquetInputFormat<ArrayWritable> inputFormat = - new ParquetInputFormat<>(DataWritableReadSupport.class); - - NullWritable nada = NullWritable.get(); - for(String filename: args) { - FileSplit split = new FileSplit(new Path(filename), 0, Long.MAX_VALUE, - new String[]{}); - RecordReader<NullWritable,ArrayWritable> recordReader = - new ParquetRecordReaderWrapper(inputFormat, split, conf, - Reporter.NULL); - ArrayWritable value = recordReader.createValue(); - while (recordReader.next(nada, value)) { - rowCount += 1; - } - recordReader.close(); - } - System.out.println("Rows read: " + rowCount); - } -} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java b/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java index d4fd4a2..900be66 100644 --- a/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java +++ b/java/bench/src/java/org/apache/orc/bench/SalesToAvro.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.avro.AvroWriter; public class SalesToAvro { http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java index 985da90..3da900f 100644 --- a/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java +++ b/java/bench/src/java/org/apache/orc/bench/SalesToParquet.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.avro.AvroWriter; import java.util.Properties; http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java index b490a8a..2b14f50 100644 --- a/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java +++ b/java/bench/src/java/org/apache/orc/bench/TaxiToAvro.java @@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.avro.AvroWriter; +import org.apache.orc.bench.csv.CsvReader; public class TaxiToAvro { http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java index 98fbe17..4b8ca8c 100644 --- a/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java +++ b/java/bench/src/java/org/apache/orc/bench/TaxiToJson.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.csv.CsvReader; import org.apache.orc.tools.FileDump; import org.iq80.snappy.SnappyOutputStream; http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java index dee5da6..2588c72 100644 --- a/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java +++ b/java/bench/src/java/org/apache/orc/bench/TaxiToOrc.java @@ -25,6 +25,7 @@ import org.apache.orc.OrcFile; import org.apache.orc.CompressionKind; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; +import org.apache.orc.bench.csv.CsvReader; import java.io.IOException; import java.io.InputStream; http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java index 3edce17..3eafc87 100644 --- a/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java +++ b/java/bench/src/java/org/apache/orc/bench/TaxiToParquet.java @@ -28,6 +28,8 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; +import org.apache.orc.bench.avro.AvroWriter; +import org.apache.orc.bench.csv.CsvReader; import java.util.Properties; public class TaxiToParquet { http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java new file mode 100644 index 0000000..1292c2b --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroScan.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.mapred.FsInput; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +public class AvroScan { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + long rowCount = 0; + for(String filename: args) { + FsInput file = new FsInput(new Path(filename), conf); + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); + DataFileReader<GenericRecord> dataFileReader = + new DataFileReader<>(file, datumReader); + GenericRecord record = null; + while (dataFileReader.hasNext()) { + record = dataFileReader.next(record); + rowCount += 1; + } + } + System.out.println("Rows read: " + rowCount); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java new file mode 100644 index 0000000..5df7b70 --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroSchemaUtils.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.avro; + +import org.apache.avro.Schema; +import org.apache.orc.TypeDescription; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * Convert Hive TypeInfo to an Avro Schema + */ +public class AvroSchemaUtils { + + private AvroSchemaUtils() { + // No instances + } + + public static Schema createAvroSchema(TypeDescription typeInfo) { + Schema schema; + switch (typeInfo.getCategory()) { + case STRING: + schema = Schema.create(Schema.Type.STRING); + break; + case CHAR: + schema = getSchemaFor("{" + + "\"type\":\"string\"," + + "\"logicalType\":\"char\"," + + "\"maxLength\":" + typeInfo.getMaxLength() + "}"); + break; + case VARCHAR: + schema = getSchemaFor("{" + + "\"type\":\"string\"," + + "\"logicalType\":\"varchar\"," + + "\"maxLength\":" + typeInfo.getMaxLength() + "}"); + break; + case BINARY: + schema = Schema.create(Schema.Type.BYTES); + break; + case BYTE: + schema = Schema.create(Schema.Type.INT); + break; + case SHORT: + schema = Schema.create(Schema.Type.INT); + break; + case INT: + schema = Schema.create(Schema.Type.INT); + break; + case LONG: + schema = Schema.create(Schema.Type.LONG); + break; + case FLOAT: + schema = Schema.create(Schema.Type.FLOAT); + break; + case DOUBLE: + schema = Schema.create(Schema.Type.DOUBLE); + break; + case BOOLEAN: + schema = Schema.create(Schema.Type.BOOLEAN); + break; + case DECIMAL: + String precision = String.valueOf(typeInfo.getPrecision()); + String scale = String.valueOf(typeInfo.getScale()); + schema = getSchemaFor("{" + + "\"type\":\"bytes\"," + + "\"logicalType\":\"decimal\"," + + "\"precision\":" + precision + "," + + "\"scale\":" + scale + "}"); + break; + case DATE: + schema = getSchemaFor("{" + + "\"type\":\"int\"," + + "\"logicalType\":\"date\"}"); + break; + case TIMESTAMP: + schema = getSchemaFor("{" + + "\"type\":\"long\"," + + "\"logicalType\":\"timestamp-millis\"}"); + break; + case LIST: + schema = createAvroArray(typeInfo); + break; + case MAP: + schema = createAvroMap(typeInfo); + break; + case STRUCT: + schema = createAvroRecord(typeInfo); + break; + case UNION: + schema = createAvroUnion(typeInfo); + break; + default: + throw new UnsupportedOperationException(typeInfo + " is not supported."); + } + + return wrapInUnionWithNull(schema); + } + + private static Schema createAvroUnion(TypeDescription typeInfo) { + List<Schema> childSchemas = new ArrayList<>(); + for (TypeDescription childTypeInfo : typeInfo.getChildren()) { + Schema childSchema = createAvroSchema(childTypeInfo); + if (childSchema.getType() == Schema.Type.UNION) { + for (Schema grandkid: childSchema.getTypes()) { + if (childSchema.getType() != Schema.Type.NULL) { + childSchemas.add(grandkid); + } + } + } else { + childSchemas.add(childSchema); + } + } + + return Schema.createUnion(childSchemas); + } + + private static Schema createAvroRecord(TypeDescription typeInfo) { + List<Schema.Field> childFields = new ArrayList<>(); + + List<String> fieldNames = typeInfo.getFieldNames(); + List<TypeDescription> fieldTypes = typeInfo.getChildren(); + + for (int i = 0; i < fieldNames.size(); ++i) { + TypeDescription childTypeInfo = fieldTypes.get(i); + Schema.Field field = new Schema.Field(fieldNames.get(i), + createAvroSchema(childTypeInfo), childTypeInfo.toString(), + (Object) null); + childFields.add(field); + } + + Schema recordSchema = Schema.createRecord("record_" + typeInfo.getId(), + typeInfo.toString(), null, false); + recordSchema.setFields(childFields); + return recordSchema; + } + + private static Schema createAvroMap(TypeDescription typeInfo) { + TypeDescription keyTypeInfo = typeInfo.getChildren().get(0); + if (keyTypeInfo.getCategory() != TypeDescription.Category.STRING) { + throw new UnsupportedOperationException("Avro only supports maps with string keys " + + typeInfo); + } + + Schema valueSchema = createAvroSchema(typeInfo.getChildren().get(1)); + + return Schema.createMap(valueSchema); + } + + private static Schema createAvroArray(TypeDescription typeInfo) { + Schema child = createAvroSchema(typeInfo.getChildren().get(0)); + return Schema.createArray(child); + } + + private static Schema wrapInUnionWithNull(Schema schema) { + Schema NULL = Schema.create(Schema.Type.NULL); + switch (schema.getType()) { + case NULL: + return schema; + case UNION: + List<Schema> kids = schema.getTypes(); + List<Schema> newKids = new ArrayList<>(kids.size() + 1); + newKids.add(NULL); + return Schema.createUnion(newKids); + default: + return Schema.createUnion(Arrays.asList(NULL, schema)); + } + } + + private static Schema getSchemaFor(String str) { + Schema.Parser parser = new Schema.Parser(); + return parser.parse(str); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java new file mode 100644 index 0000000..f9d3bad --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/avro/AvroWriter.java @@ -0,0 +1,375 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.bench.avro; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Properties; + +public class AvroWriter { + + static Properties setHiveSchema(TypeDescription schema) { + if (schema.getCategory() != TypeDescription.Category.STRUCT) { + throw new IllegalArgumentException("Assumes struct type as root, not " + + schema); + } + StringBuilder fieldNames = new StringBuilder(); + StringBuilder fieldTypes = new StringBuilder(); + List<String> childNames = schema.getFieldNames(); + List<TypeDescription> childTypes = schema.getChildren(); + for(int f=0; f < childNames.size(); ++f) { + if (f != 0) { + fieldNames.append(','); + fieldTypes.append(','); + } + fieldNames.append(childNames.get(f)); + fieldTypes.append(childTypes.get(f).toString()); + } + Properties properties = new Properties(); + properties.put("columns", fieldNames.toString()); + properties.put("columns.types", fieldTypes.toString()); + return properties; + } + + interface AvroConverter { + Object convert(ColumnVector vector, int row); + } + + private static class BooleanConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return vector.vector[row] != 0; + } else { + return null; + } + } + } + + private static class IntConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return (int) vector.vector[row]; + } else { + return null; + } + } + } + + private static class LongConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + LongColumnVector vector = (LongColumnVector) cv; + return vector.vector[row]; + } else { + return null; + } + } + } + + private static class FloatConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DoubleColumnVector vector = (DoubleColumnVector) cv; + return (float) vector.vector[row]; + } else { + return null; + } + } + } + + private static class DoubleConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DoubleColumnVector vector = (DoubleColumnVector) cv; + return vector.vector[row]; + } else { + return null; + } + } + } + + private static class StringConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + BytesColumnVector vector = (BytesColumnVector) cv; + return new String(vector.vector[row], vector.start[row], + vector.length[row]); + } else { + return null; + } + } + } + + private static class BinaryConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + BytesColumnVector vector = (BytesColumnVector) cv; + return ByteBuffer.wrap(vector.vector[row], vector.start[row], + vector.length[row]); + } else { + return null; + } + } + } + + private static class TimestampConverter implements AvroConverter { + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + TimestampColumnVector vector = (TimestampColumnVector) cv; + return vector.time[row]; + } else { + return null; + } + } + } + + private static class DecimalConverter implements AvroConverter { + final int scale; + DecimalConverter(int scale) { + this.scale = scale; + } + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + DecimalColumnVector vector = (DecimalColumnVector) cv; + return getBufferFromDecimal( + vector.vector[row].getHiveDecimal(), scale); + } else { + return null; + } + } + } + + private static class ListConverter implements AvroConverter { + final Schema avroSchema; + final AvroConverter childConverter; + + ListConverter(TypeDescription schema, Schema avroSchema) { + this.avroSchema = avroSchema; + childConverter = createConverter(schema.getChildren().get(0), + removeNullable(avroSchema.getElementType())); + } + + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + ListColumnVector vector = (ListColumnVector) cv; + int offset = (int) vector.offsets[row]; + int length = (int) vector.lengths[row]; + GenericData.Array result = new GenericData.Array(length, avroSchema); + for(int i=0; i < length; ++i) { + result.add(childConverter.convert(vector.child, offset + i)); + } + return result; + } else { + return null; + } + } + } + + private static class StructConverter implements AvroConverter { + final Schema avroSchema; + final AvroConverter[] childConverters; + + StructConverter(TypeDescription schema, Schema avroSchema) { + this.avroSchema = avroSchema; + List<TypeDescription> childrenTypes = schema.getChildren(); + childConverters = new AvroConverter[childrenTypes.size()]; + List<Schema.Field> fields = avroSchema.getFields(); + for(int f=0; f < childConverters.length; ++f) { + childConverters[f] = createConverter(childrenTypes.get(f), + removeNullable(fields.get(f).schema())); + } + } + + public Object convert(ColumnVector cv, int row) { + if (cv.isRepeating) { + row = 0; + } + if (cv.noNulls || !cv.isNull[row]) { + StructColumnVector vector = (StructColumnVector) cv; + GenericData.Record result = new GenericData.Record(avroSchema); + for(int f=0; f < childConverters.length; ++f) { + result.put(f, childConverters[f].convert(vector.fields[f], row)); + } + return result; + } else { + return null; + } + } + } + + static AvroConverter createConverter(TypeDescription types, + Schema avroSchema) { + switch (types.getCategory()) { + case BINARY: + return new BinaryConverter(); + case BOOLEAN: + return new BooleanConverter(); + case BYTE: + case SHORT: + case INT: + return new IntConverter(); + case LONG: + return new LongConverter(); + case FLOAT: + return new FloatConverter(); + case DOUBLE: + return new DoubleConverter(); + case CHAR: + case VARCHAR: + case STRING: + return new StringConverter(); + case TIMESTAMP: + return new TimestampConverter(); + case DECIMAL: + return new DecimalConverter(types.getScale()); + case LIST: + return new ListConverter(types, avroSchema); + case STRUCT: + return new StructConverter(types, avroSchema); + default: + throw new IllegalArgumentException("Unhandled type " + types); + } + } + + /** + * Remove the union(null, ...) wrapper around the schema. + * + * All of the types in Hive are nullable and in Avro those are represented + * by wrapping each type in a union type with the void type. + * @param avro The avro type + * @return The avro type with the nullable layer removed + */ + static Schema removeNullable(Schema avro) { + while (avro.getType() == Schema.Type.UNION) { + List<Schema> children = avro.getTypes(); + if (children.size() == 2 && + children.get(0).getType() == Schema.Type.NULL) { + avro = children.get(1); + } else { + break; + } + } + return avro; + } + + private final AvroConverter[] converters; + private final DataFileWriter writer; + private final GenericRecord record; + + public AvroWriter(Path path, TypeDescription schema, + Configuration conf, + String compression) throws IOException { + List<TypeDescription> childTypes = schema.getChildren(); + Schema avroSchema = AvroSchemaUtils.createAvroSchema(schema); + List<Schema.Field> avroFields = avroSchema.getFields(); + converters = new AvroConverter[childTypes.size()]; + for(int c=0; c < converters.length; ++c) { + converters[c] = createConverter(childTypes.get(c), + removeNullable(avroFields.get(c).schema())); + } + GenericDatumWriter gdw = new GenericDatumWriter(avroSchema); + writer = new DataFileWriter(gdw); + if (compression != null & !"".equals(compression)) { + writer.setCodec(CodecFactory.fromString(compression)); + } + writer.create(avroSchema, path.getFileSystem(conf).create(path)); + record = new GenericData.Record(avroSchema); + } + + public void writeBatch(VectorizedRowBatch batch) throws IOException { + for(int r=0; r < batch.size; ++r) { + for(int f=0; f < batch.cols.length; ++f) { + record.put(f, converters[f].convert(batch.cols[f], r)); + } + writer.append(record); + } + } + + public void close() throws IOException { + writer.close(); + } + + static Buffer getBufferFromBytes(byte[] input) { + ByteBuffer bb = ByteBuffer.wrap(input); + return bb.rewind(); + } + + public static Buffer getBufferFromDecimal(HiveDecimal dec, int scale) { + if (dec == null) { + return null; + } + + dec = dec.setScale(scale); + return getBufferFromBytes(dec.unscaledValue().toByteArray()); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java ---------------------------------------------------------------------- diff --git a/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java b/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java new file mode 100644 index 0000000..e99ee8f --- /dev/null +++ b/java/bench/src/java/org/apache/orc/bench/csv/CsvReader.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.bench.csv; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.orc.TypeDescription; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.sql.Timestamp; +import java.util.Iterator; +import java.util.List; +import java.util.zip.GZIPInputStream; + +public class CsvReader { + private final Iterator<CSVRecord> parser; + private final ColumnReader[] readers; + + interface ColumnReader { + void read(String value, ColumnVector vect, int row); + } + + static class LongColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + LongColumnVector vector = (LongColumnVector) vect; + vector.vector[row] = Long.parseLong(value); + } + } + } + + static class DoubleColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DoubleColumnVector vector = (DoubleColumnVector) vect; + vector.vector[row] = Double.parseDouble(value); + } + } + } + + static class StringColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + BytesColumnVector vector = (BytesColumnVector) vect; + byte[] bytes = value.getBytes(); + vector.setRef(row, bytes, 0, bytes.length); + } + } + } + + static class TimestampColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + TimestampColumnVector vector = (TimestampColumnVector) vect; + vector.set(row, Timestamp.valueOf(value)); + } + } + } + + static class DecimalColumnReader implements ColumnReader { + public void read(String value, ColumnVector vect, int row) { + if ("".equals(value)) { + vect.noNulls = false; + vect.isNull[row] = true; + } else { + DecimalColumnVector vector = (DecimalColumnVector) vect; + vector.vector[row].set(HiveDecimal.create(value)); + } + } + } + + ColumnReader createReader(TypeDescription schema) { + switch (schema.getCategory()) { + case BYTE: + case SHORT: + case INT: + case LONG: + return new LongColumnReader(); + case FLOAT: + case DOUBLE: + return new DoubleColumnReader(); + case CHAR: + case VARCHAR: + case STRING: + return new StringColumnReader(); + case DECIMAL: + return new DecimalColumnReader(); + case TIMESTAMP: + return new TimestampColumnReader(); + default: + throw new IllegalArgumentException("Unhandled type " + schema); + } + } + + public CsvReader(Path path, + Configuration conf, + TypeDescription schema) throws IOException { + FileSystem fs = path.getFileSystem(conf); + FSDataInputStream raw = fs.open(path); + String name = path.getName(); + int lastDot = name.lastIndexOf("."); + InputStream input = raw; + if (lastDot >= 0) { + if (".gz".equals(name.substring(lastDot))) { + input = new DataInputStream(new GZIPInputStream(raw)); + } + } + parser = new CSVParser(new InputStreamReader(input), + CSVFormat.RFC4180.withHeader()).iterator(); + List<TypeDescription> columnTypes = schema.getChildren(); + readers = new ColumnReader[columnTypes.size()]; + int c = 0; + for(TypeDescription columnType: columnTypes) { + readers[c++] = createReader(columnType); + } + } + + public boolean nextBatch(VectorizedRowBatch batch) throws IOException { + batch.reset(); + int maxSize = batch.getMaxSize(); + while (parser.hasNext() && batch.size < maxSize) { + CSVRecord record = parser.next(); + int c = 0; + for(String val: record) { + readers[c].read(val, batch.cols[c], batch.size); + c += 1; + } + batch.size++; + } + return batch.size != 0; + } +}