>From <[email protected]>:

[email protected] has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408 )


Change subject: [ASTERIXDB-3392] Support empty spaces, "=" in field names in 
COPY TO parquet
......................................................................

[ASTERIXDB-3392] Support empty spaces, "=" in field names in COPY TO parquet

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
parquet-java SDK doesn't support empty spaces,"=" in their 
MessageTypeParser.parseMessageType(). Serialised Schema can't be passed onto 
the ParquetExternalFilePrinterFactory due to this. So the schema is built 
twice: first time to catch errors during compilation, second time to build the 
schema.

Ext-ref: MB-65167
Change-Id: I9dd788909512bf18cb8de26a78a0787e15b11492
---
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
M 
asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
M 
asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
11 files changed, 89 insertions(+), 21 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/08/19408/1

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
index a39bc06..6813e84 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/CompiledStatements.java
@@ -607,6 +607,7 @@
         private final boolean autogenerated;

         private final ARecordType itemType;
+        private final ARecordType parquetSchema;

         public CompiledCopyToStatement(CopyToStatement copyToStatement) {
             this.query = copyToStatement.getQuery();
@@ -623,6 +624,7 @@
             this.keyExpressions = copyToStatement.getKeyExpressions();
             this.autogenerated = copyToStatement.isAutogenerated();
             this.itemType = eddDecl.getItemType();
+            this.parquetSchema = eddDecl.getParquetSchema();
         }

         @Override
@@ -650,6 +652,10 @@
             return itemType;
         }

+        public ARecordType getParquetSchema() {
+            return parquetSchema;
+        }
+
         public List<Expression> getPathExpressions() {
             return pathExpressions;
         }
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
index 978997c..f8d0ff6 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/LangExpressionToPlanTranslator.java
@@ -467,7 +467,7 @@

         // Write adapter configuration
         WriteDataSink writeDataSink = new WriteDataSink(copyTo.getAdapter(), 
copyTo.getProperties(),
-                copyTo.getItemType(), expr.getSourceLocation());
+                copyTo.getItemType(), copyTo.getParquetSchema(), 
expr.getSourceLocation());

         // writeOperator
         WriteOperator writeOperator = new WriteOperator(sourceExprRef, new 
MutableObject<>(fullPathExpr),
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 8ba9a02..8def9c8 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4139,8 +4139,8 @@
                                 
DataverseName.createFromCanonicalForm(ExternalDataConstants.DUMMY_DATAVERSE_NAME);
                         IAType iaType = 
translateType(ExternalDataConstants.DUMMY_DATABASE_NAME, dummyDataverse,
                                 ExternalDataConstants.DUMMY_TYPE_NAME, 
copyTo.getType(), mdTxnCtx);
-                        
edd.getProperties().put(ExternalDataConstants.PARQUET_SCHEMA_KEY,
-                                
SchemaConverterVisitor.convertToParquetSchemaString((ARecordType) iaType));
+                        edd.setParquetSchema((ARecordType) iaType);
+                        
SchemaConverterVisitor.convertToParquetSchema((ARecordType) iaType);
                     }
                 }

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
index d754068..5d9ab7f 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/parquet/ParquetExternalWriterFactory.java
@@ -53,7 +53,7 @@

     public IExternalWriter createWriter(ParquetSchemaTree.SchemaNode 
schemaNode) throws HyracksDataException {
         MessageType schema = generateSchema(schemaNode);
-        printerFactory.setParquetSchemaString(schema.toString());
+        printerFactory.setParquetSchema(schema);
         IExternalFileWriter writer = writerFactory.createWriter(ctx, 
printerFactory);
         return new ExternalFileWriter(resolver, writer, maxResult);
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
index ba7a1ee..681e25e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinter.java
@@ -34,23 +34,22 @@
 import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.schema.MessageType;
-import org.apache.parquet.schema.MessageTypeParser;

 public class ParquetExternalFilePrinter implements IExternalPrinter {
     private final IAType typeInfo;
     private final CompressionCodecName compressionCodecName;
     private MessageType schema;
     private ParquetOutputFile parquetOutputFile;
-    private String parquetSchemaString;
+    //    private String parquetSchemaString;
     private ParquetWriter<IValueReference> writer;
     private final long rowGroupSize;
     private final int pageSize;
     private final ParquetProperties.WriterVersion writerVersion;

-    public ParquetExternalFilePrinter(CompressionCodecName 
compressionCodecName, String parquetSchemaString,
+    public ParquetExternalFilePrinter(CompressionCodecName 
compressionCodecName, MessageType parquetSchemaString,
             IAType typeInfo, long rowGroupSize, int pageSize, 
ParquetProperties.WriterVersion writerVersion) {
         this.compressionCodecName = compressionCodecName;
-        this.parquetSchemaString = parquetSchemaString;
+        this.schema = parquetSchemaString;
         this.typeInfo = typeInfo;
         this.rowGroupSize = rowGroupSize;
         this.pageSize = pageSize;
@@ -59,7 +58,12 @@

     @Override
     public void open() throws HyracksDataException {
-        schema = MessageTypeParser.parseMessageType(parquetSchemaString);
+        //        try {
+        //            LogicalSchema logicalSchema = 
Utils.parseSchema(parquetSchemaString);
+        //            SchemaElement schemaElement = 
ParquetSchemaConverter.convert(logicalSchema);
+        //        } catch (Exception e) {
+        //            throw new RuntimeException(e);
+        //        }
     }

     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
index b6ad34e..a951188 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/ParquetExternalFilePrinterFactory.java
@@ -18,25 +18,31 @@
  */
 package org.apache.asterix.external.writer.printer;

+import org.apache.asterix.common.exceptions.CompilationException;
+import 
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
+import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.runtime.writer.IExternalPrinter;
 import org.apache.asterix.runtime.writer.IExternalPrinterFactory;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;

 public class ParquetExternalFilePrinterFactory implements 
IExternalPrinterFactory {
     private static final long serialVersionUID = 8971234908711235L;
-    private String parquetSchemaString;
+    private transient MessageType parquetInferSchema;
+    private ARecordType parquetprovidedSchema;
     private final IAType typeInfo;
     private final CompressionCodecName compressionCodecName;
     private final long rowGroupSize;
     private final int pageSize;
     private final ParquetProperties.WriterVersion writerVersion;

-    public ParquetExternalFilePrinterFactory(CompressionCodecName 
compressionCodecName, String parquetSchemaString,
-            IAType typeInfo, long rowGroupSize, int pageSize, 
ParquetProperties.WriterVersion writerVersion) {
+    public ParquetExternalFilePrinterFactory(CompressionCodecName 
compressionCodecName,
+            ARecordType parquetprovidedSchema, IAType typeInfo, long 
rowGroupSize, int pageSize,
+            ParquetProperties.WriterVersion writerVersion) {
         this.compressionCodecName = compressionCodecName;
-        this.parquetSchemaString = parquetSchemaString;
+        this.parquetprovidedSchema = parquetprovidedSchema;
         this.typeInfo = typeInfo;
         this.rowGroupSize = rowGroupSize;
         this.pageSize = pageSize;
@@ -52,13 +58,24 @@
         this.writerVersion = writerVersion;
     }

-    public void setParquetSchemaString(String parquetSchemaString) {
-        this.parquetSchemaString = parquetSchemaString;
+    public void setParquetSchema(MessageType parquetInferSchema) {
+        this.parquetInferSchema = parquetInferSchema;
     }

     @Override
     public IExternalPrinter createPrinter() {
-        return new ParquetExternalFilePrinter(compressionCodecName, 
parquetSchemaString, typeInfo, rowGroupSize,
-                pageSize, writerVersion);
+        if (parquetInferSchema != null) {
+            return new ParquetExternalFilePrinter(compressionCodecName, 
parquetInferSchema, typeInfo, rowGroupSize,
+                    pageSize, writerVersion);
+        }
+
+        MessageType schema;
+        try {
+            schema = 
SchemaConverterVisitor.convertToParquetSchema(parquetprovidedSchema);
+        } catch (CompilationException e) {
+            throw new RuntimeException(e);
+        }
+        return new ParquetExternalFilePrinter(compressionCodecName, schema, 
typeInfo, rowGroupSize, pageSize,
+                writerVersion);
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
index a6ea115..3c556dc 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/writer/printer/parquet/SchemaConverterVisitor.java
@@ -51,6 +51,11 @@
         return schemaConverterVisitor.getParquetSchema().toString();
     }

+    public static MessageType convertToParquetSchema(ARecordType schemaType) 
throws CompilationException {
+        SchemaConverterVisitor schemaConverterVisitor = new 
SchemaConverterVisitor(schemaType);
+        return schemaConverterVisitor.getParquetSchema();
+    }
+
     private MessageType getParquetSchema() throws CompilationException {
         Types.MessageTypeBuilder builder = Types.buildMessage();

diff --git 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
index e1c978a..126f3ba 100644
--- 
a/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
+++ 
b/asterixdb/asterix-lang-common/src/main/java/org/apache/asterix/lang/common/statement/ExternalDetailsDecl.java
@@ -26,6 +26,7 @@
     private Map<String, String> properties;
     private String adapter;
     private ARecordType itemType;
+    private ARecordType parquetSchema;

     public void setAdapter(String adapter) {
         this.adapter = adapter;
@@ -43,6 +44,14 @@
         return itemType;
     }

+    public void setParquetSchema(ARecordType parquetSchema) {
+        this.parquetSchema = parquetSchema;
+    }
+
+    public ARecordType getParquetSchema() {
+        return parquetSchema;
+    }
+
     public String getAdapter() {
         return adapter;
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
index 64f8d6d..1168ba1 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/IExternalWriteDataSink.java
@@ -26,5 +26,7 @@
 public interface IExternalWriteDataSink extends IWriteDataSink {
     ARecordType getItemType();

+    ARecordType getParquetSchema();
+
     SourceLocation getSourceLoc();
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
index d1667bf..c4f584c 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/WriteDataSink.java
@@ -29,13 +29,15 @@
     private final String adapterName;
     private final Map<String, String> configuration;
     private ARecordType itemType;
+    private ARecordType parquetSchema;
     private SourceLocation sourceLoc;

     public WriteDataSink(String adapterName, Map<String, String> 
configuration, ARecordType itemType,
-            SourceLocation sourceLoc) {
+            ARecordType parquetSchema, SourceLocation sourceLoc) {
         this.adapterName = adapterName;
         this.configuration = configuration;
         this.itemType = itemType;
+        this.parquetSchema = parquetSchema;
         this.sourceLoc = sourceLoc;
     }

@@ -52,6 +54,11 @@
     }

     @Override
+    public ARecordType getParquetSchema() {
+        return parquetSchema;
+    }
+
+    @Override
     public SourceLocation getSourceLoc() {
         return sourceLoc;
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index e6716df..955ce69 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -218,10 +218,11 @@
                 int pageSize = (int) StorageUtil.getByteValue(pageSizeString);
                 ParquetProperties.WriterVersion writerVersion = 
getParquetWriterVersion(configuration);

-                if 
(configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY) != null) {
-                    String parquetSchemaString = 
configuration.get(ExternalDataConstants.PARQUET_SCHEMA_KEY);
+                ARecordType parquetSchema = ((IExternalWriteDataSink) 
sink).getParquetSchema();
+
+                if (parquetSchema != null) {
                     ParquetExternalFilePrinterFactory parquetPrinterFactory =
-                            new 
ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchemaString,
+                            new 
ParquetExternalFilePrinterFactory(compressionCodecName, parquetSchema,
                                     (IAType) sourceType, rowGroupSize, 
pageSize, writerVersion);

                     ExternalFileWriterFactory parquetWriterFactory = new 
ExternalFileWriterFactory(fileWriterFactory,

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19408
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I9dd788909512bf18cb8de26a78a0787e15b11492
Gerrit-Change-Number: 19408
Gerrit-PatchSet: 1
Gerrit-Owner: [email protected]
Gerrit-MessageType: newchange

Reply via email to