Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/BatchProcessDriverCLI.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,392 @@ +package org.apache.tika.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.BufferedInputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; + +import org.apache.log4j.Logger; +import org.apache.tika.io.IOUtils; + +public class BatchProcessDriverCLI { + + /** + * This relies on an special exit values of 254 (do not restart), + * 0 ended correctly, 253 ended with exception (do restart) + */ + public static final int PROCESS_RESTART_EXIT_CODE = 253; + //make sure this is above 255 to avoid stopping on system errors + //that is, if there is a system error (e.g. 143), you + //should restart the process. + public static final int PROCESS_NO_RESTART_EXIT_CODE = 254; + public static final int PROCESS_COMPLETED_SUCCESSFULLY = 0; + private static Logger logger = Logger.getLogger(BatchProcessDriverCLI.class); + + private int maxProcessRestarts = -1; + private long pulseMillis = 1000; + + //how many times to wait pulseMillis milliseconds if a restart + //message has been received through stdout, but the + //child process has not yet exited + private int waitNumLoopsAfterRestartmessage = 60; + + + private volatile boolean userInterrupted = false; + private boolean receivedRestartMsg = false; + private Process process = null; + + private StreamGobbler errorWatcher = null; + private StreamGobbler outGobbler = null; + private InterruptWriter interruptWriter = null; + private final InterruptWatcher interruptWatcher = + new InterruptWatcher(System.in); + + private Thread errorWatcherThread = null; + private Thread outGobblerThread = null; + private Thread interruptWriterThread = null; + private final Thread interruptWatcherThread = new Thread(interruptWatcher); + + private final String[] commandLine; + private int numRestarts = 0; + private boolean redirectChildProcessToStdOut = true; + + public BatchProcessDriverCLI(String[] commandLine){ + this.commandLine = tryToReadMaxRestarts(commandLine); + } + + private String[] tryToReadMaxRestarts(String[] commandLine) { + List<String> args = new ArrayList<String>(); + for (int i = 0; i < commandLine.length; i++) { + String arg = commandLine[i]; + if (arg.equals("-maxRestarts")) { + if (i == commandLine.length-1) { + throw new IllegalArgumentException("Must specify an integer after \"-maxRestarts\""); + } + String restartNumString = commandLine[i+1]; + try { + maxProcessRestarts = Integer.parseInt(restartNumString); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Must specify an integer after \"-maxRestarts\" arg."); + } + i++; + } else { + args.add(arg); + } + } + return args.toArray(new String[args.size()]); + } + + public void execute() throws Exception { + + interruptWatcherThread.setDaemon(true); + interruptWatcherThread.start(); + logger.trace("about to start"); + start(); + int loopsAfterRestartMessageReceived = 0; + while (!userInterrupted) { + Integer exit = null; + try { + logger.trace("about to check exit value"); + exit = process.exitValue(); + logger.trace("exit value:" + exit); + stop(); + } catch (IllegalThreadStateException e) { + //hasn't exited + logger.trace("process has not exited; IllegalThreadStateException"); + } + + logger.trace("Before sleep:" + + " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg); + + //Even if the process has exited, + //wait just a little bit to make sure that + //mustRestart hasn't been set to true + try { + Thread.sleep(pulseMillis); + } catch (InterruptedException e) { + logger.trace("interrupted exception during sleep"); + } + logger.trace("After sleep:" + + " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg); + //if we've gotten the message via stdout to restart + //but the process hasn't exited yet, give it another + //chance + if (receivedRestartMsg && exit == null) { + loopsAfterRestartMessageReceived++; + logger.trace("Must restart, still not exited; loops after restart: " + + loopsAfterRestartMessageReceived); + continue; + } + if (loopsAfterRestartMessageReceived > waitNumLoopsAfterRestartmessage) { + logger.trace("About to try to restart because:" + + " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg); + logger.warn("Restarting after exceeded wait loops waiting for exit: "+ + loopsAfterRestartMessageReceived); + boolean restarted = restart(exit, receivedRestartMsg); + if (!restarted) { + break; + } + } else if (exit != null && exit != BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE + && exit != BatchProcessDriverCLI.PROCESS_COMPLETED_SUCCESSFULLY) { + logger.trace("About to try to restart because:" + + " exit=" + exit + " receivedRestartMsg=" + receivedRestartMsg); + + if (exit != null && exit == BatchProcessDriverCLI.PROCESS_RESTART_EXIT_CODE) { + logger.info("Restarting on expected restart code"); + } else { + logger.warn("Restarting on unexpected restart code: "+exit); + } + boolean restarted = restart(exit, receivedRestartMsg); + if (!restarted) { + break; + } + } else if (exit != null && (exit == PROCESS_COMPLETED_SUCCESSFULLY + || exit == BatchProcessDriverCLI.PROCESS_NO_RESTART_EXIT_CODE)) { + logger.trace("Will not restart: "+exit); + break; + } + } + logger.trace("about to call shutdown driver now"); + shutdownDriverNow(); + } + + private void shutdownDriverNow() { + if (process != null) { + for (int i = 0; i < 10; i++) { + + logger.trace("trying to shut down: "+i); + try { + int exit = process.exitValue(); + logger.trace("trying to stop:"+exit); + stop(); + interruptWatcherThread.interrupt(); + return; + } catch (IllegalThreadStateException e) { + //hasn't exited + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + //swallow + } + } + logger.error("Process didn't stop after 10 seconds after shutdown. " + + "I am forcefully killing it."); + } + interruptWatcherThread.interrupt(); + } + + public int getNumRestarts() { + return numRestarts; + } + + public boolean getUserInterrupted() { + return userInterrupted; + } + + /** + * Tries to restart (stop and then start) the child process + * @return whether or not this was successful, will be false if numRestarts >= maxProcessRestarts + * @throws Exception + */ + private boolean restart(Integer exitValue, boolean receivedRestartMsg) throws Exception { + if (maxProcessRestarts > -1 && numRestarts >= maxProcessRestarts) { + logger.warn("Hit the maximum number of process restarts. Driver is shutting down now."); + stop(); + return false; + } + logger.warn("Must restart process (exitValue="+exitValue+" numRestarts="+numRestarts+ + " receivedRestartMessage="+receivedRestartMsg+")"); + stop(); + start(); + numRestarts++; + return true; + } + + private void stop() { + if (process != null) { + logger.trace("destroying a non-null process"); + process.destroy(); + } + + receivedRestartMsg = false; + //interrupt the writer thread first + interruptWriterThread.interrupt(); + + errorWatcher.stopGobblingAndDie(); + outGobbler.stopGobblingAndDie(); + errorWatcherThread.interrupt(); + outGobblerThread.interrupt(); + } + + private void start() throws Exception { + ProcessBuilder builder = new ProcessBuilder(commandLine); + builder.directory(new File(".")); + process = builder.start(); + + errorWatcher = new StreamWatcher(process.getErrorStream()); + errorWatcherThread = new Thread(errorWatcher); + errorWatcherThread.start(); + + outGobbler = new StreamGobbler(process.getInputStream()); + outGobblerThread = new Thread(outGobbler); + outGobblerThread.start(); + + interruptWriter = new InterruptWriter(process.getOutputStream()); + interruptWriterThread = new Thread(interruptWriter); + interruptWriterThread.start(); + + } + + public void setRedirectChildProcessToStdOut(boolean redirectChildProcessToStdOut) { + this.redirectChildProcessToStdOut = redirectChildProcessToStdOut; + } + + /** + * Class to watch stdin from the driver for anything that is typed. + * This will currently cause an interrupt if anything followed by + * a return key is entered. We may want to add an "Are you sure?" dialogue. + */ + private class InterruptWatcher implements Runnable { + private BufferedReader reader; + + private InterruptWatcher(InputStream is) { + reader = new BufferedReader(new InputStreamReader(is, IOUtils.UTF_8)); + } + + @Override + public void run() { + try { + //this will block. + //as soon as it reads anything, + //set userInterrupted to true and stop + reader.readLine(); + userInterrupted = true; + } catch (IOException e) { + //swallow + } + } + } + + /** + * Class that writes to the child process + * to force an interrupt in the child process. + */ + private class InterruptWriter implements Runnable { + private final Writer writer; + + private InterruptWriter(OutputStream os) { + this.writer = new OutputStreamWriter(os, IOUtils.UTF_8); + } + + @Override + public void run() { + try { + while (true) { + Thread.sleep(500); + if (userInterrupted) { + writer.write(String.format(Locale.ENGLISH, "Ave atque vale!%n")); + writer.flush(); + } + } + } catch (IOException e) { + //swallow + } catch (InterruptedException e) { + //job is done, ok + } + } + } + + private class StreamGobbler implements Runnable { + //plagiarized from org.apache.oodt's StreamGobbler + protected final BufferedReader reader; + protected boolean running = true; + + private StreamGobbler(InputStream is) { + this.reader = new BufferedReader(new InputStreamReader(new BufferedInputStream(is), + IOUtils.UTF_8)); + } + + @Override + public void run() { + String line = null; + try { + logger.trace("gobbler starting to read"); + while ((line = reader.readLine()) != null && this.running) { + if (redirectChildProcessToStdOut) { + System.out.println("BatchProcess:"+line); + } + } + } catch (IOException e) { + logger.trace("gobbler io exception"); + //swallow ioe + } + logger.trace("gobbler done"); + } + + private void stopGobblingAndDie() { + logger.trace("stop gobbling"); + running = false; + IOUtils.closeQuietly(reader); + } + } + + private class StreamWatcher extends StreamGobbler implements Runnable { + //plagiarized from org.apache.oodt's StreamGobbler + + private StreamWatcher(InputStream is){ + super(is); + } + + @Override + public void run() { + String line = null; + try { + logger.trace("watcher starting to read"); + while ((line = reader.readLine()) != null && this.running) { + if (line.startsWith(BatchProcess.BATCH_CONSTANTS.BATCH_PROCESS_FATAL_MUST_RESTART.toString())) { + receivedRestartMsg = true; + } + logger.info("BatchProcess: "+line); + } + } catch (IOException e) { + logger.trace("watcher io exception"); + //swallow ioe + } + logger.trace("watcher done"); + } + } + + + public static void main(String[] args) throws Exception { + + BatchProcessDriverCLI runner = new BatchProcessDriverCLI(args); + runner.execute(); + System.out.println("FSBatchProcessDriver has gracefully completed"); + System.exit(0); + } +}
Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ConsumersManager.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,80 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Collections; +import java.util.List; + +/** + * Simple interface around a collection of consumers that allows + * for initializing and shutting shared resources (e.g. db connection, index, writer, etc.) + */ +public abstract class ConsumersManager { + + //maximum time to allow the ConsumersManager for either init() + //or shutdown() + private long consumersManagerMaxMillis = 60000; + private final List<FileResourceConsumer> consumers; + + public ConsumersManager(List<FileResourceConsumer> consumers) { + this.consumers = Collections.unmodifiableList(consumers); + } + /** + * Get the consumers + * @return consumers + */ + public List<FileResourceConsumer> getConsumers() { + return consumers; + } + + /** + * This is called by BatchProcess before submitting the threads + */ + public void init(){ + + } + + /** + * This is called by BatchProcess immediately before closing. + * Beware! Some of the consumers may have hung or may not + * have completed. + */ + public void shutdown(){ + + } + + /** + * {@link org.apache.tika.batch.BatchProcess} will throw an exception + * if the ConsumersManager doesn't complete init() or shutdown() + * within this amount of time. + * @return the maximum time allowed for init() or shutdown() + */ + public long getConsumersManagerMaxMillis() { + return consumersManagerMaxMillis; + } + + /** + * {@see #getConsumersManagerMaxMillis()} + * + * @param consumersManagerMaxMillis maximum number of milliseconds + * to allow for init() or shutdown() + */ + public void setConsumersManagerMaxMillis(long consumersManagerMaxMillis) { + this.consumersManagerMaxMillis = consumersManagerMaxMillis; + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileConsumerFutureResult.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,37 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +class FileConsumerFutureResult implements IFileProcessorFutureResult { + + private final FileStarted fileStarted; + private final int filesProcessed; + + public FileConsumerFutureResult(FileStarted fs, int filesProcessed) { + this.fileStarted = fs; + this.filesProcessed = filesProcessed; + } + + public FileStarted getFileStarted() { + return fileStarted; + } + + public int getFilesProcessed() { + return filesProcessed; + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResource.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,68 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.Property; + +import java.io.IOException; +import java.io.InputStream; + + +/** + * This is a basic interface to handle a logical "file". + * This should enable code-agnostic handling of files from different + * sources: file system, database, etc. + * + */ +public interface FileResource { + + //The literal lowercased extension of a file. This may or may not + //have any relationship to the actual type of the file. + public static final Property FILE_EXTENSION = Property.internalText("tika:file_ext"); + + /** + * This is only used in logging to identify which file + * may have caused problems. While it is probably best + * to use unique ids for the sake of debugging, it is not + * necessary that the ids be unique. This id + * is never used as a hashkey by the batch processors, for example. + * + * @return an id for a FileResource + */ + public String getResourceId(); + + /** + * This gets the metadata available before the parsing of the file. + * This will typically be "external" metadata: file name, + * file size, file location, data stream, etc. That is, things + * that are known about the file from outside information, not + * file-internal metadata. + * + * @return Metadata + */ + public Metadata getMetadata(); + + /** + * + * @return an InputStream for the FileResource + * @throws java.io.IOException + */ + public InputStream openInputStream() throws IOException; + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceConsumer.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,380 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import javax.xml.stream.XMLOutputFactory; +import javax.xml.stream.XMLStreamException; +import javax.xml.stream.XMLStreamWriter; +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Date; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + + +/** + * This is a base class for file consumers. The + * goal of this class is to abstract out the multithreading + * and recordkeeping components. + * <p/> + */ +public abstract class FileResourceConsumer implements Callable<IFileProcessorFutureResult> { + + private static enum STATE { + NOT_YET_STARTED, + ACTIVELY_CONSUMING, + SWALLOWED_POISON, + THREAD_INTERRUPTED, + EXCEEDED_MAX_CONSEC_WAIT_MILLIS, + ASKED_TO_SHUTDOWN, + TIMED_OUT, + CONSUMER_EXCEPTION, + CONSUMER_ERROR, + COMPLETED + } + + public static String TIME_OUT = "timeout"; + public static String ELAPSED_MILLIS = "elapsedMS"; + + private static AtomicInteger numConsumers = new AtomicInteger(-1); + protected static Logger logger = Logger.getLogger(FileResourceConsumer.class); + + private long maxConsecWaitInMillis = 10*60*1000;// 10 minutes + + private final ArrayBlockingQueue<FileResource> fileQueue; + + private final XMLOutputFactory xmlOutputFactory = XMLOutputFactory.newFactory(); + private final int consumerId; + + //used to lock checks on state to prevent + private final Object lock = new Object(); + + //this records the file that is currently + //being processed. It is null if no file is currently being processed. + //no need for volatile because of lock for checkForStales + private FileStarted currentFile = null; + + //total number of files consumed; volatile so that reporter + //sees the latest + private volatile int numResourcesConsumed = 0; + + //total number of exceptions that were handled by subclasses; + //volatile so that reporter sees the latest + private volatile int numHandledExceptions = 0; + + //after this has been set to ACTIVELY_CONSUMING, + //this should only be set by setEndedState. + private volatile STATE currentState = STATE.NOT_YET_STARTED; + + public FileResourceConsumer(ArrayBlockingQueue<FileResource> fileQueue) { + this.fileQueue = fileQueue; + consumerId = numConsumers.incrementAndGet(); + } + + public IFileProcessorFutureResult call() { + currentState = STATE.ACTIVELY_CONSUMING; + + try { + FileResource fileResource = getNextFileResource(); + while (fileResource != null) { + logger.debug("file consumer is about to process: " + fileResource.getResourceId()); + boolean consumed = _processFileResource(fileResource); + logger.debug("file consumer has finished processing: " + fileResource.getResourceId()); + + if (consumed) { + numResourcesConsumed++; + } + fileResource = getNextFileResource(); + } + } catch (InterruptedException e) { + setEndedState(STATE.THREAD_INTERRUPTED); + } + + setEndedState(STATE.COMPLETED); + return new FileConsumerFutureResult(currentFile, numResourcesConsumed); + } + + + /** + * Main piece of code that needs to be implemented. Clients + * are responsible for closing streams and handling the exceptions + * that they'd like to handle. + * <p/> + * Unchecked throwables can be thrown past this, of course. When an unchecked + * throwable is thrown, this logs the error, and then rethrows the exception. + * Clients/subclasses should make sure to catch and handle everything they can. + * <p/> + * The design goal is that the whole process should close up and shutdown soon after + * an unchecked exception or error is thrown. + * <p/> + * Make sure to call {@link #incrementHandledExceptions()} appropriately in + * your implementation of this method. + * <p/> + * + * @param fileResource resource to process + * @return whether or not a file was successfully processed + */ + public abstract boolean processFileResource(FileResource fileResource); + + + /** + * Make sure to call this appropriately! + */ + protected void incrementHandledExceptions() { + numHandledExceptions++; + } + + + /** + * Returns whether or not the consumer is still could process + * a file or is still processing a file (ACTIVELY_CONSUMING or ASKED_TO_SHUTDOWN) + * @return whether this consumer is still active + */ + public boolean isStillActive() { + if (Thread.currentThread().isInterrupted()) { + return false; + } else if( currentState == STATE.NOT_YET_STARTED || + currentState == STATE.ACTIVELY_CONSUMING || + currentState == STATE.ASKED_TO_SHUTDOWN) { + return true; + } + return false; + } + + private boolean _processFileResource(FileResource fileResource) { + currentFile = new FileStarted(fileResource.getResourceId()); + boolean consumed = false; + try { + consumed = processFileResource(fileResource); + } catch (RuntimeException e) { + setEndedState(STATE.CONSUMER_EXCEPTION); + throw e; + } catch (Error e) { + setEndedState(STATE.CONSUMER_ERROR); + throw e; + } + //if anything is thrown from processFileResource, then the fileStarted + //will remain what it was right before the exception was thrown. + currentFile = null; + return consumed; + } + + /** + * This politely asks the consumer to shutdown. + * Before processing another file, the consumer will check to see + * if it has been asked to terminate. + * <p> + * This offers another method for politely requesting + * that a FileResourceConsumer stop processing + * besides passing it {@link org.apache.tika.batch.PoisonFileResource}. + * + */ + public void pleaseShutdown() { + setEndedState(STATE.ASKED_TO_SHUTDOWN); + } + + /** + * Returns the name and start time of a file that is currently being processed. + * If no file is currently being processed, this will return null. + * + * @return FileStarted or null + */ + public FileStarted getCurrentFile() { + return currentFile; + } + + public int getNumResourcesConsumed() { + return numResourcesConsumed; + } + + public int getNumHandledExceptions() { + return numHandledExceptions; + } + + /** + * Checks to see if the currentFile being processed (if there is one) + * should be timed out (still being worked on after staleThresholdMillis). + * <p> + * If the consumer should be timed out, this will return the currentFile and + * set the state to TIMED_OUT. + * <p> + * If the consumer was already timed out earlier or + * is not processing a file or has been working on a file + * for less than #staleThresholdMillis, then this will return null. + * <p> + * @param staleThresholdMillis threshold to determine whether the consumer has gone stale. + * @return null or the file started that triggered the stale condition + */ + public FileStarted checkForTimedOutMillis(long staleThresholdMillis) { + //if there isn't a current file, don't bother obtaining lock + if (currentFile == null) { + return null; + } + //if threshold is < 0, don't even look. + if (staleThresholdMillis < 0) { + return null; + } + synchronized(lock) { + //check again once the lock has been obtained + if (currentState != STATE.ACTIVELY_CONSUMING + && currentState != STATE.ASKED_TO_SHUTDOWN) { + return null; + } + FileStarted tmp = currentFile; + if (tmp == null) { + return null; + } + if (tmp.getElapsedMillis() > staleThresholdMillis) { + setEndedState(STATE.TIMED_OUT); + logWithResourceId(Level.FATAL, TIME_OUT, + tmp.getResourceId(), ELAPSED_MILLIS, Long.toString(tmp.getElapsedMillis())); + return tmp; + } + } + return null; + } + + protected void logWithResourceId(Level level, String type, String resourceId, String... attrs) { + logWithResourceId(level, type, resourceId, null, attrs); + } + + /** + * Use this for structured output that captures resourceId and other attributes. + * + * @param level level + * @param type entity name for exception + * @param resourceId resourceId string + * @param t throwable can be null + * @param attrs (array of key0, value0, key1, value1, etc.) + */ + protected void logWithResourceId(Level level, String type, String resourceId, Throwable t, String... attrs) { + + StringWriter writer = new StringWriter(); + try { + XMLStreamWriter xml = xmlOutputFactory.createXMLStreamWriter(writer); + xml.writeStartDocument(); + xml.writeStartElement(type); + xml.writeAttribute("resourceId", resourceId); + if (attrs != null) { + //this assumes args has name value pairs alternating, name0 at 0, val0 at 1, name1 at 2, val2 at 3, etc. + for (int i = 0; i < attrs.length - 1; i++) { + xml.writeAttribute(attrs[i], attrs[i + 1]); + } + } + if (t != null) { + StringWriter stackWriter = new StringWriter(); + PrintWriter printWriter = new PrintWriter(stackWriter); + t.printStackTrace(printWriter); + xml.writeCharacters(stackWriter.toString()); + } + xml.writeEndElement(); + xml.writeEndDocument(); + xml.flush(); + xml.close(); + } catch (XMLStreamException e) { + logger.error("error writing xml stream for: " + resourceId, t); + } + + logger.log(level, writer.toString()); + } + + private FileResource getNextFileResource() throws InterruptedException { + FileResource fileResource = null; + long start = new Date().getTime(); + while (fileResource == null) { + //check to see if thread is interrupted before polling + if (Thread.currentThread().isInterrupted()) { + setEndedState(STATE.THREAD_INTERRUPTED); + logger.debug("Consumer thread was interrupted."); + break; + } + + synchronized(lock) { + //need to lock here to prevent race condition with other threads setting state + if (currentState != STATE.ACTIVELY_CONSUMING) { + logger.debug("Consumer already closed because of: "+ currentState.toString()); + break; + } + } + fileResource = fileQueue.poll(1L, TimeUnit.SECONDS); + if (fileResource != null) { + if (fileResource instanceof PoisonFileResource) { + setEndedState(STATE.SWALLOWED_POISON); + fileResource = null; + } + break; + } + logger.debug(consumerId + " is waiting for file and the queue size is: " + fileQueue.size()); + + long elapsed = new Date().getTime() - start; + if (maxConsecWaitInMillis > 0 && elapsed > maxConsecWaitInMillis) { + setEndedState(STATE.EXCEEDED_MAX_CONSEC_WAIT_MILLIS); + break; + } + } + return fileResource; + } + + protected void close(Closeable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch (IOException e){ + logger.error(e.getMessage()); + } + } + closeable = null; + } + + protected void flushAndClose(Closeable closeable) { + if (closeable == null) { + return; + } + if (closeable instanceof Flushable){ + try { + ((Flushable)closeable).flush(); + } catch (IOException e) { + logger.error(e.getMessage()); + } + } + close(closeable); + } + + //do not overwrite a finished state except if + //not yet started, actively consuming or shutting down. This should + //represent the initial cause; all subsequent calls + //to set will be ignored!!! + private void setEndedState(STATE cause) { + synchronized(lock) { + if (currentState == STATE.NOT_YET_STARTED || + currentState == STATE.ACTIVELY_CONSUMING || + currentState == STATE.ASKED_TO_SHUTDOWN) { + currentState = cause; + } + } + } + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawler.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,269 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Date; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import org.apache.log4j.Logger; +import org.apache.tika.extractor.DocumentSelector; +import org.apache.tika.metadata.Metadata; + +public abstract class FileResourceCrawler implements Callable<IFileProcessorFutureResult> { + + protected final static int SKIPPED = 0; + protected final static int ADDED = 1; + protected final static int STOP_NOW = 2; + + private volatile boolean hasCompletedCrawling = false; + private volatile boolean shutDownNoPoison = false; + private volatile boolean isActive = true; + private volatile boolean timedOut = false; + + //how long to pause if can't add to queue + private static final long PAUSE_INCREMENT_MILLIS = 1000; + + protected static Logger logger = Logger.getLogger(FileResourceCrawler.class.toString()); + + private int maxFilesToAdd = -1; + private int maxFilesToConsider = -1; + + private final ArrayBlockingQueue<FileResource> queue; + private final int numConsumers; + + + private long maxConsecWaitInMillis = 300000;//300,000ms = 5 minutes + private DocumentSelector documentSelector = null; + + //number of files added to queue + private int added = 0; + //number of files considered including those that were rejected by documentSelector + private int considered = 0; + + /** + * @param queue shared queue + * @param numConsumers number of consumers (needs to know how many poisons to add when done) + */ + public FileResourceCrawler(ArrayBlockingQueue<FileResource> queue, int numConsumers) { + this.queue = queue; + this.numConsumers = numConsumers; + } + + /** + * Implement this to control the addition of FileResources. Call {@link #tryToAdd} + * to add FileResources to the queue. + * + * @throws InterruptedException + */ + public abstract void start() throws InterruptedException; + + public FileResourceCrawlerFutureResult call() { + try { + start(); + } catch (InterruptedException e) { + //this can be triggered by shutdownNow in BatchProcess + logger.info("InterruptedException in FileCrawler: " + e.getMessage()); + } catch (Exception e) { + logger.error("Exception in FileResourceCrawler: " + e.getMessage()); + } finally { + isActive = false; + } + + try { + shutdown(); + } catch (InterruptedException e) { + //swallow + } + + return new FileResourceCrawlerFutureResult(considered, added); + } + + /** + * + * @param fileResource resource to add + * @return int status of the attempt (SKIPPED, ADDED, STOP_NOW) to add the resource to the queue. + * @throws InterruptedException + */ + protected int tryToAdd(FileResource fileResource) throws InterruptedException { + + if (maxFilesToAdd > -1 && added >= maxFilesToAdd) { + return STOP_NOW; + } + + if (maxFilesToConsider > -1 && considered > maxFilesToConsider) { + return STOP_NOW; + } + + boolean isAdded = false; + if (select(fileResource.getMetadata())) { + long totalConsecutiveWait = 0; + while (queue.offer(fileResource, 1L, TimeUnit.SECONDS) == false) { + + logger.info("FileResourceCrawler is pausing. Queue is full: " + queue.size()); + Thread.sleep(PAUSE_INCREMENT_MILLIS); + totalConsecutiveWait += PAUSE_INCREMENT_MILLIS; + if (maxConsecWaitInMillis > -1 && totalConsecutiveWait > maxConsecWaitInMillis) { + timedOut = true; + logger.error("Crawler had to wait longer than max consecutive wait time."); + throw new InterruptedException("FileResourceCrawler had to wait longer than max consecutive wait time."); + } + if (Thread.currentThread().isInterrupted()) { + logger.info("FileResourceCrawler shutting down because of interrupted thread."); + throw new InterruptedException("FileResourceCrawler interrupted."); + } + } + isAdded = true; + added++; + } else { + logger.debug("crawler did not select: "+fileResource.getResourceId()); + } + considered++; + return (isAdded)?ADDED:SKIPPED; + } + + //Warning! Depending on the value of maxConsecWaitInMillis + //this could try forever in vain to add poison to the queue. + private void shutdown() throws InterruptedException{ + logger.debug("FileResourceCrawler entering shutdown"); + if (hasCompletedCrawling || shutDownNoPoison) { + return; + } + int i = 0; + long start = new Date().getTime(); + while (queue.offer(new PoisonFileResource(), 1L, TimeUnit.SECONDS)) { + if (shutDownNoPoison) { + logger.debug("quitting the poison loop because shutDownNoPoison is now true"); + return; + } + if (Thread.currentThread().isInterrupted()) { + logger.debug("thread interrupted while trying to add poison"); + return; + } + long elapsed = new Date().getTime() - start; + if (maxConsecWaitInMillis > -1 && elapsed > maxConsecWaitInMillis) { + logger.error("Crawler timed out while trying to add poison"); + return; + } + logger.debug("added "+i+" number of PoisonFileResource(s)"); + if (i++ >= numConsumers) { + break; + } + + } + hasCompletedCrawling = true; + } + + /** + * If the crawler stops for any reason, it is no longer active. + * + * @return whether crawler is active or not + */ + public boolean isActive() { + return isActive; + } + + public void setMaxConsecWaitInMillis(long maxConsecWaitInMillis) { + this.maxConsecWaitInMillis = maxConsecWaitInMillis; + } + public void setDocumentSelector(DocumentSelector documentSelector) { + this.documentSelector = documentSelector; + } + + public int getConsidered() { + return considered; + } + + protected boolean select(Metadata m) { + return documentSelector.select(m); + } + + /** + * Maximum number of files to add. If {@link #maxFilesToAdd} < 0 (default), + * then this crawler will add all documents. + * + * @param maxFilesToAdd maximum number of files to add to the queue + */ + public void setMaxFilesToAdd(int maxFilesToAdd) { + this.maxFilesToAdd = maxFilesToAdd; + } + + + /** + * Maximum number of files to consider. A file is considered + * whether or not the DocumentSelector selects a document. + * <p/> + * If {@link #maxFilesToConsider} < 0 (default), then this crawler + * will add all documents. + * + * @param maxFilesToConsider maximum number of files to consider adding to the queue + */ + public void setMaxFilesToConsider(int maxFilesToConsider) { + this.maxFilesToConsider = maxFilesToConsider; + } + + /** + * Use sparingly. This synchronizes on the queue! + * @return whether this queue contains any non-poison file resources + */ + public boolean isQueueEmpty() { + int size= 0; + synchronized(queue) { + for (FileResource aQueue : queue) { + if (!(aQueue instanceof PoisonFileResource)) { + size++; + } + } + } + return size == 0; + } + + /** + * Returns whether the crawler timed out while trying to add a resource + * to the queue. + * <p/> + * If the crawler timed out while trying to add poison, this is not + * set to true. + * + * @return whether this was timed out or not + */ + public boolean wasTimedOut() { + return timedOut; + } + + /** + * + * @return number of files that this crawler added to the queue + */ + public int getAdded() { + return added; + } + + /** + * Set to true to shut down the FileResourceCrawler without + * adding poison. Do this only if you've already called another mechanism + * to request that consumers shut down. This prevents a potential deadlock issue + * where the crawler is trying to add to the queue, but it is full. + * + * @return + */ + public void shutDownNoPoison() { + this.shutDownNoPoison = true; + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileResourceCrawlerFutureResult.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,37 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +class FileResourceCrawlerFutureResult implements IFileProcessorFutureResult { + + private final int considered; + private final int added; + + protected FileResourceCrawlerFutureResult(int considered, int added) { + this.considered = considered; + this.added = added; + } + + protected int getConsidered() { + return considered; + } + + protected int getAdded() { + return added; + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/FileStarted.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,113 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Date; + +/** + * Simple class to record the time when a FileResource's processing started. + */ +class FileStarted { + + private final String resourceId; + private final long started; + + /** + * Initializes a new FileStarted class with {@link #resourceId} + * and sets {@link #started} as new Date().getTime(). + * + * @param resourceId string for unique resource id + */ + public FileStarted(String resourceId) { + this(resourceId, new Date().getTime()); + } + + public FileStarted(String resourceId, long started) { + this.resourceId = resourceId; + this.started = started; + } + + + /** + * @return id of resource + */ + public String getResourceId() { + return resourceId; + } + + /** + * @return time at which processing on this file started + */ + public long getStarted() { + return started; + } + + /** + * @return elapsed milliseconds this the start of processing of this + * file resource + */ + public long getElapsedMillis() { + long now = new Date().getTime(); + return now - started; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((resourceId == null) ? 0 : resourceId.hashCode()); + result = prime * result + (int) (started ^ (started >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof FileStarted)) { + return false; + } + FileStarted other = (FileStarted) obj; + if (resourceId == null) { + if (other.resourceId != null) { + return false; + } + } else if (!resourceId.equals(other.resourceId)) { + return false; + } + return started == other.started; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append("FileStarted [resourceId="); + builder.append(resourceId); + builder.append(", started="); + builder.append(started); + builder.append("]"); + return builder.toString(); + } + + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/IFileProcessorFutureResult.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,26 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * stub interface to allow for different result types from different processors + * + */ +public interface IFileProcessorFutureResult { + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/Interrupter.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,57 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.concurrent.Callable; + +import org.apache.log4j.Logger; +import org.apache.tika.io.IOUtils; + + +/** + * Class that waits for input on System.in. If the user enters a keystroke on + * System.in, this will send a signal to the FileResourceRunner to shutdown gracefully. + * + * <p> + * In the future, this may implement a common IInterrupter interface for more flexibility. + */ +public class Interrupter implements Callable<IFileProcessorFutureResult> { + + private Logger logger = Logger.getLogger(Interrupter.class); + public IFileProcessorFutureResult call(){ + try{ + BufferedReader reader = new BufferedReader(new InputStreamReader(System.in, IOUtils.UTF_8)); + while (true){ + if (reader.ready()){ + reader.readLine(); + break; + } else { + Thread.sleep(1000); + } + } + } catch (InterruptedException e){ + //canceller was interrupted + } catch (IOException e){ + logger.error("IOException from STDIN in CommandlineInterrupter."); + } + return new InterrupterFutureResult(); + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/InterrupterFutureResult.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,22 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class InterrupterFutureResult implements IFileProcessorFutureResult { + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/OutputStreamFactory.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,29 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.tika.metadata.Metadata; + +import java.io.IOException; +import java.io.OutputStream; + +public interface OutputStreamFactory { + + public OutputStream getOutputStream(Metadata metadata) throws IOException; + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParallelFileProcessingResult.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,100 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class ParallelFileProcessingResult { + private final int considered; + private final int added; + private final int consumed; + private final double secondsElapsed; + private final int exitStatus; + private final String causeForTermination; + + public ParallelFileProcessingResult(int considered, int added, int consumed, double secondsElapsed, + int exitStatus, + String causeForTermination) { + this.considered = considered; + this.added = added; + this.consumed = consumed; + this.secondsElapsed = secondsElapsed; + this.exitStatus = exitStatus; + this.causeForTermination = causeForTermination; + } + + /** + * Returns the number of file resources considered. + * If a filter causes the crawler to ignore a number of resources, + * this number could be higher than that returned by {@link #getConsumed()}. + * + * @return number of file resources considered + */ + public int getConsidered() { + return considered; + } + + /** + * @return number of resources added to the queue + */ + public int getAdded() { + return added; + } + + /** + * @return number of resources that were tried to be consumed. There + * may have been an exception. + */ + public int getConsumed() { + return consumed; + } + + /** + * @return whether the {@link BatchProcess} was interrupted + * by an {@link Interrupter}. + */ + public String getCauseForTermination() { + return causeForTermination; + } + + /** + * + * @return seconds elapsed since the start of the batch processing + */ + public double secondsElapsed() { + return secondsElapsed; + } + + /** + * + * @return intendedExitStatus + */ + public int getExitStatus() { + return exitStatus; + } + + @Override + public String toString() { + return "ParallelFileProcessingResult{" + + "considered=" + considered + + ", added=" + added + + ", consumed=" + consumed + + ", secondsElapsed=" + secondsElapsed + + ", exitStatus=" + exitStatus + + ", causeForTermination='" + causeForTermination + '\'' + + '}'; + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/ParserFactory.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,27 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.tika.config.TikaConfig; +import org.apache.tika.parser.Parser; + +public interface ParserFactory { + + public Parser getParser(TikaConfig config); + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/PoisonFileResource.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,54 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.tika.metadata.Metadata; + +import java.io.InputStream; + +/** + * Sentinel class for the crawler to add to the queue to let + * the consumers know that they should shutdown. + */ +class PoisonFileResource implements FileResource { + + /** + * always returns null + */ + @Override + public Metadata getMetadata() { + return null; + } + + /** + * always returns null + */ + @Override + public InputStream openInputStream() { + return null; + } + + /** + * always returns null + */ + @Override + public String getResourceId() { + return null; + } + +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporter.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,227 @@ +package org.apache.tika.batch; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.text.NumberFormat; +import java.util.Date; +import java.util.Locale; +import java.util.concurrent.Callable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tika.util.DurationFormatUtils; + +/** + * Basic class to use for reporting status from both the crawler and the consumers. + * This wakes up roughly every {@link #sleepMillis} and log.info's a status report. + */ + +public class StatusReporter implements Callable<IFileProcessorFutureResult> { + + private final Log logger = LogFactory.getLog(StatusReporter.class); + + //require references to these so that the + //StatusReporter can query them when it wakes up + private final ConsumersManager consumersManager; + private final FileResourceCrawler crawler; + + //local time that the StatusReporter started + private final long start; + //how long to sleep between reporting intervals + private long sleepMillis = 1000; + + //how long before considering a parse "stale" (potentially hung forever) + private long staleThresholdMillis = 100000; + + private volatile boolean isShuttingDown = false; + + /** + * Initialize with the crawler and consumers + * + * @param crawler crawler to ping at intervals + * @param consumersManager consumers to ping at intervals + */ + public StatusReporter(FileResourceCrawler crawler, ConsumersManager consumersManager) { + this.consumersManager = consumersManager; + this.crawler = crawler; + start = new Date().getTime(); + } + + /** + * Override for different behavior. + * <p/> + * This reports the string at the info level to this class' logger. + * + * @param s string to report + */ + protected void report(String s) { + logger.info(s); + } + + /** + * Startup the reporter. + */ + public IFileProcessorFutureResult call() { + NumberFormat numberFormat = NumberFormat.getNumberInstance(Locale.ROOT); + try { + while (true) { + Thread.sleep(sleepMillis); + int cnt = getRoughCountConsumed(); + int exceptions = getRoughCountExceptions(); + long elapsed = new Date().getTime() - start; + double elapsedSecs = (double) elapsed / (double) 1000; + int avg = (elapsedSecs > 5 || cnt > 100) ? (int) ((double) cnt / elapsedSecs) : -1; + + String elapsedString = DurationFormatUtils.formatMillis(new Date().getTime() - start); + String docsPerSec = avg > -1 ? String.format(Locale.ROOT, + " (%s docs per sec)", + numberFormat.format(avg)) : ""; + String msg = + String.format( + Locale.ROOT, + "Processed %s documents in %s%s.", + numberFormat.format(cnt), elapsedString, docsPerSec); + report(msg); + if (exceptions == 1){ + msg = "There has been one handled exception."; + } else { + msg = + String.format(Locale.ROOT, + "There have been %s handled exceptions.", + numberFormat.format(exceptions)); + } + report(msg); + + reportStale(); + + int stillAlive = getStillAlive(); + if (stillAlive == 1) { + msg = "There is one file processor still active."; + } else { + msg = "There are " + numberFormat.format(stillAlive) + " file processors still active."; + } + report(msg); + + int crawled = crawler.getConsidered(); + int added = crawler.getAdded(); + if (crawled == 1) { + msg = "The directory crawler has considered 1 file,"; + } else { + msg = "The directory crawler has considered " + + numberFormat.format(crawled) + " files, "; + } + if (added == 1) { + msg += "and it has added 1 file."; + } else { + msg += "and it has added " + + numberFormat.format(crawler.getAdded()) + " files."; + } + msg += "\n"; + report(msg); + + if (! crawler.isActive()) { + msg = "The directory crawler has completed its crawl.\n"; + report(msg); + } + if (isShuttingDown) { + msg = "Process is shutting down now."; + report(msg); + } + } + } catch (InterruptedException e) { + //swallow + } + return new StatusReporterFutureResult(); + } + + + /** + * Set the amount of time to sleep between reports. + * @param sleepMillis length to sleep btwn reports in milliseconds + */ + public void setSleepMillis(long sleepMillis) { + this.sleepMillis = sleepMillis; + } + + /** + * Set the amount of time in milliseconds to use as the threshold for determining + * a stale parse. + * + * @param staleThresholdMillis threshold for determining whether or not to report a stale + */ + public void setStaleThresholdMillis(long staleThresholdMillis) { + this.staleThresholdMillis = staleThresholdMillis; + } + + + private void reportStale() { + for (FileResourceConsumer consumer : consumersManager.getConsumers()) { + FileStarted fs = consumer.getCurrentFile(); + if (fs == null) { + continue; + } + long elapsed = fs.getElapsedMillis(); + if (elapsed > staleThresholdMillis) { + String elapsedString = Double.toString((double) elapsed / (double) 1000); + report("A thread has been working on " + fs.getResourceId() + + " for " + elapsedString + " seconds."); + } + } + } + + /* + * This returns a rough (unsynchronized) count of resources consumed. + */ + private int getRoughCountConsumed() { + int ret = 0; + for (FileResourceConsumer consumer : consumersManager.getConsumers()) { + ret += consumer.getNumResourcesConsumed(); + } + return ret; + } + + private int getStillAlive() { + int ret = 0; + for (FileResourceConsumer consumer : consumersManager.getConsumers()) { + if ( consumer.isStillActive()) { + ret++; + } + } + return ret; + } + + /** + * This returns a rough (unsynchronized) count of caught/handled exceptions. + * @return rough count of exceptions + */ + public int getRoughCountExceptions() { + int ret = 0; + for (FileResourceConsumer consumer : consumersManager.getConsumers()) { + ret += consumer.getNumHandledExceptions(); + } + return ret; + } + + /** + * Set whether the main process is in the process of shutting down. + * @param isShuttingDown + */ + public void setIsShuttingDown(boolean isShuttingDown){ + this.isShuttingDown = isShuttingDown; + } +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/StatusReporterFutureResult.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,23 @@ +package org.apache.tika.batch; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Empty class for what a StatusReporter returns when it finishes. + */ +public class StatusReporterFutureResult implements IFileProcessorFutureResult { +} Added: tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java URL: http://svn.apache.org/viewvc/tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java?rev=1668673&view=auto ============================================================================== --- tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java (added) +++ tika/trunk/tika-batch/src/main/java/org/apache/tika/batch/builders/AbstractConsumersBuilder.java Mon Mar 23 16:09:10 2015 @@ -0,0 +1,38 @@ +package org.apache.tika.batch.builders; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.tika.batch.ConsumersManager; +import org.apache.tika.batch.FileResource; +import org.w3c.dom.Node; + +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; + +public abstract class AbstractConsumersBuilder { + + public static int getDefaultNumConsumers(){ + int n = Runtime.getRuntime().availableProcessors()-1; + return (n < 1) ? 1 : n; + } + + public abstract ConsumersManager build(Node node, Map<String, String> runtimeAttributes, + ArrayBlockingQueue<FileResource> queue); + + +}