This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tika.git
commit 46901964b2221fd882b2c550e784f766f2b726bd Author: tballison <talli...@apache.org> AuthorDate: Thu Nov 17 09:59:46 2022 -0500 TIKA-3929 -- add a crash option for PipesReporter --- CHANGES.txt | 4 +++ .../apache/tika/pipes/CompositePipesReporter.java | 14 +++++++++ .../apache/tika/pipes/LoggingPipesReporter.java | 10 +++++++ .../java/org/apache/tika/pipes/PipesReporter.java | 24 +++++++++++++++ .../apache/tika/pipes/async/AsyncProcessor.java | 3 +- .../org/apache/tika/pipes/async/AsyncStatus.java | 18 ++++++++--- .../org/apache/tika/pipes/async/MockReporter.java | 10 +++++++ .../reporters/fs/FileSystemStatusReporter.java | 35 ++++++++++++++++++++-- .../opensearch/OpenSearchPipesReporter.java | 10 +++++++ 9 files changed, 121 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index f8c546d24..c7995493b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,3 +1,7 @@ +Release 2.6.1 - ??? + + * Downgraded logging in PipesClient for each parse from info to debug. + Release 2.6.0 - 11/3/2022 * Add optional Siegfried detector (TIKA-3901). diff --git a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java index da34f3f98..4f78b6be8 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/CompositePipesReporter.java @@ -37,6 +37,20 @@ public class CompositePipesReporter extends PipesReporter implements Initializab } + @Override + public void error(Throwable t) { + for (PipesReporter reporter : pipesReporters) { + reporter.error(t); + } + } + + @Override + public void error(String msg) { + for (PipesReporter reporter : pipesReporters) { + reporter.error(msg); + } + } + @Field public void setPipesReporters(List<PipesReporter> pipesReporters) { this.pipesReporters = pipesReporters; diff --git a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java index bf7eb45c3..5f00880ba 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/LoggingPipesReporter.java @@ -30,4 +30,14 @@ public class LoggingPipesReporter extends PipesReporter { public void report(FetchEmitTuple t, PipesResult result, long elapsed) { LOGGER.debug("{} {} {}", t, result, elapsed); } + + @Override + public void error(Throwable t) { + LOGGER.error("pipes error", t); + } + + @Override + public void error(String msg) { + LOGGER.error("error {}", msg); + } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java index 32a7c61a6..18db3fe1d 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java @@ -39,8 +39,20 @@ public abstract class PipesReporter implements Closeable { public void report(FetchEmitTuple t, PipesResult result, long elapsed) { } + + @Override + public void error(Throwable t) { + + } + + @Override + public void error(String msg) { + + } }; + //Implementers are responsible for preventing reporting after + //crashes if that is the desired behavior. public abstract void report(FetchEmitTuple t, PipesResult result, long elapsed); @@ -69,4 +81,16 @@ public abstract class PipesReporter implements Closeable { public void close() throws IOException { //no-op } + + /** + * This is called if the process has crashed. + * Implementers should not rely on close() to be called after this. + * @param t + */ + public abstract void error(Throwable t); + /** + * This is called if the process has crashed. + * Implementers should not rely on close() to be called after this. + * @param msg + */public abstract void error(String msg); } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java index 476e4df58..7a71f08c9 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncProcessor.java @@ -117,7 +117,7 @@ public class AsyncProcessor implements Closeable { } } catch (Exception e) { executorService.shutdownNow(); - asyncConfig.getPipesReporter().close(); + asyncConfig.getPipesReporter().error(e); throw e; } } @@ -222,6 +222,7 @@ public class AsyncProcessor implements Closeable { } } catch (ExecutionException e) { LOG.error("execution exception", e); + asyncConfig.getPipesReporter().error(e); throw new RuntimeException(e); } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java index 30408f04a..46a58ff2b 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/async/AsyncStatus.java @@ -22,22 +22,24 @@ import java.util.Map; import org.apache.tika.pipes.PipesResult; import org.apache.tika.pipes.pipesiterator.TotalCountResult; +import org.apache.tika.utils.StringUtils; public class AsyncStatus { public enum ASYNC_STATUS { STARTED, - COMPLETED - //CRASHED TODO: need to figure out how to set this? + COMPLETED, + CRASHED } private final Instant started; - private Instant lastUpdate; private TotalCountResult totalCountResult = new TotalCountResult(0, TotalCountResult.STATUS.NOT_COMPLETED); private Map<PipesResult.STATUS, Long> statusCounts = new HashMap<>(); private ASYNC_STATUS asyncStatus = ASYNC_STATUS.STARTED; + private String crashMessage = StringUtils.EMPTY; + public AsyncStatus() { started = Instant.now(); lastUpdate = started; @@ -51,6 +53,10 @@ public class AsyncStatus { this.asyncStatus = status; } + public void updateCrash(String msg) { + this.crashMessage = msg; + } + public Instant getStarted() { return started; } @@ -71,10 +77,14 @@ public class AsyncStatus { return asyncStatus; } + public String getCrashMessage() { + return crashMessage; + } + @Override public String toString() { return "AsyncStatus{" + "started=" + started + ", lastUpdate=" + lastUpdate + ", totalCountResult=" + totalCountResult + ", statusCounts=" + statusCounts + - ", status=" + asyncStatus + '}'; + ", asyncStatus=" + asyncStatus + ", crashMessage='" + crashMessage + '\'' + '}'; } } diff --git a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java index 112ace4c9..b8197bd82 100644 --- a/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java +++ b/tika-core/src/test/java/org/apache/tika/pipes/async/MockReporter.java @@ -30,6 +30,16 @@ public class MockReporter extends PipesReporter { } + @Override + public void error(Throwable t) { + + } + + @Override + public void error(String msg) { + + } + @Field public void setEndpoint(String endpoint) { this.endpoint = endpoint; diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java index 92c3d7675..b48745a6c 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-fs-status/src/main/java/org/apache/tika/pipes/reporters/fs/FileSystemStatusReporter.java @@ -44,6 +44,7 @@ import org.apache.tika.pipes.PipesReporter; import org.apache.tika.pipes.PipesResult; import org.apache.tika.pipes.async.AsyncStatus; import org.apache.tika.pipes.pipesiterator.TotalCountResult; +import org.apache.tika.utils.ExceptionUtils; /** * This is intended to write summary statistics to disk @@ -67,6 +68,8 @@ public class FileSystemStatusReporter extends PipesReporter private long reportUpdateMillis = 1000; + private volatile boolean crashed = false; + Thread reporterThread; private ConcurrentHashMap<PipesResult.STATUS, LongAdder> counts = new ConcurrentHashMap<>(); private AsyncStatus asyncStatus = new AsyncStatus(); @@ -114,7 +117,15 @@ public class FileSystemStatusReporter extends PipesReporter try (Writer writer = Files.newBufferedWriter(statusFile, StandardCharsets.UTF_8)) { objectMapper.writeValue(writer, asyncStatus); } catch (IOException e) { - e.printStackTrace(); + LOG.warn("couldn't write report", e); + } + } + + private synchronized void crash(String crashMessage) { + asyncStatus.updateCrash(crashMessage); + try (Writer writer = Files.newBufferedWriter(statusFile, StandardCharsets.UTF_8)) { + objectMapper.writeValue(writer, asyncStatus); + } catch (IOException e) { LOG.warn("couldn't write report", e); } } @@ -137,13 +148,33 @@ public class FileSystemStatusReporter extends PipesReporter @Override public void close() throws IOException { LOG.debug("finishing and writing last report"); + interuptThread(); + if (! crashed) { + report(AsyncStatus.ASYNC_STATUS.COMPLETED); + } + } + + private void interuptThread() { reporterThread.interrupt(); try { reporterThread.join(1000); } catch (InterruptedException e) { //swallow } - report(AsyncStatus.ASYNC_STATUS.COMPLETED); + } + + @Override + public void error(Throwable t) { + crashed = true; + interuptThread(); + crash(ExceptionUtils.getStackTrace(t)); + } + + @Override + public void error(String msg) { + crashed = true; + interuptThread(); + crash(msg); } @Override diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java index 8945678d6..7dbe13621 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-opensearch/src/main/java/org/apache/tika/pipes/reporters/opensearch/OpenSearchPipesReporter.java @@ -102,6 +102,16 @@ public class OpenSearchPipesReporter extends PipesReporter implements Initializa } } + @Override + public void error(Throwable t) { + LOG.error("crashed", t); + } + + @Override + public void error(String msg) { + LOG.error("crashed {}", msg); + } + private boolean shouldReport(PipesResult result) { if (includeStatus.size() > 0) { if (includeStatus.contains(result.getStatus().name())) {