This is an automated email from the ASF dual-hosted git repository. tallison pushed a commit to branch TIKA-4235 in repository https://gitbox.apache.org/repos/asf/tika.git
commit 1071c2e14cab8418c44fa9b279eb5f9150ab377a Author: tallison <talli...@apache.org> AuthorDate: Thu Apr 4 14:02:53 2024 -0400 TIKA_4235 -- add pipeline parameter --- .../pipes/emitter/opensearch/OpenSearchClient.java | 18 +++++++++++++----- .../pipes/emitter/opensearch/OpenSearchEmitter.java | 6 ++++-- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java index c757115a1..8be41653e 100644 --- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java +++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java @@ -21,8 +21,10 @@ import java.io.IOException; import java.io.InputStreamReader; import java.io.Reader; import java.io.StringWriter; +import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.util.List; +import java.util.Optional; import java.util.UUID; import com.fasterxml.jackson.core.JsonFactory; @@ -68,16 +70,21 @@ public class OpenSearchClient { } - public void emitDocuments(List<? extends EmitData> emitData) throws IOException, TikaClientException { + public void emitDocuments(List<? extends EmitData> emitData, Optional<String> pipeline) throws IOException, + TikaClientException { StringBuilder json = new StringBuilder(); for (EmitData d : emitData) { appendDoc(d.getEmitKey().getEmitKey(), d.getMetadataList(), json); } - emitJson(json); + emitJson(json, pipeline); } - private void emitJson(StringBuilder json) throws IOException, TikaClientException { + private void emitJson(StringBuilder json, Optional<String> pipeline) throws IOException, + TikaClientException { String requestUrl = openSearchUrl + "/_bulk"; + if (pipeline.isPresent()) { + requestUrl += "?pipeline=" + URLEncoder.encode(pipeline.get()); + } JsonResponse response = postJson(requestUrl, json.toString()); if (response.getStatus() != 200) { throw new TikaClientException(response.getMsg()); @@ -92,12 +99,13 @@ public class OpenSearchClient { } - public void emitDocument(String emitKey, List<Metadata> metadataList) throws IOException, + public void emitDocument(String emitKey, List<Metadata> metadataList, + Optional<String> pipeline) throws IOException, TikaClientException { StringBuilder json = new StringBuilder(); appendDoc(emitKey, metadataList, json); - emitJson(json); + emitJson(json, pipeline); } private void appendDoc(String emitKey, List<Metadata> metadataList, StringBuilder json) diff --git a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java index bc010cf46..b547882cf 100644 --- a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java +++ b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +65,7 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable private OpenSearchClient openSearchClient; private final HttpClientFactory httpClientFactory; private String embeddedFileFieldName = DEFAULT_EMBEDDED_FILE_FIELD_NAME; + private String pipeline = null; public OpenSearchEmitter() throws TikaConfigException { httpClientFactory = new HttpClientFactory(); @@ -77,7 +79,7 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable } try { LOG.debug("about to emit {} docs", emitData.size()); - openSearchClient.emitDocuments(emitData); + openSearchClient.emitDocuments(emitData, Optional.ofNullable(pipeline)); LOG.info("successfully emitted {} docs", emitData.size()); } catch (TikaClientException e) { LOG.warn("problem emitting docs", e); @@ -94,7 +96,7 @@ public class OpenSearchEmitter extends AbstractEmitter implements Initializable } try { LOG.debug("about to emit one doc"); - openSearchClient.emitDocument(emitKey, metadataList); + openSearchClient.emitDocument(emitKey, metadataList, Optional.ofNullable(pipeline)); LOG.info("successfully emitted one doc"); } catch (TikaClientException e) { LOG.warn("problem emitting doc", e);