This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9c1503b [tiered storage] Use NAR plugin to package offloaders (#2393) 9c1503b is described below commit 9c1503b67eb122d2801fd9a9b5a94e5f7780efc0 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Wed Aug 29 00:52:29 2018 -0700 [tiered storage] Use NAR plugin to package offloaders (#2393) ### Motivation Offloader typically involves a new storage system, which usually involves dependencies that might be conflicting with the dependencies of Pulsar. We want to package offloader implementations as what we did for connectors, so people can decide which offloader to use and only include it. People would also become easier to write its offloader if needed. ### Changes - Update `tiered-storage/jcloud` to package it using nifi-nar plugin. - Add bunch of utils in managed-ledger to locate offloader nar packages and load `LedgerOffloaderFactory` from corresponding NAR package. - Add a new distribution module to package offloaders into one distribution, as what we did for connectors. - Update pulsar-all image to include offloaders ### NOTES This change doesn't get rid of jcloud-shaded. because jcloud is using `ServiceLoad` and Guice injection. It makes things very tricky on class loading. Not attempt to address the problem any time soon. We can pin `jcloud-shaded` version as what we did for `protobuf-shaded` in next release. --- conf/broker.conf | 3 + distribution/{ => offloaders}/pom.xml | 42 +++++- distribution/offloaders/src/assemble/README | 10 ++ .../offloaders/src/assemble/offloaders.xml | 54 ++++++++ distribution/pom.xml | 1 + distribution/server/src/assemble/LICENSE.bin.txt | 14 -- docker/pulsar-all/Dockerfile | 4 + docker/pulsar-all/pom.xml | 21 +++ .../mledger/offload/OffloaderDefinition.java | 46 +++++++ .../bookkeeper/mledger/offload/OffloaderUtils.java | 149 +++++++++++++++++++++ .../bookkeeper/mledger/offload/Offloaders.java | 57 ++++++++ .../apache/pulsar/broker/ServiceConfiguration.java | 11 ++ pulsar-broker/pom.xml | 6 - .../org/apache/pulsar/broker/PulsarService.java | 20 ++- tiered-storage/jcloud/pom.xml | 10 +- .../impl/BlobStoreManagedLedgerOffloader.java | 6 + .../META-INF/services/pulsar-offloader.yaml | 9 +- 17 files changed, 422 insertions(+), 41 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 11670d3..8927a85 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -513,6 +513,9 @@ schemaRegistryStorageClassName=org.apache.pulsar.broker.service.schema.Bookkeepe ### --- Ledger Offloading --- ### +# The directory for all the offloader implementations +offloadersDirectory=./offloaders + # Driver to use to offload old data to long term storage (Possible values: S3, aws-s3, google-cloud-storage) # When using google-cloud-storage, Make sure both Google Cloud Storage and Google Cloud Storage JSON API are enabled for # the project (check from Developers Console -> Api&auth -> APIs). diff --git a/distribution/pom.xml b/distribution/offloaders/pom.xml similarity index 53% copy from distribution/pom.xml copy to distribution/offloaders/pom.xml index 36b4917..84349da 100644 --- a/distribution/pom.xml +++ b/distribution/offloaders/pom.xml @@ -24,17 +24,45 @@ <parent> <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar</artifactId> + <artifactId>distribution</artifactId> <version>2.2.0-incubating-SNAPSHOT</version> <relativePath>..</relativePath> </parent> - <artifactId>distribution</artifactId> + <artifactId>pulsar-offloader-distribution</artifactId> <packaging>pom</packaging> - <name>Pulsar :: Distribution</name> + <name>Pulsar :: Distribution :: Offloader</name> - <modules> - <module>server</module> - <module>io</module> - </modules> + <dependencies> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>managed-ledger-original</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>distro-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <attach>true</attach> + <tarLongFileMode>posix</tarLongFileMode> + <finalName>apache-pulsar-offloaders-${project.version}</finalName> + <descriptors> + <descriptor>src/assemble/offloaders.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> diff --git a/distribution/offloaders/src/assemble/README b/distribution/offloaders/src/assemble/README new file mode 100644 index 0000000..6777ec2 --- /dev/null +++ b/distribution/offloaders/src/assemble/README @@ -0,0 +1,10 @@ + +Please refer to http://pulsar.incubator.apache.org/ for access to documentation. + +This package contains Pulsar offloader archives. Each archive +contains: + + * the offloader code plus all the dependencies + + * META-INF/DEPEDENCIES file with licensing information for all transitive + dependencies diff --git a/distribution/offloaders/src/assemble/offloaders.xml b/distribution/offloaders/src/assemble/offloaders.xml new file mode 100644 index 0000000..e9babe4 --- /dev/null +++ b/distribution/offloaders/src/assemble/offloaders.xml @@ -0,0 +1,54 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly + xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>bin</id> + <formats> + <format>tar.gz</format> + </formats> + <includeBaseDirectory>true</includeBaseDirectory> + <files> + <file> + <source>${basedir}/../../DISCLAIMER</source> + <outputDirectory>.</outputDirectory> + <fileMode>644</fileMode> + </file> + <file> + <source>${basedir}/../../LICENSE</source> + <outputDirectory>.</outputDirectory> + <fileMode>644</fileMode> + </file> + <file> + <source>${basedir}/src/assemble/README</source> + <destName>README</destName> + <outputDirectory>.</outputDirectory> + <fileMode>644</fileMode> + </file> + + <file> + <source>${basedir}/../../tiered-storage/jcloud/target/tiered-storage-jcloud-${project.version}.nar</source> + <outputDirectory>offloaders</outputDirectory> + <fileMode>644</fileMode> + </file> + </files> +</assembly> diff --git a/distribution/pom.xml b/distribution/pom.xml index 36b4917..80134e2 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -36,5 +36,6 @@ <modules> <module>server</module> <module>io</module> + <module>offloaders</module> </modules> </project> diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt index 8da89a8..e0b1eb7 100644 --- a/distribution/server/src/assemble/LICENSE.bin.txt +++ b/distribution/server/src/assemble/LICENSE.bin.txt @@ -317,7 +317,6 @@ The Apache Software License, Version 2.0 - com.fasterxml.jackson.core-jackson-annotations-2.8.4.jar - com.fasterxml.jackson.core-jackson-core-2.8.4.jar - com.fasterxml.jackson.core-jackson-databind-2.8.4.jar - - com.fasterxml.jackson.dataformat-jackson-dataformat-cbor-2.6.7.jar - com.fasterxml.jackson.dataformat-jackson-dataformat-yaml-2.8.4.jar - com.fasterxml.jackson.jaxrs-jackson-jaxrs-base-2.8.4.jar - com.fasterxml.jackson.jaxrs-jackson-jaxrs-json-provider-2.8.4.jar @@ -359,7 +358,6 @@ The Apache Software License, Version 2.0 - io.prometheus-simpleclient_hotspot-0.0.23.jar - io.prometheus-simpleclient_servlet-0.0.23.jar * Bean Validation API -- javax.validation-validation-api-1.1.0.Final.jar - * Joda Time -- joda-time-joda-time-2.8.1.jar * Log4J - log4j-log4j-1.2.17.jar - org.apache.logging.log4j-log4j-api-2.10.0.jar @@ -419,9 +417,6 @@ The Apache Software License, Version 2.0 * OkHttp - com.squareup.okhttp-okhttp-2.5.0.jar * Okio - com.squareup.okio-okio-1.13.0.jar * Javassist -- org.javassist-javassist-3.21.0-GA.jar - * Amazon AWS SDK - - com.amazonaws-aws-java-sdk-core-1.11.297.jar - - software.amazon.ion-ion-java-1.0.2.jar * gRPC - io.grpc-grpc-all-1.12.0.jar - io.grpc-grpc-auth-1.12.0.jar @@ -455,10 +450,6 @@ The Apache Software License, Version 2.0 - org.xerial.snappy-snappy-java-1.1.1.3.jar * Objenesis - org.objenesis-objenesis-2.1.jar - * Java Dependency Injection - - javax.inject-javax.inject-1.jar - * java-xmlbuilder - - com.jamesmurty.utils-java-xmlbuilder-1.1.jar BSD 3-clause "New" or "Revised" License @@ -492,7 +483,6 @@ Protocol Buffers License CDDL-1.1 -- licenses/LICENSE-CDDL-1.1.txt * Java Annotations API - javax.annotation-javax.annotation-api-1.2.jar - - javax.annotation-jsr250-api-1.0.jar * Java Servlet API -- javax.servlet-javax.servlet-api-3.1.0.jar * WebSocket Server API -- javax.websocket-javax.websocket-api-1.0.jar * Java Web Service REST API -- javax.ws.rs-javax.ws.rs-api-2.1.jar @@ -524,10 +514,6 @@ Eclipse Public License 1.0 -- licenses/LICENSE-AspectJ.txt Public Domain * XZ for Java -- licenses/LICENSE-xz.txt - org.tukaani-xz-1.5.jar - * iHarder.net Base64 - - net.iharder-base64-2.3.8.jar - * AOP Alliance - - aopalliance-aopalliance-1.0.jar Public Domain (CC0) -- licenses/LICENSE-CC0.txt * Reactive Streams -- org.reactivestreams-reactive-streams-1.0.0.jar diff --git a/docker/pulsar-all/Dockerfile b/docker/pulsar-all/Dockerfile index 46f7b9f..717a007 100644 --- a/docker/pulsar-all/Dockerfile +++ b/docker/pulsar-all/Dockerfile @@ -20,6 +20,10 @@ FROM apachepulsar/pulsar:latest ARG PULSAR_IO_TARBALL +ARG PULSAR_OFFLOADER_TARBALL ADD ${PULSAR_IO_TARBALL} / RUN mv /apache-pulsar-io-connectors-*/connectors /pulsar/connectors + +ADD ${PULSAR_OFFLOADER_TARBALL} / +RUN mv /apache-pulsar-offloaders-*/offloaders /pulsar/offloaders diff --git a/docker/pulsar-all/pom.xml b/docker/pulsar-all/pom.xml index 5149999..519a498 100644 --- a/docker/pulsar-all/pom.xml +++ b/docker/pulsar-all/pom.xml @@ -40,6 +40,14 @@ <type>tar.gz</type> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-offloader-distribution</artifactId> + <version>${project.parent.version}</version> + <classifier>bin</classifier> + <type>tar.gz</type> + <scope>provided</scope> + </dependency> </dependencies> <profiles> @@ -72,6 +80,18 @@ <excludeTransitive>true</excludeTransitive> </configuration> </execution> + <execution> + <id>copy-offloader-tarball</id> + <goals> + <goal>copy-dependencies</goal> + </goals> + <phase>generate-resources</phase> + <configuration> + <outputDirectory>${project.build.directory}/</outputDirectory> + <includeArtifactIds>pulsar-offloader-distribution</includeArtifactIds> + <excludeTransitive>true</excludeTransitive> + </configuration> + </execution> </executions> </plugin> <plugin> @@ -103,6 +123,7 @@ <tag>${project.version}</tag> <buildArgs> <PULSAR_IO_TARBALL>target/pulsar-io-distribution-${project.version}-bin.tar.gz</PULSAR_IO_TARBALL> + <PULSAR_OFFLOADER_TARBALL>target/pulsar-offloader-distribution-${project.version}-bin.tar.gz</PULSAR_OFFLOADER_TARBALL> </buildArgs> </configuration> </plugin> diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderDefinition.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderDefinition.java new file mode 100644 index 0000000..414b7c8 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderDefinition.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload; + +import lombok.Data; +import lombok.NoArgsConstructor; + +/** + * Definition of an offloader NAR package. + */ +@Data +@NoArgsConstructor +public class OffloaderDefinition { + + /** + * The name of the offloader type. + */ + private String name; + + /** + * Description to be used for user help. + */ + private String description; + + /** + * The class name for the offloader factory implementation. + */ + private String offloaderFactoryClass; + +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java new file mode 100644 index 0000000..a1d3349 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/OffloaderUtils.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload; + +import java.io.File; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.util.ObjectMapperFactory; + +/** + * Utils to load offloaders + */ +@Slf4j +public class OffloaderUtils { + + private static final String PULSAR_OFFLOADER_SERVICE_NAME = "pulsar-offloader.yaml"; + + /** + * Extract the Pulsar offloader class from a offloader archive. + * + * @param narPath nar package path + * @return the offloader class name + * @throws IOException when fail to retrieve the pulsar offloader class + */ + static Pair<NarClassLoader, LedgerOffloaderFactory> getOffloaderFactory(String narPath) throws IOException { + NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet()); + String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME); + + OffloaderDefinition conf = ObjectMapperFactory.getThreadLocalYaml() + .readValue(configStr, OffloaderDefinition.class); + if (StringUtils.isEmpty(conf.getOffloaderFactoryClass())) { + throw new IOException( + String.format("The '%s' offloader does not provide an offloader factory implementation", + conf.getName())); + } + + try { + // Try to load offloader factory class and check it implements Offloader interface + Class factoryClass = ncl.loadClass(conf.getOffloaderFactoryClass()); + CompletableFuture<LedgerOffloaderFactory> loadFuture = new CompletableFuture<>(); + Thread loadingThread = new Thread(() -> { + Thread.currentThread().setContextClassLoader(ncl); + + log.info("Loading offloader factory {} using class loader {}", factoryClass, ncl); + try { + Object offloader = factoryClass.newInstance(); + if (!(offloader instanceof LedgerOffloaderFactory)) { + throw new IOException("Class " + conf.getOffloaderFactoryClass() + " does not implement interface " + + LedgerOffloaderFactory.class.getName()); + } + loadFuture.complete((LedgerOffloaderFactory) offloader); + } catch (Throwable t) { + loadFuture.completeExceptionally(t); + } + }, "load-factory-" + factoryClass); + try { + loadingThread.start(); + return Pair.of(ncl, loadFuture.get()); + } finally { + loadingThread.join(); + } + } catch (Throwable t) { + rethrowIOException(t); + } + return null; + } + + private static void rethrowIOException(Throwable cause) + throws IOException { + if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error) cause; + } else { + throw new IOException(cause.getMessage(), cause); + } + } + + public static OffloaderDefinition getOffloaderDefinition(String narPath) throws IOException { + try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) { + String configStr = ncl.getServiceDefinition(PULSAR_OFFLOADER_SERVICE_NAME); + + return ObjectMapperFactory.getThreadLocalYaml().readValue(configStr, OffloaderDefinition.class); + } + } + + public static Offloaders searchForOffloaders(String connectorsDirectory) throws IOException { + Path path = Paths.get(connectorsDirectory).toAbsolutePath(); + log.info("Searching for connectors in {}", path); + + Offloaders offloaders = new Offloaders(); + + if (!path.toFile().exists()) { + log.warn("Offloaders archive directory not found"); + return offloaders; + } + + try (DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.nar")) { + stream.forEach(archive -> { + try { + OffloaderDefinition definition = getOffloaderDefinition(archive.toString()); + log.info("Found offloader {} from {}", definition, archive); + + if (!StringUtils.isEmpty(definition.getOffloaderFactoryClass())) { + // Validate offloader factory class to be present and of the right type + Pair<NarClassLoader, LedgerOffloaderFactory> offloaderFactoryPair = + getOffloaderFactory(archive.toString()); + if (null != offloaderFactoryPair) { + offloaders.getOffloaders().add(offloaderFactoryPair); + } + } + } catch (Throwable t) { + log.warn("Failed to load offloader from {}", archive, t); + } + }); + } + return offloaders; + } + + +} diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java new file mode 100644 index 0000000..dfe4c31 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/offload/Offloaders.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.offload; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.nar.NarClassLoader; + +@Slf4j +@Data +public class Offloaders implements AutoCloseable { + + private final List<Pair<NarClassLoader, LedgerOffloaderFactory>> offloaders = new ArrayList<>(); + + public LedgerOffloaderFactory getOffloaderFactory(String driverName) throws IOException { + for (Pair<NarClassLoader, LedgerOffloaderFactory> factory : offloaders) { + if (factory.getRight().isDriverSupported(driverName)) { + return factory.getRight(); + } + } + throw new IOException("No offloader found for driver '" + driverName + "'." + + " Please make sure you dropped the offloader nar packages under `${PULSAR_HOME}/offloaders`."); + } + + @Override + public void close() throws Exception { + offloaders.forEach(offloader -> { + try { + offloader.getLeft().close(); + } catch (IOException e) { + log.warn("Failed to close nar class loader for offloader '{}': {}", + offloader.getRight().getClass(), e.getMessage()); + } + }); + } +} diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index e94f6b9..65696f7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -499,6 +499,9 @@ public class ServiceConfiguration implements PulsarConfiguration { * NOTES: all implementation related settings should be put in implementation package. * only common settings like driver name, io threads can be added here. ****/ + // The directory to locate offloaders + private String offloadersDirectory = "./offloaders"; + // Driver to use to offload old data to long term storage private String managedLedgerOffloadDriver = null; @@ -1728,6 +1731,14 @@ public class ServiceConfiguration implements PulsarConfiguration { return this.managedLedgerOffloadMaxThreads; } + public String getOffloadersDirectory() { + return offloadersDirectory; + } + + public void setOffloadersDirectory(String dir) { + this.offloadersDirectory = dir; + } + public void setBrokerServiceCompactionMonitorIntervalInSeconds(int interval) { this.brokerServiceCompactionMonitorIntervalInSeconds = interval; } diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml index f3410b5..fe575a7 100644 --- a/pulsar-broker/pom.xml +++ b/pulsar-broker/pom.xml @@ -99,12 +99,6 @@ <dependency> <groupId>${project.groupId}</groupId> - <artifactId>tiered-storage-jcloud</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>${project.groupId}</groupId> <artifactId>pulsar-broker-common</artifactId> <version>${project.version}</version> </dependency> diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index d965c7e..ca5ac5b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker; +import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; @@ -50,8 +51,10 @@ import org.apache.bookkeeper.mledger.LedgerOffloader; import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; -import org.apache.bookkeeper.mledger.offload.jcloud.JCloudLedgerOffloaderFactory; +import org.apache.bookkeeper.mledger.offload.OffloaderUtils; +import org.apache.bookkeeper.mledger.offload.Offloaders; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.cache.ConfigurationCacheService; @@ -140,6 +143,7 @@ public class PulsarService implements AutoCloseable { private final ScheduledExecutorService loadManagerExecutor; private ScheduledExecutorService compactorExecutor; private OrderedScheduler offloaderScheduler; + private Offloaders offloaderManager = new Offloaders(); private LedgerOffloader offloader; private ScheduledFuture<?> loadReportTask = null; private ScheduledFuture<?> loadSheddingTask = null; @@ -286,6 +290,8 @@ public class PulsarService implements AutoCloseable { schemaRegistryService.close(); } + offloaderManager.close(); + state = State.Closed; } catch (Exception e) { @@ -664,11 +670,13 @@ public class PulsarService implements AutoCloseable { public synchronized LedgerOffloader createManagedLedgerOffloader(ServiceConfiguration conf) throws PulsarServerException { try { - // TODO: will make this configurable when switching to use NAR loader to load offloaders - LedgerOffloaderFactory offloaderFactory = JCloudLedgerOffloaderFactory.of(); - - if (conf.getManagedLedgerOffloadDriver() != null - && offloaderFactory.isDriverSupported(conf.getManagedLedgerOffloadDriver())) { + if (StringUtils.isNotBlank(conf.getManagedLedgerOffloadDriver())) { + checkNotNull(conf.getOffloadersDirectory(), + "Offloader driver is configured to be '%s' but no offloaders directory is configured.", + conf.getManagedLedgerOffloadDriver()); + this.offloaderManager = OffloaderUtils.searchForOffloaders(conf.getOffloadersDirectory()); + LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory( + conf.getManagedLedgerOffloadDriver()); try { return offloaderFactory.create( conf.getProperties(), diff --git a/tiered-storage/jcloud/pom.xml b/tiered-storage/jcloud/pom.xml index fcd3300..a7d516e 100644 --- a/tiered-storage/jcloud/pom.xml +++ b/tiered-storage/jcloud/pom.xml @@ -42,7 +42,6 @@ <groupId>org.apache.pulsar</groupId> <artifactId>jclouds-shaded</artifactId> <version>${project.version}</version> - <!-- <exclusions> <exclusion> <groupId>com.google.code.gson</groupId> @@ -69,7 +68,6 @@ <artifactId>*</artifactId> </exclusion> </exclusions> - --> </dependency> <dependency> <groupId>com.amazonaws</groupId> @@ -129,4 +127,12 @@ <scope>test</scope> </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-maven-plugin</artifactId> + </plugin> + </plugins> + </build> </project> diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java index f96afaf..b8c2727 100644 --- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java +++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloader.java @@ -49,6 +49,7 @@ import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder; import org.apache.commons.lang3.tuple.Pair; import org.jclouds.Constants; import org.jclouds.ContextBuilder; +import org.jclouds.aws.s3.AWSS3ProviderMetadata; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.domain.Blob; @@ -61,8 +62,10 @@ import org.jclouds.domain.Location; import org.jclouds.domain.LocationBuilder; import org.jclouds.domain.LocationScope; import org.jclouds.googlecloud.GoogleCredentialsFromJson; +import org.jclouds.googlecloudstorage.GoogleCloudStorageProviderMetadata; import org.jclouds.io.Payload; import org.jclouds.io.Payloads; +import org.jclouds.osgi.ProviderRegistry; import org.jclouds.s3.reference.S3Constants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,6 +120,9 @@ public class BlobStoreManagedLedgerOffloader implements LedgerOffloader { overrides.setProperty(Constants.PROPERTY_SO_TIMEOUT, "25000"); overrides.setProperty(Constants.PROPERTY_MAX_RETRIES, Integer.toString(100)); + ProviderRegistry.registerProvider(new AWSS3ProviderMetadata()); + ProviderRegistry.registerProvider(new GoogleCloudStorageProviderMetadata()); + ContextBuilder contextBuilder = ContextBuilder.newBuilder(driver); contextBuilder.credentials(credentials.identity, credentials.credential); diff --git a/docker/pulsar-all/Dockerfile b/tiered-storage/jcloud/src/main/resources/META-INF/services/pulsar-offloader.yaml similarity index 83% copy from docker/pulsar-all/Dockerfile copy to tiered-storage/jcloud/src/main/resources/META-INF/services/pulsar-offloader.yaml index 46f7b9f..8f97287 100644 --- a/docker/pulsar-all/Dockerfile +++ b/tiered-storage/jcloud/src/main/resources/META-INF/services/pulsar-offloader.yaml @@ -17,9 +17,6 @@ # under the License. # -FROM apachepulsar/pulsar:latest - -ARG PULSAR_IO_TARBALL - -ADD ${PULSAR_IO_TARBALL} / -RUN mv /apache-pulsar-io-connectors-*/connectors /pulsar/connectors +name: jcloud +description: JCloud based offloader implementation +offloaderFactoryClass: org.apache.bookkeeper.mledger.offload.jcloud.JCloudLedgerOffloaderFactory