This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-3941 in repository https://gitbox.apache.org/repos/asf/tika.git
commit 2fa4a2cd4004848841a85f9cae11f379fd097ac8 Author: tballison <talli...@apache.org> AuthorDate: Wed May 31 17:10:41 2023 -0400 TIKA-3941 -- WIP ... more remains --- .../org/apache/tika/parser/AutoDetectParser.java | 4 ++ .../java/org/apache/tika/pipes/PipesClient.java | 62 +++++++++++++++++++-- .../java/org/apache/tika/pipes/PipesResult.java | 28 +++++++--- .../java/org/apache/tika/pipes/PipesServer.java | 65 +++++++++++++++++++++- .../org/apache/tika/pipes/async/AsyncConfig.java | 10 ++++ .../apache/tika/pipes/async/AsyncProcessor.java | 14 ++++- .../tika/pipes/async/AsyncProcessorTest.java | 61 ++++++++++++++++---- .../org/apache/tika/pipes/async/MockReporter.java | 6 +- 8 files changed, 220 insertions(+), 30 deletions(-) diff --git a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java index 45e972c20..d333c2e9a 100644 --- a/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java +++ b/tika-core/src/main/java/org/apache/tika/parser/AutoDetectParser.java @@ -157,6 +157,10 @@ public class AutoDetectParser extends CompositeParser { this.autoDetectParserConfig = autoDetectParserConfig; } + public AutoDetectParserConfig getAutoDetectParserConfig() { + return this.autoDetectParserConfig; + } + public void parse(InputStream stream, ContentHandler handler, Metadata metadata, ParseContext context) throws IOException, SAXException, TikaException { if (autoDetectParserConfig.getMetadataWriteFilterFactory() != null) { diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java index 7a4e6eecf..3db897f79 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesClient.java @@ -30,6 +30,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.concurrent.ExecutionException; @@ -45,7 +46,9 @@ import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.tika.metadata.Metadata; import org.apache.tika.pipes.emitter.EmitData; +import org.apache.tika.pipes.emitter.EmitKey; import org.apache.tika.utils.ProcessUtils; import org.apache.tika.utils.StringUtils; @@ -145,6 +148,7 @@ public class PipesClient implements Closeable { private PipesResult actuallyProcess(FetchEmitTuple t) throws InterruptedException { long start = System.currentTimeMillis(); + final PipesResult[] intermediateResult = new PipesResult[1]; FutureTask<PipesResult> futureTask = new FutureTask<>(() -> { UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream(); @@ -167,6 +171,10 @@ public class PipesClient implements Closeable { throw new InterruptedException("thread interrupt"); } PipesResult result = readResults(t, start); + while (result.getStatus().equals(PipesResult.STATUS.INTERMEDIATE_RESULT)) { + intermediateResult[0] = result; + result = readResults(t, start); + } if (LOG.isDebugEnabled()) { long elapsed = System.currentTimeMillis() - readStart; LOG.debug("finished reading result in {} ms", elapsed); @@ -177,6 +185,9 @@ public class PipesClient implements Closeable { pipesClientId, System.currentTimeMillis() - readStart); } + if (result.getStatus() == PipesResult.STATUS.OOM) { + return buildFatalResult(result, intermediateResult); + } return result; }); @@ -197,7 +208,7 @@ public class PipesClient implements Closeable { if (!process.isAlive() && TIMEOUT_EXIT_CODE == process.exitValue()) { LOG.warn("pipesClientId={} server timeout: {} in {} ms", pipesClientId, t.getId(), elapsed); - return PipesResult.TIMEOUT; + return buildFatalResult(PipesResult.TIMEOUT, intermediateResult); } process.waitFor(500, TimeUnit.MILLISECONDS); if (process.isAlive()) { @@ -207,18 +218,34 @@ public class PipesClient implements Closeable { LOG.warn("pipesClientId={} crash: {} in {} ms with exit code {}", pipesClientId, t.getId(), elapsed, process.exitValue()); } - return PipesResult.UNSPECIFIED_CRASH; + return buildFatalResult(PipesResult.UNSPECIFIED_CRASH, intermediateResult); } catch (TimeoutException e) { long elapsed = System.currentTimeMillis() - start; destroyForcibly(); LOG.warn("pipesClientId={} client timeout: {} in {} ms", pipesClientId, t.getId(), elapsed); - return PipesResult.TIMEOUT; + return buildFatalResult(PipesResult.TIMEOUT, intermediateResult); } finally { futureTask.cancel(true); } } + private PipesResult buildFatalResult(PipesResult result, + PipesResult[] intermediateResult) { + + if (intermediateResult[0] == null) { + return result; + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("intermediate2 result: {}", intermediateResult[0].getEmitData()); + } + intermediateResult[0].getEmitData().getMetadataList().get(0).set("PipesResult", + result.getStatus().toString()); + return new PipesResult(result.getStatus(), + intermediateResult[0].getEmitData(), true); + } + } + private void pauseThenDestroy() throws InterruptedException { //wait just a little bit to let process end to get exit value //if there's a timeout on the server side @@ -259,9 +286,11 @@ public class PipesClient implements Closeable { try { status = lookup(statusByte); } catch (IllegalArgumentException e) { - throw new IOException("problem reading response from server " - + String.format(Locale.US, "%02x", statusByte), - e); + String byteString = "-1"; + if (statusByte > -1) { + byteString = String.format(Locale.US, "%02x", (byte)statusByte); + } + throw new IOException("problem reading response from server: " + byteString, e); } switch (status) { @@ -292,6 +321,10 @@ public class PipesClient implements Closeable { LOG.warn("pipesClientId={} fetch exception: {} in {} ms", pipesClientId, t.getId(), millis); return readMessage(PipesResult.STATUS.FETCH_EXCEPTION); + case INTERMEDIATE_RESULT: + LOG.debug("pipesClientId={} intermediate success: {} in {} ms", pipesClientId, + t.getId(), millis); + return deserializeIntermediateResult(t.getEmitKey()); case PARSE_SUCCESS: //there may have been a parse exception, but the parse didn't crash LOG.debug("pipesClientId={} parse success: {} in {} ms", pipesClientId, t.getId(), @@ -349,6 +382,23 @@ public class PipesClient implements Closeable { } } + private PipesResult deserializeIntermediateResult(EmitKey emitKey) throws IOException { + + int length = input.readInt(); + byte[] bytes = new byte[length]; + input.readFully(bytes); + try (ObjectInputStream objectInputStream = new ObjectInputStream( + new UnsynchronizedByteArrayInputStream(bytes))) { + Metadata metadata = (Metadata) objectInputStream.readObject(); + EmitData emitData = new EmitData(emitKey, Collections.singletonList(metadata)); + return new PipesResult(PipesResult.STATUS.INTERMEDIATE_RESULT, emitData, true); + } catch (ClassNotFoundException e) { + LOG.error("class not found exception deserializing data", e); + //this should be catastrophic + throw new RuntimeException(e); + } + } + private void restart() throws IOException, InterruptedException, TimeoutException { if (process != null) { LOG.debug("process still alive; trying to destroy it"); diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java index ace4f3724..639bfc437 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesResult.java @@ -20,6 +20,8 @@ import org.apache.tika.pipes.emitter.EmitData; public class PipesResult { + private boolean intermediate = false; + public enum STATUS { CLIENT_UNAVAILABLE_WITHIN_MS, FETCHER_INITIALIZATION_EXCEPTION, @@ -32,7 +34,8 @@ public class PipesResult { OOM, TIMEOUT, UNSPECIFIED_CRASH, NO_EMITTER_FOUND, EMIT_SUCCESS, EMIT_SUCCESS_PARSE_EXCEPTION, EMIT_EXCEPTION, - INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND; + INTERRUPTED_EXCEPTION, NO_FETCHER_FOUND, + INTERMEDIATE_RESULT; } public static final PipesResult CLIENT_UNAVAILABLE_WITHIN_MS = @@ -48,18 +51,19 @@ public class PipesResult { private final EmitData emitData; private final String message; - private PipesResult(STATUS status, EmitData emitData, String message) { + private PipesResult(STATUS status, EmitData emitData, String message, boolean intermediate) { this.status = status; this.emitData = emitData; this.message = message; + this.intermediate = intermediate; } public PipesResult(STATUS status) { - this(status, null, null); + this(status, null, null, false); } public PipesResult(STATUS status, String message) { - this(status, null, message); + this(status, null, message, false); } /** @@ -68,7 +72,11 @@ public class PipesResult { * @param emitData */ public PipesResult(EmitData emitData) { - this(STATUS.PARSE_SUCCESS, emitData, null); + this(STATUS.PARSE_SUCCESS, emitData, null, false); + } + + public PipesResult(STATUS status, EmitData emitData, boolean intermediate) { + this(status, emitData, null, intermediate); } /** @@ -79,7 +87,7 @@ public class PipesResult { * @param message */ public PipesResult(EmitData emitData, String message) { - this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitData, message); + this(STATUS.PARSE_SUCCESS_WITH_EXCEPTION, emitData, message, false); } public STATUS getStatus() { @@ -94,9 +102,13 @@ public class PipesResult { return message; } + public boolean isIntermediate() { + return intermediate; + } + @Override public String toString() { - return "PipesResult{" + "status=" + status + ", emitData=" + emitData + ", message='" + - message + '\'' + '}'; + return "PipesResult{" + "intermediate=" + intermediate + ", status=" + status + + ", emitData=" + emitData + ", message='" + message + '\'' + '}'; } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java index c97a4e39e..374cf7bc1 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java @@ -24,6 +24,7 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.PrintStream; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; @@ -37,12 +38,17 @@ import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; import org.apache.tika.config.TikaConfig; +import org.apache.tika.detect.Detector; import org.apache.tika.exception.EncryptedDocumentException; import org.apache.tika.exception.TikaException; import org.apache.tika.extractor.DocumentSelector; +import org.apache.tika.io.TemporaryResources; +import org.apache.tika.io.TikaInputStream; import org.apache.tika.metadata.Metadata; import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.mime.MediaType; import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.parser.DigestingParser; import org.apache.tika.parser.ParseContext; import org.apache.tika.parser.Parser; import org.apache.tika.parser.RecursiveParserWrapper; @@ -76,6 +82,9 @@ public class PipesServer implements Runnable { //this has to be some number not close to 0-3 //it looks like the server crashes with exit value 3 on OOM, for example public static final int TIMEOUT_EXIT_CODE = 17; + private DigestingParser.Digester digester; + + private Detector detector; public enum STATUS { READY, @@ -93,7 +102,8 @@ public class PipesServer implements Runnable { EMIT_EXCEPTION, OOM, TIMEOUT, - EMPTY_OUTPUT; + EMPTY_OUTPUT, + INTERMEDIATE_RESULT; byte getByte() { return (byte) (ordinal() + 1); @@ -544,6 +554,7 @@ public class PipesServer implements Runnable { handlerConfig.getMaxEmbeddedResources()); ParseContext parseContext = new ParseContext(); long start = System.currentTimeMillis(); + preParse(fetchEmitTuple, stream, metadata, parseContext); try { rMetaParser.parse(stream, handler, metadata, parseContext); } catch (SAXException e) { @@ -563,6 +574,39 @@ public class PipesServer implements Runnable { return handler.getMetadataList(); } + private void preParse(FetchEmitTuple t, InputStream stream, Metadata metadata, + ParseContext parseContext) { + try (TemporaryResources temporaryResources = new TemporaryResources()) { + TikaInputStream tis = TikaInputStream.cast(stream); + if (tis == null) { + tis = TikaInputStream.get(stream, temporaryResources, metadata); + } + _preParse(t.getId(), tis, metadata, parseContext); + } catch (IOException e) { + LOG.warn("something went wrong in pre-parse casting of the inputstream to a " + + "TikaInputStream", e); + return; + } + writeIntermediate(t.getEmitKey(), metadata); + } + + private void _preParse(String id, TikaInputStream tis, Metadata metadata, + ParseContext parseContext) { + if (digester != null) { + try (InputStream is = Files.newInputStream(tis.getPath())) { + digester.digest(is, metadata, parseContext); + } catch (IOException e) { + LOG.warn("problem digesting: " + id, e); + } + } + try { + MediaType mt = detector.detect(tis, metadata); + metadata.set(Metadata.CONTENT_TYPE, mt.toString()); + } catch (IOException e) { + LOG.warn("problem detecting: " + id, e); + } + } + private void injectUserMetadata(Metadata userMetadata, List<Metadata> metadataList) { for (String n : userMetadata.names()) { //overwrite whatever was there @@ -609,10 +653,29 @@ public class PipesServer implements Runnable { this.fetcherManager = FetcherManager.load(tikaConfigPath); this.emitterManager = EmitterManager.load(tikaConfigPath); this.autoDetectParser = new AutoDetectParser(this.tikaConfig); + if (((AutoDetectParser)autoDetectParser).getAutoDetectParserConfig().getDigesterFactory() != null) { + this.digester = ((AutoDetectParser) autoDetectParser). + getAutoDetectParserConfig().getDigesterFactory().build(); + } + this.detector = ((AutoDetectParser)this.autoDetectParser).getDetector(); this.rMetaParser = new RecursiveParserWrapper(autoDetectParser); } + private void writeIntermediate(EmitKey emitKey, Metadata metadata) { + EmitData emitData = new EmitData(emitKey, Collections.singletonList(metadata)); + try { + UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream(); + try (ObjectOutputStream objectOutputStream = new ObjectOutputStream(bos)) { + objectOutputStream.writeObject(metadata); + } + write(STATUS.INTERMEDIATE_RESULT, bos.toByteArray()); + } catch (IOException e) { + LOG.error("problem writing intermediate data (forking process shutdown?)", e); + exit(1); + } + } + private void write(EmitData emitData) { try { UnsynchronizedByteArrayOutputStream bos = new UnsynchronizedByteArrayOutputStream(); diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java index 51b52af66..bc55cca5d 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncConfig.java @@ -33,6 +33,8 @@ public class AsyncConfig extends PipesConfigBase { private int queueSize = 10000; private int numEmitters = 1; + private boolean emitIntermediateResults = false; + private PipesReporter pipesReporter = PipesReporter.NO_OP_REPORTER; public static AsyncConfig load(Path p) throws IOException, TikaConfigException { @@ -107,4 +109,12 @@ public class AsyncConfig extends PipesConfigBase { public void setPipesReporter(PipesReporter pipesReporter) { this.pipesReporter = pipesReporter; } + + public void setEmitIntermediateResults(boolean emitIntermediateResults) { + this.emitIntermediateResults = emitIntermediateResults; + } + + public boolean isEmitIntermediateResults() { + return emitIntermediateResults; + } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java index 17292c047..3f51ef6ab 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java @@ -302,8 +302,9 @@ public class AsyncProcessor implements Closeable { System.currentTimeMillis() - start); } long offerStart = System.currentTimeMillis(); - if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS || - result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) { + + if (shouldEmit(result)) { + LOG.debug("adding result to emitter queue: " + result.getEmitData()); boolean offered = emitDataQueue.offer(result.getEmitData(), MAX_OFFER_WAIT_MS, TimeUnit.MILLISECONDS); @@ -323,5 +324,14 @@ public class AsyncProcessor implements Closeable { } } } + + private boolean shouldEmit(PipesResult result) { + + if (result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS || + result.getStatus() == PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION) { + return true; + } + return result.isIntermediate() && asyncConfig.isEmitIntermediateResults(); + } } } diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java index 4104866c7..6c7748740 100644 --- a/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java +++ b/tika-core/src/test/java/org/apache/tika/pipes/async/AsyncProcessorTest.java @@ -27,12 +27,12 @@ import java.util.HashSet; import java.util.Random; import java.util.Set; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.apache.tika.metadata.Metadata; import org.apache.tika.pipes.FetchEmitTuple; +import org.apache.tika.pipes.PipesResult; import org.apache.tika.pipes.emitter.EmitData; import org.apache.tika.pipes.emitter.EmitKey; import org.apache.tika.pipes.fetcher.FetchKey; @@ -60,8 +60,6 @@ public class AsyncProcessorTest { private final int totalFiles = 100; - private Path tikaConfigPath; - @TempDir private Path inputDir; @@ -74,9 +72,12 @@ public class AsyncProcessorTest { private int crash = 0; - @BeforeEach - public void setUp() throws SQLException, IOException { - tikaConfigPath = Files.createTempFile(configDir, "tika-config-", ".xml"); + public Path setUp(boolean emitIntermediateResults) throws SQLException, IOException { + ok = 0; + oom = 0; + timeouts = 0; + crash = 0; + Path tikaConfigPath = Files.createTempFile(configDir, "tika-config-", ".xml"); String xml = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" + "<properties>" + " <emitters>" + " <emitter class=\"org.apache.tika.pipes.async.MockEmitter\">\n" + @@ -86,7 +87,12 @@ public class AsyncProcessorTest { " <name>mock</name>\n" + " <basePath>" + ProcessUtils.escapeCommandLine(inputDir.toAbsolutePath().toString()) + "</basePath>\n" + " </fetcher>" + " </fetchers>" + - "<async><tikaConfig>" + + + + "<async><pipesReporter class=\"org.apache.tika.pipes.async.MockReporter\"/>" + + "<emitIntermediateResults>" + emitIntermediateResults + + "</emitIntermediateResults>" + + "<tikaConfig>" + ProcessUtils.escapeCommandLine(tikaConfigPath.toAbsolutePath().toString()) + "</tikaConfig><forkedJvmArgs><arg>-Xmx512m</arg" + "></forkedJvmArgs><maxForEmitBatchBytes>1000000</maxForEmitBatchBytes>" + @@ -101,16 +107,19 @@ public class AsyncProcessorTest { Files.write(inputDir.resolve(i + ".xml"), OOM.getBytes(StandardCharsets.UTF_8)); oom++; } else if (f < 0.10) { - Files.write(inputDir.resolve(i + ".xml"), TIMEOUT.getBytes(StandardCharsets.UTF_8)); - timeouts++; - } else if (f < 0.15) { Files.write(inputDir.resolve(i + ".xml"), SYSTEM_EXIT.getBytes(StandardCharsets.UTF_8)); crash++; + } else if (f < 0.13) { + Files.write(inputDir.resolve(i + ".xml"), TIMEOUT.getBytes(StandardCharsets.UTF_8)); + timeouts++; } else { Files.write(inputDir.resolve(i + ".xml"), OK.getBytes(StandardCharsets.UTF_8)); ok++; } } + MockEmitter.EMIT_DATA.clear(); + MockReporter.RESULTS.clear(); + return tikaConfigPath; } /* @@ -128,9 +137,9 @@ public class AsyncProcessorTest { @Test public void testBasic() throws Exception { - AsyncProcessor processor = new AsyncProcessor(tikaConfigPath); + AsyncProcessor processor = new AsyncProcessor(setUp(false)); for (int i = 0; i < totalFiles; i++) { - FetchEmitTuple t = new FetchEmitTuple("myId", + FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new FetchKey("mock", i + ".xml"), new EmitKey("mock", "emit-" + i), new Metadata()); processor.offer(t, 1000); @@ -148,5 +157,33 @@ public class AsyncProcessorTest { emitKeys.add(d.getEmitKey().getEmitKey()); } assertEquals(ok, emitKeys.size()); + assertEquals(100, MockReporter.RESULTS.size()); + for (PipesResult r : MockReporter.RESULTS) { + assertEquals("application/mock+xml", + r.getEmitData().getMetadataList().get(0).get(Metadata.CONTENT_TYPE)); + } + } + + @Test + public void testEmitIntermediate() throws Exception { + AsyncProcessor processor = new AsyncProcessor(setUp(true)); + for (int i = 0; i < totalFiles; i++) { + FetchEmitTuple t = new FetchEmitTuple("myId-" + i, new FetchKey("mock", i + ".xml"), + new EmitKey("mock", "emit-" + i), new Metadata()); + processor.offer(t, 1000); + } + for (int i = 0; i < 10; i++) { + processor.offer(PipesIterator.COMPLETED_SEMAPHORE, 1000); + } + //TODO clean this up + while (processor.checkActive()) { + Thread.sleep(100); + } + processor.close(); + Set<String> emitKeys = new HashSet<>(); + for (EmitData d : MockEmitter.EMIT_DATA) { + emitKeys.add(d.getEmitKey().getEmitKey()); + } + assertEquals(totalFiles, emitKeys.size()); } } diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java index b8197bd82..6e8308c89 100644 --- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java +++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java @@ -16,6 +16,8 @@ */ package org.apache.tika.pipes.async; +import java.util.concurrent.ArrayBlockingQueue; + import org.apache.tika.config.Field; import org.apache.tika.pipes.FetchEmitTuple; import org.apache.tika.pipes.PipesReporter; @@ -23,11 +25,13 @@ import org.apache.tika.pipes.PipesResult; public class MockReporter extends PipesReporter { + static ArrayBlockingQueue<PipesResult> RESULTS = new ArrayBlockingQueue<>(10000); + private String endpoint; @Override public void report(FetchEmitTuple t, PipesResult result, long elapsed) { - + RESULTS.add(result); } @Override