This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push: new e63730e12 TIKA-4213 -- improve jdbc pipes reporter (#1669) e63730e12 is described below commit e63730e126e74b4ac36e5f2b8c6790963eb41c14 Author: Tim Allison <talli...@apache.org> AuthorDate: Thu Mar 21 08:42:25 2024 -0400 TIKA-4213 -- improve jdbc pipes reporter (#1669) * TIKA-4213 -- improve jdbc pipes reporter --- .../pipes/reporters/jdbc/JDBCPipesReporter.java | 52 ++++++++++++---------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java index 0082eb9de..ee52bf80f 100644 --- a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java @@ -22,6 +22,8 @@ import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; +import java.sql.Timestamp; +import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -68,7 +70,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl private String connectionString; private Optional<String> postConnectionString = Optional.empty(); - private final ArrayBlockingQueue<KeyStatusPair> queue = + private final ArrayBlockingQueue<IdStatusPair> queue = new ArrayBlockingQueue(ARRAY_BLOCKING_QUEUE_SIZE); CompletableFuture<Void> reportWorkerFuture; @@ -146,7 +148,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl return; } try { - queue.offer(new KeyStatusPair(t.getEmitKey().getEmitKey(), result.getStatus()), + queue.offer(new IdStatusPair(t.getId(), result.getStatus()), MAX_WAIT_MILLIS, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { //swallow @@ -167,7 +169,7 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl @Override public void close() throws IOException { try { - queue.offer(KeyStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS); + queue.offer(IdStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS); } catch (InterruptedException e) { return; } @@ -186,20 +188,20 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl } } - private static class KeyStatusPair { + private static class IdStatusPair { - static KeyStatusPair END_SEMAPHORE = new KeyStatusPair(null, null); - private final String emitKey; + static IdStatusPair END_SEMAPHORE = new IdStatusPair(null, null); + private final String id; private final PipesResult.STATUS status; - public KeyStatusPair(String emitKey, PipesResult.STATUS status) { - this.emitKey = emitKey; + public IdStatusPair(String id, PipesResult.STATUS status) { + this.id = id; this.status = status; } @Override public String toString() { - return "KeyStatusPair{" + "emitKey='" + emitKey + '\'' + ", status=" + status + '}'; + return "KeyStatusPair{" + "id='" + id + '\'' + ", status=" + status + '}'; } } @@ -208,18 +210,18 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl private static final int MAX_TRIES = 3; private final String connectionString; private final Optional<String> postConnectionString; - private final ArrayBlockingQueue<KeyStatusPair> queue; + private final ArrayBlockingQueue<IdStatusPair> queue; private final int cacheSize; private final long reportWithinMs; - List<KeyStatusPair> cache = new ArrayList<>(); + List<IdStatusPair> cache = new ArrayList<>(); private Connection connection; private PreparedStatement insert; public ReportWorker(String connectionString, Optional<String> postConnectionString, - ArrayBlockingQueue<KeyStatusPair> queue, int cacheSize, + ArrayBlockingQueue<IdStatusPair> queue, int cacheSize, long reportWithinMs) { this.connectionString = connectionString; this.postConnectionString = postConnectionString; @@ -242,18 +244,19 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl public void run() { long lastReported = System.currentTimeMillis(); while (true) { - //blocking - KeyStatusPair p = null; + IdStatusPair p = null; try { - p = queue.take(); + p = queue.poll(reportWithinMs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { return; } - if (p == KeyStatusPair.END_SEMAPHORE) { - shutdownNow(); - return; + if (p != null) { + if (p == IdStatusPair.END_SEMAPHORE) { + shutdownNow(); + return; + } + cache.add(p); } - cache.add(p); long elapsed = System.currentTimeMillis() - lastReported; if (cache.size() >= cacheSize || elapsed > reportWithinMs) { @@ -296,10 +299,11 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl int attempt = 0; while (++attempt < MAX_TRIES) { try { - for (KeyStatusPair p : cache) { + for (IdStatusPair p : cache) { insert.clearParameters(); - insert.setString(1, p.emitKey); + insert.setString(1, p.id); insert.setString(2, p.status.name()); + insert.setTimestamp(3, Timestamp.from(Instant.now())); insert.addBatch(); } insert.executeBatch(); @@ -317,7 +321,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl try (Statement st = connection.createStatement()) { String sql = "drop table if exists " + TABLE_NAME; st.execute(sql); - sql = "create table " + TABLE_NAME + " (path varchar(1024), status varchar(32))"; + sql = "create table " + TABLE_NAME + " (id varchar(1024), status varchar(32), " + + "timestamp timestamp with time zone)"; st.execute(sql); } } @@ -370,7 +375,8 @@ public class JDBCPipesReporter extends PipesReporterBase implements Initializabl } private void createPreparedStatement() throws SQLException { - String sql = "insert into " + TABLE_NAME + " (path, status) values (?,?)"; + //do we want to do an upsert? + String sql = "insert into " + TABLE_NAME + " (id, status, timestamp) values (?,?,?)"; insert = connection.prepareStatement(sql); } }