http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java new file mode 100644 index 0000000..45dc021 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api.namespace; + +import com.google.common.base.Preconditions; +import org.apache.distributedlog.BKDistributedLogNamespace; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogConstants; +import org.apache.distributedlog.config.DynamicDistributedLogConfiguration; +import org.apache.distributedlog.feature.CoreFeatureKeys; +import org.apache.distributedlog.injector.AsyncFailureInjector; +import org.apache.distributedlog.injector.AsyncRandomFailureInjector; +import org.apache.distributedlog.namespace.NamespaceDriver; +import org.apache.distributedlog.namespace.NamespaceDriverManager; +import org.apache.distributedlog.util.ConfUtils; +import org.apache.distributedlog.util.DLUtils; +import org.apache.distributedlog.util.OrderedScheduler; +import org.apache.distributedlog.common.util.PermitLimiter; +import org.apache.distributedlog.util.SimplePermitLimiter; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.feature.SettableFeatureProvider; +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; + +/** + * Builder to construct a <code>Namespace</code>. + * The builder takes the responsibility of loading backend according to the uri. + * + * @see Namespace + * @since 0.3.32 + */ +public class NamespaceBuilder { + + private static final Logger logger = LoggerFactory.getLogger(NamespaceBuilder.class); + + public static NamespaceBuilder newBuilder() { + return new NamespaceBuilder(); + } + + private DistributedLogConfiguration _conf = null; + private DynamicDistributedLogConfiguration _dynConf = null; + private URI _uri = null; + private StatsLogger _statsLogger = NullStatsLogger.INSTANCE; + private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE; + private FeatureProvider _featureProvider = null; + private String _clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID; + private int _regionId = DistributedLogConstants.LOCAL_REGION_ID; + + // private constructor + private NamespaceBuilder() {} + + /** + * DistributedLog Configuration used for the namespace. + * + * @param conf + * distributedlog configuration + * @return namespace builder. + */ + public NamespaceBuilder conf(DistributedLogConfiguration conf) { + this._conf = conf; + return this; + } + + /** + * Dynamic DistributedLog Configuration used for the namespace + * + * @param dynConf dynamic distributedlog configuration + * @return namespace builder + */ + public NamespaceBuilder dynConf(DynamicDistributedLogConfiguration dynConf) { + this._dynConf = dynConf; + return this; + } + + /** + * Namespace Location. + * + * @param uri + * namespace location uri. + * @see Namespace + * @return namespace builder. + */ + public NamespaceBuilder uri(URI uri) { + this._uri = uri; + return this; + } + + /** + * Stats Logger used for stats collection + * + * @param statsLogger + * stats logger + * @return namespace builder. + */ + public NamespaceBuilder statsLogger(StatsLogger statsLogger) { + this._statsLogger = statsLogger; + return this; + } + + /** + * Stats Logger used for collecting per log stats. + * + * @param statsLogger + * stats logger for collecting per log stats + * @return namespace builder. + */ + public NamespaceBuilder perLogStatsLogger(StatsLogger statsLogger) { + this._perLogStatsLogger = statsLogger; + return this; + } + + /** + * Feature provider used to control the availabilities of features in the namespace. + * + * @param featureProvider + * feature provider to control availabilities of features. + * @return namespace builder. + */ + public NamespaceBuilder featureProvider(FeatureProvider featureProvider) { + this._featureProvider = featureProvider; + return this; + } + + /** + * Client Id used for accessing the namespace + * + * @param clientId + * client id used for accessing the namespace + * @return namespace builder. + */ + public NamespaceBuilder clientId(String clientId) { + this._clientId = clientId; + return this; + } + + /** + * Region Id used for encoding logs in the namespace. The region id + * is useful when the namespace is globally spanning over regions. + * + * @param regionId + * region id. + * @return namespace builder. + */ + public NamespaceBuilder regionId(int regionId) { + this._regionId = regionId; + return this; + } + + @SuppressWarnings("deprecation") + private static StatsLogger normalizePerLogStatsLogger(StatsLogger statsLogger, + StatsLogger perLogStatsLogger, + DistributedLogConfiguration conf) { + StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger; + if (perLogStatsLogger == NullStatsLogger.INSTANCE && + conf.getEnablePerStreamStat()) { + normalizedPerLogStatsLogger = statsLogger.scope("stream"); + } + return normalizedPerLogStatsLogger; + } + + /** + * Build the namespace. + * + * @return the namespace instance. + * @throws IllegalArgumentException when there is illegal argument provided in the builder + * @throws NullPointerException when there is null argument provided in the builder + * @throws IOException when fail to build the backend + */ + public Namespace build() + throws IllegalArgumentException, NullPointerException, IOException { + // Check arguments + Preconditions.checkNotNull(_conf, "No DistributedLog Configuration."); + Preconditions.checkNotNull(_uri, "No DistributedLog URI"); + + // validate the configuration + _conf.validate(); + if (null == _dynConf) { + _dynConf = ConfUtils.getConstDynConf(_conf); + } + + // retrieve the namespace driver + NamespaceDriver driver = NamespaceDriverManager.getDriver(_uri); + URI normalizedUri = DLUtils.normalizeURI(_uri); + + // build the feature provider + FeatureProvider featureProvider; + if (null == _featureProvider) { + featureProvider = new SettableFeatureProvider("", 0); + logger.info("No feature provider is set. All features are disabled now."); + } else { + featureProvider = _featureProvider; + } + + // build the failure injector + AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder() + .injectDelays(_conf.getEIInjectReadAheadDelay(), + _conf.getEIInjectReadAheadDelayPercent(), + _conf.getEIInjectMaxReadAheadDelayMs()) + .injectErrors(false, 10) + .injectStops(_conf.getEIInjectReadAheadStall(), 10) + .injectCorruption(_conf.getEIInjectReadAheadBrokenEntries()) + .build(); + + // normalize the per log stats logger + StatsLogger perLogStatsLogger = normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf); + + // build the scheduler + OrderedScheduler scheduler = OrderedScheduler.newBuilder() + .name("DLM-" + normalizedUri.getPath()) + .corePoolSize(_conf.getNumWorkerThreads()) + .build(); + + // initialize the namespace driver + driver.initialize( + _conf, + _dynConf, + normalizedUri, + scheduler, + featureProvider, + failureInjector, + _statsLogger, + perLogStatsLogger, + DLUtils.normalizeClientId(_clientId), + _regionId); + + // initialize the write limiter + PermitLimiter writeLimiter; + if (_conf.getGlobalOutstandingWriteLimit() < 0) { + writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER; + } else { + Feature disableWriteLimitFeature = featureProvider.getFeature( + CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase()); + writeLimiter = new SimplePermitLimiter( + _conf.getOutstandingWriteLimitDarkmode(), + _conf.getGlobalOutstandingWriteLimit(), + _statsLogger.scope("writeLimiter"), + true /* singleton */, + disableWriteLimitFeature); + } + + return new BKDistributedLogNamespace( + _conf, + normalizedUri, + driver, + scheduler, + featureProvider, + writeLimiter, + failureInjector, + _statsLogger, + perLogStatsLogger, + DLUtils.normalizeClientId(_clientId), + _regionId); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java new file mode 100644 index 0000000..fa8f288 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Namespace API and the builder to build namespace instance. + */ +package org.apache.distributedlog.api.namespace; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java new file mode 100644 index 0000000..eca11fd --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * DistributedLog NEW API. + * + * <p>This is the new Java8 {@link java.util.concurrent.CompletableFuture} based API. It is + * <strong>experimental</strong> and still under developing. + */ +package org.apache.distributedlog.api; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java new file mode 100644 index 0000000..bf4a8d3 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api.subscription; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.DLSN; + +public interface SubscriptionStateStore extends Closeable { + /** + * Get the last committed position stored for this subscription + * + * @return future represents the last commit position + */ + public CompletableFuture<DLSN> getLastCommitPosition(); + + /** + * Advances the position associated with the subscriber + * + * @param newPosition - new commit position + * @return future represents the advance result + */ + public CompletableFuture<Void> advanceCommitPosition(DLSN newPosition); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java new file mode 100644 index 0000000..b6a0ed1 --- /dev/null +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.api.subscription; + +import java.io.Closeable; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.DLSN; + +/** + * Store to manage subscriptions + */ +public interface SubscriptionsStore extends Closeable { + + /** + * Get the last committed position stored for <i>subscriberId</i>. + * + * @param subscriberId + * subscriber id + * @return future representing last committed position. + */ + public CompletableFuture<DLSN> getLastCommitPosition(String subscriberId); + + /** + * Get the last committed positions for all subscribers. + * + * @return future representing last committed positions for all subscribers. + */ + public CompletableFuture<Map<String, DLSN>> getLastCommitPositions(); + + /** + * Advance the last committed position for <i>subscriberId</i>. + * + * @param subscriberId + * subscriber id. + * @param newPosition + * new committed position. + * @return future representing advancing result. + */ + public CompletableFuture<Void> advanceCommitPosition(String subscriberId, DLSN newPosition); + + /** + * Delete the subscriber <i>subscriberId</i> permanently. Once the subscriber is deleted, all the + * data stored under this subscriber will be lost. + * @param subscriberId subscriber id + * @return future represent success or failure. + * return true only if there's such subscriber and we removed it successfully. + * return false if there's no such subscriber, or we failed to remove. + */ + public CompletableFuture<Boolean> deleteSubscriber(String subscriberId); + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java index 56a4f2e..34011b5 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java @@ -20,20 +20,20 @@ package org.apache.distributedlog.auditor; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.SettableFuture; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.BookKeeperClientBuilder; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.api.DistributedLogManager; import org.apache.distributedlog.LogSegmentMetadata; +import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.impl.BKNamespaceDriver; -import org.apache.distributedlog.namespace.DistributedLogNamespace; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.ZooKeeperClientBuilder; import org.apache.distributedlog.exceptions.DLInterruptedException; import org.apache.distributedlog.exceptions.ZKException; import org.apache.distributedlog.impl.metadata.BKDLConfig; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; import org.apache.distributedlog.namespace.NamespaceDriver; import org.apache.distributedlog.util.DLUtils; import org.apache.bookkeeper.client.BKException; @@ -45,6 +45,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy; import org.apache.bookkeeper.zookeeper.RetryPolicy; import org.apache.commons.lang3.tuple.Pair; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.zookeeper.AsyncCallback; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; @@ -84,13 +85,13 @@ public class DLAuditor { this.conf = conf; } - private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) { + private ZooKeeperClient getZooKeeperClient(Namespace namespace) { NamespaceDriver driver = namespace.getNamespaceDriver(); assert(driver instanceof BKNamespaceDriver); return ((BKNamespaceDriver) driver).getWriterZKC(); } - private BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) { + private BookKeeperClient getBookKeeperClient(Namespace namespace) { NamespaceDriver driver = namespace.getNamespaceDriver(); assert(driver instanceof BKNamespaceDriver); return ((BKNamespaceDriver) driver).getReaderBKC(); @@ -169,7 +170,7 @@ public class DLAuditor { LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get()); final Set<Long> ledgers = new HashSet<Long>(); - final SettableFuture<Void> doneFuture = SettableFuture.create(); + final CompletableFuture<Void> doneFuture = FutureUtils.createFuture(); BookkeeperInternalCallbacks.Processor<Long> collector = new BookkeeperInternalCallbacks.Processor<Long>() { @@ -195,9 +196,9 @@ public class DLAuditor { @Override public void processResult(int rc, String path, Object ctx) { if (BKException.Code.OK == rc) { - doneFuture.set(null); + doneFuture.complete(null); } else { - doneFuture.setException(BKException.create(rc)); + doneFuture.completeExceptionally(BKException.create(rc)); } } }; @@ -225,12 +226,12 @@ public class DLAuditor { private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> allocationPaths) throws IOException { final Set<Long> ledgers = new TreeSet<Long>(); - List<DistributedLogNamespace> namespaces = - new ArrayList<DistributedLogNamespace>(uris.size()); + List<Namespace> namespaces = + new ArrayList<Namespace>(uris.size()); try { for (URI uri : uris) { namespaces.add( - DistributedLogNamespaceBuilder.newBuilder() + NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build()); @@ -240,8 +241,8 @@ public class DLAuditor { ExecutorService executor = Executors.newFixedThreadPool(uris.size()); try { int i = 0; - for (final DistributedLogNamespace namespace : namespaces) { - final DistributedLogNamespace dlNamespace = namespace; + for (final Namespace namespace : namespaces) { + final Namespace dlNamespace = namespace; final URI uri = uris.get(i); final List<String> aps = allocationPaths.get(i); i++; @@ -278,7 +279,7 @@ public class DLAuditor { executor.shutdown(); } } finally { - for (DistributedLogNamespace namespace : namespaces) { + for (Namespace namespace : namespaces) { namespace.close(); } } @@ -286,7 +287,7 @@ public class DLAuditor { } private void collectLedgersFromAllocator(final URI uri, - final DistributedLogNamespace namespace, + final Namespace namespace, final List<String> allocationPaths, final Set<Long> ledgers) throws IOException { final LinkedBlockingQueue<String> poolQueue = @@ -346,7 +347,7 @@ public class DLAuditor { } private void collectLedgersFromDL(final URI uri, - final DistributedLogNamespace namespace, + final Namespace namespace, final Set<Long> ledgers) throws IOException { logger.info("Enumerating {} to collect streams.", uri); Iterator<String> streams = namespace.getLogs(); @@ -366,7 +367,7 @@ public class DLAuditor { }); } - private List<Long> collectLedgersFromStream(DistributedLogNamespace namespace, + private List<Long> collectLedgersFromStream(Namespace namespace, String stream, Set<Long> ledgers) throws IOException { @@ -394,7 +395,7 @@ public class DLAuditor { */ public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws IOException { logger.info("Collecting stream space usage for {}.", uri); - DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() + Namespace namespace = NamespaceBuilder.newBuilder() .conf(conf) .uri(uri) .build(); @@ -406,7 +407,7 @@ public class DLAuditor { } private Map<String, Long> calculateStreamSpaceUsage( - final URI uri, final DistributedLogNamespace namespace) + final URI uri, final Namespace namespace) throws IOException { Iterator<String> streams = namespace.getLogs(); final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>(); @@ -432,7 +433,7 @@ public class DLAuditor { return streamSpaceUsageMap; } - private long calculateStreamSpaceUsage(final DistributedLogNamespace namespace, + private long calculateStreamSpaceUsage(final Namespace namespace, final String stream) throws IOException { DistributedLogManager dlm = namespace.openLog(stream); long totalBytes = 0; @@ -504,7 +505,7 @@ public class DLAuditor { LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get()); - final SettableFuture<Void> doneFuture = SettableFuture.create(); + final CompletableFuture<Void> doneFuture = FutureUtils.createFuture(); final BookKeeper bk = bkc.get(); BookkeeperInternalCallbacks.Processor<Long> collector = @@ -544,9 +545,9 @@ public class DLAuditor { @Override public void processResult(int rc, String path, Object ctx) { if (BKException.Code.OK == rc) { - doneFuture.set(null); + doneFuture.complete(null); } else { - doneFuture.setException(BKException.create(rc)); + doneFuture.completeExceptionally(BKException.create(rc)); } } }; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java index 6ea248b..ee33dc3 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java @@ -17,9 +17,10 @@ */ package org.apache.distributedlog.bk; +import java.util.concurrent.CompletableFuture; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Transaction; import org.apache.distributedlog.util.Transaction.OpListener; -import com.twitter.util.Future; import org.apache.bookkeeper.client.LedgerHandle; import java.io.IOException; @@ -57,8 +58,8 @@ public class LedgerAllocatorDelegator implements LedgerAllocator { } @Override - public Future<Void> delete() { - return Future.exception(new UnsupportedOperationException("Can't delete an allocator by delegator")); + public CompletableFuture<Void> delete() { + return FutureUtils.exception(new UnsupportedOperationException("Can't delete an allocator by delegator")); } @Override @@ -67,17 +68,17 @@ public class LedgerAllocatorDelegator implements LedgerAllocator { } @Override - public Future<LedgerHandle> tryObtain(Transaction<Object> txn, + public CompletableFuture<LedgerHandle> tryObtain(Transaction<Object> txn, OpListener<LedgerHandle> listener) { return this.allocator.tryObtain(txn, listener); } @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { if (ownAllocator) { return this.allocator.asyncClose(); } else { - return Future.value(null); + return FutureUtils.value(null); } } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java index 4fff2f6..19c5546 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java @@ -19,17 +19,15 @@ package org.apache.distributedlog.bk; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; +import java.util.concurrent.CompletableFuture; import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.ZooKeeperClient; import org.apache.distributedlog.exceptions.DLInterruptedException; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Transaction; import org.apache.distributedlog.util.Utils; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.util.ZkUtils; @@ -40,7 +38,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction1; import java.io.IOException; import java.util.ArrayList; @@ -320,25 +317,25 @@ public class LedgerAllocatorPool implements LedgerAllocator { } @Override - public Future<LedgerHandle> tryObtain(final Transaction<Object> txn, - final Transaction.OpListener<LedgerHandle> listener) { + public CompletableFuture<LedgerHandle> tryObtain(final Transaction<Object> txn, + final Transaction.OpListener<LedgerHandle> listener) { final SimpleLedgerAllocator allocator; synchronized (this) { if (allocatingList.isEmpty()) { - return Future.exception(new IOException("No ledger allocator available under " + poolPath + ".")); + return FutureUtils.exception(new IOException("No ledger allocator available under " + poolPath + ".")); } else { allocator = allocatingList.removeFirst(); } } - final Promise<LedgerHandle> tryObtainPromise = new Promise<LedgerHandle>(); + final CompletableFuture<LedgerHandle> tryObtainPromise = new CompletableFuture<LedgerHandle>(); final FutureEventListener<LedgerHandle> tryObtainListener = new FutureEventListener<LedgerHandle>() { @Override public void onSuccess(LedgerHandle lh) { synchronized (LedgerAllocatorPool.this) { obtainMap.put(lh, allocator); reverseObtainMap.put(allocator, lh); - tryObtainPromise.setValue(lh); + tryObtainPromise.complete(lh); } } @@ -349,7 +346,7 @@ public class LedgerAllocatorPool implements LedgerAllocator { } catch (IOException ioe) { logger.info("Failed to rescue allocator {}", allocator.allocatePath, ioe); } - tryObtainPromise.setException(cause); + tryObtainPromise.completeExceptionally(cause); } }; @@ -365,7 +362,7 @@ public class LedgerAllocatorPool implements LedgerAllocator { abortObtain(allocator); listener.onAbort(t); } - }).addEventListener(tryObtainListener); + }).whenComplete(tryObtainListener); return tryObtainPromise; } @@ -399,7 +396,7 @@ public class LedgerAllocatorPool implements LedgerAllocator { } @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { List<LedgerAllocator> allocatorsToClose; synchronized (this) { allocatorsToClose = Lists.newArrayListWithExpectedSize( @@ -414,21 +411,15 @@ public class LedgerAllocatorPool implements LedgerAllocator { allocatorsToClose.add(allocator); } } - return FutureUtils.processList(allocatorsToClose, new Function<LedgerAllocator, Future<Void>>() { - @Override - public Future<Void> apply(LedgerAllocator allocator) { - return allocator.asyncClose(); - } - }, scheduledExecutorService).map(new AbstractFunction1<List<Void>, Void>() { - @Override - public Void apply(List<Void> values) { - return null; - } - }); + return FutureUtils.processList( + allocatorsToClose, + allocator -> allocator.asyncClose(), + scheduledExecutorService + ).thenApply(values -> null); } @Override - public Future<Void> delete() { + public CompletableFuture<Void> delete() { List<LedgerAllocator> allocatorsToDelete; synchronized (this) { allocatorsToDelete = Lists.newArrayListWithExpectedSize( @@ -443,16 +434,10 @@ public class LedgerAllocatorPool implements LedgerAllocator { allocatorsToDelete.add(allocator); } } - return FutureUtils.processList(allocatorsToDelete, new Function<LedgerAllocator, Future<Void>>() { - @Override - public Future<Void> apply(LedgerAllocator allocator) { - return allocator.delete(); - } - }, scheduledExecutorService).flatMap(new AbstractFunction1<List<Void>, Future<Void>>() { - @Override - public Future<Void> apply(List<Void> values) { - return Utils.zkDelete(zkc, poolPath, new ZkVersion(-1)); - } - }); + return FutureUtils.processList( + allocatorsToDelete, + allocator -> allocator.delete(), + scheduledExecutorService + ).thenCompose(values -> Utils.zkDelete(zkc, poolPath, new ZkVersion(-1))); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java index e0102f3..144b0a6 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java @@ -18,19 +18,20 @@ package org.apache.distributedlog.bk; import com.google.common.collect.Lists; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Function; import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.DistributedLogConstants; import org.apache.distributedlog.util.DLUtils; +import org.apache.distributedlog.common.concurrent.FutureEventListener; import org.apache.distributedlog.util.Transaction; import org.apache.distributedlog.util.Transaction.OpListener; import org.apache.distributedlog.ZooKeeperClient; -import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.distributedlog.util.Utils; import org.apache.distributedlog.zk.ZKTransaction; import org.apache.distributedlog.zk.ZKVersionedSetOp; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.meta.ZkVersion; import org.apache.bookkeeper.versioning.Version; @@ -40,9 +41,6 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.runtime.AbstractFunction0; -import scala.runtime.AbstractFunction1; -import scala.runtime.BoxedUnit; import java.io.IOException; import java.util.LinkedList; @@ -96,7 +94,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen // version ZkVersion version = new ZkVersion(-1); // outstanding allocation - Promise<LedgerHandle> allocatePromise; + CompletableFuture<LedgerHandle> allocatePromise; // outstanding tryObtain transaction Transaction<Object> tryObtainTxn = null; OpListener<LedgerHandle> tryObtainListener = null; @@ -105,73 +103,71 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen // Allocated Ledger LedgerHandle allocatedLh = null; - Future<Void> closeFuture = null; - final LinkedList<Future<Void>> ledgerDeletions = - new LinkedList<Future<Void>>(); + CompletableFuture<Void> closeFuture = null; + final LinkedList<CompletableFuture<Void>> ledgerDeletions = + new LinkedList<CompletableFuture<Void>>(); // Ledger configuration private final QuorumConfigProvider quorumConfigProvider; - static Future<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath, + static CompletableFuture<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath, final ZooKeeperClient zkc) { return Utils.zkGetData(zkc, allocatePath, false) - .flatMap(new AbstractFunction1<Versioned<byte[]>, Future<Versioned<byte[]>>>() { + .thenCompose(new Function<Versioned<byte[]>, CompletionStage<Versioned<byte[]>>>() { @Override - public Future<Versioned<byte[]>> apply(Versioned<byte[]> result) { + public CompletableFuture<Versioned<byte[]>> apply(Versioned<byte[]> result) { if (null != result && null != result.getVersion() && null != result.getValue()) { - return Future.value(result); + return FutureUtils.value(result); } return createAllocationData(allocatePath, zkc); } }); } - private static Future<Versioned<byte[]>> createAllocationData(final String allocatePath, + private static CompletableFuture<Versioned<byte[]>> createAllocationData(final String allocatePath, final ZooKeeperClient zkc) { try { - final Promise<Versioned<byte[]>> promise = new Promise<Versioned<byte[]>>(); + final CompletableFuture<Versioned<byte[]>> promise = new CompletableFuture<Versioned<byte[]>>(); zkc.get().create(allocatePath, DistributedLogConstants.EMPTY_BYTES, zkc.getDefaultACL(), CreateMode.PERSISTENT, new org.apache.zookeeper.AsyncCallback.Create2Callback() { @Override public void processResult(int rc, String path, Object ctx, String name, Stat stat) { if (KeeperException.Code.OK.intValue() == rc) { - promise.setValue(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES, + promise.complete(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES, new ZkVersion(stat.getVersion()))); } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { - Utils.zkGetData(zkc, allocatePath, false).proxyTo(promise); + FutureUtils.proxyTo( + Utils.zkGetData(zkc, allocatePath, false), + promise + ); } else { - promise.setException(FutureUtils.zkException( + promise.completeExceptionally(Utils.zkException( KeeperException.create(KeeperException.Code.get(rc)), allocatePath)); } } }, null); return promise; } catch (ZooKeeperClient.ZooKeeperConnectionException e) { - return Future.exception(FutureUtils.zkException(e, allocatePath)); + return FutureUtils.exception(Utils.zkException(e, allocatePath)); } catch (InterruptedException e) { - return Future.exception(FutureUtils.zkException(e, allocatePath)); + return FutureUtils.exception(Utils.zkException(e, allocatePath)); } } - public static Future<SimpleLedgerAllocator> of(final String allocatePath, + public static CompletableFuture<SimpleLedgerAllocator> of(final String allocatePath, final Versioned<byte[]> allocationData, final QuorumConfigProvider quorumConfigProvider, final ZooKeeperClient zkc, final BookKeeperClient bkc) { if (null != allocationData && null != allocationData.getValue() && null != allocationData.getVersion()) { - return Future.value(new SimpleLedgerAllocator(allocatePath, allocationData, + return FutureUtils.value(new SimpleLedgerAllocator(allocatePath, allocationData, quorumConfigProvider, zkc, bkc)); } return getAndCreateAllocationData(allocatePath, zkc) - .map(new AbstractFunction1<Versioned<byte[]>, SimpleLedgerAllocator>() { - @Override - public SimpleLedgerAllocator apply(Versioned<byte[]> allocationData) { - return new SimpleLedgerAllocator(allocatePath, allocationData, - quorumConfigProvider, zkc, bkc); - } - }); + .thenApply(allocationData1 -> new SimpleLedgerAllocator(allocatePath, allocationData1, + quorumConfigProvider, zkc, bkc)); } /** @@ -240,14 +236,14 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen } @Override - public synchronized Future<LedgerHandle> tryObtain(final Transaction<Object> txn, - final OpListener<LedgerHandle> listener) { + public synchronized CompletableFuture<LedgerHandle> tryObtain(final Transaction<Object> txn, + final OpListener<LedgerHandle> listener) { if (Phase.ERROR == phase) { - return Future.exception(new AllocationException(Phase.ERROR, + return FutureUtils.exception(new AllocationException(Phase.ERROR, "Error on allocating ledger under " + allocatePath)); } if (Phase.HANDING_OVER == phase || Phase.HANDED_OVER == phase || null != tryObtainTxn) { - return Future.exception(new ConcurrentObtainException(phase, + return FutureUtils.exception(new ConcurrentObtainException(phase, "Ledger handle is handling over to another thread : " + phase)); } tryObtainTxn = txn; @@ -328,13 +324,13 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen return; } setPhase(Phase.ALLOCATING); - allocatePromise = new Promise<LedgerHandle>(); + allocatePromise = new CompletableFuture<LedgerHandle>(); QuorumConfig quorumConfig = quorumConfigProvider.getQuorumConfig(); bkc.createLedger( quorumConfig.getEnsembleSize(), quorumConfig.getWriteQuorumSize(), quorumConfig.getAckQuorumSize() - ).addEventListener(this); + ).whenComplete(this); } private synchronized void completeAllocation(LedgerHandle lh) { @@ -347,11 +343,11 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen ZKVersionedSetOp commitOp = new ZKVersionedSetOp(zkSetDataOp, this); tryObtainTxn.addOp(commitOp); setPhase(Phase.HANDING_OVER); - FutureUtils.setValue(allocatePromise, lh); + allocatePromise.complete(lh); } private synchronized void failAllocation(Throwable cause) { - FutureUtils.setException(allocatePromise, cause); + allocatePromise.completeExceptionally(cause); } @Override @@ -386,7 +382,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen private void markAsAllocated(final LedgerHandle lh) { byte[] data = DLUtils.logSegmentId2Bytes(lh.getId()); Utils.zkSetData(zkc, allocatePath, data, getVersion()) - .addEventListener(new FutureEventListener<ZkVersion>() { + .whenComplete(new FutureEventListener<ZkVersion>() { @Override public void onSuccess(ZkVersion version) { // we only issue deleting ledger left from previous allocation when we could allocate first ledger @@ -411,27 +407,20 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen } void deleteLedger(final long ledgerId) { - final Future<Void> deleteFuture = bkc.deleteLedger(ledgerId, true); + final CompletableFuture<Void> deleteFuture = bkc.deleteLedger(ledgerId, true); synchronized (ledgerDeletions) { ledgerDeletions.add(deleteFuture); } - deleteFuture.onFailure(new AbstractFunction1<Throwable, BoxedUnit>() { - @Override - public BoxedUnit apply(Throwable cause) { + deleteFuture.whenComplete((value, cause) -> { + if (null != cause) { LOG.error("Error deleting ledger {} for ledger allocator {}, retrying : ", new Object[] { ledgerId, allocatePath, cause }); if (!isClosing()) { deleteLedger(ledgerId); } - return BoxedUnit.UNIT; } - }).ensure(new AbstractFunction0<BoxedUnit>() { - @Override - public BoxedUnit apply() { - synchronized (ledgerDeletions) { - ledgerDeletions.remove(deleteFuture); - } - return BoxedUnit.UNIT; + synchronized (ledgerDeletions) { + ledgerDeletions.remove(deleteFuture); } }); } @@ -440,25 +429,25 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen return closeFuture != null; } - private Future<Void> closeInternal(boolean cleanup) { - Promise<Void> closePromise; + private CompletableFuture<Void> closeInternal(boolean cleanup) { + CompletableFuture<Void> closePromise; synchronized (this) { if (null != closeFuture) { return closeFuture; } - closePromise = new Promise<Void>(); + closePromise = new CompletableFuture<Void>(); closeFuture = closePromise; } if (!cleanup) { LOG.info("Abort ledger allocator without cleaning up on {}.", allocatePath); - FutureUtils.setValue(closePromise, null); + closePromise.complete(null); return closePromise; } cleanupAndClose(closePromise); return closePromise; } - private void cleanupAndClose(final Promise<Void> closePromise) { + private void cleanupAndClose(final CompletableFuture<Void> closePromise) { LOG.info("Closing ledger allocator on {}.", allocatePath); final ZKTransaction txn = new ZKTransaction(zkc); // try obtain ledger handle @@ -476,21 +465,21 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen } private void complete() { - FutureUtils.setValue(closePromise, null); + closePromise.complete(null); LOG.info("Closed ledger allocator on {}.", allocatePath); } - }).addEventListener(new FutureEventListener<LedgerHandle>() { + }).whenComplete(new FutureEventListener<LedgerHandle>() { @Override public void onSuccess(LedgerHandle lh) { // try obtain succeed // if we could obtain the ledger handle, we have the responsibility to close it deleteLedger(lh.getId()); // wait for deletion to be completed - List<Future<Void>> outstandingDeletions; + List<CompletableFuture<Void>> outstandingDeletions; synchronized (ledgerDeletions) { outstandingDeletions = Lists.newArrayList(ledgerDeletions); } - Future.collect(outstandingDeletions).addEventListener(new FutureEventListener<List<Void>>() { + FutureUtils.collect(outstandingDeletions).whenComplete(new FutureEventListener<List<Void>>() { @Override public void onSuccess(List<Void> values) { txn.execute(); @@ -499,7 +488,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen @Override public void onFailure(Throwable cause) { LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause); - FutureUtils.setValue(closePromise, null); + closePromise.complete(null); } }); } @@ -507,7 +496,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen @Override public void onFailure(Throwable cause) { LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause); - FutureUtils.setValue(closePromise, null); + closePromise.complete(null); } }); @@ -519,18 +508,13 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen } @Override - public Future<Void> asyncClose() { + public CompletableFuture<Void> asyncClose() { return closeInternal(false); } @Override - public Future<Void> delete() { - return closeInternal(true).flatMap(new AbstractFunction1<Void, Future<Void>>() { - @Override - public Future<Void> apply(Void value) { - return Utils.zkDelete(zkc, allocatePath, getVersion()); - } - }); + public CompletableFuture<Void> delete() { + return closeInternal(true).thenCompose(value -> Utils.zkDelete(zkc, allocatePath, getVersion())); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java b/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java deleted file mode 100644 index dccd2e8..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.callback; - -/** - * ReadAhead Callback - */ -public interface ReadAheadCallback { - void resumeReadAhead(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java deleted file mode 100644 index f189ad3..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.config; - -import com.google.common.base.Preconditions; - -import org.apache.commons.configuration.AbstractConfiguration; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Iterator; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Configuration view built on concurrent hash map for fast thread-safe access. - * Notes: - * 1. Multi-property list aggregation will not work in this class. I.e. commons config - * normally combines all properties with the same key into one list property automatically. - * This class simply overwrites any existing mapping. - */ -public class ConcurrentBaseConfiguration extends AbstractConfiguration { - static final Logger LOG = LoggerFactory.getLogger(ConcurrentBaseConfiguration.class); - - private final ConcurrentHashMap<String, Object> map; - - public ConcurrentBaseConfiguration() { - this.map = new ConcurrentHashMap<String, Object>(); - } - - @Override - protected void addPropertyDirect(String key, Object value) { - Preconditions.checkNotNull(value); - map.put(key, value); - } - - @Override - public Object getProperty(String key) { - return map.get(key); - } - - @Override - public Iterator getKeys() { - return map.keySet().iterator(); - } - - @Override - public boolean containsKey(String key) { - return map.containsKey(key); - } - - @Override - public boolean isEmpty() { - return map.isEmpty(); - } - - @Override - protected void clearPropertyDirect(String key) { - map.remove(key); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java deleted file mode 100644 index 4e7f886..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.config; - -import com.google.common.base.Preconditions; -import org.apache.commons.configuration.Configuration; - -/** - * Invariant thread-safe view of some configuration. - */ -public class ConcurrentConstConfiguration extends ConcurrentBaseConfiguration { - public ConcurrentConstConfiguration(Configuration conf) { - Preconditions.checkNotNull(conf); - copy(conf); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java deleted file mode 100644 index 70059d4..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.config; - -/** - * Configuration listener triggered when reloading configuration settings. - */ -public interface ConfigurationListener { - - /** - * Reload the configuration. - * - * @param conf configuration to reload - */ - void onReload(ConcurrentBaseConfiguration conf); - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java deleted file mode 100644 index 0e5c897..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.config; - -import java.io.FileNotFoundException; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.Iterator; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; -import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.FileConfiguration; -import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ConfigurationSubscription publishes a reloading, thread-safe view of file configuration. The class - * periodically calls FileConfiguration.reload on the underlying conf, and propagates changes to the - * concurrent config. The configured FileChangedReloadingStrategy ensures that file config will only - * be reloaded if something changed. - * Notes: - * 1. Reload schedule is never terminated. The assumption is a finite number of these are started - * at the calling layer, and terminated only once the executor service is shut down. - * 2. The underlying FileConfiguration is not at all thread-safe, so its important to ensure access - * to this object is always single threaded. - */ -public class ConfigurationSubscription { - static final Logger LOG = LoggerFactory.getLogger(ConfigurationSubscription.class); - - private final ConcurrentBaseConfiguration viewConfig; - private final ScheduledExecutorService executorService; - private final int reloadPeriod; - private final TimeUnit reloadUnit; - private final List<FileConfigurationBuilder> fileConfigBuilders; - private final List<FileConfiguration> fileConfigs; - private final CopyOnWriteArraySet<ConfigurationListener> confListeners; - - public ConfigurationSubscription(ConcurrentBaseConfiguration viewConfig, - List<FileConfigurationBuilder> fileConfigBuilders, - ScheduledExecutorService executorService, - int reloadPeriod, - TimeUnit reloadUnit) - throws ConfigurationException { - Preconditions.checkNotNull(fileConfigBuilders); - Preconditions.checkArgument(!fileConfigBuilders.isEmpty()); - Preconditions.checkNotNull(executorService); - Preconditions.checkNotNull(viewConfig); - this.viewConfig = viewConfig; - this.executorService = executorService; - this.reloadPeriod = reloadPeriod; - this.reloadUnit = reloadUnit; - this.fileConfigBuilders = fileConfigBuilders; - this.fileConfigs = Lists.newArrayListWithExpectedSize(this.fileConfigBuilders.size()); - this.confListeners = new CopyOnWriteArraySet<ConfigurationListener>(); - reload(); - scheduleReload(); - } - - public void registerListener(ConfigurationListener listener) { - this.confListeners.add(listener); - } - - public void unregisterListener(ConfigurationListener listener) { - this.confListeners.remove(listener); - } - - private boolean initConfig() { - if (fileConfigs.isEmpty()) { - try { - for (FileConfigurationBuilder fileConfigBuilder : fileConfigBuilders) { - FileConfiguration fileConfig = fileConfigBuilder.getConfiguration(); - FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy(); - reloadingStrategy.setRefreshDelay(0); - fileConfig.setReloadingStrategy(reloadingStrategy); - fileConfigs.add(fileConfig); - } - } catch (ConfigurationException ex) { - if (!fileNotFound(ex)) { - LOG.error("Config init failed {}", ex); - } - } - } - return !fileConfigs.isEmpty(); - } - - private void scheduleReload() { - executorService.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - reload(); - } - }, 0, reloadPeriod, reloadUnit); - } - - @VisibleForTesting - void reload() { - // No-op if already loaded. - if (!initConfig()) { - return; - } - // Reload if config exists. - Set<String> confKeys = Sets.newHashSet(); - for (FileConfiguration fileConfig : fileConfigs) { - LOG.debug("Check and reload config, file={}, lastModified={}", fileConfig.getFile(), - fileConfig.getFile().lastModified()); - fileConfig.reload(); - // load keys - Iterator keyIter = fileConfig.getKeys(); - while (keyIter.hasNext()) { - String key = (String) keyIter.next(); - confKeys.add(key); - } - } - // clear unexisted keys - Iterator viewIter = viewConfig.getKeys(); - while (viewIter.hasNext()) { - String key = (String) viewIter.next(); - if (!confKeys.contains(key)) { - clearViewProperty(key); - } - } - LOG.info("Reload features : {}", confKeys); - // load keys from files - for (FileConfiguration fileConfig : fileConfigs) { - try { - loadView(fileConfig); - } catch (Exception ex) { - if (!fileNotFound(ex)) { - LOG.error("Config reload failed for file {}", fileConfig.getFileName(), ex); - } - } - } - for (ConfigurationListener listener : confListeners) { - listener.onReload(viewConfig); - } - } - - private boolean fileNotFound(Exception ex) { - return ex instanceof FileNotFoundException || - ex.getCause() != null && ex.getCause() instanceof FileNotFoundException; - } - - private void loadView(FileConfiguration fileConfig) { - Iterator fileIter = fileConfig.getKeys(); - while (fileIter.hasNext()) { - String key = (String) fileIter.next(); - setViewProperty(fileConfig, key, fileConfig.getProperty(key)); - } - } - - private void clearViewProperty(String key) { - LOG.debug("Removing property, key={}", key); - viewConfig.clearProperty(key); - } - - private void setViewProperty(FileConfiguration fileConfig, - String key, - Object value) { - if (!viewConfig.containsKey(key) || !viewConfig.getProperty(key).equals(value)) { - LOG.debug("Setting property, key={} value={}", key, fileConfig.getProperty(key)); - viewConfig.setProperty(key, fileConfig.getProperty(key)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java index c77778a..c69b7a5 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java @@ -24,7 +24,6 @@ import com.google.common.collect.Lists; import org.apache.distributedlog.DistributedLogConfiguration; import java.io.File; -import java.io.FileNotFoundException; import java.net.MalformedURLException; import java.util.HashMap; import java.util.LinkedList; @@ -35,6 +34,11 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.ConfigurationException; +import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration; +import org.apache.distributedlog.common.config.ConcurrentConstConfiguration; +import org.apache.distributedlog.common.config.ConfigurationSubscription; +import org.apache.distributedlog.common.config.FileConfigurationBuilder; +import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java index ea7f4a7..9e760c5 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java @@ -19,6 +19,7 @@ package org.apache.distributedlog.config; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.bk.QuorumConfig; +import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration; import static org.apache.distributedlog.DistributedLogConfiguration.*; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java deleted file mode 100644 index dbf8fe7..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.config; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.FileConfiguration; - -/** - * Abstract out FileConfiguration subclass construction. - */ -public interface FileConfigurationBuilder { - FileConfiguration getConfiguration() throws ConfigurationException; -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java deleted file mode 100644 index df1408c..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.config; - -import java.net.URL; - -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.FileConfiguration; -import org.apache.commons.configuration.PropertiesConfiguration; - -/** - * Hide PropertiesConfiguration dependency. - */ -public class PropertiesConfigurationBuilder implements FileConfigurationBuilder { - private URL url; - - public PropertiesConfigurationBuilder(URL url) { - this.url = url; - } - - @Override - public FileConfiguration getConfiguration() throws ConfigurationException { - return new PropertiesConfiguration(url); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java index 83cac22..f51302e 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java @@ -17,7 +17,7 @@ */ package org.apache.distributedlog.feature; -import org.apache.distributedlog.config.ConcurrentBaseConfiguration; +import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration; import org.apache.bookkeeper.feature.CacheableFeatureProvider; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.FeatureProvider; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java index 4689d51..201ed8a 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java @@ -21,11 +21,11 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.config.ConcurrentBaseConfiguration; -import org.apache.distributedlog.config.ConfigurationListener; -import org.apache.distributedlog.config.ConfigurationSubscription; -import org.apache.distributedlog.config.FileConfigurationBuilder; -import org.apache.distributedlog.config.PropertiesConfigurationBuilder; +import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration; +import org.apache.distributedlog.common.config.ConfigurationListener; +import org.apache.distributedlog.common.config.ConfigurationSubscription; +import org.apache.distributedlog.common.config.FileConfigurationBuilder; +import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder; import org.apache.bookkeeper.feature.Feature; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.feature.SettableFeature; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java deleted file mode 100644 index b1adf4a..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.function; - -import org.apache.distributedlog.io.AsyncCloseable; -import scala.Function0; -import scala.runtime.AbstractFunction0; -import scala.runtime.BoxedUnit; - -/** - * Function to close {@link org.apache.distributedlog.io.AsyncCloseable} - */ -public class CloseAsyncCloseableFunction extends AbstractFunction0<BoxedUnit> { - - /** - * Return a function to close an {@link AsyncCloseable}. - * - * @param closeable closeable to close - * @return function to close an {@link AsyncCloseable} - */ - public static Function0<BoxedUnit> of(AsyncCloseable closeable) { - return new CloseAsyncCloseableFunction(closeable); - } - - private final AsyncCloseable closeable; - - private CloseAsyncCloseableFunction(AsyncCloseable closeable) { - this.closeable = closeable; - } - - @Override - public BoxedUnit apply() { - closeable.asyncClose(); - return BoxedUnit.UNIT; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java deleted file mode 100644 index 6360f2c..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.function; - -import scala.runtime.AbstractFunction1; - -/** - * Map Function return default value - */ -public class DefaultValueMapFunction<T, R> extends AbstractFunction1<T, R> { - - public static <T, R> DefaultValueMapFunction<T, R> of(R defaultValue) { - return new DefaultValueMapFunction<T, R>(defaultValue); - } - - private final R defaultValue; - - private DefaultValueMapFunction(R defaultValue) { - this.defaultValue = defaultValue; - } - - @Override - public R apply(T any) { - return defaultValue; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java index 1bf620c..00703e3 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java @@ -17,16 +17,15 @@ */ package org.apache.distributedlog.function; +import java.util.List; +import java.util.function.Function; import org.apache.distributedlog.DistributedLogConstants; import org.apache.distributedlog.LogSegmentMetadata; -import scala.runtime.AbstractFunction1; - -import java.util.List; /** * Retrieve the last tx id from list of log segments */ -public class GetLastTxIdFunction extends AbstractFunction1<List<LogSegmentMetadata>, Long> { +public class GetLastTxIdFunction implements Function<List<LogSegmentMetadata>, Long> { public static final GetLastTxIdFunction INSTANCE = new GetLastTxIdFunction(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java deleted file mode 100644 index 98164de..0000000 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.function; - -import org.apache.distributedlog.LogSegmentMetadata; -import org.apache.bookkeeper.versioning.Versioned; -import scala.Function1; -import scala.runtime.AbstractFunction1; - -import java.util.List; - -/** - * Function to get the versioned value from {@link org.apache.bookkeeper.versioning.Versioned} - */ -public class GetVersionedValueFunction<T> extends AbstractFunction1<Versioned<T>, T> { - - public static final Function1<Versioned<List<LogSegmentMetadata>>, List<LogSegmentMetadata>> - GET_LOGSEGMENT_LIST_FUNC = new GetVersionedValueFunction<List<LogSegmentMetadata>>(); - - @Override - public T apply(Versioned<T> versionedValue) { - return versionedValue.getValue(); - } -}