>From <[email protected]>:
[email protected] has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408 )
Change subject: [ASTERIXDB-3392] Support empty spaces, "=" in field names in
COPY TO parquet
......................................................................
[ASTERIXDB-3392] Support empty spaces, "=" in field names in COPY TO parquet
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
parquet-java SDK doesn't support empty spaces,"=" in their
MessageTypeParser.parseMessageType(). Serialised Schema can't be passed onto
the ParquetExternalFilePrinterFactory due to this. So the schema is built
twice: first time to catch errors during compilation, second time to build the
schema.
Ext-ref: MB-65167
Change-Id: I9dd788909512bf18cb8de26a78a0787e15b11492
---
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
M
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
M
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
M
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
M
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
11 files changed, 89 insertions(+), 21 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/08/19408/1
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index a39bc06..6813e84 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -607,6 +607,7 @@
private final boolean autogenerated;
private final ARecordType itemType;
+ private final ARecordType parquetSchema;
public CompiledCopyToStatement(CopyToStatement copyToStatement) {
this.query = copyToStatement.getQuery();
@@ -623,6 +624,7 @@
this.keyExpressions = copyToStatement.getKeyExpressions();
this.autogenerated = copyToStatement.isAutogenerated();
this.itemType = eddDecl.getItemType();
+ this.parquetSchema = eddDecl.getParquetSchema();
}
@Override
@@ -650,6 +652,10 @@
return itemType;
}
+ public ARecordType getParquetSchema() {
+ return parquetSchema;
+ }
+
public List<Expression> getPathExpressions() {
return pathExpressions;
}
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 978997c..f8d0ff6 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -467,7 +467,7 @@
// Write adapter configuration
WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(),
copyTo.getProperties(),
- copyTo.getItemType(), expr.getSourceLocation());
+ copyTo.getItemType(), copyTo.getParquetSchema(),
expr.getSourceLocation());
// writeOperator
WriteOperator writeOperator = new WriteOperator(sourceExprRef, new
MutableObject<>(fullPathExpr),
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8ba9a02..8def9c8 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4139,8 +4139,8 @@
DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
IAType iaType =
translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse,
ExternalDataConstants.DUMMY_TYPE_NAME,
copyTo.getType(), mdTxnCtx);
-
edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
-
SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
+ edd.setParquetSchema((ARecordType) iaType);
+
SchemaConverterVisitor.convertToParquetSchema((ARecordType) iaType);
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
index d754068..5d9ab7f 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
@@ -53,7 +53,7 @@
public IExternalWriter createWriter(ParquetSchemaTree.SchemaNode
schemaNode) throws HyracksDataException {
MessageType schema = generateSchema(schemaNode);
- printerFactory.setParquetSchemaString(schema.toString());
+ printerFactory.setParquetSchema(schema);
IExternalFileWriter writer = writerFactory.createWriter(ctx,
printerFactory);
return new ExternalFileWriter(resolver, writer, maxResult);
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
index ba7a1ee..681e25e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
@@ -34,23 +34,22 @@
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;
public class ParquetExternalFilePrinter implements IExternalPrinter {
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
private MessageType schema;
private ParquetOutputFile parquetOutputFile;
- private String parquetSchemaString;
+ // private String parquetSchemaString;
private ParquetWriter<IValueReference> writer;
private final long rowGroupSize;
private final int pageSize;
private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinter(CompressionCodecName
compressionCodecName, String parquetSchemaString,
+ public ParquetExternalFilePrinter(CompressionCodecName
compressionCodecName, MessageType parquetSchemaString,
IAType typeInfo, long rowGroupSize, int pageSize,
ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.parquetSchemaString = parquetSchemaString;
+ this.schema = parquetSchemaString;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
@@ -59,7 +58,12 @@
@Override
public void open() throws HyracksDataException {
- schema = MessageTypeParser.parseMessageType(parquetSchemaString);
+ // try {
+ // LogicalSchema logicalSchema =
Utils.parseSchema(parquetSchemaString);
+ // SchemaElement schemaElement =
ParquetSchemaConverter.convert(logicalSchema);
+ // } catch (Exception e) {
+ // throw new RuntimeException(e);
+ // }
}
@Override
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
index b6ad34e..a951188 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
@@ -18,25 +18,31 @@
*/
package org.apache.asterix.external.writer.printer;
+import org.apache.asterix.common.exceptions.CompilationException;
+import
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
+import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.runtime.writer.IExternalPrinter;
import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
public class ParquetExternalFilePrinterFactory implements
IExternalPrinterFactory {
private static final long serialVersionUID = 8971234908711235L;
- private String parquetSchemaString;
+ private transient MessageType parquetInferSchema;
+ private ARecordType parquetprovidedSchema;
private final IAType typeInfo;
private final CompressionCodecName compressionCodecName;
private final long rowGroupSize;
private final int pageSize;
private final ParquetProperties.WriterVersion writerVersion;
- public ParquetExternalFilePrinterFactory(CompressionCodecName
compressionCodecName, String parquetSchemaString,
- IAType typeInfo, long rowGroupSize, int pageSize,
ParquetProperties.WriterVersion writerVersion) {
+ public ParquetExternalFilePrinterFactory(CompressionCodecName
compressionCodecName,
+ ARecordType parquetprovidedSchema, IAType typeInfo, long
rowGroupSize, int pageSize,
+ ParquetProperties.WriterVersion writerVersion) {
this.compressionCodecName = compressionCodecName;
- this.parquetSchemaString = parquetSchemaString;
+ this.parquetprovidedSchema = parquetprovidedSchema;
this.typeInfo = typeInfo;
this.rowGroupSize = rowGroupSize;
this.pageSize = pageSize;
@@ -52,13 +58,24 @@
this.writerVersion = writerVersion;
}
- public void setParquetSchemaString(String parquetSchemaString) {
- this.parquetSchemaString = parquetSchemaString;
+ public void setParquetSchema(MessageType parquetInferSchema) {
+ this.parquetInferSchema = parquetInferSchema;
}
@Override
public IExternalPrinter createPrinter() {
- return new ParquetExternalFilePrinter(compressionCodecName,
parquetSchemaString, typeInfo, rowGroupSize,
- pageSize, writerVersion);
+ if (parquetInferSchema != null) {
+ return new ParquetExternalFilePrinter(compressionCodecName,
parquetInferSchema, typeInfo, rowGroupSize,
+ pageSize, writerVersion);
+ }
+
+ MessageType schema;
+ try {
+ schema =
SchemaConverterVisitor.convertToParquetSchema(parquetprovidedSchema);
+ } catch (CompilationException e) {
+ throw new RuntimeException(e);
+ }
+ return new ParquetExternalFilePrinter(compressionCodecName, schema,
typeInfo, rowGroupSize, pageSize,
+ writerVersion);
}
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
index a6ea115..3c556dc 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
@@ -51,6 +51,11 @@
return schemaConverterVisitor.getParquetSchema().toString();
}
+ public static MessageType convertToParquetSchema(ARecordType schemaType)
throws CompilationException {
+ SchemaConverterVisitor schemaConverterVisitor = new
SchemaConverterVisitor(schemaType);
+ return schemaConverterVisitor.getParquetSchema();
+ }
+
private MessageType getParquetSchema() throws CompilationException {
Types.MessageTypeBuilder builder = Types.buildMessage();
diff --git
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
index e1c978a..126f3ba 100644
---
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
+++
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
@@ -26,6 +26,7 @@
private Map<String, String> properties;
private String adapter;
private ARecordType itemType;
+ private ARecordType parquetSchema;
public void setAdapter(String adapter) {
this.adapter = adapter;
@@ -43,6 +44,14 @@
return itemType;
}
+ public void setParquetSchema(ARecordType parquetSchema) {
+ this.parquetSchema = parquetSchema;
+ }
+
+ public ARecordType getParquetSchema() {
+ return parquetSchema;
+ }
+
public String getAdapter() {
return adapter;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
index 64f8d6d..1168ba1 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
@@ -26,5 +26,7 @@
public interface IExternalWriteDataSink extends IWriteDataSink {
ARecordType getItemType();
+ ARecordType getParquetSchema();
+
SourceLocation getSourceLoc();
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
index d1667bf..c4f584c 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
@@ -29,13 +29,15 @@
private final String adapterName;
private final Map<String, String> configuration;
private ARecordType itemType;
+ private ARecordType parquetSchema;
private SourceLocation sourceLoc;
public WriteDataSink(String adapterName, Map<String, String>
configuration, ARecordType itemType,
- SourceLocation sourceLoc) {
+ ARecordType parquetSchema, SourceLocation sourceLoc) {
this.adapterName = adapterName;
this.configuration = configuration;
this.itemType = itemType;
+ this.parquetSchema = parquetSchema;
this.sourceLoc = sourceLoc;
}
@@ -52,6 +54,11 @@
}
@Override
+ public ARecordType getParquetSchema() {
+ return parquetSchema;
+ }
+
+ @Override
public SourceLocation getSourceLoc() {
return sourceLoc;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index e6716df..955ce69 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -218,10 +218,11 @@
int pageSize = (int) StorageUtil.getByteValue(pageSizeString);
ParquetProperties.WriterVersion writerVersion =
getParquetWriterVersion(configuration);
- if
(configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY) != null) {
- String parquetSchemaString =
configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
+ ARecordType parquetSchema = ((IExternalWriteDataSink)
sink).getParquetSchema();
+
+ if (parquetSchema != null) {
ParquetExternalFilePrinterFactory parquetPrinterFactory =
- new
ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
+ new
ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchema,
(IAType) sourceType, rowGroupSize,
pageSize, writerVersion);
ExternalFileWriterFactory parquetWriterFactory = new
ExternalFileWriterFactory(fileWriterFactory,
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I9dd788909512bf18cb8de26a78a0787e15b11492
Gerrit-Change-Number: 19408
Gerrit-PatchSet: 1
Gerrit-Owner: [email protected]
Gerrit-MessageType: newchange