This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch fixing_ingestion_job_with_output_uri
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit adeec12892809e4db59b2457d563b282abaefd0a
Author: Xiang Fu <fx19880...@gmail.com>
AuthorDate: Tue Aug 18 15:28:20 2020 -0700

    Fixing segment push uri ingestion jobs
---
 .../ingestion/batch/common/SegmentPushUtils.java   | 22 ++++++
 .../batch/common/TestSegmentPushUtils.java         | 79 ++++++++++++++++++++++
 .../hadoop/HadoopSegmentUriPushJobRunner.java      |  6 +-
 .../batch/spark/SparkSegmentUriPushJobRunner.java  |  6 +-
 .../batch/standalone/SegmentUriPushJobRunner.java  |  6 +-
 .../apache/pinot/plugin/filesystem/S3PinotFS.java  |  2 +
 6 files changed, 115 insertions(+), 6 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
index 7cb4d55..462ea87 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java
@@ -26,6 +26,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.exception.HttpErrorStatusException;
 import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
@@ -46,6 +47,27 @@ public class SegmentPushUtils implements Serializable {
 
   private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = 
new FileUploadDownloadClient();
 
+  public static URI generateSegmentTarURI(URI outputDirURI, URI fileURI, 
String prefix, String suffix) {
+    if (StringUtils.isEmpty(prefix) && StringUtils.isEmpty(suffix)) {
+      // In case the FS doesn't provide scheme or host, will fill it up from 
outputDirURI.
+      String scheme = fileURI.getScheme();
+      if (StringUtils.isEmpty(fileURI.getScheme())) {
+        scheme = outputDirURI.getScheme();
+      }
+      String host = fileURI.getHost();
+      if (StringUtils.isEmpty(fileURI.getHost())) {
+        host = outputDirURI.getHost();
+      }
+      try {
+        return new URI(scheme, fileURI.getUserInfo(), host, fileURI.getPort(), 
fileURI.getPath(), fileURI.getQuery(),
+            fileURI.getFragment());
+      } catch (URISyntaxException e) {
+        return fileURI;
+      }
+    }
+    return URI.create(String.format("%s%s%s", prefix, fileURI.getRawPath(), 
suffix));
+  }
+
   public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS 
fileSystem, List<String> tarFilePaths)
       throws RetriableOperationException, AttemptsExceededException {
     String tableName = spec.getTableSpec().getTableName();
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/TestSegmentPushUtils.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/TestSegmentPushUtils.java
new file mode 100644
index 0000000..f2795d1
--- /dev/null
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/TestSegmentPushUtils.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.plugin.ingestion.batch.common;
+
+import java.net.URI;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestSegmentPushUtils {
+
+  private static final String RAW_DIRECTORY_PATH = "/rawdata/";
+  private static final String RAW_SEGMENT_PATH = 
"/rawdata/segments/segment.tar.gz";
+
+  @Test
+  public void testGenerateSegmentTarURIForS3() {
+    String s3Base = "s3://org.pinot.data";
+    URI dirURI = URI.create(s3Base + RAW_DIRECTORY_PATH);
+    URI fileURI = URI.create(s3Base + RAW_SEGMENT_PATH);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
fileURI, "", ""), fileURI);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
fileURI, s3Base, ""), fileURI);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
URI.create(RAW_SEGMENT_PATH), s3Base, ""),
+        fileURI);
+  }
+
+  @Test
+  public void testGenerateSegmentTarURIForGCS() {
+    String gcsBase = "gs://org.pinot.data";
+    URI dirURI = URI.create(gcsBase + RAW_DIRECTORY_PATH);
+    URI fileURI = URI.create(gcsBase + RAW_SEGMENT_PATH);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
fileURI, "", ""), fileURI);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
fileURI, gcsBase, ""), fileURI);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
URI.create(RAW_SEGMENT_PATH), gcsBase, ""),
+        fileURI);
+  }
+
+  @Test
+  public void testGenerateSegmentTarURIForHdfs() {
+    String hdfsBase = "hdfs://namespace1";
+    URI dirURI = URI.create(hdfsBase + RAW_DIRECTORY_PATH);
+    URI fileURI = URI.create(hdfsBase + RAW_SEGMENT_PATH);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
fileURI, "", ""), fileURI);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
fileURI, hdfsBase, ""), fileURI);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
URI.create(RAW_SEGMENT_PATH), hdfsBase, ""),
+        fileURI);
+  }
+
+  @Test
+  public void testGenerateSegmentTarURIForWebhdfs() {
+    String hdfsBase = "hdfs://namespace1";
+    URI dirURI = URI.create(hdfsBase + RAW_DIRECTORY_PATH);
+    URI fileURI = URI.create(hdfsBase + RAW_SEGMENT_PATH);
+    String webHdfsPrefix = "http://foo:1234/webhdfs/v1";;
+    String webHdfsSuffix = "?op=OPEN";
+    URI webHdfsPath = 
URI.create("http://foo:1234/webhdfs/v1/rawdata/segments/segment.tar.gz?op=OPEN";);
+    Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, 
fileURI, webHdfsPrefix, webHdfsSuffix),
+        webHdfsPath);
+    Assert.assertEquals(
+        SegmentPushUtils.generateSegmentTarURI(dirURI, 
URI.create(RAW_SEGMENT_PATH), webHdfsPrefix, webHdfsSuffix),
+        webHdfsPath);
+  }
+}
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
index e1a2ceb..f451c2c 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java
@@ -87,8 +87,10 @@ public class HadoopSegmentUriPushJobRunner implements 
IngestionJobRunner, Serial
     for (String file : files) {
       URI uri = URI.create(file);
       if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + 
uri.getRawPath() + _spec.getPushJobSpec()
-            .getSegmentUriSuffix());
+        URI updatedURI = SegmentPushUtils
+            .generateSegmentTarURI(outputDirURI, uri, 
_spec.getPushJobSpec().getSegmentUriPrefix(),
+                _spec.getPushJobSpec().getSegmentUriSuffix());
+        segmentUris.add(updatedURI.toString());
       }
     }
     // Push from driver
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
index d8604f5..824e05a 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java
@@ -92,8 +92,10 @@ public class SparkSegmentUriPushJobRunner implements 
IngestionJobRunner, Seriali
     for (String file : files) {
       URI uri = URI.create(file);
       if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + 
uri.getRawPath() + _spec.getPushJobSpec()
-            .getSegmentUriSuffix());
+        URI updatedURI = SegmentPushUtils
+            .generateSegmentTarURI(outputDirURI, uri, 
_spec.getPushJobSpec().getSegmentUriPrefix(),
+                _spec.getPushJobSpec().getSegmentUriSuffix());
+        segmentUris.add(updatedURI.toString());
       }
     }
 
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
index afe5430..3172dac 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java
@@ -87,8 +87,10 @@ public class SegmentUriPushJobRunner implements 
IngestionJobRunner {
     for (String file : files) {
       URI uri = URI.create(file);
       if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) {
-        segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + 
uri.getRawPath() + _spec.getPushJobSpec()
-            .getSegmentUriSuffix());
+        URI updatedURI = SegmentPushUtils
+            .generateSegmentTarURI(outputDirURI, uri, 
_spec.getPushJobSpec().getSegmentUriPrefix(),
+                _spec.getPushJobSpec().getSegmentUriSuffix());
+        segmentUris.add(updatedURI.toString());
       }
     }
     try {
diff --git 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
index 33892be..d70eadc 100644
--- 
a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
+++ 
b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java
@@ -31,6 +31,7 @@ import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.slf4j.Logger;
@@ -411,6 +412,7 @@ public class S3PinotFS extends PinotFS {
       throws Exception {
     LOGGER.info("Copy {} to local {}", srcUri, dstFile.getAbsolutePath());
     URI base = getBase(srcUri);
+    FileUtils.forceMkdir(dstFile.getParentFile());
     String prefix = sanitizePath(base.relativize(srcUri).getPath());
     GetObjectRequest getObjectRequest = 
GetObjectRequest.builder().bucket(srcUri.getHost()).key(prefix).build();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to