This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tika.git
The following commit(s) were added to refs/heads/main by this push: new f846009d6 TIKA-3931 (#803) f846009d6 is described below commit f846009d6e9b4789f8057b1b27fde7caa8d4cbda Author: Tim Allison <talli...@apache.org> AuthorDate: Thu Nov 17 13:58:08 2022 -0500 TIKA-3931 (#803) * TIKA-3931 -- add a JDBCPipesReporter * TIKA-3931 -- update bom --- CHANGES.txt | 2 + tika-bom/pom.xml | 30 ++- .../java/org/apache/tika/pipes/PipesReporter.java | 4 +- .../org/apache/tika/pipes/PipesReporterBase.java | 155 ++++++++++++ tika-pipes/tika-pipes-reporters/pom.xml | 1 + .../tika-pipes-reporter-jdbc/pom.xml | 116 +++++++++ .../pipes/reporters/jdbc/JDBCPipesReporter.java | 270 +++++++++++++++++++++ .../reporters/jdbc/TestJDBCPipesReporter.java | 238 ++++++++++++++++++ .../src/test/resources/tika-config-excludes.xml | 46 ++++ .../src/test/resources/tika-config-includes.xml | 46 ++++ 10 files changed, 901 insertions(+), 7 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e9b58ee10..54d14214e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,7 @@ Release 2.6.1 - ??? + * Add a JDBCPipesReporter (TIKA-3931). + * Add multivalued field strategy option in jdbc-emitter (TIKA-3930). Default is now 'concatenate' with ', ' as the delimiter. diff --git a/tika-bom/pom.xml b/tika-bom/pom.xml index e8fcc606e..151dd3138 100644 --- a/tika-bom/pom.xml +++ b/tika-bom/pom.xml @@ -319,6 +319,16 @@ <artifactId>tika-emitter-gcs</artifactId> <version>2.6.1-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-emitter-jdbc</artifactId> + <version>2.6.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-emitter-kafka</artifactId> + <version>2.6.1-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-emitter-opensearch</artifactId> @@ -331,15 +341,24 @@ </dependency> <dependency> <groupId>org.apache.tika</groupId> - <artifactId>tika-emitter-kafka</artifactId> + <artifactId>tika-emitter-solr</artifactId> <version>2.6.1-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.tika</groupId> - <artifactId>tika-emitter-solr</artifactId> + <artifactId>tika-pipes-reporter-fs-status</artifactId> + <version>2.6.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes-reporter-jdbc</artifactId> + <version>2.6.1-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes-reporter-opensearch</artifactId> <version>2.6.1-SNAPSHOT</version> </dependency> - <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-fetcher-gcs</artifactId> @@ -355,7 +374,6 @@ <artifactId>tika-fetcher-s3</artifactId> <version>2.6.1-SNAPSHOT</version> </dependency> - <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-pipes-iterator-csv</artifactId> @@ -373,12 +391,12 @@ </dependency> <dependency> <groupId>org.apache.tika</groupId> - <artifactId>tika-pipes-iterator-s3</artifactId> + <artifactId>tika-pipes-iterator-kafka</artifactId> <version>2.6.1-SNAPSHOT</version> </dependency> <dependency> <groupId>org.apache.tika</groupId> - <artifactId>tika-pipes-iterator-kafka</artifactId> + <artifactId>tika-pipes-iterator-s3</artifactId> <version>2.6.1-SNAPSHOT</version> </dependency> <dependency> diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java index 18db3fe1d..3978039b4 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporter.java @@ -92,5 +92,7 @@ public abstract class PipesReporter implements Closeable { * This is called if the process has crashed. * Implementers should not rely on close() to be called after this. * @param msg - */public abstract void error(String msg); + */ + public abstract void error(String msg); + } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesReporterBase.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporterBase.java new file mode 100644 index 000000000..3dcddfa71 --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesReporterBase.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.tika.config.Field; +import org.apache.tika.config.Initializable; +import org.apache.tika.config.InitializableProblemHandler; +import org.apache.tika.config.Param; +import org.apache.tika.exception.TikaConfigException; + +/** + * Base class that includes filtering by {@link PipesResult.STATUS} + */ +public abstract class PipesReporterBase extends PipesReporter implements Initializable { + + private final Set<PipesResult.STATUS> includes = new HashSet<>(); + private final Set<PipesResult.STATUS> excludes = new HashSet<>(); + + private StatusFilter statusFilter; + + @Override + public void initialize(Map<String, Param> params) throws TikaConfigException { + statusFilter = buildStatusFilter(includes, excludes); + } + + private StatusFilter buildStatusFilter(Set<PipesResult.STATUS> includes, + Set<PipesResult.STATUS> excludes) throws TikaConfigException { + if (includes.size() > 0 && excludes.size() > 0) { + throw new TikaConfigException("Only one of includes and excludes may have any " + + "contents"); + } + if (includes.size() > 0) { + return new IncludesFilter(includes); + } else if (excludes.size() > 0) { + return new ExcludesFilter(excludes); + } + return new AcceptAllFilter(); + } + + @Override + public void checkInitialization(InitializableProblemHandler problemHandler) + throws TikaConfigException { + + } + + /** + * Implementations must call this for the includes/excludes filters to work! + * @param status + * @return + */ + public boolean accept(PipesResult.STATUS status) { + return statusFilter.accept(status); + } + + @Field + public void setIncludes(List<String> includes) throws TikaConfigException { + for (String s : includes) { + try { + PipesResult.STATUS status = PipesResult.STATUS.valueOf(s); + this.includes.add(status); + } catch (IllegalArgumentException e) { + String optionString = getOptionString(); + throw new TikaConfigException( + "I regret I don't recognize " + s + ". I only understand: " + optionString, + e); + } + } + } + + @Field + public void setExcludes(List<String> excludes) throws TikaConfigException { + for (String s : excludes) { + try { + PipesResult.STATUS status = PipesResult.STATUS.valueOf(s); + this.excludes.add(status); + } catch (IllegalArgumentException e) { + String optionString = getOptionString(); + throw new TikaConfigException( + "I regret I don't recognize " + s + ". I only understand: " + optionString, + e); + } + } + } + + private String getOptionString() { + StringBuilder sb = new StringBuilder(); + int i = 0; + for (PipesResult.STATUS status : PipesResult.STATUS.values()) { + if (++i > 1) { + sb.append(", "); + } + sb.append(status.name()); + } + return sb.toString(); + } + + private abstract static class StatusFilter { + abstract boolean accept(PipesResult.STATUS status); + } + + private static class IncludesFilter extends StatusFilter { + private final Set<PipesResult.STATUS> includes; + + private IncludesFilter(Set<PipesResult.STATUS> includes) { + this.includes = includes; + } + + @Override + boolean accept(PipesResult.STATUS status) { + return includes.contains(status); + } + } + + private static class ExcludesFilter extends StatusFilter { + private final Set<PipesResult.STATUS> excludes; + + ExcludesFilter(Set<PipesResult.STATUS> excludes) { + this.excludes = excludes; + } + + @Override + boolean accept(PipesResult.STATUS status) { + return !excludes.contains(status); + } + } + + private static class AcceptAllFilter extends StatusFilter { + + @Override + boolean accept(PipesResult.STATUS status) { + return true; + } + } + + +} diff --git a/tika-pipes/tika-pipes-reporters/pom.xml b/tika-pipes/tika-pipes-reporters/pom.xml index d574a9d74..4f4c5e917 100644 --- a/tika-pipes/tika-pipes-reporters/pom.xml +++ b/tika-pipes/tika-pipes-reporters/pom.xml @@ -34,6 +34,7 @@ <modules> <module>tika-pipes-reporter-opensearch</module> <module>tika-pipes-reporter-fs-status</module> + <module>tika-pipes-reporter-jdbc</module> </modules> <scm> diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/pom.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/pom.xml new file mode 100644 index 000000000..c2d990596 --- /dev/null +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/pom.xml @@ -0,0 +1,116 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.tika</groupId> + <artifactId>tika-pipes-reporters</artifactId> + <version>2.6.1-SNAPSHOT</version> + <relativePath>../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tika-pipes-reporter-jdbc</artifactId> + + <name>Apache Tika Pipes Reporter - JDBC Pipes Reporter</name> + <url>https://tika.apache.org/</url> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>tika-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.h2database</groupId> + <artifactId>h2</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <configuration> + <archive> + <manifestEntries> + <Automatic-Module-Name>org.apache.tika.pipes.reporters.jdbc</Automatic-Module-Name> + </manifestEntries> + </archive> + </configuration> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <artifactId>maven-shade-plugin</artifactId> + <version>${maven.shade.version}</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <createDependencyReducedPom> + false + </createDependencyReducedPom> + <!-- <filters> --> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*</exclude> + <exclude>LICENSE.txt</exclude> + <exclude>NOTICE.txt</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> + <resource>META-INF/LICENSE</resource> + <file>target/classes/META-INF/LICENSE</file> + </transformer> + <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> + <resource>META-INF/NOTICE</resource> + <file>target/classes/META-INF/NOTICE</file> + </transformer> + <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> + <resource>META-INF/DEPENDENCIES</resource> + <file>target/classes/META-INF/DEPENDENCIES</file> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + + <scm> + <tag>2.2.1-rc2</tag> + </scm> +</project> diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java new file mode 100644 index 000000000..f29c2f986 --- /dev/null +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/main/java/org/apache/tika/pipes/reporters/jdbc/JDBCPipesReporter.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.reporters.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.tika.config.Field; +import org.apache.tika.config.Initializable; +import org.apache.tika.config.InitializableProblemHandler; +import org.apache.tika.config.Param; +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.pipes.FetchEmitTuple; +import org.apache.tika.pipes.PipesReporterBase; +import org.apache.tika.pipes.PipesResult; +import org.apache.tika.utils.StringUtils; + +/** + * This is an initial draft of a JDBCPipesReporter. This will drop + * the tika_status table with each run. If you'd like different behavior, + * please open a ticket on our JIRA! + */ +public class JDBCPipesReporter extends PipesReporterBase implements Initializable { + + private static final Logger LOG = LoggerFactory.getLogger(JDBCPipesReporter.class); + private static final int CACHE_SIZE = 100; + private static final int ARRAY_BLOCKING_QUEUE_SIZE = 1000; + + public static final String TABLE_NAME = "tika_status"; + + private static final long MAX_WAIT_MILLIS = 120000; + + private String connectionString; + private ArrayBlockingQueue<KeyStatusPair> queue = + new ArrayBlockingQueue<>(ARRAY_BLOCKING_QUEUE_SIZE); + CompletableFuture<Void> reportWorkerFuture; + + @Override + public void initialize(Map<String, Param> params) throws TikaConfigException { + super.initialize(params); + if (StringUtils.isBlank(connectionString)) { + throw new TikaConfigException("Must specify a connectionString"); + } + ReportWorker reportWorker = new ReportWorker(connectionString, queue); + reportWorker.init(); + reportWorkerFuture = CompletableFuture.runAsync(reportWorker); + } + + + @Override + public void checkInitialization(InitializableProblemHandler problemHandler) + throws TikaConfigException { + + } + + @Field + public void setConnection(String connection) { + this.connectionString = connection; + } + + + @Override + public void report(FetchEmitTuple t, PipesResult result, long elapsed) { + if (! accept(result.getStatus())) { + return; + } + try { + queue.offer(new KeyStatusPair(t.getEmitKey().getEmitKey(), result.getStatus()), + MAX_WAIT_MILLIS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + //swallow + } + + } + + @Override + public void error(Throwable t) { + LOG.error("reported error; all bets are off", t); + } + + @Override + public void error(String msg) { + LOG.error("reported error; all bets are off: {}", msg); + } + + @Override + public void close() throws IOException { + try { + queue.offer(KeyStatusPair.END_SEMAPHORE, 60, TimeUnit.SECONDS); + } catch (InterruptedException e) { + return; + } + + try { + reportWorkerFuture.get(60, TimeUnit.SECONDS); + } catch (ExecutionException e) { + LOG.error("problem closing", e); + throw new RuntimeException(e); + } catch (TimeoutException e) { + LOG.error("timeout closing", e); + } catch (InterruptedException e) { + // + } finally { + reportWorkerFuture.cancel(true); + } + } + + private static class KeyStatusPair { + + static KeyStatusPair END_SEMAPHORE = new KeyStatusPair(null, null); + private final String emitKey; + private final PipesResult.STATUS status; + + public KeyStatusPair(String emitKey, PipesResult.STATUS status) { + this.emitKey = emitKey; + this.status = status; + } + + @Override + public String toString() { + return "KeyStatusPair{" + "emitKey='" + emitKey + '\'' + ", status=" + status + '}'; + } + } + + private static class ReportWorker implements Runnable { + + private static final int MAX_TRIES = 3; + private final String connectionString; + private final ArrayBlockingQueue<KeyStatusPair> queue; + List<KeyStatusPair> cache = new ArrayList<>(); + private Connection connection; + private PreparedStatement insert; + + public ReportWorker(String connectionString, ArrayBlockingQueue<KeyStatusPair> queue) { + this.connectionString = connectionString; + this.queue = queue; + } + + public void init() throws TikaConfigException { + try { + createConnection(); + createTable(); + createPreparedStatement(); + } catch (SQLException e) { + throw new TikaConfigException("Problem creating connection, etc", e); + } + } + + @Override + public void run() { + while (true) { + //blocking + KeyStatusPair p = null; + try { + p = queue.take(); + } catch (InterruptedException e) { + return; + } + if (p == KeyStatusPair.END_SEMAPHORE) { + try { + reportNow(); + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + return; + } + return; + } + cache.add(p); + if (cache.size() >= CACHE_SIZE) { + try { + reportNow(); + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + return; + } + } + } + + } + + private void reportNow() throws SQLException, InterruptedException { + int attempt = 0; + while (++attempt < MAX_TRIES) { + try { + for (KeyStatusPair p : cache) { + insert.clearParameters(); + insert.setString(1, p.emitKey); + insert.setString(2, p.status.name()); + insert.addBatch(); + } + insert.executeBatch(); + cache.clear(); + return; + } catch (SQLException e) { + LOG.warn("problem writing to the db. Will try to reconnect", e); + reconnect(); + } + } + } + + private void createTable() throws SQLException { + try (Statement st = connection.createStatement()) { + String sql = "drop table if exists " + TABLE_NAME; + st.execute(sql); + sql = "create table " + TABLE_NAME + " (emit_key varchar(512), status varchar(32))"; + st.execute(sql); + } + } + + private void reconnect() throws SQLException, InterruptedException { + int attempts = 0; + SQLException ex = null; + while (++attempts < 3) { + try { + createConnection(); + createPreparedStatement(); + return; + } catch (SQLException e) { + LOG.warn("problem reconnecting", e); + //if there's a failure, wait 10 seconds + //and hope the db is back up. + Thread.sleep(10000); + ex = e; + } + } + throw ex; + } + + private void createConnection() throws SQLException { + connection = DriverManager.getConnection(connectionString); + } + + private void createPreparedStatement() throws SQLException { + String sql = "insert into " + TABLE_NAME + " (emit_key, status) values (?,?)"; + insert = connection.prepareStatement(sql); + } + } + +} diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java new file mode 100644 index 000000000..188e262a9 --- /dev/null +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/java/org/apache/tika/pipes/reporters/jdbc/TestJDBCPipesReporter.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.reporters.jdbc; + +import static org.apache.tika.pipes.PipesResult.STATUS.PARSE_SUCCESS; +import static org.apache.tika.pipes.PipesResult.STATUS.PARSE_SUCCESS_WITH_EXCEPTION; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.jupiter.api.Test; + +import org.apache.tika.pipes.FetchEmitTuple; +import org.apache.tika.pipes.PipesReporter; +import org.apache.tika.pipes.PipesResult; +import org.apache.tika.pipes.async.AsyncConfig; +import org.apache.tika.pipes.emitter.EmitKey; +import org.apache.tika.pipes.fetcher.FetchKey; +import org.apache.tika.pipes.pipesiterator.TotalCountResult; + +public class TestJDBCPipesReporter { + + @Test + public void testBasic() throws Exception { + int numThreads = 10; + int numIterations = 200; + String connectionString = "jdbc:h2:mem:test_tika"; + JDBCPipesReporter reporter = new JDBCPipesReporter(); + reporter.setConnection(connectionString); + reporter.initialize(new HashMap<>()); + + Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations); + reporter.close(); + Map<PipesResult.STATUS, Long> total = countReported(connectionString); + assertEquals(expected.size(), total.size()); + long sum = 0; + for (Map.Entry<PipesResult.STATUS, Long> e : expected.entrySet()) { + assertTrue(total.containsKey(e.getKey()), e.getKey().toString()); + assertEquals(e.getValue(), total.get(e.getKey()), e.getKey().toString()); + sum += e.getValue(); + } + assertEquals(numThreads * numIterations, sum); + } + + @Test + public void testIncludes() throws Exception { + Path p = Paths.get(this.getClass().getResource("/tika-config-includes.xml").toURI()); + AsyncConfig asyncConfig = AsyncConfig.load(p); + PipesReporter reporter = asyncConfig.getPipesReporter(); + int numThreads = 10; + int numIterations = 200; + String connectionString = "jdbc:h2:mem:test_tika"; + + Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations); + reporter.close(); + Map<PipesResult.STATUS, Long> total = countReported(connectionString); + assertEquals(2, total.size()); + long sum = 0; + for (Map.Entry<PipesResult.STATUS, Long> e : expected.entrySet()) { + if (e.getKey() == PARSE_SUCCESS || e.getKey() == PARSE_SUCCESS_WITH_EXCEPTION) { + assertTrue(total.containsKey(e.getKey()), e.getKey().toString()); + assertEquals(e.getValue(), total.get(e.getKey()), e.getKey().toString()); + } else { + assertFalse(total.containsKey(e.getKey()), e.getKey().toString()); + } + sum += e.getValue(); + } + assertEquals(numThreads * numIterations, sum); + } + + @Test + public void testExcludes() throws Exception { + Path p = Paths.get(this.getClass().getResource("/tika-config-excludes.xml").toURI()); + AsyncConfig asyncConfig = AsyncConfig.load(p); + PipesReporter reporter = asyncConfig.getPipesReporter(); + int numThreads = 10; + int numIterations = 200; + String connectionString = "jdbc:h2:mem:test_tika"; + + Map<PipesResult.STATUS, Long> expected = runBatch(reporter, numThreads, numIterations); + reporter.close(); + Map<PipesResult.STATUS, Long> total = countReported(connectionString); + assertEquals(15, total.size()); + long sum = 0; + for (Map.Entry<PipesResult.STATUS, Long> e : expected.entrySet()) { + if (e.getKey() != PARSE_SUCCESS && e.getKey() != PARSE_SUCCESS_WITH_EXCEPTION) { + assertTrue(total.containsKey(e.getKey()), e.getKey().toString()); + assertEquals(e.getValue(), total.get(e.getKey()), e.getKey().toString()); + } else { + assertFalse(total.containsKey(e.getKey()), e.getKey().toString()); + } + sum += e.getValue(); + } + assertEquals(numThreads * numIterations, sum); + } + + + private Map<PipesResult.STATUS, Long> countReported(String connectionString) throws + SQLException { + Map<PipesResult.STATUS, Long> counts = new HashMap<>(); + try (Connection connection = DriverManager.getConnection(connectionString)) { + try (Statement st = connection.createStatement()) { + String sql = "select * from tika_status"; + try (ResultSet rs = st.executeQuery(sql)) { + while (rs.next()) { + String fetchKey = rs.getString(1); + String name = rs.getString(2); + PipesResult.STATUS status = PipesResult.STATUS.valueOf(name); + Long cnt = counts.get(status); + if (cnt == null) { + cnt = 1L; + } else { + cnt++; + } + counts.put(status, cnt); + } + } + } + } + return counts; + } + + private Map<PipesResult.STATUS, Long> runBatch(PipesReporter reporter, + int numThreads, + int numIterations) + throws ExecutionException, InterruptedException { + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + ExecutorCompletionService<Integer> executorCompletionService = + new ExecutorCompletionService(executorService); + List<ReportWorker> workerList = new ArrayList<>(); + for (int i = 0; i < numThreads; i++) { + ReportWorker reportWorker = new ReportWorker(reporter, numIterations); + workerList.add(reportWorker); + executorCompletionService.submit(reportWorker); + } + + Map<PipesResult.STATUS, Long> total = new HashMap<>(); + int finished = 0; + while (finished < numThreads) { + Future<Integer> future = executorCompletionService.poll(); + if (future != null) { + future.get(); + finished++; + } + } + for (ReportWorker r : workerList) { + Map<PipesResult.STATUS, Long> local = r.getWritten(); + for (Map.Entry<PipesResult.STATUS, Long> e : local.entrySet()) { + Long t = total.get(e.getKey()); + if (t == null) { + t = e.getValue(); + } else { + t += e.getValue(); + } + total.put(e.getKey(), t); + } + } + return total; + } + + private static class ReportWorker implements Callable<Integer> { + Map<PipesResult.STATUS, Long> written = new HashMap<>(); + private static final AtomicInteger TOTAL_ADDED = new AtomicInteger(0); + private final PipesReporter reporter; + private final int numIterations; + private ReportWorker(PipesReporter reporter, int numIterations) { + this.reporter = reporter; + this.numIterations = numIterations; + } + @Override + public Integer call() throws Exception { + PipesResult.STATUS[] statuses = PipesResult.STATUS.values(); + Random random = new Random(); + for (int i = 0; i < numIterations; i++) { + PipesResult.STATUS status = statuses[random.nextInt(statuses.length)]; + PipesResult pipesResult = new PipesResult(status); + String id = "id " + TOTAL_ADDED.getAndIncrement(); + FetchEmitTuple t = new FetchEmitTuple(id, + new FetchKey("fetcher", "fetchKey"), + new EmitKey("emitter", id) + ); + + reporter.report(t, pipesResult, 100L); + Long cnt = written.get(status); + if (cnt == null) { + written.put(status, 1L); + } else { + cnt++; + written.put(status, cnt); + } + if (i % 100 == 0) { + Thread.sleep(94); + reporter.report(new TotalCountResult(Math.round((100 + (double) i / (double) 1000)), + TotalCountResult.STATUS.NOT_COMPLETED)); + } + } + return 1; + } + + Map<PipesResult.STATUS, Long> getWritten() { + return written; + } + } +} diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml new file mode 100644 index 000000000..f35b27952 --- /dev/null +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-excludes.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<properties> + <async> + <params> + <maxForEmitBatchBytes>10000</maxForEmitBatchBytes> + <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes> + <emitWithinMillis>60000</emitWithinMillis> + <numEmitters>1</numEmitters> + <numClients>3</numClients> + <tikaConfig>{TIKA_CONFIG}</tikaConfig> + <forkedJvmArgs> + <arg>-Xmx512m</arg> + <arg>-XX:ParallelGCThreads=2</arg> + <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg> + </forkedJvmArgs> + <timeoutMillis>60000</timeoutMillis> + </params> + <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter"> + <params> + <connection>jdbc:h2:mem:test_tika</connection> + <excludes> + <exclude>PARSE_SUCCESS</exclude> + <exclude>PARSE_SUCCESS_WITH_EXCEPTION</exclude> + </excludes> + </params> + </pipesReporter> +</async> +</properties> diff --git a/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml new file mode 100644 index 000000000..fa7c74fcf --- /dev/null +++ b/tika-pipes/tika-pipes-reporters/tika-pipes-reporter-jdbc/src/test/resources/tika-config-includes.xml @@ -0,0 +1,46 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> +<properties> + <async> + <params> + <maxForEmitBatchBytes>10000</maxForEmitBatchBytes> + <emitMaxEstimatedBytes>100000</emitMaxEstimatedBytes> + <emitWithinMillis>60000</emitWithinMillis> + <numEmitters>1</numEmitters> + <numClients>3</numClients> + <tikaConfig>{TIKA_CONFIG}</tikaConfig> + <forkedJvmArgs> + <arg>-Xmx512m</arg> + <arg>-XX:ParallelGCThreads=2</arg> + <arg>-Dlog4j.configurationFile={LOG4J_PROPERTIES_FILE}</arg> + </forkedJvmArgs> + <timeoutMillis>60000</timeoutMillis> + </params> + <pipesReporter class="org.apache.tika.pipes.reporters.jdbc.JDBCPipesReporter"> + <params> + <connection>jdbc:h2:mem:test_tika</connection> + <includes> + <include>PARSE_SUCCESS</include> + <include>PARSE_SUCCESS_WITH_EXCEPTION</include> + </includes> + </params> + </pipesReporter> +</async> +</properties>