This is an automated email from the ASF dual-hosted git repository.

tallison pushed a commit to branch TIKA-4235
in repository https://gitbox.apache.org/repos/asf/tika.git

commit 1071c2e14cab8418c44fa9b279eb5f9150ab377a
Author: tallison <talli...@apache.org>
AuthorDate: Thu Apr 4 14:02:53 2024 -0400

    TIKA_4235 -- add pipeline parameter
---
 .../pipes/emitter/opensearch/OpenSearchClient.java     | 18 +++++++++++++-----
 .../pipes/emitter/opensearch/OpenSearchEmitter.java    |  6 ++++--
 2 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
 
b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
index c757115a1..8be41653e 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchClient.java
@@ -21,8 +21,10 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.io.StringWriter;
+import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
 
 import com.fasterxml.jackson.core.JsonFactory;
@@ -68,16 +70,21 @@ public class OpenSearchClient {
     }
 
 
-    public void emitDocuments(List<? extends EmitData> emitData) throws 
IOException, TikaClientException {
+    public void emitDocuments(List<? extends EmitData> emitData, 
Optional<String> pipeline) throws IOException,
+            TikaClientException {
         StringBuilder json = new StringBuilder();
         for (EmitData d : emitData) {
             appendDoc(d.getEmitKey().getEmitKey(), d.getMetadataList(), json);
         }
-        emitJson(json);
+        emitJson(json, pipeline);
     }
 
-    private void emitJson(StringBuilder json) throws IOException, 
TikaClientException {
+    private void emitJson(StringBuilder json, Optional<String> pipeline) 
throws IOException,
+            TikaClientException {
         String requestUrl = openSearchUrl + "/_bulk";
+        if (pipeline.isPresent()) {
+            requestUrl += "?pipeline=" + URLEncoder.encode(pipeline.get());
+        }
         JsonResponse response = postJson(requestUrl, json.toString());
         if (response.getStatus() != 200) {
             throw new TikaClientException(response.getMsg());
@@ -92,12 +99,13 @@ public class OpenSearchClient {
     }
 
 
-    public void emitDocument(String emitKey, List<Metadata> metadataList) 
throws IOException,
+    public void emitDocument(String emitKey, List<Metadata> metadataList,
+                             Optional<String> pipeline) throws IOException,
             TikaClientException {
 
         StringBuilder json = new StringBuilder();
         appendDoc(emitKey, metadataList, json);
-        emitJson(json);
+        emitJson(json, pipeline);
     }
 
     private void appendDoc(String emitKey, List<Metadata> metadataList, 
StringBuilder json)
diff --git 
a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
 
b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
index bc010cf46..b547882cf 100644
--- 
a/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
+++ 
b/tika-pipes/tika-emitters/tika-emitter-opensearch/src/main/java/org/apache/tika/pipes/emitter/opensearch/OpenSearchEmitter.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +65,7 @@ public class OpenSearchEmitter extends AbstractEmitter 
implements Initializable
     private OpenSearchClient openSearchClient;
     private final HttpClientFactory httpClientFactory;
     private String embeddedFileFieldName = DEFAULT_EMBEDDED_FILE_FIELD_NAME;
+    private String pipeline = null;
 
     public OpenSearchEmitter() throws TikaConfigException {
         httpClientFactory = new HttpClientFactory();
@@ -77,7 +79,7 @@ public class OpenSearchEmitter extends AbstractEmitter 
implements Initializable
         }
         try {
             LOG.debug("about to emit {} docs", emitData.size());
-            openSearchClient.emitDocuments(emitData);
+            openSearchClient.emitDocuments(emitData, 
Optional.ofNullable(pipeline));
             LOG.info("successfully emitted {} docs", emitData.size());
         } catch (TikaClientException e) {
             LOG.warn("problem emitting docs", e);
@@ -94,7 +96,7 @@ public class OpenSearchEmitter extends AbstractEmitter 
implements Initializable
         }
         try {
             LOG.debug("about to emit one doc");
-            openSearchClient.emitDocument(emitKey, metadataList);
+            openSearchClient.emitDocument(emitKey, metadataList, 
Optional.ofNullable(pipeline));
             LOG.info("successfully emitted one doc");
         } catch (TikaClientException e) {
             LOG.warn("problem emitting doc", e);

Reply via email to