Hussain Towaileb has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/3423
Change subject: [WIP] Reporting failed ingestion records ...................................................................... [WIP] Reporting failed ingestion records Change-Id: Ic3fcc6b771b4393542988cd0aa6fd992ccb81028 --- A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterDatasource.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterFunction.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterReader.java A asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterRewriter.java M asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java M asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java 18 files changed, 367 insertions(+), 31 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/23/3423/1 diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterDatasource.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterDatasource.java new file mode 100644 index 0000000..d02eac1 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterDatasource.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.metadata.api.IDatasourceFunction; +import org.apache.asterix.metadata.declared.DataSourceId; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; + +/** + * This function reads the log files containing the reported failed ingestion data, row by row, and returns them. + */ + +public class FailedIngestionReporterDatasource extends FunctionDataSource { + + private static final DataSourceId FAILED_INGESTION_REPORTER_DATASOURCE_ID = + new DataSourceId(FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER.getNamespace(), + FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER.getName()); + + public FailedIngestionReporterDatasource(INodeDomain domain) throws AlgebricksException { + super(FAILED_INGESTION_REPORTER_DATASOURCE_ID, domain); + } + + @Override + protected IDatasourceFunction createFunction(MetadataProvider metadataProvider, + AlgebricksAbsolutePartitionConstraint locations) { + return new FailedIngestionReporterFunction(locations); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterFunction.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterFunction.java new file mode 100644 index 0000000..96107f1 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterFunction.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.metadata.declared.AbstractDatasourceFunction; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * This function reads the log files containing the reported failed ingestion data, row by row, and returns them. + */ + +public class FailedIngestionReporterFunction extends AbstractDatasourceFunction { + + private static final long serialVersionUID = 1L; + + public FailedIngestionReporterFunction(AlgebricksAbsolutePartitionConstraint locations) { + super(locations); + } + + @Override + public IRecordReader<char[]> createRecordReader(IHyracksTaskContext ctx, int partition) + throws HyracksDataException { + return new FailedIngestionReporterReader(partition); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterReader.java new file mode 100644 index 0000000..45fb764 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterReader.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; + +import org.apache.asterix.external.api.IRawRecord; +import org.apache.asterix.external.input.record.CharArrayRecord; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * The log files should be organized as a String formatted record, each single line in the log file contains a single + * full record. This function reads the log file one line at a time, ensures the record format is valid, then returns + * it. + * + * Log files may exist on different partitions, however, this function assumes that all the log files are using + * identical paths and names in all the partitions. + */ + +public class FailedIngestionReporterReader extends FunctionReader { + + private static final String PROGRESS_LOG_BASE_PATH = "logs"; + private static final String FAILED_INGEST_RECORD_FILE_NAME = "failed_ingest_record.log"; + + // Ensure the parent folders are all created first + private final int partitionNumber; + private BufferedReader bufferedReader; + private String line; + + FailedIngestionReporterReader(int partitionNumber) throws HyracksDataException { + this.partitionNumber = partitionNumber; + + openFiles(); + } + + @Override + public boolean hasNext() throws IOException { + try { + if (bufferedReader == null) { + return false; + } + + line = bufferedReader.readLine(); + return line != null; + } catch (IOException ex) { + throw HyracksDataException.create(ex); + } + } + + @Override + public IRawRecord<char[]> next() throws IOException, InterruptedException { + // TODO(Hussain) Temporary assumption, each read line is a valid string json record. validated using JSONObject + CharArrayRecord record = new CharArrayRecord(); + record.append((line).toCharArray()); + record.endRecord(); + return record; + } + + /** + * Opens the files to prepare for the reading + * + * @throws HyracksDataException HyracksDataException + */ + private void openFiles() throws HyracksDataException { + // TODO(Hussain) Get this path from a constant somewhere + // Ensure the parent folders are all created first + Path path = Paths.get(PROGRESS_LOG_BASE_PATH + File.separator + partitionNumber + File.separator + + FAILED_INGEST_RECORD_FILE_NAME); + try { + if (Files.exists(path)) { + bufferedReader = Files.newBufferedReader(path, StandardCharsets.UTF_8); + } + } catch (IOException ex) { + throw HyracksDataException.create(ex); + } + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterRewriter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterRewriter.java new file mode 100644 index 0000000..31c0ddf --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/function/FailedIngestionReporterRewriter.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.function; + +import org.apache.asterix.common.functions.FunctionConstants; +import org.apache.asterix.metadata.declared.FunctionDataSource; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; +import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; + +/** + * This function reads the log files containing the reported failed ingestion data, row by row, and returns them. + */ + +public class FailedIngestionReporterRewriter extends FunctionRewriter { + + public static final FunctionIdentifier FAILED_INGESTION_REPORTER = + new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "failed_ingestion_report", 0); + public static final FailedIngestionReporterRewriter INSTANCE = + new FailedIngestionReporterRewriter(FAILED_INGESTION_REPORTER); + + private FailedIngestionReporterRewriter(FunctionIdentifier functionId) { + super(functionId); + } + + @Override + protected FunctionDataSource toDatasource(IOptimizationContext context, AbstractFunctionCallExpression function) + throws AlgebricksException { + + return new FailedIngestionReporterDatasource(context.getComputationNodeDomain()); + } +} diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java index c708cd1..03959bc 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/util/MetadataBuiltinFunctions.java @@ -23,6 +23,7 @@ import org.apache.asterix.app.function.DatasetResourcesRewriter; import org.apache.asterix.app.function.DatasetRewriter; import org.apache.asterix.app.function.DumpIndexRewriter; +import org.apache.asterix.app.function.FailedIngestionReporterRewriter; import org.apache.asterix.app.function.FeedRewriter; import org.apache.asterix.app.function.JobSummariesRewriter; import org.apache.asterix.app.function.PingRewriter; @@ -65,6 +66,12 @@ BuiltinFunctions.addUnnestFun(TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR, true); BuiltinFunctions.addDatasourceFunction(TPCDSDataGeneratorRewriter.TPCDS_DATA_GENERATOR, TPCDSDataGeneratorRewriter.INSTANCE); + // Failed ingestion reporter + BuiltinFunctions.addFunction(FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER, + (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); + BuiltinFunctions.addUnnestFun(FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER, true); + BuiltinFunctions.addDatasourceFunction(FailedIngestionReporterRewriter.FAILED_INGESTION_REPORTER, + FailedIngestionReporterRewriter.INSTANCE); // Active requests function BuiltinFunctions.addFunction(ActiveRequestsRewriter.ACTIVE_REQUESTS, (expression, env, mp) -> RecordUtil.FULLY_OPEN_RECORD_TYPE, true); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java index 5dbc383..2275d32 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IDataParser.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.api; import java.io.DataOutput; +import java.util.function.Consumer; import org.apache.asterix.builders.IARecordBuilder; import org.apache.asterix.builders.OrderedListBuilder; @@ -111,4 +112,36 @@ DataOutput out = buffer.getDataOutput(); serde.serialize(serializable, out); } + + /** + * Implement this method if failure of parsing needs to be logged. + * + * In case of failure to parse data, the parser can construct a String formatted record containing metadata + * information about the data that failed to be parsed. This allows the parser to pass this information to the + * components responsible for logging and log this failure. The whole String record should be a single line. + * + * Recommended minimum fields with sample data: (fields can differ depending on data source) + * {"source": "twitter", "bucket": 1, "key": abc, "timestamp": "ADMMaskTime"} + * + * More fields can be added if further information needs to be logged. + * + * This would enable the system to read this information and allow for reporting capability. + * + * @return String formatted record with metadata information + */ + default String constructParsingFailureMetadataStringRecord() { + throw new UnsupportedOperationException("constructParsingFailureMetadataStringRecord is not implemented"); + } + + /** + * Implement this method if failure of parsing needs to be logged. + * + * Passes a consumer function to handle the logging. The function should be consuming the string constructed + * containing the metadata for logging. + * + * @param consumer a {@link Consumer} function to consume the string with the metadata and log it. + */ + default void setParsingFailureLoggingHandler(Consumer<String> consumer) { + throw new UnsupportedOperationException("setParsingFailureLoggingHandler is not implemented"); + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java index 2533af5..11e5e5c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/AbstractFeedDataFlowController.java @@ -30,12 +30,15 @@ public abstract class AbstractFeedDataFlowController implements IDataFlowController, Closeable { protected TupleForwarder tupleForwarder; protected final IHyracksTaskContext ctx; + protected final int partitionNumber; protected final int numOfFields; protected final ArrayTupleBuilder tb; protected final FeedLogManager feedLogManager; protected boolean flushing; - public AbstractFeedDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfFields) { + public AbstractFeedDataFlowController(IHyracksTaskContext ctx, int partitionNumber, FeedLogManager feedLogManager, + int numOfFields) { + this.partitionNumber = partitionNumber; this.feedLogManager = feedLogManager; this.numOfFields = numOfFields; this.ctx = ctx; diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java index b14722b..f419641 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedDataFlowController.java @@ -30,10 +30,11 @@ private final IRecordWithPKDataParser<T> dataParser; - public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager, - final int numOfOutputFields, final IRecordWithPKDataParser<T> dataParser, - final IRecordReader<T> recordReader) throws HyracksDataException { - super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader); + public ChangeFeedDataFlowController(final IHyracksTaskContext ctx, int partitionNumber, + final FeedLogManager feedLogManager, final int numOfOutputFields, + final IRecordWithPKDataParser<T> dataParser, final IRecordReader<T> recordReader) + throws HyracksDataException { + super(ctx, partitionNumber, feedLogManager, numOfOutputFields, dataParser, recordReader); this.dataParser = dataParser; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java index 621397b..9359cde 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/ChangeFeedWithMetaDataFlowController.java @@ -28,10 +28,11 @@ public class ChangeFeedWithMetaDataFlowController<T> extends FeedWithMetaDataFlowController<T> { - public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, final FeedLogManager feedLogManager, - final int numOfOutputFields, final IRecordWithMetadataParser<T> dataParser, - final IRecordReader<T> recordReader) throws HyracksDataException { - super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader); + public ChangeFeedWithMetaDataFlowController(final IHyracksTaskContext ctx, int partitionNumber, + final FeedLogManager feedLogManager, final int numOfOutputFields, + final IRecordWithMetadataParser<T> dataParser, final IRecordReader<T> recordReader) + throws HyracksDataException { + super(ctx, partitionNumber, feedLogManager, numOfOutputFields, dataParser, recordReader); } @Override diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index f392139..6986ebb 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -57,13 +57,15 @@ protected long incomingRecordsCount = 0; protected long failedRecordsCount = 0; - public FeedRecordDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields, - IRecordDataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException { - super(ctx, feedLogManager, numOfOutputFields); + public FeedRecordDataFlowController(IHyracksTaskContext ctx, int partitionNumber, FeedLogManager feedLogManager, + int numOfOutputFields, IRecordDataParser<T> dataParser, IRecordReader<T> recordReader) + throws HyracksDataException { + super(ctx, partitionNumber, feedLogManager, numOfOutputFields); this.dataParser = dataParser; this.recordReader = recordReader; recordReader.setFeedLogManager(feedLogManager); recordReader.setController(this); + dataParser.setParsingFailureLoggingHandler(feedLogManager::logRecord); } @Override @@ -179,9 +181,19 @@ private boolean parseAndForward(IRawRecord<? extends T> record) throws IOException { try { dataParser.parse(record, tb.getDataOutput()); + } catch (RuntimeDataException e) { + // If the String record is passed for the failed-parsed data, then log it to the file + String stringRecord = e.getParams() != null && e.getParams().length > 0 ? (String) e.getParams()[0] : null; + + // File logging + if (stringRecord != null) { + feedLogManager.logRecord(stringRecord); + } + + // continue the outer loop + return false; } catch (Exception e) { LOGGER.log(Level.WARN, ExternalDataConstants.ERROR_PARSE_RECORD, e); - feedLogManager.logRecord(record.toString(), ExternalDataConstants.ERROR_PARSE_RECORD); // continue the outer loop return false; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java index 4deb422..93d6ee1 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedStreamDataFlowController.java @@ -31,9 +31,9 @@ private final AsterixInputStream stream; protected long incomingRecordsCount = 0; - public FeedStreamDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, + public FeedStreamDataFlowController(IHyracksTaskContext ctx, int partitionNumber, FeedLogManager feedLogManager, IStreamDataParser streamParser, AsterixInputStream inputStream) { - super(ctx, feedLogManager, 1); + super(ctx, partitionNumber, feedLogManager, 1); this.dataParser = streamParser; this.stream = inputStream; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java index 289c16f..044e681 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedWithMetaDataFlowController.java @@ -30,9 +30,10 @@ protected final IRecordWithMetadataParser<T> dataParser; - public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, FeedLogManager feedLogManager, int numOfOutputFields, - IRecordWithMetadataParser<T> dataParser, IRecordReader<T> recordReader) throws HyracksDataException { - super(ctx, feedLogManager, numOfOutputFields, dataParser, recordReader); + public FeedWithMetaDataFlowController(IHyracksTaskContext ctx, int partitionNumber, FeedLogManager feedLogManager, + int numOfOutputFields, IRecordWithMetadataParser<T> dataParser, IRecordReader<T> recordReader) + throws HyracksDataException { + super(ctx, partitionNumber, feedLogManager, numOfOutputFields, dataParser, recordReader); this.dataParser = dataParser; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java index f9bc8b2..959e34d 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/RecordWithMetadataAndPK.java @@ -100,6 +100,10 @@ return record; } + public IAType[] getMetaTypes() { + return metaTypes; + } + public ArrayBackedValueStorage getMetadata(final int index) { return fieldValueBuffers[index]; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java index f9b012e..e34df1c 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DataflowControllerProvider.java @@ -76,18 +76,19 @@ if (isRecordWithMeta) { if (isChangeFeed) { int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration); - return new ChangeFeedWithMetaDataFlowController(ctx, feedLogManager, numOfKeys + 2, - (IRecordWithMetadataParser) dataParser, recordReader); + return new ChangeFeedWithMetaDataFlowController(ctx, partition, feedLogManager, + numOfKeys + 2, (IRecordWithMetadataParser) dataParser, recordReader); } else { - return new FeedWithMetaDataFlowController(ctx, feedLogManager, 2, + return new FeedWithMetaDataFlowController(ctx, partition, feedLogManager, 2, (IRecordWithMetadataParser) dataParser, recordReader); } } else if (isChangeFeed) { int numOfKeys = ExternalDataUtils.getNumberOfKeys(configuration); - return new ChangeFeedDataFlowController(ctx, feedLogManager, numOfKeys + 1, + return new ChangeFeedDataFlowController(ctx, partition, feedLogManager, numOfKeys + 1, (IRecordWithPKDataParser) dataParser, recordReader); } else { - return new FeedRecordDataFlowController(ctx, feedLogManager, 1, dataParser, recordReader); + return new FeedRecordDataFlowController(ctx, partition, feedLogManager, 1, dataParser, + recordReader); } } else { return new RecordDataFlowController(ctx, dataParser, recordReader, 1); @@ -99,7 +100,7 @@ IStreamDataParser streamParser = streamParserFactory.createInputStreamParser(ctx, partition); streamParser.setInputStream(stream); if (isFeed) { - return new FeedStreamDataFlowController(ctx, feedLogManager, streamParser, stream); + return new FeedStreamDataFlowController(ctx, partition, feedLogManager, streamParser, stream); } else { return new StreamDataFlowController(ctx, streamParser); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java index 282b536..65d124b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedLogManager.java @@ -36,6 +36,9 @@ import org.apache.commons.io.FileUtils; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.util.LogRedactionUtil; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class FeedLogManager implements Closeable { @@ -46,9 +49,13 @@ SNAPSHOT // an identifier that partitions with identifiers before this one should be ignored } + private static final Logger LOGGER = LogManager.getLogger(); + + private static final String PROGRESS_LOG_BASE_PATH = "logs"; private static final String PROGRESS_LOG_FILE_NAME = "progress.log"; private static final String ERROR_LOG_FILE_NAME = "error.log"; private static final String BAD_RECORDS_FILE_NAME = "failed_record.log"; + private static final String FAILED_INGEST_RECORD_FILE_NAME = "failed_ingest_record.log"; private static final String START_PREFIX = "s:"; private static final String END_PREFIX = "e:"; private static final String DATE_FORMAT_STRING = "MM/dd/yyyy HH:mm:ss"; @@ -56,15 +63,18 @@ private String currentPartition; private final TreeSet<String> completed; private final Path dir; + private final int partitionNumber; private BufferedWriter progressLogger; private BufferedWriter errorLogger; private BufferedWriter recordLogger; + private BufferedWriter failedIngestionRecordLogger; private final StringBuilder stringBuilder = new StringBuilder(); private int count = 0; private static final DateFormat df = new SimpleDateFormat(DATE_FORMAT_STRING); - public FeedLogManager(File file) throws HyracksDataException { + public FeedLogManager(File file, int partitionNumber) throws HyracksDataException { try { + this.partitionNumber = partitionNumber; this.dir = file.toPath(); this.completed = new TreeSet<>(); if (!exists()) { @@ -122,6 +132,15 @@ recordLogger = Files.newBufferedWriter( Paths.get(dir.toAbsolutePath().toString() + File.separator + BAD_RECORDS_FILE_NAME), StandardCharsets.UTF_8, StandardOpenOption.APPEND); + + // Ensure the parent folders are all created first + Path path = Paths.get(PROGRESS_LOG_BASE_PATH + File.separator + partitionNumber + File.separator + + FAILED_INGEST_RECORD_FILE_NAME); + if (!Files.exists(path.getParent())) { + Files.createDirectories(path.getParent()); + } + failedIngestionRecordLogger = Files.newBufferedWriter(path, StandardCharsets.UTF_8, StandardOpenOption.CREATE, + StandardOpenOption.APPEND); } @Override @@ -184,6 +203,19 @@ recordLogger.flush(); } + public void logRecord(String record) { + try { + failedIngestionRecordLogger.write(record); + failedIngestionRecordLogger.newLine(); + + // TODO This could be optimized by flushing only at closing (or auto-flush when buffer is full) + // validate between safer logging vs riskier logging but with better performance + failedIngestionRecordLogger.flush(); + } catch (IOException ex) { + LOGGER.log(Level.WARN, "Failed to log record metadata", ex); + } + } + private static String getSplitId(String log) { return log.substring(PREFIX_SIZE); } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java index ecaced6..4ab1093 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/FeedUtils.java @@ -118,13 +118,14 @@ public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition, FileSplit[] feedLogFileSplits) throws HyracksDataException { return new FeedLogManager( - FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, ctx.getIoManager()).getFile()); + FeedUtils.getAbsoluteFileRef(feedLogFileSplits[partition].getPath(), 0, ctx.getIoManager()).getFile(), + partition); } - public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, FileSplit feedLogFileSplit) + public static FeedLogManager getFeedLogManager(IHyracksTaskContext ctx, int partition, FileSplit feedLogFileSplit) throws HyracksDataException { return new FeedLogManager( - FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, ctx.getIoManager()).getFile()); + FeedUtils.getAbsoluteFileRef(feedLogFileSplit.getPath(), 0, ctx.getIoManager()).getFile(), partition); } public static void processFeedMessage(ByteBuffer input, VSizeFrame message, FrameTupleAccessor fta) diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java index effd59f..6c50054 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedAdapter.java @@ -55,7 +55,7 @@ public TestTypedAdapter(ITupleParserFactory parserFactory, ARecordType sourceDatatype, IHyracksTaskContext ctx, Map<String, String> configuration, int partition) throws IOException { - super(new TestTypedFeedDataFlowController(ctx)); + super(new TestTypedFeedDataFlowController(ctx, partition)); pos = new PipedOutputStream(); pis = new PipedInputStream(pos); this.configuration = configuration; diff --git a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java index 708cdd8..4fa9cbf 100644 --- a/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java +++ b/asterixdb/asterix-external-data/src/test/java/org/apache/asterix/external/library/adapter/TestTypedFeedDataFlowController.java @@ -23,8 +23,8 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; class TestTypedFeedDataFlowController extends AbstractFeedDataFlowController { - TestTypedFeedDataFlowController(IHyracksTaskContext ctx) { - super(ctx, null, 0); + TestTypedFeedDataFlowController(IHyracksTaskContext ctx, int partitionNumber) { + super(ctx, partitionNumber, null, 0); } @Override -- To view, visit https://asterix-gerrit.ics.uci.edu/3423 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-MessageType: newchange Gerrit-Change-Id: Ic3fcc6b771b4393542988cd0aa6fd992ccb81028 Gerrit-Change-Number: 3423 Gerrit-PatchSet: 1 Gerrit-Owner: Hussain Towaileb <[email protected]>
