>From Hussain Towaileb <[email protected]>: Hussain Towaileb has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408 )
Change subject: [ASTERIXDB-3392] Support empty spaces, "=" in field names in COPY TO parquet ...................................................................... [ASTERIXDB-3392] Support empty spaces, "=" in field names in COPY TO parquet - user model changes: no - storage format changes: no - interface changes: no Details: parquet-java SDK doesn't support empty spaces,"=" in their MessageTypeParser.parseMessageType(). Serialised Schema can't be passed onto the ParquetExternalFilePrinterFactory due to this. So the schema is built twice: first time to catch errors during compilation, second time to build the schema. Ext-ref: MB-65167 Change-Id: I9dd788909512bf18cb8de26a78a0787e15b11492 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408 Integration-Tests: Jenkins <[email protected]> Integration-Tests: Hussain Towaileb <[email protected]> Reviewed-by: Hussain Towaileb <[email protected]> Reviewed-by: <[email protected]> Tested-by: Hussain Towaileb <[email protected]> --- M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java A asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp M asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java M asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm M asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java M asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp A asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java 30 files changed, 376 insertions(+), 27 deletions(-) Approvals: Hussain Towaileb: Looks good to me, approved; Verified; Verified [email protected]: Looks good to me, but someone else must approve Anon. E. Moose #1000171: Jenkins: Verified diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java index a39bc06..6813e84 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java @@ -607,6 +607,7 @@ private final boolean autogenerated; private final ARecordType itemType; + private final ARecordType parquetSchema; public CompiledCopyToStatement(CopyToStatement copyToStatement) { this.query = copyToStatement.getQuery(); @@ -623,6 +624,7 @@ this.keyExpressions = copyToStatement.getKeyExpressions(); this.autogenerated = copyToStatement.isAutogenerated(); this.itemType = eddDecl.getItemType(); + this.parquetSchema = eddDecl.getParquetSchema(); } @Override @@ -650,6 +652,10 @@ return itemType; } + public ARecordType getParquetSchema() { + return parquetSchema; + } + public List<Expression> getPathExpressions() { return pathExpressions; } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java index 978997c..f8d0ff6 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java @@ -467,7 +467,7 @@ // Write adapter configuration WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(), copyTo.getProperties(), - copyTo.getItemType(), expr.getSourceLocation()); + copyTo.getItemType(), copyTo.getParquetSchema(), expr.getSourceLocation()); // writeOperator WriteOperator writeOperator = new WriteOperator(sourceExprRef, new MutableObject<>(fullPathExpr), diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 334c074..e1b4bb0 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -4212,8 +4212,8 @@ DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME); IAType iaType = translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse, ExternalDataConstants.DUMMY_TYPE_NAME, copyTo.getType(), mdTxnCtx); - edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY, - SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType)); + edd.setParquetSchema((ARecordType) iaType); + SchemaConverterVisitor.convertToParquetSchema((ARecordType) iaType); } } diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp new file mode 100644 index 0000000..8f9bb53 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.01.ddl.sqlpp @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +DROP DATAVERSE test if exists; +CREATE DATAVERSE test; +USE test; + + +CREATE TYPE ColumnType1 AS { + id: integer + }; + +CREATE COLLECTION TestCollection(ColumnType1) PRIMARY KEY id; \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp new file mode 100644 index 0000000..9022632 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.02.update.sqlpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/* + * Description : create a dataset using year-month-duration as the primary key + * Expected Res : Success + * Date : 7 May 2013 + * Issue : 363 + */ + +use test; +/* +insert into TestCollection({"id":`year-month-duration`("P16Y"), "name": "John"}); +insert into TestCollection({"id":`day-time-duration`("-P3829H849.392S"), "name": "Alex"}); +*/ + +insert into TestCollection({"id":18, "Director=name": "SS Rajamouli", "Director.Age" : 51 ,"Films Made" : ["RRR", "Eega", "Baahubali"] }); + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp new file mode 100644 index 0000000..3da960b --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.03.update.sqlpp @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE test; + +COPY ( + select c.* from TestCollection c +) toWriter +TO %adapter% +PATH (%pathprefix% "copy-to-result", "parquet-field-names1") +TYPE ( { id:int, `Director=name` : string, `Director.Age` : int ,`Films Made` : [string] } ) +WITH { + %template_colons%, + %additionalProperties% + "format":"parquet" +}; + + + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp new file mode 100644 index 0000000..38acb88 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.04.ddl.sqlpp @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE test; + + +CREATE TYPE ColumnType2 AS { +}; + + + +CREATE EXTERNAL DATASET TestDataset1(ColumnType2) USING %adapter% +( + %template%, + %additional_Properties%, + ("definition"="%path_prefix%copy-to-result/parquet-field-names1/"), + ("include"="*.parquet"), + ("requireVersionChangeDetection"="false"), + ("format" = "parquet") +); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp new file mode 100644 index 0000000..86d344c --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.05.query.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE test; + + +SELECT c.* +FROM TestDataset1 c +ORDER BY c.id; + + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp new file mode 100644 index 0000000..f72a1f5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.06.update.sqlpp @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE test; + +COPY ( +select c.* from TestCollection c + ) toWriter +TO %adapter% +PATH (%pathprefix% "copy-to-result", "parquet-field-names2") +WITH { + %template_colons%, + %additionalProperties% + "format":"parquet" + }; + + + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp new file mode 100644 index 0000000..17003c5 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.07.ddl.sqlpp @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE test; + + +CREATE EXTERNAL DATASET TestDataset2(ColumnType2) USING %adapter% +( + %template%, + %additional_Properties%, + ("definition"="%path_prefix%copy-to-result/parquet-field-names2/"), + ("include"="*.parquet"), + ("requireVersionChangeDetection"="false"), + ("format" = "parquet") +); \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp new file mode 100644 index 0000000..ce09a4a --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/copy-to/parquet-field-names/parquet-field-names.08.query.sqlpp @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +USE test; + + +SELECT c.* +FROM TestDataset2 c +ORDER BY c.id; + + diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm new file mode 100644 index 0000000..4566d53 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.05.adm @@ -0,0 +1 @@ +{ "id": 18, "Director=name": "SS Rajamouli", "Director.Age": 51, "Films Made": [ "RRR", "Eega", "Baahubali" ] } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm new file mode 100644 index 0000000..fa88a74 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/runtimets/results/copy-to/parquet-field-names/parquet-field-names.08.adm @@ -0,0 +1 @@ +{ "Director.Age": 51, "Films Made": [ "RRR", "Eega", "Baahubali" ], "Director=name": "SS Rajamouli", "id": 18 } \ No newline at end of file diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml index 1a05334..04e8d1e 100644 --- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml +++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml @@ -115,6 +115,16 @@ </compilation-unit> </test-case> <test-case FilePath="copy-to"> + <compilation-unit name="parquet-field-names"> + <placeholder name="adapter" value="S3" /> + <placeholder name="pathprefix" value="" /> + <placeholder name="path_prefix" value="" /> + <placeholder name="additionalProperties" value='"container":"playground",' /> + <placeholder name="additional_Properties" value='("container"="playground")' /> + <output-dir compare="Text">parquet-field-names</output-dir> + </compilation-unit> + </test-case> + <test-case FilePath="copy-to"> <compilation-unit name="parquet-empty-array"> <placeholder name="adapter" value="S3" /> <placeholder name="pathprefix" value="" /> diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java index d754068..5d9ab7f 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java @@ -53,7 +53,7 @@ public IExternalWriter createWriter(ParquetSchemaTree.SchemaNode schemaNode) throws HyracksDataException { MessageType schema = generateSchema(schemaNode); - printerFactory.setParquetSchemaString(schema.toString()); + printerFactory.setParquetSchema(schema); IExternalFileWriter writer = writerFactory.createWriter(ctx, printerFactory); return new ExternalFileWriter(resolver, writer, maxResult); } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java index b500cbe..25fbdc8 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSchemaInferPoolWriter.java @@ -30,6 +30,7 @@ import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; +// Maintains a pool of Parquet writers holding a file, each with its own schema , and writes values to the appropriate writer based on schema. public class ParquetSchemaInferPoolWriter { private final ParquetExternalWriterFactory writerFactory; @@ -57,6 +58,7 @@ if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.EQUIVALENT)) { return; } else if (schemaComparisonType.equals(ISchemaChecker.SchemaComparisonType.GROWING)) { + // If the schema is growing, close the existing writer and create a new one with the new schema. schemaNodes.set(i, schemaLazyVisitor.inferSchema(value)); closeWriter(i); return; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java index 3dbd4d3..7c1c03b 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetSinkExternalWriterRuntime.java @@ -76,6 +76,7 @@ } + // Schema Inference is done frame wise, i.e., we infer the schema for all the records in frame and write the values with schema inferred until now. @Override public void nextFrame(ByteBuffer buffer) throws HyracksDataException { tupleAccessor.reset(buffer); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java index ba7a1ee..046c03f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java @@ -34,23 +34,22 @@ import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.MessageTypeParser; public class ParquetExternalFilePrinter implements IExternalPrinter { private final IAType typeInfo; private final CompressionCodecName compressionCodecName; - private MessageType schema; + private final MessageType schema; private ParquetOutputFile parquetOutputFile; - private String parquetSchemaString; + // private String parquetSchemaString; private ParquetWriter<IValueReference> writer; private final long rowGroupSize; private final int pageSize; private final ParquetProperties.WriterVersion writerVersion; - public ParquetExternalFilePrinter(CompressionCodecName compressionCodecName, String parquetSchemaString, + public ParquetExternalFilePrinter(CompressionCodecName compressionCodecName, MessageType parquetSchemaString, IAType typeInfo, long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) { this.compressionCodecName = compressionCodecName; - this.parquetSchemaString = parquetSchemaString; + this.schema = parquetSchemaString; this.typeInfo = typeInfo; this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; @@ -59,7 +58,6 @@ @Override public void open() throws HyracksDataException { - schema = MessageTypeParser.parseMessageType(parquetSchemaString); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java index b6ad34e..035e49a 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java @@ -18,25 +18,34 @@ */ package org.apache.asterix.external.writer.printer; +import org.apache.asterix.common.exceptions.CompilationException; +import org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor; +import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.runtime.writer.IExternalPrinter; import org.apache.asterix.runtime.writer.IExternalPrinterFactory; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; public class ParquetExternalFilePrinterFactory implements IExternalPrinterFactory { private static final long serialVersionUID = 8971234908711235L; - private String parquetSchemaString; + // parquetInferSchema is for the case when the schema is inferred from the data, not provided by the user + // set During the runtime + private transient MessageType parquetInferSchema; + // parquetProvidedSchema is for the case when the schema is provided by the user + private ARecordType parquetProvidedSchema; private final IAType typeInfo; private final CompressionCodecName compressionCodecName; private final long rowGroupSize; private final int pageSize; private final ParquetProperties.WriterVersion writerVersion; - public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName, String parquetSchemaString, - IAType typeInfo, long rowGroupSize, int pageSize, ParquetProperties.WriterVersion writerVersion) { + public ParquetExternalFilePrinterFactory(CompressionCodecName compressionCodecName, + ARecordType parquetprovidedSchema, IAType typeInfo, long rowGroupSize, int pageSize, + ParquetProperties.WriterVersion writerVersion) { this.compressionCodecName = compressionCodecName; - this.parquetSchemaString = parquetSchemaString; + this.parquetProvidedSchema = parquetprovidedSchema; this.typeInfo = typeInfo; this.rowGroupSize = rowGroupSize; this.pageSize = pageSize; @@ -52,13 +61,25 @@ this.writerVersion = writerVersion; } - public void setParquetSchemaString(String parquetSchemaString) { - this.parquetSchemaString = parquetSchemaString; + public void setParquetSchema(MessageType parquetInferSchema) { + this.parquetInferSchema = parquetInferSchema; } @Override public IExternalPrinter createPrinter() { - return new ParquetExternalFilePrinter(compressionCodecName, parquetSchemaString, typeInfo, rowGroupSize, - pageSize, writerVersion); + if (parquetInferSchema != null) { + return new ParquetExternalFilePrinter(compressionCodecName, parquetInferSchema, typeInfo, rowGroupSize, + pageSize, writerVersion); + } + + MessageType schema; + try { + schema = SchemaConverterVisitor.convertToParquetSchema(parquetProvidedSchema); + } catch (CompilationException e) { + // This should not happen, Compilation Exception should be caught at the query-compile time + throw new RuntimeException(e); + } + return new ParquetExternalFilePrinter(compressionCodecName, schema, typeInfo, rowGroupSize, pageSize, + writerVersion); } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java index 7058bf6..cdf24c6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/FieldNamesDictionary.java @@ -26,6 +26,8 @@ import org.apache.hyracks.data.std.api.IValueReference; import org.apache.hyracks.util.string.UTF8StringUtil; +// The Field Names Dictionary will cache the mapping between field name bytes and their corresponding string representations, +// minimizing the creation of new string objects during field name deserialization while writing to parquet files. public class FieldNamesDictionary { private final FieldNamesTrieDictionary trie; private final List<String> fieldNames; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java index 99b9736..dfa6e4f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ISchemaChecker.java @@ -22,6 +22,10 @@ import org.apache.hyracks.data.std.api.IValueReference; public interface ISchemaChecker { + + // EQUIVALENT: Example: { name: string, age: int } -> { name: string, age: int } + // GROWING: equivalent types but having extra fields, Example: { name: string, age: int } -> { name: string, age: int , address: string } + // CONFLICTING: conflict in types, Example: { name: string, age: int } -> { name: {first:string, last:string}, age: int } enum SchemaComparisonType { EQUIVALENT, GROWING, diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java index 373bfe4..2a03bfd 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetRecordLazyVisitor.java @@ -44,6 +44,8 @@ private final MessageType schema; private final RecordLazyVisitablePointable rec; + // The Record Consumer is responsible for traversing the record tree, + // using recordConsumer.startField() to navigate into a child node and endField() to move back to the parent node. private RecordConsumer recordConsumer; private final FieldNamesDictionary fieldNamesDictionary; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java index b591175..70872bb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetSchemaLazyVisitor.java @@ -39,6 +39,7 @@ import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Types; +// This class is used to infer the schema of a record into SchemaNode, which is an internal tree representation of the schema. public class ParquetSchemaLazyVisitor implements ILazyVisitablePointableVisitor<Void, ParquetSchemaTree.SchemaNode> { private final RecordLazyVisitablePointable rec; private final FieldNamesDictionary fieldNamesDictionary; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java index 04e11f7..38d12f9 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/ParquetValueWriter.java @@ -44,6 +44,7 @@ import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.PrimitiveType; +//This class reduces the number of Java objects created each time a column is written to a Parquet file by reusing the same VoidPointable for all columns within the file. public class ParquetValueWriter { public static final String LIST_FIELD = "list"; public static final String ELEMENT_FIELD = "element"; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java index 44cd5b2..fc43c89 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaCheckerLazyVisitor.java @@ -30,6 +30,8 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IValueReference; +// This class is used to check the schema of a record against a schema that has been inferred so far. +// By checking, we can determine if the record is equivalent to the schema, if the record is growing, or if there is a conflict. public class SchemaCheckerLazyVisitor implements ISchemaChecker, ILazyVisitablePointableVisitor<ISchemaChecker.SchemaComparisonType, ParquetSchemaTree.SchemaNode> { private final FieldNamesDictionary fieldNamesDictionary; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java index a6ea115..9f5d02f 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java @@ -36,6 +36,7 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Types; +// Traverses the RecordType tree and converts it to a Parquet schema. public class SchemaConverterVisitor implements IATypeVisitor<Void, Pair<Types.Builder, String>> { public static String MESSAGE_NAME = "asterix_schema"; private final ARecordType schemaType; @@ -46,9 +47,9 @@ this.unsupportedType = null; } - public static String convertToParquetSchemaString(ARecordType schemaType) throws CompilationException { + public static MessageType convertToParquetSchema(ARecordType schemaType) throws CompilationException { SchemaConverterVisitor schemaConverterVisitor = new SchemaConverterVisitor(schemaType); - return schemaConverterVisitor.getParquetSchema().toString(); + return schemaConverterVisitor.getParquetSchema(); } private MessageType getParquetSchema() throws CompilationException { diff --git a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java index e1c978a..126f3ba 100644 --- a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java +++ b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java @@ -26,6 +26,7 @@ private Map<String, String> properties; private String adapter; private ARecordType itemType; + private ARecordType parquetSchema; public void setAdapter(String adapter) { this.adapter = adapter; @@ -43,6 +44,14 @@ return itemType; } + public void setParquetSchema(ARecordType parquetSchema) { + this.parquetSchema = parquetSchema; + } + + public ARecordType getParquetSchema() { + return parquetSchema; + } + public String getAdapter() { return adapter; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java index 64f8d6d..1168ba1 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java @@ -26,5 +26,7 @@ public interface IExternalWriteDataSink extends IWriteDataSink { ARecordType getItemType(); + ARecordType getParquetSchema(); + SourceLocation getSourceLoc(); } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java index d1667bf..4a10f7f 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java @@ -28,14 +28,16 @@ public class WriteDataSink implements IExternalWriteDataSink { private final String adapterName; private final Map<String, String> configuration; - private ARecordType itemType; - private SourceLocation sourceLoc; + private final ARecordType itemType; + private final ARecordType parquetSchema; + private final SourceLocation sourceLoc; public WriteDataSink(String adapterName, Map<String, String> configuration, ARecordType itemType, - SourceLocation sourceLoc) { + ARecordType parquetSchema, SourceLocation sourceLoc) { this.adapterName = adapterName; this.configuration = configuration; this.itemType = itemType; + this.parquetSchema = parquetSchema; this.sourceLoc = sourceLoc; } @@ -43,6 +45,7 @@ this.adapterName = writeDataSink.getAdapterName(); this.configuration = new HashMap<>(writeDataSink.configuration); this.itemType = writeDataSink.itemType; + this.parquetSchema = writeDataSink.parquetSchema; this.sourceLoc = writeDataSink.sourceLoc; } @@ -52,6 +55,11 @@ } @Override + public ARecordType getParquetSchema() { + return parquetSchema; + } + + @Override public SourceLocation getSourceLoc() { return sourceLoc; } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java index e6716df..e88b1de 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java @@ -205,7 +205,7 @@ case ExternalDataConstants.FORMAT_PARQUET: CompressionCodecName compressionCodecName; - if (compression == null || compression.equals("") || compression.equals("none")) { + if (compression == null || compression.isEmpty() || compression.equals("none")) { compressionCodecName = CompressionCodecName.UNCOMPRESSED; } else { compressionCodecName = CompressionCodecName.valueOf(compression.toUpperCase()); @@ -218,10 +218,11 @@ int pageSize = (int) StorageUtil.getByteValue(pageSizeString); ParquetProperties.WriterVersion writerVersion = getParquetWriterVersion(configuration); - if (configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY) != null) { - String parquetSchemaString = configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY); + ARecordType parquetSchema = ((IExternalWriteDataSink) sink).getParquetSchema(); + + if (parquetSchema != null) { ParquetExternalFilePrinterFactory parquetPrinterFactory = - new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString, + new ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchema, (IAType) sourceType, rowGroupSize, pageSize, writerVersion); ExternalFileWriterFactory parquetWriterFactory = new ExternalFileWriterFactory(fileWriterFactory, @@ -230,6 +231,7 @@ partitionComparatorFactories, inputDesc, parquetWriterFactory); } + // Parquet Writing with Schema Inference int maxSchemas = ExternalWriterProvider.getMaxParquetSchema(configuration); ParquetExternalFilePrinterFactoryProvider printerFactoryProvider = new ParquetExternalFilePrinterFactoryProvider(compressionCodecName, (IAType) sourceType, -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I9dd788909512bf18cb8de26a78a0787e15b11492 Gerrit-Change-Number: 19408 Gerrit-PatchSet: 4 Gerrit-Owner: [email protected] Gerrit-Reviewer: Ali Alsuliman <[email protected]> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Hussain Towaileb <[email protected]> Gerrit-Reviewer: Jenkins <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-MessageType: merged
