Hussain Towaileb has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/3423


Change subject: [WIP] Reporting failed ingestion records
......................................................................

[WIP] Reporting failed ingestion records

Change-Id: Ic3fcc6b771b4393542988cd0aa6fd992ccb81028
---
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterDatasource.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterFunction.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterReader.java
A 
asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterRewriter.java
M 
asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
M 
asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
18 files changed, 367 insertions(+), 31 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/23/3423/1

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterDatasource.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterDatasource.java
new file mode 100644
index 0000000..d02eac1
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterDatasource.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.metadata.api.IDatasourceFunction;
+import org.apache.asterix.metadata.declared.DataSourceId;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.asterix.metadata.declared.MetadataProvider;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain;
+
+/**
+ * This function reads the log files containing the reported failed ingestion 
data, row by row, and returns them.
+ */
+
+public class FailedIngestionReporterDatasource extends FunctionDataSource {
+
+    private static final DataSourceId FAILED_INGESTION_REPORTER_DATASOURCE_ID =
+            new 
DataSourceId(FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER.getNamespace(),
+                    
FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER.getName());
+
+    public FailedIngestionReporterDatasource(INodeDomain domain) throws 
AlgebricksException {
+        super(FAILED_INGESTION_REPORTER_DATASOURCE_ID, domain);
+    }
+
+    @Override
+    protected IDatasourceFunction createFunction(MetadataProvider 
metadataProvider,
+            AlgebricksAbsolutePartitionConstraint locations) {
+        return new FailedIngestionReporterFunction(locations);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterFunction.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterFunction.java
new file mode 100644
index 0000000..96107f1
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.metadata.declared.AbstractDatasourceFunction;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * This function reads the log files containing the reported failed ingestion 
data, row by row, and returns them.
+ */
+
+public class FailedIngestionReporterFunction extends 
AbstractDatasourceFunction {
+
+    private static final long serialVersionUID = 1L;
+
+    public 
FailedIngestionReporterFunction(AlgebricksAbsolutePartitionConstraint 
locations) {
+        super(locations);
+    }
+
+    @Override
+    public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, 
int partition)
+            throws HyracksDataException {
+        return new FailedIngestionReporterReader(partition);
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterReader.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterReader.java
new file mode 100644
index 0000000..45fb764
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterReader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.input.record.CharArrayRecord;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+/**
+ * The log files should be organized as a String formatted record, each single 
line in the log file contains a single
+ * full record. This function reads the log file one line at a time, ensures 
the record format is valid, then returns
+ * it.
+ *
+ * Log files may exist on different partitions, however, this function assumes 
that all the log files are using
+ * identical paths and names in all the partitions.
+ */
+
+public class FailedIngestionReporterReader extends FunctionReader {
+
+    private static final String PROGRESS_LOG_BASE_PATH = "logs";
+    private static final String FAILED_INGEST_RECORD_FILE_NAME = 
"failed_ingest_record.log";
+
+    // Ensure the parent folders are all created first
+    private final int partitionNumber;
+    private BufferedReader bufferedReader;
+    private String line;
+
+    FailedIngestionReporterReader(int partitionNumber) throws 
HyracksDataException {
+        this.partitionNumber = partitionNumber;
+
+        openFiles();
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+        try {
+            if (bufferedReader == null) {
+                return false;
+            }
+
+            line = bufferedReader.readLine();
+            return line != null;
+        } catch (IOException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+
+    @Override
+    public IRawRecord<char[]> next() throws IOException, InterruptedException {
+        // TODO(Hussain) Temporary assumption, each read line is a valid 
string json record. validated using JSONObject
+        CharArrayRecord record = new CharArrayRecord();
+        record.append((line).toCharArray());
+        record.endRecord();
+        return record;
+    }
+
+    /**
+     * Opens the files to prepare for the reading
+     *
+     * @throws HyracksDataException HyracksDataException
+     */
+    private void openFiles() throws HyracksDataException {
+        // TODO(Hussain) Get this path from a constant somewhere
+        // Ensure the parent folders are all created first
+        Path path = Paths.get(PROGRESS_LOG_BASE_PATH + File.separator + 
partitionNumber + File.separator
+                + FAILED_INGEST_RECORD_FILE_NAME);
+        try {
+            if (Files.exists(path)) {
+                bufferedReader = Files.newBufferedReader(path, 
StandardCharsets.UTF_8);
+            }
+        } catch (IOException ex) {
+            throw HyracksDataException.create(ex);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterRewriter.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterRewriter.java
new file mode 100644
index 0000000..31c0ddf
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterRewriter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.function;
+
+import org.apache.asterix.common.functions.FunctionConstants;
+import org.apache.asterix.metadata.declared.FunctionDataSource;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+/**
+ * This function reads the log files containing the reported failed ingestion 
data, row by row, and returns them.
+ */
+
+public class FailedIngestionReporterRewriter extends FunctionRewriter {
+
+    public static final FunctionIdentifier FAILED_INGESTION_REPORTER =
+            new FunctionIdentifier(FunctionConstants.ASTERIX_NS, 
"failed_ingestion_report", 0);
+    public static final FailedIngestionReporterRewriter INSTANCE =
+            new FailedIngestionReporterRewriter(FAILED_INGESTION_REPORTER);
+
+    private FailedIngestionReporterRewriter(FunctionIdentifier functionId) {
+        super(functionId);
+    }
+
+    @Override
+    protected FunctionDataSource toDatasource(IOptimizationContext context, 
AbstractFunctionCallExpression function)
+            throws AlgebricksException {
+
+        return new 
FailedIngestionReporterDatasource(context.getComputationNodeDomain());
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
index c708cd1..03959bc 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java
@@ -23,6 +23,7 @@
 import org.apache.asterix.app.function.DatasetResourcesRewriter;
 import org.apache.asterix.app.function.DatasetRewriter;
 import org.apache.asterix.app.function.DumpIndexRewriter;
+import org.apache.asterix.app.function.FailedIngestionReporterRewriter;
 import org.apache.asterix.app.function.FeedRewriter;
 import org.apache.asterix.app.function.JobSummariesRewriter;
 import org.apache.asterix.app.function.PingRewriter;
@@ -65,6 +66,12 @@
         
BuiltinFunctions.addUnnestFun(TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR, 
true);
         
BuiltinFunctions.addDatasourceFunction(TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR,
                 TPCDSDataGeneratorRewriter.INSTANCE);
+        // Failed ingestion reporter
+        
BuiltinFunctions.addFunction(FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER,
+                (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, 
true);
+        
BuiltinFunctions.addUnnestFun(FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER,
 true);
+        
BuiltinFunctions.addDatasourceFunction(FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER,
+                FailedIngestionReporterRewriter.INSTANCE);
         // Active requests function
         BuiltinFunctions.addFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS,
                 (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, 
true);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
index 5dbc383..2275d32 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.external.api;

 import java.io.DataOutput;
+import java.util.function.Consumer;

 import org.apache.asterix.builders.IARecordBuilder;
 import org.apache.asterix.builders.OrderedListBuilder;
@@ -111,4 +112,36 @@
         DataOutput out = buffer.getDataOutput();
         serde.serialize(serializable, out);
     }
+
+    /**
+     * Implement this method if failure of parsing needs to be logged.
+     *
+     * In case of failure to parse data, the parser can construct a String 
formatted record containing metadata
+     * information about the data that failed to be parsed. This allows the 
parser to pass this information to the
+     * components responsible for logging and log this failure. The whole 
String record should be a single line.
+     *
+     * Recommended minimum fields with sample data: (fields can differ 
depending on data source)
+     * {"source": "twitter", "bucket": 1, "key": abc, "timestamp": 
"ADMMaskTime"}
+     *
+     * More fields can be added if further information needs to be logged.
+     *
+     * This would enable the system to read this information and allow for 
reporting capability.
+     *
+     * @return String formatted record with metadata information
+     */
+    default String constructParsingFailureMetadataStringRecord() {
+        throw new 
UnsupportedOperationException("constructParsingFailureMetadataStringRecord is 
not implemented");
+    }
+
+    /**
+     * Implement this method if failure of parsing needs to be logged.
+     *
+     * Passes a consumer function to handle the logging. The function should 
be consuming the string constructed
+     * containing the metadata for logging.
+     *
+     * @param consumer a {@link Consumer} function to consume the string with 
the metadata and log it.
+     */
+    default void setParsingFailureLoggingHandler(Consumer<String> consumer) {
+        throw new 
UnsupportedOperationException("setParsingFailureLoggingHandler is not 
implemented");
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
index 2533af5..11e5e5c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java
@@ -30,12 +30,15 @@
 public abstract class AbstractFeedDataFlowController implements 
IDataFlowController, Closeable {
     protected TupleForwarder tupleForwarder;
     protected final IHyracksTaskContext ctx;
+    protected final int partitionNumber;
     protected final int numOfFields;
     protected final ArrayTupleBuilder tb;
     protected final FeedLogManager feedLogManager;
     protected boolean flushing;

-    public AbstractFeedDataFlowController(IHyracksTaskContext ctx, 
FeedLogManager feedLogManager, int numOfFields) {
+    public AbstractFeedDataFlowController(IHyracksTaskContext ctx, int 
partitionNumber, FeedLogManager feedLogManager,
+            int numOfFields) {
+        this.partitionNumber = partitionNumber;
         this.feedLogManager = feedLogManager;
         this.numOfFields = numOfFields;
         this.ctx = ctx;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
index b14722b..f419641 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java
@@ -30,10 +30,11 @@

     private final IRecordWithPKDataParser<T> dataParser;

-    public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final 
FeedLogManager feedLogManager,
-            final int numOfOutputFields, final IRecordWithPKDataParser<T> 
dataParser,
-            final IRecordReader<T> recordReader) throws HyracksDataException {
-        super(ctx, feedLogManager, numOfOutputFields, dataParser, 
recordReader);
+    public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, int 
partitionNumber,
+            final FeedLogManager feedLogManager, final int numOfOutputFields,
+            final IRecordWithPKDataParser<T> dataParser, final 
IRecordReader<T> recordReader)
+            throws HyracksDataException {
+        super(ctx, partitionNumber, feedLogManager, numOfOutputFields, 
dataParser, recordReader);
         this.dataParser = dataParser;
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
index 621397b..9359cde 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java
@@ -28,10 +28,11 @@

 public class ChangeFeedWithMetaDataFlowController<T> extends 
FeedWithMetaDataFlowController<T> {

-    public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, 
final FeedLogManager feedLogManager,
-            final int numOfOutputFields, final IRecordWithMetadataParser<T> 
dataParser,
-            final IRecordReader<T> recordReader) throws HyracksDataException {
-        super(ctx, feedLogManager, numOfOutputFields, dataParser, 
recordReader);
+    public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, 
int partitionNumber,
+            final FeedLogManager feedLogManager, final int numOfOutputFields,
+            final IRecordWithMetadataParser<T> dataParser, final 
IRecordReader<T> recordReader)
+            throws HyracksDataException {
+        super(ctx, partitionNumber, feedLogManager, numOfOutputFields, 
dataParser, recordReader);
     }

     @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
index f392139..6986ebb 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java
@@ -57,13 +57,15 @@
     protected long incomingRecordsCount = 0;
     protected long failedRecordsCount = 0;

-    public FeedRecordDataFlowController(IHyracksTaskContext ctx, 
FeedLogManager feedLogManager, int numOfOutputFields,
-            IRecordDataParser<T> dataParser, IRecordReader<T> recordReader) 
throws HyracksDataException {
-        super(ctx, feedLogManager, numOfOutputFields);
+    public FeedRecordDataFlowController(IHyracksTaskContext ctx, int 
partitionNumber, FeedLogManager feedLogManager,
+            int numOfOutputFields, IRecordDataParser<T> dataParser, 
IRecordReader<T> recordReader)
+            throws HyracksDataException {
+        super(ctx, partitionNumber, feedLogManager, numOfOutputFields);
         this.dataParser = dataParser;
         this.recordReader = recordReader;
         recordReader.setFeedLogManager(feedLogManager);
         recordReader.setController(this);
+        dataParser.setParsingFailureLoggingHandler(feedLogManager::logRecord);
     }

     @Override
@@ -179,9 +181,19 @@
     private boolean parseAndForward(IRawRecord<? extends T> record) throws 
IOException {
         try {
             dataParser.parse(record, tb.getDataOutput());
+        } catch (RuntimeDataException e) {
+            // If the String record is passed for the failed-parsed data, then 
log it to the file
+            String stringRecord = e.getParams() != null && 
e.getParams().length > 0 ? (String) e.getParams()[0] : null;
+
+            // File logging
+            if (stringRecord != null) {
+                feedLogManager.logRecord(stringRecord);
+            }
+
+            // continue the outer loop
+            return false;
         } catch (Exception e) {
             LOGGER.log(Level.WARN, ExternalDataConstants.ERROR_PARSE_RECORD, 
e);
-            feedLogManager.logRecord(record.toString(), 
ExternalDataConstants.ERROR_PARSE_RECORD);
             // continue the outer loop
             return false;
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
index 4deb422..93d6ee1 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java
@@ -31,9 +31,9 @@
     private final AsterixInputStream stream;
     protected long incomingRecordsCount = 0;

-    public FeedStreamDataFlowController(IHyracksTaskContext ctx, 
FeedLogManager feedLogManager,
+    public FeedStreamDataFlowController(IHyracksTaskContext ctx, int 
partitionNumber, FeedLogManager feedLogManager,
             IStreamDataParser streamParser, AsterixInputStream inputStream) {
-        super(ctx, feedLogManager, 1);
+        super(ctx, partitionNumber, feedLogManager, 1);
         this.dataParser = streamParser;
         this.stream = inputStream;
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
index 289c16f..044e681 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java
@@ -30,9 +30,10 @@

     protected final IRecordWithMetadataParser<T> dataParser;

-    public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, 
FeedLogManager feedLogManager, int numOfOutputFields,
-            IRecordWithMetadataParser<T> dataParser, IRecordReader<T> 
recordReader) throws HyracksDataException {
-        super(ctx, feedLogManager, numOfOutputFields, dataParser, 
recordReader);
+    public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, int 
partitionNumber, FeedLogManager feedLogManager,
+            int numOfOutputFields, IRecordWithMetadataParser<T> dataParser, 
IRecordReader<T> recordReader)
+            throws HyracksDataException {
+        super(ctx, partitionNumber, feedLogManager, numOfOutputFields, 
dataParser, recordReader);
         this.dataParser = dataParser;
     }

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
index f9bc8b2..959e34d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java
@@ -100,6 +100,10 @@
         return record;
     }

+    public IAType[] getMetaTypes() {
+        return metaTypes;
+    }
+
     public ArrayBackedValueStorage getMetadata(final int index) {
         return fieldValueBuffers[index];
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
index f9b012e..e34df1c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java
@@ -76,18 +76,19 @@
                         if (isRecordWithMeta) {
                             if (isChangeFeed) {
                                 int numOfKeys = 
ExternalDataUtils.getNumberOfKeys(configuration);
-                                return new 
ChangeFeedWithMetaDataFlowController(ctx, feedLogManager, numOfKeys + 2,
-                                        (IRecordWithMetadataParser) 
dataParser, recordReader);
+                                return new 
ChangeFeedWithMetaDataFlowController(ctx, partition, feedLogManager,
+                                        numOfKeys + 2, 
(IRecordWithMetadataParser) dataParser, recordReader);
                             } else {
-                                return new FeedWithMetaDataFlowController(ctx, 
feedLogManager, 2,
+                                return new FeedWithMetaDataFlowController(ctx, 
partition, feedLogManager, 2,
                                         (IRecordWithMetadataParser) 
dataParser, recordReader);
                             }
                         } else if (isChangeFeed) {
                             int numOfKeys = 
ExternalDataUtils.getNumberOfKeys(configuration);
-                            return new ChangeFeedDataFlowController(ctx, 
feedLogManager, numOfKeys + 1,
+                            return new ChangeFeedDataFlowController(ctx, 
partition, feedLogManager, numOfKeys + 1,
                                     (IRecordWithPKDataParser) dataParser, 
recordReader);
                         } else {
-                            return new FeedRecordDataFlowController(ctx, 
feedLogManager, 1, dataParser, recordReader);
+                            return new FeedRecordDataFlowController(ctx, 
partition, feedLogManager, 1, dataParser,
+                                    recordReader);
                         }
                     } else {
                         return new RecordDataFlowController(ctx, dataParser, 
recordReader, 1);
@@ -99,7 +100,7 @@
                     IStreamDataParser streamParser = 
streamParserFactory.createInputStreamParser(ctx, partition);
                     streamParser.setInputStream(stream);
                     if (isFeed) {
-                        return new FeedStreamDataFlowController(ctx, 
feedLogManager, streamParser, stream);
+                        return new FeedStreamDataFlowController(ctx, 
partition, feedLogManager, streamParser, stream);
                     } else {
                         return new StreamDataFlowController(ctx, streamParser);
                     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
index 282b536..65d124b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java
@@ -36,6 +36,9 @@
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.util.LogRedactionUtil;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;

 public class FeedLogManager implements Closeable {

@@ -46,9 +49,13 @@
         SNAPSHOT // an identifier that partitions with identifiers before this 
one should be ignored
     }

+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private static final String PROGRESS_LOG_BASE_PATH = "logs";
     private static final String PROGRESS_LOG_FILE_NAME = "progress.log";
     private static final String ERROR_LOG_FILE_NAME = "error.log";
     private static final String BAD_RECORDS_FILE_NAME = "failed_record.log";
+    private static final String FAILED_INGEST_RECORD_FILE_NAME = 
"failed_ingest_record.log";
     private static final String START_PREFIX = "s:";
     private static final String END_PREFIX = "e:";
     private static final String DATE_FORMAT_STRING = "MM/dd/yyyy HH:mm:ss";
@@ -56,15 +63,18 @@
     private String currentPartition;
     private final TreeSet<String> completed;
     private final Path dir;
+    private final int partitionNumber;
     private BufferedWriter progressLogger;
     private BufferedWriter errorLogger;
     private BufferedWriter recordLogger;
+    private BufferedWriter failedIngestionRecordLogger;
     private final StringBuilder stringBuilder = new StringBuilder();
     private int count = 0;
     private static final DateFormat df = new 
SimpleDateFormat(DATE_FORMAT_STRING);

-    public FeedLogManager(File file) throws HyracksDataException {
+    public FeedLogManager(File file, int partitionNumber) throws 
HyracksDataException {
         try {
+            this.partitionNumber = partitionNumber;
             this.dir = file.toPath();
             this.completed = new TreeSet<>();
             if (!exists()) {
@@ -122,6 +132,15 @@
         recordLogger = Files.newBufferedWriter(
                 Paths.get(dir.toAbsolutePath().toString() + File.separator + 
BAD_RECORDS_FILE_NAME),
                 StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+
+        // Ensure the parent folders are all created first
+        Path path = Paths.get(PROGRESS_LOG_BASE_PATH + File.separator + 
partitionNumber + File.separator
+                + FAILED_INGEST_RECORD_FILE_NAME);
+        if (!Files.exists(path.getParent())) {
+            Files.createDirectories(path.getParent());
+        }
+        failedIngestionRecordLogger = Files.newBufferedWriter(path, 
StandardCharsets.UTF_8, StandardOpenOption.CREATE,
+                StandardOpenOption.APPEND);
     }

     @Override
@@ -184,6 +203,19 @@
         recordLogger.flush();
     }

+    public void logRecord(String record) {
+        try {
+            failedIngestionRecordLogger.write(record);
+            failedIngestionRecordLogger.newLine();
+
+            // TODO This could be optimized by flushing only at closing (or 
auto-flush when buffer is full)
+            // validate between safer logging vs riskier logging but with 
better performance
+            failedIngestionRecordLogger.flush();
+        } catch (IOException ex) {
+            LOGGER.log(Level.WARN, "Failed to log record metadata", ex);
+        }
+    }
+
     private static String getSplitId(String log) {
         return log.substring(PREFIX_SIZE);
     }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
index ecaced6..4ab1093 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java
@@ -118,13 +118,14 @@
     public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, 
int partition,
             FileSplit[] feedLogFileSplits) throws HyracksDataException {
         return new FeedLogManager(
-                
FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, 
ctx.getIoManager()).getFile());
+                
FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, 
ctx.getIoManager()).getFile(),
+                partition);
     }

-    public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, 
FileSplit feedLogFileSplit)
+    public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, 
int partition, FileSplit feedLogFileSplit)
             throws HyracksDataException {
         return new FeedLogManager(
-                FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, 
ctx.getIoManager()).getFile());
+                FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, 
ctx.getIoManager()).getFile(), partition);
     }

     public static void processFeedMessage(ByteBuffer input, VSizeFrame 
message, FrameTupleAccessor fta)
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
index effd59f..6c50054 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java
@@ -55,7 +55,7 @@

     public TestTypedAdapter(ITupleParserFactory parserFactory, ARecordType 
sourceDatatype, IHyracksTaskContext ctx,
             Map<String, String> configuration, int partition) throws 
IOException {
-        super(new TestTypedFeedDataFlowController(ctx));
+        super(new TestTypedFeedDataFlowController(ctx, partition));
         pos = new PipedOutputStream();
         pis = new PipedInputStream(pos);
         this.configuration = configuration;
diff --git 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
index 708cdd8..4fa9cbf 100644
--- 
a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
+++ 
b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java
@@ -23,8 +23,8 @@
 import org.apache.hyracks.api.context.IHyracksTaskContext;

 class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController {
-    TestTypedFeedDataFlowController(IHyracksTaskContext ctx) {
-        super(ctx, null, 0);
+    TestTypedFeedDataFlowController(IHyracksTaskContext ctx, int 
partitionNumber) {
+        super(ctx, partitionNumber, null, 0);
     }

     @Override

--
To view, visit https://asterix-gerrit.ics.uci.edu/3423
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic3fcc6b771b4393542988cd0aa6fd992ccb81028
Gerrit-Change-Number: 3423
Gerrit-PatchSet: 1
Gerrit-Owner: Hussain Towaileb <[email protected]>

Reply via email to