>From Utsav Singh <[email protected]>:

Utsav Singh has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19405 )


Change subject: [ASTERIXDB-3509]: COPY TO CSV - Raise Warning which TYPE Schema 
Mismatch
......................................................................

[ASTERIXDB-3509]: COPY TO CSV - Raise Warning which TYPE Schema Mismatch

Change-Id: I71d9e315be78a246bba733ea704f2d50ae71507c
---
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/AObjectPrinterFactory.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
M 
asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/APrintVisitor.java
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/CSVPrinterFactoryProvider.java
M 
hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
M 
asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
M asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
M 
asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
11 files changed, 66 insertions(+), 34 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/05/19405/1

diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index d06a9e5..4bbbd27 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -320,6 +320,7 @@
     FAILED_EXTERNAL_CROSS_ACCOUNT_AUTHENTICATION(1213),
     LONG_LIVED_CREDENTIALS_NEEDED_TO_ASSUME_ROLE(1214),
     TEMPORARY_CREDENTIALS_CANNOT_BE_USED_TO_ASSUME_ROLE(1215),
+    COPY_TO_SCHEMA_MISMATCH(1216),

     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index c1ffd13..606736e 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -322,6 +322,7 @@
 1213 = Failed to perform cross-account authentication. Encountered error : 
'%1$s'
 1214 = Long-lived credentials are required to assume a role
 1215 = Temporary credentials cannot be used to assume a role
+1216 = Type Schema does not match with the expected output schema.

 # Feed Errors
 3001 = Illegal state.
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 20299d5..165f256 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -788,11 +788,11 @@
     public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteFileRuntime(int sourceColumn,
             int[] partitionColumns, IBinaryComparatorFactory[] 
partitionComparatorFactories,
             IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression 
staticPathExpr,
-            SourceLocation pathSourceLocation, IWriteDataSink sink, 
RecordDescriptor inputDesc, Object sourceType)
-            throws AlgebricksException {
+            SourceLocation pathSourceLocation, IWriteDataSink sink, 
RecordDescriptor inputDesc, Object sourceType,
+            JobGenContext context) throws AlgebricksException {
         IPushRuntimeFactory runtime = 
ExternalWriterProvider.getWriteFileRuntime(appCtx, sink, sourceType,
                 staticPathExpr, pathSourceLocation, dynamicPathEvalFactory, 
inputDesc, sourceColumn, partitionColumns,
-                partitionComparatorFactories);
+                partitionComparatorFactories, context.getWarningCollector());
         return new Pair<>(runtime, null);
     }

diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
index e6716df..a50117b 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/provider/ExternalWriterProvider.java
@@ -61,6 +61,7 @@
 import 
org.apache.hyracks.algebricks.runtime.operators.writer.WriterPartitionerFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.util.StorageUtil;
@@ -143,8 +144,8 @@
     public static IPushRuntimeFactory 
getWriteFileRuntime(ICcApplicationContext appCtx, IWriteDataSink sink,
             Object sourceType, ILogicalExpression staticPathExpr, 
SourceLocation pathSourceLocation,
             IScalarEvaluatorFactory dynamicPathEvalFactory, RecordDescriptor 
inputDesc, int sourceColumn,
-            int[] partitionColumns, IBinaryComparatorFactory[] 
partitionComparatorFactories)
-            throws AlgebricksException {
+            int[] partitionColumns, IBinaryComparatorFactory[] 
partitionComparatorFactories,
+            IWarningCollector warningCollector) throws AlgebricksException {
         String staticPath = staticPathExpr != null ? 
ConstantExpressionUtil.getStringConstant(staticPathExpr) : null;
         IExternalFileWriterFactory fileWriterFactory =
                 ExternalWriterProvider.createWriterFactory(appCtx, sink, 
staticPath, pathSourceLocation);
@@ -184,11 +185,10 @@
                 if (sink instanceof IExternalWriteDataSink) {
                     ARecordType itemType = ((IExternalWriteDataSink) 
sink).getItemType();
                     if (itemType != null) {
-                        printerFactory =
-                                CSVPrinterFactoryProvider
-                                        .createInstance(itemType, 
sink.getConfiguration(),
-                                                ((IExternalWriteDataSink) 
sink).getSourceLoc())
-                                        .getPrinterFactory(sourceType);
+                        printerFactory = CSVPrinterFactoryProvider
+                                .createInstance(itemType, 
sink.getConfiguration(),
+                                        ((IExternalWriteDataSink) 
sink).getSourceLoc(), warningCollector)
+                                .getPrinterFactory(sourceType);
                         externalPrinterFactory =
                                 new 
CsvExternalFilePrinterFactory(printerFactory, compressStreamFactory);
                         writerFactory = new 
ExternalFileWriterFactory(fileWriterFactory, externalPrinterFactory,
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/AObjectPrinterFactory.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/AObjectPrinterFactory.java
index fdde82c..193fe4f 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/AObjectPrinterFactory.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/AObjectPrinterFactory.java
@@ -39,6 +39,7 @@
 import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;

 public class AObjectPrinterFactory implements IPrinterFactory {
     private static final long serialVersionUID = 1L;
@@ -47,10 +48,13 @@
     private ARecordType itemType;
     private Map<String, String> configuration;
     private boolean emptyFieldAsNull;
+    private IWarningCollector warningCollector;

-    private AObjectPrinterFactory(ARecordType itemType, Map<String, String> 
configuration) {
+    private AObjectPrinterFactory(ARecordType itemType, Map<String, String> 
configuration,
+            IWarningCollector warningCollector) {
         this.itemType = itemType;
         this.configuration = configuration;
+        this.warningCollector = warningCollector;
         String emptyFieldAsNullStr = 
configuration.get(KEY_EMPTY_FIELD_AS_NULL);
         this.emptyFieldAsNull = emptyFieldAsNullStr != null && 
Boolean.parseBoolean(emptyFieldAsNullStr);
         this.nullPrinter = 
ANullPrinterFactory.createInstance(configuration.get(KEY_NULL)).createPrinter();
@@ -60,8 +64,9 @@

     }

-    public static AObjectPrinterFactory createInstance(ARecordType itemType, 
Map<String, String> configuration) {
-        return new AObjectPrinterFactory(itemType, configuration);
+    public static AObjectPrinterFactory createInstance(ARecordType itemType, 
Map<String, String> configuration,
+            IWarningCollector warningCollector) {
+        return new AObjectPrinterFactory(itemType, configuration, 
warningCollector);
     }

     public boolean printFlatValue(ATypeTag typeTag, byte[] b, int s, int l, 
PrintStream ps)
@@ -154,7 +159,7 @@
         final ARecordVisitablePointable recordVisitablePointable =
                 new 
ARecordVisitablePointable(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
         final Pair<PrintStream, ATypeTag> streamTag = new Pair<>(null, null);
-        final IPrintVisitor visitor = new APrintVisitor(itemType, 
configuration);
+        final IPrintVisitor visitor = new APrintVisitor(itemType, 
configuration, warningCollector);

         return (byte[] b, int s, int l, PrintStream ps) -> {
             ATypeTag typeTag = 
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(b[s]);
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
index 2b31fb0..bf6354a 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ARecordPrinterFactory.java
@@ -32,6 +32,7 @@
 import org.apache.hyracks.algebricks.data.IPrinter;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;

 public class ARecordPrinterFactory implements IPrinterFactory {

@@ -39,11 +40,14 @@
     private final ARecordType recType;
     private final ARecordType itemType;
     private final Map<String, String> configuration;
+    private final IWarningCollector warningCollector;

-    public ARecordPrinterFactory(ARecordType recType, ARecordType itemType, 
Map<String, String> configuration) {
+    public ARecordPrinterFactory(ARecordType recType, ARecordType itemType, 
Map<String, String> configuration,
+            IWarningCollector warningCollector) {
         this.recType = recType;
         this.itemType = itemType;
         this.configuration = configuration;
+        this.warningCollector = warningCollector;
     }
 
     @Override
@@ -52,7 +56,7 @@
         final IAType inputType =
                 recType == null ? 
DefaultOpenFieldType.getDefaultOpenFieldType(ATypeTag.OBJECT) : recType;
         final IVisitablePointable recAccessor = 
allocator.allocateRecordValue(inputType);
-        final APrintVisitor printVisitor = new APrintVisitor(itemType, 
configuration);
+        final APrintVisitor printVisitor = new APrintVisitor(itemType, 
configuration, warningCollector);
         final Pair<PrintStream, ATypeTag> arg = new Pair<>(null, null);

         return new IPrinter() {
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/CSVPrinterFactoryProvider.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/CSVPrinterFactoryProvider.java
index 322b3e6..484d3a8 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/CSVPrinterFactoryProvider.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/CSVPrinterFactoryProvider.java
@@ -61,26 +61,29 @@
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.algebricks.data.IPrinterFactory;
 import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
 import org.apache.hyracks.api.exceptions.SourceLocation;

 public class CSVPrinterFactoryProvider implements IPrinterFactoryProvider {
     private ARecordType itemType;
     private Map<String, String> configuration;
     private SourceLocation sourceLocation;
+    private IWarningCollector warningCollector;

     public static final CSVPrinterFactoryProvider INSTANCE =
-            new CSVPrinterFactoryProvider(null, Collections.emptyMap(), null);
+            new CSVPrinterFactoryProvider(null, Collections.emptyMap(), null, 
null);

     public static final CSVPrinterFactoryProvider createInstance(ARecordType 
itemType,
-            Map<String, String> configuration, SourceLocation sourceLocation) {
-        return new CSVPrinterFactoryProvider(itemType, configuration, 
sourceLocation);
+            Map<String, String> configuration, SourceLocation sourceLocation, 
IWarningCollector warningCollector) {
+        return new CSVPrinterFactoryProvider(itemType, configuration, 
sourceLocation, warningCollector);
     }

     private CSVPrinterFactoryProvider(ARecordType itemType, Map<String, 
String> configuration,
-            SourceLocation sourceLocation) {
+            SourceLocation sourceLocation, IWarningCollector warningCollector) 
{
         this.itemType = itemType;
         this.configuration = configuration;
         this.sourceLocation = sourceLocation;
+        this.warningCollector = warningCollector;
     }

     @Override
@@ -137,7 +140,7 @@
                             configuration.get(KEY_FORCE_QUOTE), 
configuration.get(KEY_ESCAPE),
                             configuration.get(KEY_DELIMITER));
                 case OBJECT:
-                    return new ARecordPrinterFactory((ARecordType) type, 
itemType, configuration);
+                    return new ARecordPrinterFactory((ARecordType) type, 
itemType, configuration, warningCollector);
                 case ARRAY:
                     throw new NotImplementedException("'OrderedList' type 
unsupported for CSV output");
                 case MULTISET:
@@ -167,7 +170,7 @@
                     break;
             }
         }
-        return AObjectPrinterFactory.createInstance(itemType, configuration);
+        return AObjectPrinterFactory.createInstance(itemType, configuration, 
warningCollector);

     }
 }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
index ce9e3a3..1ee2534 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/ACSVRecordPrinter.java
@@ -19,6 +19,7 @@

 package org.apache.asterix.om.pointables.printer.csv;

+import static 
org.apache.asterix.common.exceptions.ErrorCode.COPY_TO_SCHEMA_MISMATCH;
 import static org.apache.asterix.om.types.hierachy.ATypeHierarchy.isCompatible;

 import java.io.PrintStream;
@@ -37,6 +38,8 @@
 import org.apache.asterix.om.types.EnumDeserializer;
 import org.apache.asterix.om.types.IAType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.util.string.UTF8StringUtil;

 public class ACSVRecordPrinter extends ARecordPrinter {
@@ -44,17 +47,20 @@
     private boolean firstRecord;
     private boolean header;
     private final String recordDelimiter;
+    private final IWarningCollector warningCollector;
     private static final List<ATypeTag> supportedTypes = 
List.of(ATypeTag.TINYINT, ATypeTag.SMALLINT, ATypeTag.INTEGER,
             ATypeTag.BIGINT, ATypeTag.UINT8, ATypeTag.UINT16, ATypeTag.UINT64, 
ATypeTag.FLOAT, ATypeTag.DOUBLE,
             ATypeTag.STRING, ATypeTag.BOOLEAN, ATypeTag.DATETIME, 
ATypeTag.UINT32, ATypeTag.DATE, ATypeTag.TIME);

     public ACSVRecordPrinter(final String startRecord, final String endRecord, 
final String fieldSeparator,
-            final String fieldNameSeparator, String recordDelimiter, 
ARecordType schema, String headerStr) {
+            final String fieldNameSeparator, String recordDelimiter, 
ARecordType schema, String headerStr,
+            IWarningCollector warningCollector) {
         super(startRecord, endRecord, fieldSeparator, fieldNameSeparator);
         this.schema = schema;
         this.header = headerStr != null && Boolean.parseBoolean(headerStr);
         this.firstRecord = true;
         this.recordDelimiter = recordDelimiter;
+        this.warningCollector = warningCollector;
     }

     @Override
@@ -108,6 +114,10 @@
                 }
                 printField(ps, visitor, fieldNamePointable, fieldValue, 
expectedTypeTag);
             }
+        } else {
+            if (warningCollector.shouldWarn()) {
+                warningCollector.warn(Warning.of(null, 
COPY_TO_SCHEMA_MISMATCH));
+            }
         }
     }

@@ -117,7 +127,6 @@
         final List<String> expectedFieldNames = 
Arrays.asList(schema.getFieldNames());
         final List<IAType> expectedFieldTypes = 
Arrays.asList(schema.getFieldTypes());
         if (fieldNames.size() != expectedFieldNames.size()) {
-            // todo: raise warning about schema mismatch
             return false;
         }
         for (int i = 0; i < fieldNames.size(); ++i) {
@@ -136,11 +145,9 @@
                         expectedType = unionType.getActualType().getTypeTag();
                         canNull = unionType.isNullableType();
                         if (!supportedTypes.contains(expectedType)) {
-                            // unsupported DataType
                             return false;
                         }
                     } else {
-                        // todo: unexpected type
                         return false;
                     }
                 } else {
@@ -148,11 +155,9 @@
                 }
                 schemaDetails.put(fieldColumnName, expectedType);
             } else {
-                // todo: raise warning about schema mismatch
                 return false;
             }
             if (typeTag.equals(ATypeTag.MISSING) || 
(typeTag.equals(ATypeTag.NULL) && !canNull)) {
-                // todo: raise warning about schema mismatch
                 return false;
             }
             if (!isCompatible(typeTag, expectedType) && !canNull) {
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/APrintVisitor.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/APrintVisitor.java
index bbe5286..f4bb1c1 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/APrintVisitor.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/pointables/printer/csv/APrintVisitor.java
@@ -35,6 +35,7 @@
 import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;

 /**
  * This class is a IVisitablePointableVisitor implementation which recursively
@@ -45,11 +46,13 @@
     private final ARecordType itemType;
     private final Map<String, String> configuration;
     private AObjectPrinterFactory objectPrinterFactory;
+    private final IWarningCollector warningCollector;

-    public APrintVisitor(ARecordType itemType, Map<String, String> 
configuration) {
+    public APrintVisitor(ARecordType itemType, Map<String, String> 
configuration, IWarningCollector warningCollector) {
         super();
         this.itemType = itemType;
         this.configuration = configuration;
+        this.warningCollector = warningCollector;
     }

     @Override
@@ -62,14 +65,15 @@
         String delimiter = CSVUtils.getDelimiter(configuration);
         String recordDelimiter = configuration.get(KEY_RECORD_DELIMITER) == 
null ? (itemType == null ? "" : "\n")
                 : configuration.get(KEY_RECORD_DELIMITER);
-        return new ACSVRecordPrinter("", "", delimiter, null, recordDelimiter, 
itemType, configuration.get(KEY_HEADER));
+        return new ACSVRecordPrinter("", "", delimiter, null, recordDelimiter, 
itemType, configuration.get(KEY_HEADER),
+                warningCollector);
     }
 
     @Override
     protected boolean printFlatValue(ATypeTag typeTag, byte[] b, int s, int l, 
PrintStream ps)
             throws HyracksDataException {
         if (objectPrinterFactory == null) {
-            objectPrinterFactory = 
AObjectPrinterFactory.createInstance(itemType, configuration);
+            objectPrinterFactory = 
AObjectPrinterFactory.createInstance(itemType, configuration, warningCollector);
         }
         return objectPrinterFactory.printFlatValue(typeTag, b, s, l, ps);
     }
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
index 11c8b81..8de683a 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java
@@ -62,8 +62,8 @@
     Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteFileRuntime(int sourceColumn,
             int[] partitionColumns, IBinaryComparatorFactory[] 
partitionComparatorFactories,
             IScalarEvaluatorFactory dynamicPathEvalFactory, ILogicalExpression 
staticPathExpr,
-            SourceLocation pathSourceLocation, IWriteDataSink sink, 
RecordDescriptor inputDesc, Object sourceType)
-            throws AlgebricksException;
+            SourceLocation pathSourceLocation, IWriteDataSink sink, 
RecordDescriptor inputDesc, Object sourceType,
+            JobGenContext context) throws AlgebricksException;

     Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> 
getWriteDatabaseWithKeyRuntime(int sourceColumn,
             IScalarEvaluatorFactory[] keyEvaluatorFactories, IWriteDataSink 
sink, RecordDescriptor inputDesc,
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
index b8c1f36..05b3ddc 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java
@@ -176,7 +176,7 @@
         if (write.isFileStoreSink()) {
             runtimeAndConstraints = mp.getWriteFileRuntime(sourceColumn, 
partitionColumns, partitionComparatorFactories,
                     dynamicPathEvalFactory, staticPathExpr, 
pathExpr.getSourceLocation(), writeDataSink, inputDesc,
-                    typeEnv.getVarType(sourceVariable));
+                    typeEnv.getVarType(sourceVariable), context);

         } else {
             runtimeAndConstraints = 
mp.getWriteDatabaseWithKeyRuntime(sourceColumn, keyEvalFactories, writeDataSink,

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19405
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I71d9e315be78a246bba733ea704f2d50ae71507c
Gerrit-Change-Number: 19405
Gerrit-PatchSet: 1
Gerrit-Owner: Utsav Singh <[email protected]>
Gerrit-MessageType: newchange

Reply via email to