>From Ayush Tripathi <[email protected]>: Ayush Tripathi has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20650?usp=email )
Change subject: pdf commit ...................................................................... pdf commit Change-Id: I1ad074ab96da62407aba313f8bb3c3f5f8df47d3 --- A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfInputStreamFactory.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfReaderFactory.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/PDFDataParserFactory.java 3 files changed, 386 insertions(+), 0 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/50/20650/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfInputStreamFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfInputStreamFactory.java new file mode 100644 index 0000000..11e425e --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfInputStreamFactory.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.pdf; + +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +// removed StandardCharsets import +// removed unused ArrayList import +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +import org.apache.asterix.common.api.IApplicationContext; +import org.apache.asterix.common.external.IExternalFilterEvaluator; +import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; +import org.apache.asterix.external.api.AsterixInputStream; +import org.apache.asterix.external.api.IExternalDataRuntimeContext; +import org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder; +import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory; +import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStream; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.ExternalDataPrefix; +import org.apache.asterix.external.util.ExternalDataUtils; +// removed unused S3AuthUtils import +import org.apache.asterix.external.util.aws.s3.S3Utils; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; + +// removed unused direct S3 SDK imports +import software.amazon.awssdk.services.s3.model.S3Object; + +// removed unused paginator import + +public class AwsS3PdfInputStreamFactory extends AbstractExternalInputStreamFactory { + + private static final long serialVersionUID = 1L; + + @Override + public AsterixInputStream createInputStream(IExternalDataRuntimeContext context) throws HyracksDataException { + IExternalFilterValueEmbedder valueEmbedder = context.getValueEmbedder(); + int partition = context.getPartition(); + IApplicationContext ncAppCtx = (IApplicationContext) context.getTaskContext().getJobletContext() + .getServiceContext().getApplicationContext(); + return new AwsS3InputStream(ncAppCtx, configuration, + partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), valueEmbedder); + } + + @Override + public void configure(IServiceContext ctx, Map<String, String> configuration, IWarningCollector warningCollector, + IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException { + super.configure(ctx, configuration, warningCollector, filterEvaluatorFactory); + + IApplicationContext appCtx = (IApplicationContext) ctx.getApplicationContext(); + + // Submit job and poll for completion, then list output objects from configured output-url + List<S3Object> filesOnly = java.util.Collections.emptyList(); + try { + String jobsEndpoint = configuration.getOrDefault("jobs-endpoint", "http://localhost:8000/jobs"); + String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME); + String definition = configuration.getOrDefault(ExternalDataConstants.DEFINITION_FIELD_NAME, ""); + String def = definition == null ? "" : definition.trim(); + if (!def.isEmpty() && def.startsWith("/")) { + def = def.substring(1); + } + + String defaultInputUrl = "s3://" + (container == null ? "" : container) + (def.isEmpty() ? "" : "/" + def); + String inputUrl = configuration.getOrDefault("input-url", defaultInputUrl); + String datasetName = configuration.get(ExternalDataConstants.KEY_DATASET); + datasetName = datasetName == null ? "" : datasetName.trim(); + if (!datasetName.isEmpty() && datasetName.startsWith("/")) { + datasetName = datasetName.substring(1); + } + if (!datasetName.isEmpty() && datasetName.endsWith("/")) { + datasetName = datasetName.substring(0, datasetName.length() - 1); + } + + // If a dataset name is provided, treat definition as definition/datasetName for listing + String effectiveDef = def; + if (!datasetName.isEmpty()) { + effectiveDef = def.isEmpty() ? datasetName : (def + "/" + datasetName); + } + + String defaultOutputUrl = "s3://" + (container == null ? "" : container) + + (effectiveDef.isEmpty() ? "" : ("/" + effectiveDef)); + String outputUrl = configuration.getOrDefault("output-url", defaultOutputUrl); + String parsingModel = configuration.getOrDefault("parsing-model", "gpt-4.1-nano"); + String pageRange = configuration.getOrDefault("page-range", ""); + String fileGlob = configuration.getOrDefault("file-glob", "*.pdf"); + + String openaiApiKey = configuration.get("openai_api_key"); + String awsAccessKeyId = configuration.get("aws_access_key_id"); + String awsSecretAccessKey = configuration.get("aws_secret_access_key"); + String parsingSchema = configuration.get("parsing-schema"); + String parsingGuidance = configuration.get("parsing-guidance"); + + StringBuilder postBodyBuilder = new StringBuilder(); + postBodyBuilder.append("{\"input_url\":\"").append(escapeJson(inputUrl)).append("\"") + .append(",\"output_url\":\"").append(escapeJson(outputUrl)).append("\"") + .append(",\"parsing_model\":\"").append(escapeJson(parsingModel)).append("\"") + .append(",\"page_range\":\"").append(escapeJson(pageRange)).append("\"").append(",\"file_glob\":\"") + .append(escapeJson(fileGlob)).append("\""); + + if (openaiApiKey != null && !openaiApiKey.isEmpty()) { + postBodyBuilder.append(",\"openai_api_key\":\"").append(escapeJson(openaiApiKey)).append("\""); + } + if (awsAccessKeyId != null && !awsAccessKeyId.isEmpty()) { + postBodyBuilder.append(",\"aws_access_key_id\":\"").append(escapeJson(awsAccessKeyId)).append("\""); + } + if (awsSecretAccessKey != null && !awsSecretAccessKey.isEmpty()) { + postBodyBuilder.append(",\"aws_secret_access_key\":\"").append(escapeJson(awsSecretAccessKey)) + .append("\""); + } + if (parsingSchema != null && !parsingSchema.isEmpty()) { + // config key uses hyphen, request expects snake_case: parsing_schema + postBodyBuilder.append(",\"parsing_schema\":\"").append(escapeJson(parsingSchema)).append("\""); + } + if (parsingGuidance != null && !parsingGuidance.isEmpty()) { + // config key uses hyphen, request expects snake_case: parsing_guidance + postBodyBuilder.append(",\"parsing_guidance\":\"").append(escapeJson(parsingGuidance)).append("\""); + } + postBodyBuilder.append('}'); + String postBody = postBodyBuilder.toString(); + + HttpRequest submitReq = + HttpRequest.newBuilder(URI.create(jobsEndpoint)).header("Content-Type", "application/json") + .POST(HttpRequest.BodyPublishers.ofString(postBody)).build(); + HttpResponse<String> submitResp = + HttpClient.newHttpClient().send(submitReq, HttpResponse.BodyHandlers.ofString()); + String submitBody = submitResp.body() == null ? "" : submitResp.body(); + String jobId = extractJsonString(submitBody, "job_id"); + + if (jobId != null && !jobId.isEmpty()) { + String baseJobsEndpoint = jobsEndpoint; + int qm = baseJobsEndpoint.indexOf('?'); + if (qm >= 0) { + baseJobsEndpoint = baseJobsEndpoint.substring(0, qm); + } + String statusUrl = + (baseJobsEndpoint.endsWith("/") ? (baseJobsEndpoint + jobId) : (baseJobsEndpoint + "/" + jobId)) + + "/status"; + String status = null; + long deadline = System.currentTimeMillis() + 150_000; // 150s + int attempt = 0; + final long initialBackoffMs = 500L; + final long maxBackoffMs = 5000L; + while (System.currentTimeMillis() < deadline) { + HttpRequest statusReq = HttpRequest.newBuilder(URI.create(statusUrl)).GET().build(); + HttpResponse<String> statusResp = + HttpClient.newHttpClient().send(statusReq, HttpResponse.BodyHandlers.ofString()); + String statusBody = statusResp.body() == null ? "" : statusResp.body(); + status = extractJsonString(statusBody, "status"); + if ("completed".equalsIgnoreCase(status) || "success".equalsIgnoreCase(status)) { + break; + } + if ("failed".equalsIgnoreCase(status) || "error".equalsIgnoreCase(status) + || "cancelled".equalsIgnoreCase(status)) { + status = "failed"; + break; + } + long backoff = initialBackoffMs << attempt; + if (backoff > maxBackoffMs) { + backoff = maxBackoffMs; + } + long jitter = backoff / 4; // 25% jitter + long sleepMs = backoff + (jitter == 0 ? 0 + : java.util.concurrent.ThreadLocalRandom.current().nextLong(0, jitter + 1)); + try { + Thread.sleep(sleepMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + if (attempt < 30) { // prevent overflow on shift + attempt++; + } + } + + // Replace with direct list using S3Utils like standard factory + configuration.put(ExternalDataConstants.DEFINITION_FIELD_NAME, effectiveDef); + IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(ctx, warningCollector); + ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration); + + configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot()); + IncludeExcludeMatcher includeExcludeMatcher = + ExternalDataUtils.getIncludeExcludeMatchers(configuration); + filesOnly = S3Utils.listS3Objects(appCtx, configuration, includeExcludeMatcher, warningCollector, + externalDataPrefix, evaluator); + } else { + // Fallback: direct list as well + configuration.put(ExternalDataConstants.DEFINITION_FIELD_NAME, effectiveDef); + IExternalFilterEvaluator evaluator = filterEvaluatorFactory.create(ctx, warningCollector); + ExternalDataPrefix externalDataPrefix = new ExternalDataPrefix(configuration); + configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, externalDataPrefix.getRoot()); + IncludeExcludeMatcher includeExcludeMatcher = + ExternalDataUtils.getIncludeExcludeMatchers(configuration); + filesOnly = S3Utils.listS3Objects(appCtx, configuration, includeExcludeMatcher, warningCollector, + externalDataPrefix, evaluator); + } + } catch (Throwable t) { + filesOnly = java.util.Collections.emptyList(); + } + + distributeWorkLoad(filesOnly, getPartitionsCount()); + } + + // Removed log parsing approach in favor of job submission and S3 listing + + private void distributeWorkLoad(List<S3Object> fileObjects, int partitionsCount) { + PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount, + Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize)); + + for (int i = 0; i < partitionsCount; i++) { + workloadQueue.add(new PartitionWorkLoadBasedOnSize()); + } + + for (S3Object object : fileObjects) { + PartitionWorkLoadBasedOnSize workload = workloadQueue.poll(); + workload.addFilePath(object.key(), object.size()); + workloadQueue.add(workload); + } + partitionWorkLoadsBasedOnSize.addAll(workloadQueue); + } + + // Removed unused helper methods for orchestrator request body + + // listOutputsFromS3 no longer needed after direct listing switch + + private static String escapeJson(String s) { + if (s == null) { + return ""; + } + return s.replace("\\", "\\\\").replace("\"", "\\\""); + } + + private static String extractJsonString(String json, String field) { + if (json == null || field == null) { + return null; + } + String pattern = "\"" + field + "\"\\s*:\\s*\""; + java.util.regex.Pattern p = java.util.regex.Pattern.compile(pattern); + java.util.regex.Matcher m = p.matcher(json); + if (m.find()) { + int start = m.end(); + int end = json.indexOf('"', start); + if (end > start) { + return json.substring(start, end); + } + } + return null; + } + + // parse helpers removed +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfReaderFactory.java new file mode 100644 index 0000000..36c92b0 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfReaderFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.pdf; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory; +import org.apache.asterix.external.util.ExternalDataConstants; + +public class AwsS3PdfReaderFactory extends StreamRecordReaderFactory { + + private static final long serialVersionUID = 1L; + + private static final List<String> RECORD_READER_NAMES = + Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3); + + @Override + public List<String> getRecordReaderNames() { + return RECORD_READER_NAMES; + } + + @Override + protected void setStreamFactory(java.util.Map<String, String> config) { + streamFactory = new AwsS3PdfInputStreamFactory(); + } + + @Override + public Set<String> getReaderSupportedFormats() { + return Collections.singleton(ExternalDataConstants.FORMAT_PDF); + } +} \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/PDFDataParserFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/PDFDataParserFactory.java new file mode 100644 index 0000000..850927e --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/PDFDataParserFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.parser.factory; + +import java.util.Collections; +import java.util.List; + +import org.apache.asterix.external.api.IExternalDataRuntimeContext; +import org.apache.asterix.external.api.IRecordDataParser; +import org.apache.asterix.external.api.IStreamDataParser; +import org.apache.asterix.external.parser.JSONDataParser; +import org.apache.asterix.om.types.ARecordType; + +public class PDFDataParserFactory extends AbstractGenericDataParserFactory<char[]> { + + private static final long serialVersionUID = 1L; + private static final List<String> PARSER_FORMAT = Collections.singletonList("pdf"); + + @Override + public IStreamDataParser createInputStreamParser(IExternalDataRuntimeContext context) { + return createParser(context); + } + + @Override + public void setMetaType(ARecordType metaType) { + // no MetaType to set. + } + + @Override + public List<String> getParserFormats() { + return PARSER_FORMAT; + } + + @Override + public IRecordDataParser<char[]> createRecordParser(IExternalDataRuntimeContext context) { + return createParser(context); + } + + @Override + public Class<?> getRecordClass() { + return char[].class; + } + + private JSONDataParser createParser(IExternalDataRuntimeContext context) { + return new JSONDataParser(recordType, new com.fasterxml.jackson.core.JsonFactory(), context); + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20650?usp=email To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings?usp=email Gerrit-MessageType: newchange Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I1ad074ab96da62407aba313f8bb3c3f5f8df47d3 Gerrit-Change-Number: 20650 Gerrit-PatchSet: 1 Gerrit-Owner: Ayush Tripathi <[email protected]>
