This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4207 in repository https://gitbox.apache.org/repos/asf/tika.git
commit 0674ea4693d57cbc30f8a59417f469a48ce53c2f Author: tallison <talli...@apache.org> AuthorDate: Fri Mar 8 20:13:49 2024 -0500 TIKA-4207 -- WIP, checkpoint commit. Doesn't compile...:D --- .../java/org/apache/tika/cli/TikaCLIAsyncTest.java | 73 ++++++++ .../test/java/org/apache/tika/cli/TikaCLITest.java | 57 +------ .../AbstractEmbeddedDocumentByteStore.java | 63 +++++++ .../extractor/BasicEmbeddedDocumentByteStore.java | 46 +++++ .../tika/extractor/EmbeddedDocumentByteStore.java | 32 ++++ .../extractor/ParsingAndEmbeddedDocExtractor.java | 162 ++++++++++++++++++ .../ParsingAndEmbeddedDocExtractorFactory.java | 40 +++++ .../java/org/apache/tika/pipes/FetchEmitTuple.java | 52 ++++-- .../java/org/apache/tika/pipes/PipesServer.java | 188 +++++++++++++++++---- .../extractor/EmbeddedDocumentBytesConfig.java | 93 ++++++++++ .../extractor/EmbeddedDocumentEmitterStore.java | 63 +++++++ .../org/apache/tika/pipes/PipesServerTest.java | 2 +- .../metadata/serialization/JsonFetchEmitTuple.java | 41 ++++- 13 files changed, 811 insertions(+), 101 deletions(-) diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java new file mode 100644 index 000000000..1f6c8fc2c --- /dev/null +++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLIAsyncTest.java @@ -0,0 +1,73 @@ +package org.apache.tika.cli; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class TikaCLIAsyncTest extends TikaCLITest { + + private static Path ASYNC_CONFIG; + @TempDir + private static Path ASYNC_OUTPUT_DIR; + + @BeforeAll + public static void setUpClass() throws Exception { + ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml"); + String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" + + "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" + + "</async>" + "<fetchers>" + + "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" + + "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + + "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" + + "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" + + "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() + + "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" + + "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" + + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" + + "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" + + "</pipesIterator>" + "</properties>"; + Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8)); + } + + @Test + public void testAsync() throws Exception { + String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath()); + + int json = 0; + for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) { + if (f.getName().endsWith(".json")) { + //check one file for pretty print + if (f.getName().equals("coffee.xls.json")) { + checkForPrettyPrint(f); + } + json++; + } + } + assertEquals(17, json); + } + + private void checkForPrettyPrint(File f) throws IOException { + String json = FileUtils.readFileToString(f, UTF_8); + int previous = json.indexOf("Content-Length"); + assertTrue(previous > -1); + for (String k : new String[]{"Content-Type", "dc:creator", + "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) { + int i = json.indexOf(k); + assertTrue( i > -1, "should have found " + k); + assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous); + previous = i; + } + } + + +} diff --git a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java index ebd1d90b9..c160db396 100644 --- a/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java +++ b/tika-app/src/test/java/org/apache/tika/cli/TikaCLITest.java @@ -45,11 +45,8 @@ import org.apache.tika.utils.ProcessUtils; */ public class TikaCLITest { - private static final File TEST_DATA_FILE = new File("src/test/resources/test-data"); + static final File TEST_DATA_FILE = new File("src/test/resources/test-data"); - private static Path ASYNC_CONFIG; - @TempDir - private static Path ASYNC_OUTPUT_DIR; @TempDir private Path extractDir; @@ -61,24 +58,7 @@ public class TikaCLITest { private PrintStream stderr = null; private String resourcePrefix; - @BeforeAll - public static void setUpClass() throws Exception { - ASYNC_CONFIG = Files.createTempFile(ASYNC_OUTPUT_DIR, "async-config-", ".xml"); - String xml = "<properties>" + "<async>" + "<numClients>3</numClients>" + - "<tikaConfig>" + ASYNC_CONFIG.toAbsolutePath() + "</tikaConfig>" + - "</async>" + "<fetchers>" + - "<fetcher class=\"org.apache.tika.pipes.fetcher.fs.FileSystemFetcher\">" + - "<name>fsf</name>" + "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + - "</basePath>" + "</fetcher>" + "</fetchers>" + "<emitters>" + - "<emitter class=\"org.apache.tika.pipes.emitter.fs.FileSystemEmitter\">" + - "<name>fse</name>" + "<basePath>" + ASYNC_OUTPUT_DIR.toAbsolutePath() + - "</basePath>" + "<prettyPrint>true</prettyPrint>" + "</emitter>" + "</emitters>" + - "<pipesIterator class=\"org.apache.tika.pipes.pipesiterator.fs.FileSystemPipesIterator\">" + - "<basePath>" + TEST_DATA_FILE.getAbsolutePath() + "</basePath>" + - "<fetcherName>fsf</fetcherName>" + "<emitterName>fse</emitterName>" + - "</pipesIterator>" + "</properties>"; - Files.write(ASYNC_CONFIG, xml.getBytes(UTF_8)); - } + protected static void assertExtracted(Path p, String allFiles) throws IOException { @@ -582,42 +562,11 @@ public class TikaCLITest { assertTrue(content.contains("application/vnd.oasis.opendocument.text-web")); } - @Test - public void testAsync() throws Exception { - String content = getParamOutContent("-a", "--config=" + ASYNC_CONFIG.toAbsolutePath()); - - int json = 0; - for (File f : ASYNC_OUTPUT_DIR.toFile().listFiles()) { - if (f.getName().endsWith(".json")) { - //check one file for pretty print - if (f.getName().equals("coffee.xls.json")) { - checkForPrettyPrint(f); - } - json++; - } - } - assertEquals(17, json); - } - - private void checkForPrettyPrint(File f) throws IOException { - String json = FileUtils.readFileToString(f, UTF_8); - int previous = json.indexOf("Content-Length"); - assertTrue(previous > -1); - for (String k : new String[]{"Content-Type", "dc:creator", - "dcterms:created", "dcterms:modified", "X-TIKA:content\""}) { - int i = json.indexOf(k); - assertTrue( i > -1, "should have found " + k); - assertTrue(i > previous, "bad order: " + k + " at " + i + " not less than " + previous); - previous = i; - } - } - - /** * reset outContent and errContent if they are not empty * run given params in TikaCLI and return outContent String with UTF-8 */ - private String getParamOutContent(String... params) throws Exception { + String getParamOutContent(String... params) throws Exception { resetContent(); TikaCLI.main(params); return outContent.toString("UTF-8"); diff --git a/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java new file mode 100644 index 000000000..c435a3e6e --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/extractor/AbstractEmbeddedDocumentByteStore.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.extractor; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.tika.io.FilenameUtils; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig; +import org.apache.tika.utils.StringUtils; + +public abstract class AbstractEmbeddedDocumentByteStore implements EmbeddedDocumentByteStore { + + List<Integer> ids = new ArrayList<>(); + + public String getFetchKey(String containerFetchKey, int embeddedId, + EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig, + Metadata metadata) { + String embeddedIdString = embeddedDocumentBytesConfig.getZeroPadName() > 0 ? + StringUtils.leftPad(Integer.toString(embeddedId), + embeddedDocumentBytesConfig.getZeroPadName(), "0") : + Integer.toString(embeddedId); + + StringBuilder fetchKey = new StringBuilder(containerFetchKey) + .append(embeddedDocumentBytesConfig.getEmbeddedIdPrefix()) + .append(embeddedIdString); + + if (embeddedDocumentBytesConfig.getSuffixStrategy().equals( + EmbeddedDocumentBytesConfig.SUFFIX_STRATEGY.EXISTING)) { + String fName = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY); + String suffix = FilenameUtils.getSuffixFromPath(fName); + fetchKey.append(suffix); + } + return fetchKey.toString(); + } + + @Override + public void add(int id, Metadata metadata, byte[] bytes) throws IOException { + ids.add(id); + } + + @Override + public List<Integer> getIds() { + return ids; + } +} diff --git a/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java new file mode 100644 index 000000000..b41285eb0 --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/extractor/BasicEmbeddedDocumentByteStore.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.extractor; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.tika.metadata.Metadata; +import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig; + +public class BasicEmbeddedDocumentByteStore extends AbstractEmbeddedDocumentByteStore { + private final EmbeddedDocumentBytesConfig config; + public BasicEmbeddedDocumentByteStore(EmbeddedDocumentBytesConfig config) { + this.config = config; + } + //this won't scale, but let's start fully in memory for now; + Map<Integer, byte[]> docBytes = new HashMap<>(); + public void add(int id, Metadata metadata, byte[] bytes) throws IOException { + super.add(id, metadata, bytes); + docBytes.put(id, bytes); + } + + public byte[] getDocument(int id) { + return docBytes.get(id); + } + + @Override + public void close() throws IOException { + //delete tmp dir or whatever here + } +} diff --git a/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java new file mode 100644 index 000000000..ad1bb81f3 --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/extractor/EmbeddedDocumentByteStore.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.extractor; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +import org.apache.tika.metadata.Metadata; + +public interface EmbeddedDocumentByteStore extends Closeable { + //we need metadata for the emitter store...can we get away without it? + void add(int id, Metadata metadata, byte[] bytes) throws IOException; + + byte[] getDocument(int id); + + List<Integer> getIds(); +} diff --git a/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java new file mode 100644 index 000000000..d88ec94c4 --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractor.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.extractor; + +import static org.apache.tika.sax.XHTMLContentHandler.XHTML; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.apache.commons.io.input.CloseShieldInputStream; +import org.xml.sax.ContentHandler; +import org.xml.sax.SAXException; +import org.xml.sax.helpers.AttributesImpl; + +import org.apache.tika.exception.CorruptedFileException; +import org.apache.tika.exception.EncryptedDocumentException; +import org.apache.tika.exception.TikaException; +import org.apache.tika.io.TemporaryResources; +import org.apache.tika.io.TikaInputStream; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.metadata.TikaCoreProperties; +import org.apache.tika.parser.DelegatingParser; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.parser.ParseRecord; +import org.apache.tika.parser.Parser; +import org.apache.tika.sax.BodyContentHandler; +import org.apache.tika.sax.EmbeddedContentHandler; + +/** + * Helper class for parsers of package archives or other compound document + * formats that support embedded or attached component documents. + * + * This is intended to both parse the embedded documents and extract + * the raw bytes from the embedded attachments when possible. + * + * See also {@link ParsingEmbeddedDocumentExtractor} and {@link ParserContainerExtractor}. + * + * @since 3.0.0 + */ +public class ParsingAndEmbeddedDocExtractor implements EmbeddedDocumentExtractor { + + private static final File ABSTRACT_PATH = new File(""); + + private static final Parser DELEGATING_PARSER = new DelegatingParser(); + + private boolean writeFileNameToContent = true; + + private final ParseContext context; + + public ParsingAndEmbeddedDocExtractor(ParseContext context) { + this.context = context; + } + + public boolean shouldParseEmbedded(Metadata metadata) { + DocumentSelector selector = context.get(DocumentSelector.class); + if (selector != null) { + return selector.select(metadata); + } + + FilenameFilter filter = context.get(FilenameFilter.class); + if (filter != null) { + String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY); + if (name != null) { + return filter.accept(ABSTRACT_PATH, name); + } + } + + return true; + } + + public void parseEmbedded( + InputStream stream, ContentHandler handler, Metadata metadata, boolean outputHtml) + throws SAXException, IOException { + if (outputHtml) { + AttributesImpl attributes = new AttributesImpl(); + attributes.addAttribute("", "class", "class", "CDATA", "package-entry"); + handler.startElement(XHTML, "div", "div", attributes); + } + + String name = metadata.get(TikaCoreProperties.RESOURCE_NAME_KEY); + if (writeFileNameToContent && name != null && name.length() > 0 && outputHtml) { + handler.startElement(XHTML, "h1", "h1", new AttributesImpl()); + char[] chars = name.toCharArray(); + handler.characters(chars, 0, chars.length); + handler.endElement(XHTML, "h1", "h1"); + } + + // Use the delegate parser to parse this entry + try (TemporaryResources tmp = new TemporaryResources()) { + final TikaInputStream newStream = + TikaInputStream.get(CloseShieldInputStream.wrap(stream), tmp, metadata); + if (stream instanceof TikaInputStream) { + final Object container = ((TikaInputStream) stream).getOpenContainer(); + if (container != null) { + newStream.setOpenContainer(container); + } + } + Path p = newStream.getPath(); + storeEmbeddedBytes(p, metadata); + + DELEGATING_PARSER.parse(newStream, new EmbeddedContentHandler(new BodyContentHandler(handler)), + metadata, context); + } catch (EncryptedDocumentException ede) { + recordException(ede, context); + } catch (CorruptedFileException e) { + //necessary to stop the parse to avoid infinite loops + //on corrupt sqlite3 files + throw new IOException(e); + } catch (TikaException e) { + recordException(e, context); + } + + if (outputHtml) { + handler.endElement(XHTML, "div", "div"); + } + } + + private void storeEmbeddedBytes(Path p, Metadata metadata) { + EmbeddedDocumentByteStore embeddedDocumentByteStore = + context.get(EmbeddedDocumentByteStore.class); + int id = metadata.getInt(TikaCoreProperties.EMBEDDED_ID); + try { + embeddedDocumentByteStore.add(id, metadata, Files.readAllBytes(p)); + } catch (IOException e) { + //log, or better, store embdocstore exception + } + } + + private void recordException(Exception e, ParseContext context) { + ParseRecord record = context.get(ParseRecord.class); + if (record == null) { + return; + } + record.addException(e); + } + + public Parser getDelegatingParser() { + return DELEGATING_PARSER; + } + + public void setWriteFileNameToContent(boolean writeFileNameToContent) { + this.writeFileNameToContent = writeFileNameToContent; + } +} diff --git a/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java new file mode 100644 index 000000000..ca4c6633c --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/extractor/ParsingAndEmbeddedDocExtractorFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.extractor; + +import org.apache.tika.config.Field; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.ParseContext; + +public class ParsingAndEmbeddedDocExtractorFactory + implements EmbeddedDocumentExtractorFactory { + + private boolean writeFileNameToContent = true; + + @Field + public void setWriteFileNameToContent(boolean writeFileNameToContent) { + this.writeFileNameToContent = writeFileNameToContent; + } + + @Override + public EmbeddedDocumentExtractor newInstance(Metadata metadata, ParseContext parseContext) { + ParsingEmbeddedDocumentExtractor ex = + new ParsingEmbeddedDocumentExtractor(parseContext); + ex.setWriteFileNameToContent(writeFileNameToContent); + return ex; + } +} diff --git a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java index 3a8ec2bdd..c49f3743f 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/FetchEmitTuple.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.util.Objects; import org.apache.tika.metadata.Metadata; +import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig; import org.apache.tika.pipes.emitter.EmitKey; import org.apache.tika.pipes.fetcher.FetchKey; @@ -38,6 +39,7 @@ public class FetchEmitTuple implements Serializable { private final ON_PARSE_EXCEPTION onParseException; private HandlerConfig handlerConfig; + private EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig; public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey) { this(id, fetchKey, emitKey, new Metadata(), HandlerConfig.DEFAULT_HANDLER_CONFIG, @@ -55,12 +57,20 @@ public class FetchEmitTuple implements Serializable { public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata, HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException) { + this(id, fetchKey, emitKey, metadata, handlerConfig, onParseException, + EmbeddedDocumentBytesConfig.SKIP); + } + + public FetchEmitTuple(String id, FetchKey fetchKey, EmitKey emitKey, Metadata metadata, + HandlerConfig handlerConfig, ON_PARSE_EXCEPTION onParseException, + EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig) { this.id = id; this.fetchKey = fetchKey; this.emitKey = emitKey; this.metadata = metadata; this.handlerConfig = handlerConfig; this.onParseException = onParseException; + this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig; } public String getId() { @@ -94,21 +104,40 @@ public class FetchEmitTuple implements Serializable { return handlerConfig == null ? HandlerConfig.DEFAULT_HANDLER_CONFIG : handlerConfig; } + public EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig() { + return embeddedDocumentBytesConfig; + } + @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } FetchEmitTuple that = (FetchEmitTuple) o; - if (!Objects.equals(id, that.id)) return false; - if (!Objects.equals(fetchKey, that.fetchKey)) + if (!Objects.equals(id, that.id)) { + return false; + } + if (!Objects.equals(fetchKey, that.fetchKey)) { + return false; + } + if (!Objects.equals(emitKey, that.emitKey)) { + return false; + } + if (!Objects.equals(metadata, that.metadata)) { + return false; + } + if (onParseException != that.onParseException) { return false; - if (!Objects.equals(emitKey, that.emitKey)) return false; - if (!Objects.equals(metadata, that.metadata)) + } + if (!Objects.equals(handlerConfig, that.handlerConfig)) { return false; - if (onParseException != that.onParseException) return false; - return Objects.equals(handlerConfig, that.handlerConfig); + } + return Objects.equals(embeddedDocumentBytesConfig, that.embeddedDocumentBytesConfig); } @Override @@ -119,13 +148,16 @@ public class FetchEmitTuple implements Serializable { result = 31 * result + (metadata != null ? metadata.hashCode() : 0); result = 31 * result + (onParseException != null ? onParseException.hashCode() : 0); result = 31 * result + (handlerConfig != null ? handlerConfig.hashCode() : 0); + result = 31 * result + + (embeddedDocumentBytesConfig != null ? embeddedDocumentBytesConfig.hashCode() : 0); return result; } @Override public String toString() { return "FetchEmitTuple{" + "id='" + id + '\'' + ", fetchKey=" + fetchKey + ", emitKey=" + - emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException + - ", handlerConfig=" + handlerConfig + '}'; + emitKey + ", metadata=" + metadata + ", onParseException=" + onParseException + + ", handlerConfig=" + handlerConfig + ", embeddedDocumentBytesConfig=" + + embeddedDocumentBytesConfig + '}'; } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java index ed1e5bb5e..94ef58502 100644 --- a/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java +++ b/tika-core/src/main/java/org/apache/tika/pipes/PipesServer.java @@ -16,6 +16,7 @@ */ package org.apache.tika.pipes; +import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; @@ -24,10 +25,12 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.PrintStream; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; import java.util.List; +import java.util.Optional; import org.apache.commons.io.IOUtils; import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream; @@ -40,8 +43,11 @@ import org.xml.sax.SAXException; import org.apache.tika.config.TikaConfig; import org.apache.tika.detect.Detector; import org.apache.tika.exception.EncryptedDocumentException; +import org.apache.tika.exception.TikaConfigException; import org.apache.tika.exception.TikaException; +import org.apache.tika.extractor.BasicEmbeddedDocumentByteStore; import org.apache.tika.extractor.DocumentSelector; +import org.apache.tika.extractor.EmbeddedDocumentByteStore; import org.apache.tika.io.TemporaryResources; import org.apache.tika.io.TikaInputStream; import org.apache.tika.metadata.Metadata; @@ -52,11 +58,14 @@ import org.apache.tika.parser.DigestingParser; import org.apache.tika.parser.ParseContext; import org.apache.tika.parser.Parser; import org.apache.tika.parser.RecursiveParserWrapper; +import org.apache.tika.pipes.emitter.StreamEmitter; +import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig; import org.apache.tika.pipes.emitter.EmitData; import org.apache.tika.pipes.emitter.EmitKey; import org.apache.tika.pipes.emitter.Emitter; import org.apache.tika.pipes.emitter.EmitterManager; import org.apache.tika.pipes.emitter.TikaEmitterException; +import org.apache.tika.pipes.extractor.EmbeddedDocumentEmitterStore; import org.apache.tika.pipes.fetcher.FetchKey; import org.apache.tika.pipes.fetcher.Fetcher; import org.apache.tika.pipes.fetcher.FetcherManager; @@ -269,7 +278,7 @@ public class PipesServer implements Runnable { * @return */ private String getContainerStacktrace(FetchEmitTuple t, List<Metadata> metadataList) { - if (metadataList == null || metadataList.size() < 1) { + if (metadataIsEmpty(metadataList)) { return StringUtils.EMPTY; } String stack = metadataList.get(0).get(TikaCoreProperties.CONTAINER_EXCEPTION); @@ -277,11 +286,12 @@ public class PipesServer implements Runnable { } - private void emit(String taskId, EmitData emitData, String parseExceptionStack) { + private void emit(String taskId, EmitKey emitKey, MetadataListAndEmbeddedBytes parseData, + String parseExceptionStack) { Emitter emitter = null; try { - emitter = emitterManager.getEmitter(emitData.getEmitKey().getEmitterName()); + emitter = emitterManager.getEmitter(emitKey.getEmitterName()); } catch (IllegalArgumentException e) { String noEmitterMsg = getNoEmitterMsg(taskId); LOG.warn(noEmitterMsg); @@ -289,7 +299,11 @@ public class PipesServer implements Runnable { return; } try { - emitter.emit(emitData.getEmitKey().getEmitKey(), emitData.getMetadataList()); + if (parseData.toBePackagedForStreamEmitter()) { + emitContentsAndBytes(emitter, emitKey, parseData); + } else { + emitter.emit(emitKey.getEmitKey(), parseData.getMetadataList()); + } } catch (IOException | TikaEmitterException e) { LOG.warn("emit exception", e); String msg = ExceptionUtils.getStackTrace(e); @@ -306,6 +320,16 @@ public class PipesServer implements Runnable { } } + private void emitContentsAndBytes(Emitter emitter, EmitKey emitKey, + MetadataListAndEmbeddedBytes parseData) { + if (! (emitter instanceof StreamEmitter)) { + throw new IllegalArgumentException("The emitter for embedded document byte store must" + + " be a StreamEmitter. I see: " + emitter.getClass()); + } + //TODO: implement this + throw new UnsupportedOperationException("this is not yet implemented"); + } + private void parseOne() { synchronized (lock) { parsing = true; @@ -348,35 +372,53 @@ public class PipesServer implements Runnable { } start = System.currentTimeMillis(); - List<Metadata> metadataList = parseIt(t, fetcher); + MetadataListAndEmbeddedBytes parseData = null; - if (LOG.isTraceEnabled()) { - LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start); - } + try { + parseData = parseFromTuple(t, fetcher); - if (metadataIsEmpty(metadataList)) { - write(STATUS.EMPTY_OUTPUT); - return; - } + if (LOG.isTraceEnabled()) { + LOG.trace("timer -- to parse: {} ms", System.currentTimeMillis() - start); + } + + if (metadataIsEmpty(parseData.getMetadataList())) { + write(STATUS.EMPTY_OUTPUT); + return; + } - emitIt(t, metadataList); + emitParseData(t, parseData); + } finally { + if (parseData.hasEmbeddedDocumentByteStore() && + parseData.getEmbeddedDocumentByteStore() instanceof Closeable) { + try { + ((Closeable)parseData.getEmbeddedDocumentByteStore()).close(); + } catch (IOException e) { + LOG.warn("problem closing embedded document byte store", e); + } + } + } } - private void emitIt(FetchEmitTuple t, List<Metadata> metadataList) { + private void emitParseData(FetchEmitTuple t, MetadataListAndEmbeddedBytes parseData) { long start = System.currentTimeMillis(); - String stack = getContainerStacktrace(t, metadataList); + String stack = getContainerStacktrace(t, parseData.getMetadataList()); //we need to apply this after we pull out the stacktrace - filterMetadata(metadataList); + filterMetadata(parseData.getMetadataList()); if (StringUtils.isBlank(stack) || t.getOnParseException() == FetchEmitTuple.ON_PARSE_EXCEPTION.EMIT) { - injectUserMetadata(t.getMetadata(), metadataList); + injectUserMetadata(t.getMetadata(), parseData.getMetadataList()); EmitKey emitKey = t.getEmitKey(); if (StringUtils.isBlank(emitKey.getEmitKey())) { emitKey = new EmitKey(emitKey.getEmitterName(), t.getFetchKey().getFetchKey()); t.setEmitKey(emitKey); } - EmitData emitData = new EmitData(t.getEmitKey(), metadataList, stack); - if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) { - emit(t.getId(), emitData, stack); + EmitData emitData = new EmitData(t.getEmitKey(), parseData.getMetadataList(), stack); + if (parseData.toBePackagedForStreamEmitter()) { + emit(t.getId(), emitKey, parseData, stack); + if (LOG.isTraceEnabled()) { + LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start); + } + } else if (maxForEmitBatchBytes >= 0 && emitData.getEstimatedSizeBytes() >= maxForEmitBatchBytes) { + emit(t.getId(), emitKey, parseData, stack); if (LOG.isTraceEnabled()) { LOG.trace("timer -- emitted: {} ms", System.currentTimeMillis() - start); } @@ -418,7 +460,7 @@ public class PipesServer implements Runnable { } } - protected List<Metadata> parseIt(FetchEmitTuple t, Fetcher fetcher) { + protected MetadataListAndEmbeddedBytes parseFromTuple(FetchEmitTuple t, Fetcher fetcher) { FetchKey fetchKey = t.getFetchKey(); if (fetchKey.hasRange()) { if (! (fetcher instanceof RangeFetcher)) { @@ -428,7 +470,7 @@ public class PipesServer implements Runnable { Metadata metadata = new Metadata(); try (InputStream stream = ((RangeFetcher)fetcher).fetch(fetchKey.getFetchKey(), fetchKey.getRangeStart(), fetchKey.getRangeEnd(), metadata)) { - return parse(t, stream, metadata); + return parseWithStream(t, stream, metadata); } catch (SecurityException e) { LOG.error("security exception " + t.getId(), e); throw e; @@ -439,7 +481,7 @@ public class PipesServer implements Runnable { } else { Metadata metadata = new Metadata(); try (InputStream stream = fetcher.fetch(t.getFetchKey().getFetchKey(), metadata)) { - return parse(t, stream, metadata); + return parseWithStream(t, stream, metadata); } catch (SecurityException e) { LOG.error("security exception " + t.getId(), e); throw e; @@ -488,20 +530,46 @@ public class PipesServer implements Runnable { exit(1); } - private List<Metadata> parse(FetchEmitTuple fetchEmitTuple, InputStream stream, - Metadata metadata) { + private MetadataListAndEmbeddedBytes parseWithStream(FetchEmitTuple fetchEmitTuple, InputStream stream, + Metadata metadata) throws TikaConfigException { HandlerConfig handlerConfig = fetchEmitTuple.getHandlerConfig(); + List<Metadata> metadataList; + //this adds the EmbeddedDocumentByteStore to the parsecontext + ParseContext parseContext = createParseContext(fetchEmitTuple); + if (handlerConfig.getParseMode() == HandlerConfig.PARSE_MODE.RMETA) { - return parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata); + metadataList = parseRecursive(fetchEmitTuple, handlerConfig, stream, metadata, parseContext); } else { - return parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata); + metadataList = parseConcatenated(fetchEmitTuple, handlerConfig, stream, metadata, parseContext); } + + return new MetadataListAndEmbeddedBytes(metadataList, + parseContext.get(EmbeddedDocumentByteStore.class)); + } + + private ParseContext createParseContext(FetchEmitTuple fetchEmitTuple) + throws TikaConfigException { + ParseContext parseContext = new ParseContext(); + if (fetchEmitTuple.getEmbeddedDocumentBytesConfig() == EmbeddedDocumentBytesConfig.SKIP) { + return parseContext; + } + + if (! StringUtils.isBlank(fetchEmitTuple.getEmbeddedDocumentBytesConfig().getEmitter())) { + parseContext.set(EmbeddedDocumentByteStore.class, + new EmbeddedDocumentEmitterStore(fetchEmitTuple.getEmitKey(), + fetchEmitTuple.getEmbeddedDocumentBytesConfig(), + emitterManager)); + } else { + parseContext.set(EmbeddedDocumentByteStore.class, + new BasicEmbeddedDocumentByteStore(fetchEmitTuple.getEmbeddedDocumentBytesConfig())); + + } + return parseContext; } private List<Metadata> parseConcatenated(FetchEmitTuple fetchEmitTuple, HandlerConfig handlerConfig, InputStream stream, - Metadata metadata) { - ParseContext parseContext = new ParseContext(); + Metadata metadata, ParseContext parseContext) { ContentHandlerFactory contentHandlerFactory = new BasicContentHandlerFactory(handlerConfig.getType(), @@ -552,8 +620,7 @@ public class PipesServer implements Runnable { private List<Metadata> parseRecursive(FetchEmitTuple fetchEmitTuple, HandlerConfig handlerConfig, InputStream stream, - Metadata metadata) { - ParseContext parseContext = new ParseContext(); + Metadata metadata, ParseContext parseContext) { //Intentionally do not add the metadata filter here! //We need to let stacktraces percolate RecursiveParserWrapperHandler handler = new RecursiveParserWrapperHandler( @@ -590,7 +657,7 @@ public class PipesServer implements Runnable { if (tis == null) { tis = TikaInputStream.get(stream, tmp, metadata); } - _preParse(t.getId(), tis, metadata, parseContext); + _preParse(t, tis, metadata, parseContext); } finally { IOUtils.closeQuietly(tmp); } @@ -598,13 +665,13 @@ public class PipesServer implements Runnable { writeIntermediate(t.getEmitKey(), metadata); } - private void _preParse(String id, TikaInputStream tis, Metadata metadata, + private void _preParse(FetchEmitTuple t, TikaInputStream tis, Metadata metadata, ParseContext parseContext) { if (digester != null) { try { digester.digest(tis, metadata, parseContext); } catch (IOException e) { - LOG.warn("problem digesting: " + id, e); + LOG.warn("problem digesting: " + t.getId(), e); } } try { @@ -612,7 +679,18 @@ public class PipesServer implements Runnable { metadata.set(Metadata.CONTENT_TYPE, mt.toString()); metadata.set(TikaCoreProperties.CONTENT_TYPE_PARSER_OVERRIDE, mt.toString()); } catch (IOException e) { - LOG.warn("problem detecting: " + id, e); + LOG.warn("problem detecting: " + t.getId(), e); + } + + if (t.getEmbeddedDocumentBytesConfig() != null && + t.getEmbeddedDocumentBytesConfig().isIncludeOriginal()) { + EmbeddedDocumentByteStore embeddedDocumentByteStore = + parseContext.get(EmbeddedDocumentEmitterStore.class); + try { + embeddedDocumentByteStore.add(0, metadata, Files.readAllBytes(tis.getPath())); + } catch (IOException e) { + LOG.warn("problem reading source file into embedded document byte store", e); + } } } @@ -734,4 +812,44 @@ public class PipesServer implements Runnable { exit(1); } } + + private class MetadataListAndEmbeddedBytes { + final List<Metadata> metadataList; + final Optional<EmbeddedDocumentByteStore> embeddedDocumentByteStore; + + public MetadataListAndEmbeddedBytes(List<Metadata> metadataList, + EmbeddedDocumentByteStore embeddedDocumentByteStore) { + this.metadataList = metadataList; + this.embeddedDocumentByteStore = Optional.ofNullable(embeddedDocumentByteStore); + } + + public List<Metadata> getMetadataList() { + return metadataList; + } + + public EmbeddedDocumentByteStore getEmbeddedDocumentByteStore() { + return embeddedDocumentByteStore.get(); + } + + /** + * This tests whether there's any type of embedded document store + * ...that, for example, may require closing at the end of the parse. + * @return + */ + public boolean hasEmbeddedDocumentByteStore() { + return embeddedDocumentByteStore.isPresent(); + } + + /** + * If the intent is that the metadata and byte store be packaged in a zip + * or similar and emitted via a single stream emitter. + * + * This is basically a test that this is not an EmbeddedDocumentEmitterStore. + * + * @return + */ + public boolean toBePackagedForStreamEmitter() { + return ! (embeddedDocumentByteStore.get() instanceof EmbeddedDocumentEmitterStore); + } + } } diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java new file mode 100644 index 000000000..cdf3c77fe --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentBytesConfig.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tika.pipes.extractor; + +public class EmbeddedDocumentBytesConfig { + + public static EmbeddedDocumentBytesConfig SKIP = new EmbeddedDocumentBytesConfig(false); + + public enum SUFFIX_STRATEGY { + NONE, EXISTING, DETECTED + } + private final boolean extractEmbeddedDocumentBytes; + //TODO -- add these at some point + /* + private Set<String> includeMimeTypes = new HashSet<>(); + private Set<String> excludeMimeTypes = new HashSet<>(); + */ + private int zeroPadName = 0; + + private SUFFIX_STRATEGY suffixStrategy = SUFFIX_STRATEGY.NONE; + + private String embeddedIdPrefix = "-"; + + private String emitter; + + private boolean includeOriginal = false; + + public EmbeddedDocumentBytesConfig(boolean extractEmbeddedDocumentBytes) { + this.extractEmbeddedDocumentBytes = extractEmbeddedDocumentBytes; + } + + public static EmbeddedDocumentBytesConfig getSKIP() { + return SKIP; + } + + public boolean isExtractEmbeddedDocumentBytes() { + return extractEmbeddedDocumentBytes; + } + + public int getZeroPadName() { + return zeroPadName; + } + + public SUFFIX_STRATEGY getSuffixStrategy() { + return suffixStrategy; + } + + public String getEmbeddedIdPrefix() { + return embeddedIdPrefix; + } + + public String getEmitter() { + return emitter; + } + + public boolean isIncludeOriginal() { + return includeOriginal; + } + + public void setZeroPadNameLength(int zeroPadName) { + this.zeroPadName = zeroPadName; + } + + public void setSuffixStrategy(SUFFIX_STRATEGY suffixStrategy) { + this.suffixStrategy = suffixStrategy; + } + + public void setEmbeddedIdPrefix(String embeddedIdPrefix) { + this.embeddedIdPrefix = embeddedIdPrefix; + } + + public void setEmitter(String emitter) { + this.emitter = emitter; + } + + public void setIncludeOriginal(boolean includeOriginal) { + this.includeOriginal = includeOriginal; + } +} diff --git a/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java new file mode 100644 index 000000000..ddcca6edf --- /dev/null +++ b/tika-core/src/main/java/org/apache/tika/pipes/extractor/EmbeddedDocumentEmitterStore.java @@ -0,0 +1,63 @@ +package org.apache.tika.pipes.extractor; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.commons.io.IOExceptionWithCause; +import org.apache.commons.io.input.UnsynchronizedByteArrayInputStream; + +import org.apache.tika.exception.TikaConfigException; +import org.apache.tika.extractor.AbstractEmbeddedDocumentByteStore; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.pipes.emitter.EmitKey; +import org.apache.tika.pipes.emitter.Emitter; +import org.apache.tika.pipes.emitter.EmitterManager; +import org.apache.tika.pipes.emitter.StreamEmitter; +import org.apache.tika.pipes.emitter.TikaEmitterException; + +public class EmbeddedDocumentEmitterStore extends AbstractEmbeddedDocumentByteStore { + private final EmitKey containerEmitKey; + private final EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig; + private final StreamEmitter emitter; + + private static final Metadata METADATA = new Metadata(); + public EmbeddedDocumentEmitterStore(EmitKey containerEmitKey, + EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig, + EmitterManager emitterManager) throws TikaConfigException { + this.containerEmitKey = containerEmitKey; + this.embeddedDocumentBytesConfig = embeddedDocumentBytesConfig; + Emitter tmpEmitter = + emitterManager.getEmitter(embeddedDocumentBytesConfig.getEmitter()); + if (! (tmpEmitter instanceof StreamEmitter)) { + throw new TikaConfigException("Emitter " + + embeddedDocumentBytesConfig.getEmitter() + + " must implement a StreamEmitter"); + } + this.emitter = (StreamEmitter) tmpEmitter; + } + + @Override + public void add(int id, Metadata metadata, byte[] bytes) throws IOException { + //intentionally do not call super.add, because we want the ids list to be empty + String emitKey = getFetchKey(containerEmitKey.getEmitKey(), + id, embeddedDocumentBytesConfig, metadata); + + try { + emitter.emit(emitKey, new UnsynchronizedByteArrayInputStream(bytes), METADATA); + } catch (TikaEmitterException e) { + throw new IOExceptionWithCause(e); + } + } + + @Override + public byte[] getDocument(int id) { + throw new UnsupportedOperationException("this is emit only."); + } + + @Override + public void close() throws IOException { + if (emitter instanceof Closeable) { + ((Closeable) emitter).close(); + } + } +} diff --git a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java index 53c784796..92d8c5c11 100644 --- a/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java +++ b/tika-core/src/test/java/org/apache/tika/pipes/PipesServerTest.java @@ -69,7 +69,7 @@ public class PipesServerTest extends TikaTest { new FetchKey("fs", "mock.xml"), new EmitKey("", "")); Fetcher fetcher = FetcherManager.load(tikaConfig).getFetcher(); - List<Metadata> metadataList = pipesServer.parseIt(fetchEmitTuple, fetcher); + List<Metadata> metadataList = pipesServer.parseFromTuple(fetchEmitTuple, fetcher); assertEquals("5f3b924303e960ce35d7f705e91d3018dd110a9c3cef0546a91fe013d6dad6fd", metadataList.get(0).get("X-TIKA:digest:SHA-256")); } diff --git a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java index 3fbd67c0c..e1bec421a 100644 --- a/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java +++ b/tika-serialization/src/main/java/org/apache/tika/metadata/serialization/JsonFetchEmitTuple.java @@ -32,6 +32,7 @@ import org.apache.tika.config.TikaConfig; import org.apache.tika.metadata.Metadata; import org.apache.tika.pipes.FetchEmitTuple; import org.apache.tika.pipes.HandlerConfig; +import org.apache.tika.pipes.extractor.EmbeddedDocumentBytesConfig; import org.apache.tika.pipes.emitter.EmitKey; import org.apache.tika.pipes.fetcher.FetchKey; import org.apache.tika.sax.BasicContentHandlerFactory; @@ -54,6 +55,8 @@ public class JsonFetchEmitTuple { private static final String HANDLER_CONFIG_MAX_EMBEDDED_RESOURCES = "maxEmbeddedResources"; private static final String HANDLER_CONFIG_PARSE_MODE = "parseMode"; + private static final String EMBEDDED_DOCUMENT_BYTES_CONFIG = "embeddedDocumentBytesConfig"; + public static FetchEmitTuple fromJson(Reader reader) throws IOException { try (JsonParser jParser = new JsonFactory().setStreamReadConstraints(StreamReadConstraints.builder() @@ -84,6 +87,8 @@ public class JsonFetchEmitTuple { FetchEmitTuple.DEFAULT_ON_PARSE_EXCEPTION; HandlerConfig handlerConfig = HandlerConfig.DEFAULT_HANDLER_CONFIG; Metadata metadata = new Metadata(); + EmbeddedDocumentBytesConfig embeddedDocumentBytesConfig = EmbeddedDocumentBytesConfig.SKIP; + while (token != JsonToken.END_OBJECT) { if (token != JsonToken.FIELD_NAME) { throw new IOException("required field name, but see: " + token.name()); @@ -120,6 +125,8 @@ public class JsonFetchEmitTuple { fetchRangeStart = getLong(jParser); } else if (FETCH_RANGE_END.equals(name)) { fetchRangeEnd = getLong(jParser); + } else if (EMBEDDED_DOCUMENT_BYTES_CONFIG.equals(name)) { + embeddedDocumentBytesConfig = getEmbeddedDocumentBytesConfig(jParser); } token = jParser.nextToken(); } @@ -127,7 +134,39 @@ public class JsonFetchEmitTuple { id = fetchKey; } return new FetchEmitTuple(id, new FetchKey(fetcherName, fetchKey, fetchRangeStart, fetchRangeEnd), - new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException); + new EmitKey(emitterName, emitKey), metadata, handlerConfig, onParseException, + embeddedDocumentBytesConfig); + } + + private static EmbeddedDocumentBytesConfig getEmbeddedDocumentBytesConfig(JsonParser jParser) throws IOException { + JsonToken token = jParser.nextToken(); + if (token != JsonToken.START_OBJECT) { + throw new IOException("required start object, but see: " + token.name()); + } + String fieldName = jParser.nextFieldName(); + EmbeddedDocumentBytesConfig config = new EmbeddedDocumentBytesConfig(true); + while (fieldName != null) { + switch (fieldName) { + //TODO: fill in more here! + case "extractEmbeddedDocumentBytes": + boolean extract = jParser.nextBooleanValue(); + if (! extract) { + return new EmbeddedDocumentBytesConfig(false); + } + break; + case "includeOriginal": + config.setIncludeOriginal(jParser.nextBooleanValue()); + break; + case "emitter": + config.setEmitter(jParser.nextTextValue()); + break; + default: + throw new IllegalArgumentException("I regret I don't understand '" + fieldName + + "' in the context of an embeddedDocumentBytesConfig"); + } + fieldName = jParser.nextFieldName(); + } + return EmbeddedDocumentBytesConfig.SKIP; } private static HandlerConfig getHandlerConfig(JsonParser jParser) throws IOException {