This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch fixing_ingestion_job_with_output_uri in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit adeec12892809e4db59b2457d563b282abaefd0a Author: Xiang Fu <fx19880...@gmail.com> AuthorDate: Tue Aug 18 15:28:20 2020 -0700 Fixing segment push uri ingestion jobs --- .../ingestion/batch/common/SegmentPushUtils.java | 22 ++++++ .../batch/common/TestSegmentPushUtils.java | 79 ++++++++++++++++++++++ .../hadoop/HadoopSegmentUriPushJobRunner.java | 6 +- .../batch/spark/SparkSegmentUriPushJobRunner.java | 6 +- .../batch/standalone/SegmentUriPushJobRunner.java | 6 +- .../apache/pinot/plugin/filesystem/S3PinotFS.java | 2 + 6 files changed, 115 insertions(+), 6 deletions(-) diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java index 7cb4d55..462ea87 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentPushUtils.java @@ -26,6 +26,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Arrays; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.SimpleHttpResponse; @@ -46,6 +47,27 @@ public class SegmentPushUtils implements Serializable { private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); + public static URI generateSegmentTarURI(URI outputDirURI, URI fileURI, String prefix, String suffix) { + if (StringUtils.isEmpty(prefix) && StringUtils.isEmpty(suffix)) { + // In case the FS doesn't provide scheme or host, will fill it up from outputDirURI. + String scheme = fileURI.getScheme(); + if (StringUtils.isEmpty(fileURI.getScheme())) { + scheme = outputDirURI.getScheme(); + } + String host = fileURI.getHost(); + if (StringUtils.isEmpty(fileURI.getHost())) { + host = outputDirURI.getHost(); + } + try { + return new URI(scheme, fileURI.getUserInfo(), host, fileURI.getPort(), fileURI.getPath(), fileURI.getQuery(), + fileURI.getFragment()); + } catch (URISyntaxException e) { + return fileURI; + } + } + return URI.create(String.format("%s%s%s", prefix, fileURI.getRawPath(), suffix)); + } + public static void pushSegments(SegmentGenerationJobSpec spec, PinotFS fileSystem, List<String> tarFilePaths) throws RetriableOperationException, AttemptsExceededException { String tableName = spec.getTableSpec().getTableName(); diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/TestSegmentPushUtils.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/TestSegmentPushUtils.java new file mode 100644 index 0000000..f2795d1 --- /dev/null +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/TestSegmentPushUtils.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.plugin.ingestion.batch.common; + +import java.net.URI; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestSegmentPushUtils { + + private static final String RAW_DIRECTORY_PATH = "/rawdata/"; + private static final String RAW_SEGMENT_PATH = "/rawdata/segments/segment.tar.gz"; + + @Test + public void testGenerateSegmentTarURIForS3() { + String s3Base = "s3://org.pinot.data"; + URI dirURI = URI.create(s3Base + RAW_DIRECTORY_PATH); + URI fileURI = URI.create(s3Base + RAW_SEGMENT_PATH); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, "", ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, s3Base, ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, URI.create(RAW_SEGMENT_PATH), s3Base, ""), + fileURI); + } + + @Test + public void testGenerateSegmentTarURIForGCS() { + String gcsBase = "gs://org.pinot.data"; + URI dirURI = URI.create(gcsBase + RAW_DIRECTORY_PATH); + URI fileURI = URI.create(gcsBase + RAW_SEGMENT_PATH); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, "", ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, gcsBase, ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, URI.create(RAW_SEGMENT_PATH), gcsBase, ""), + fileURI); + } + + @Test + public void testGenerateSegmentTarURIForHdfs() { + String hdfsBase = "hdfs://namespace1"; + URI dirURI = URI.create(hdfsBase + RAW_DIRECTORY_PATH); + URI fileURI = URI.create(hdfsBase + RAW_SEGMENT_PATH); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, "", ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, hdfsBase, ""), fileURI); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, URI.create(RAW_SEGMENT_PATH), hdfsBase, ""), + fileURI); + } + + @Test + public void testGenerateSegmentTarURIForWebhdfs() { + String hdfsBase = "hdfs://namespace1"; + URI dirURI = URI.create(hdfsBase + RAW_DIRECTORY_PATH); + URI fileURI = URI.create(hdfsBase + RAW_SEGMENT_PATH); + String webHdfsPrefix = "http://foo:1234/webhdfs/v1"; + String webHdfsSuffix = "?op=OPEN"; + URI webHdfsPath = URI.create("http://foo:1234/webhdfs/v1/rawdata/segments/segment.tar.gz?op=OPEN"); + Assert.assertEquals(SegmentPushUtils.generateSegmentTarURI(dirURI, fileURI, webHdfsPrefix, webHdfsSuffix), + webHdfsPath); + Assert.assertEquals( + SegmentPushUtils.generateSegmentTarURI(dirURI, URI.create(RAW_SEGMENT_PATH), webHdfsPrefix, webHdfsSuffix), + webHdfsPath); + } +} diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java index e1a2ceb..f451c2c 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentUriPushJobRunner.java @@ -87,8 +87,10 @@ public class HadoopSegmentUriPushJobRunner implements IngestionJobRunner, Serial for (String file : files) { URI uri = URI.create(file); if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { - segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + uri.getRawPath() + _spec.getPushJobSpec() - .getSegmentUriSuffix()); + URI updatedURI = SegmentPushUtils + .generateSegmentTarURI(outputDirURI, uri, _spec.getPushJobSpec().getSegmentUriPrefix(), + _spec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); } } // Push from driver diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java index d8604f5..824e05a 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark/SparkSegmentUriPushJobRunner.java @@ -92,8 +92,10 @@ public class SparkSegmentUriPushJobRunner implements IngestionJobRunner, Seriali for (String file : files) { URI uri = URI.create(file); if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { - segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + uri.getRawPath() + _spec.getPushJobSpec() - .getSegmentUriSuffix()); + URI updatedURI = SegmentPushUtils + .generateSegmentTarURI(outputDirURI, uri, _spec.getPushJobSpec().getSegmentUriPrefix(), + _spec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); } } diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java index afe5430..3172dac 100644 --- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java +++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentUriPushJobRunner.java @@ -87,8 +87,10 @@ public class SegmentUriPushJobRunner implements IngestionJobRunner { for (String file : files) { URI uri = URI.create(file); if (uri.getPath().endsWith(Constants.TAR_GZ_FILE_EXT)) { - segmentUris.add(_spec.getPushJobSpec().getSegmentUriPrefix() + uri.getRawPath() + _spec.getPushJobSpec() - .getSegmentUriSuffix()); + URI updatedURI = SegmentPushUtils + .generateSegmentTarURI(outputDirURI, uri, _spec.getPushJobSpec().getSegmentUriPrefix(), + _spec.getPushJobSpec().getSegmentUriSuffix()); + segmentUris.add(updatedURI.toString()); } } try { diff --git a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java index 33892be..d70eadc 100644 --- a/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java +++ b/pinot-plugins/pinot-file-system/pinot-s3/src/main/java/org/apache/pinot/plugin/filesystem/S3PinotFS.java @@ -31,6 +31,7 @@ import java.nio.file.Paths; import java.util.HashMap; import java.util.Map; +import org.apache.commons.io.FileUtils; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFS; import org.slf4j.Logger; @@ -411,6 +412,7 @@ public class S3PinotFS extends PinotFS { throws Exception { LOGGER.info("Copy {} to local {}", srcUri, dstFile.getAbsolutePath()); URI base = getBase(srcUri); + FileUtils.forceMkdir(dstFile.getParentFile()); String prefix = sanitizePath(base.relativize(srcUri).getPath()); GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(srcUri.getHost()).key(prefix).build(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org