This is an automated email from the ASF dual-hosted git repository. rcordier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 16d3a0e812b69d88d5190b5f7bf5b1392c8f90d9 Author: Tung Van TRAN <[email protected]> AuthorDate: Thu Aug 26 17:46:06 2021 +0700 JAMES-3150 Expose the webAdmin endpoint to trigger blob deduplicated garbage collection --- server/blob/blob-storage-strategy/pom.xml | 4 + .../server/blob/deduplication/BlobGCTask.java | 74 +--- .../BlobGCTaskAdditionalInformationDTO.java | 119 ++++++ server/protocols/webadmin/webadmin-data/pom.xml | 14 + .../apache/james/webadmin/routes/BlobRoutes.java | 168 ++++++++ .../james/webadmin/routes/BlobRoutesTest.java | 430 +++++++++++++++++++++ 6 files changed, 753 insertions(+), 56 deletions(-) diff --git a/server/blob/blob-storage-strategy/pom.xml b/server/blob/blob-storage-strategy/pom.xml index af1daa1..ceb5a5b 100644 --- a/server/blob/blob-storage-strategy/pom.xml +++ b/server/blob/blob-storage-strategy/pom.xml @@ -49,6 +49,10 @@ <artifactId>james-server-task-api</artifactId> </dependency> <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>james-server-task-json</artifactId> + </dependency> + <dependency> <!-- Added because of https://issues.apache.org/jira/browse/SUREFIRE-1266 --> <groupId>${james.groupId}</groupId> <artifactId>james-server-testing</artifactId> diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java index 7ecb0a4..2177c62 100644 --- a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java +++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTask.java @@ -21,7 +21,6 @@ package org.apache.james.server.blob.deduplication; import java.time.Clock; import java.time.Instant; -import java.util.Arrays; import java.util.Optional; import java.util.Set; @@ -35,8 +34,6 @@ import org.apache.james.task.TaskType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - import reactor.core.scheduler.Schedulers; public class BlobGCTask implements Task { @@ -45,7 +42,7 @@ public class BlobGCTask implements Task { public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation { - private static AdditionalInformation from(Scope scope, Context context) { + private static AdditionalInformation from(Context context) { Context.Snapshot snapshot = context.snapshot(); return new AdditionalInformation( snapshot.getReferenceSourceCount(), @@ -53,8 +50,7 @@ public class BlobGCTask implements Task { snapshot.getGcedBlobCount(), snapshot.getErrorCount(), snapshot.getBloomFilterExpectedBlobCount(), - snapshot.getBloomFilterAssociatedProbability(), - scope); + snapshot.getBloomFilterAssociatedProbability()); } private final Instant timestamp; @@ -64,15 +60,13 @@ public class BlobGCTask implements Task { private final long errorCount; private final long bloomFilterExpectedBlobCount; private final double bloomFilterAssociatedProbability; - private final Scope scope; AdditionalInformation(long referenceSourceCount, long blobCount, long gcedBlobCount, long errorCount, long bloomFilterExpectedBlobCount, - double bloomFilterAssociatedProbability, - Scope scope) { + double bloomFilterAssociatedProbability) { this.referenceSourceCount = referenceSourceCount; this.blobCount = blobCount; this.gcedBlobCount = gcedBlobCount; @@ -80,7 +74,6 @@ public class BlobGCTask implements Task { this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount; this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability; this.timestamp = Clock.systemUTC().instant(); - this.scope = scope; } @Override @@ -115,36 +108,13 @@ public class BlobGCTask implements Task { public double getBloomFilterAssociatedProbability() { return bloomFilterAssociatedProbability; } - - public Scope getScope() { - return scope; - } - } - - public enum Scope { - UNREFERENCED; - - static class ScopeInvalidException extends IllegalArgumentException { - } - - public static Optional<Scope> from(String name) { - Preconditions.checkNotNull(name); - return Arrays.stream(Scope.values()) - .filter(value -> name.equalsIgnoreCase(value.name())) - .findFirst(); - } } interface Builder { @FunctionalInterface - interface RequireScope { - BlobGCTask scope(Scope scope); - } - - @FunctionalInterface interface RequireAssociatedProbability { - RequireScope associatedProbability(double associatedProbability); + BlobGCTask associatedProbability(double associatedProbability); } @FunctionalInterface @@ -186,7 +156,7 @@ public class BlobGCTask implements Task { public static Builder.RequireBlobStoreDAO builder() { return blobStoreDao -> generationAwareBlobIdFactory -> generationAwareBlobIdConfiguration -> blobReferenceSources -> bucketName -> clock -> expectedBlobCount - -> associatedProbability -> scope + -> associatedProbability -> new BlobGCTask( blobStoreDao, generationAwareBlobIdFactory, @@ -195,8 +165,7 @@ public class BlobGCTask implements Task { bucketName, clock, expectedBlobCount, - associatedProbability, - scope); + associatedProbability); } @@ -209,7 +178,6 @@ public class BlobGCTask implements Task { private final int expectedBlobCount; private final double associatedProbability; private final Context context; - private final Scope scope; public BlobGCTask(BlobStoreDAO blobStoreDAO, @@ -219,8 +187,7 @@ public class BlobGCTask implements Task { BucketName bucketName, Clock clock, int expectedBlobCount, - double associatedProbability, - Scope scope) { + double associatedProbability) { this.blobStoreDAO = blobStoreDAO; this.generationAwareBlobIdFactory = generationAwareBlobIdFactory; this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration; @@ -230,25 +197,20 @@ public class BlobGCTask implements Task { this.expectedBlobCount = expectedBlobCount; this.associatedProbability = associatedProbability; this.context = new Context(expectedBlobCount, associatedProbability); - this.scope = scope; } @Override public Result run() throws InterruptedException { - if (Scope.UNREFERENCED.equals(this.scope)) { - BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm( - BlobReferenceAggregate.aggregate(blobReferenceSources), - blobStoreDAO, - generationAwareBlobIdFactory, - generationAwareBlobIdConfiguration, - clock); - - return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context) - .subscribeOn(Schedulers.elastic()) - .block(); - } else { - return Result.COMPLETED; - } + BloomFilterGCAlgorithm gcAlgorithm = new BloomFilterGCAlgorithm( + BlobReferenceAggregate.aggregate(blobReferenceSources), + blobStoreDAO, + generationAwareBlobIdFactory, + generationAwareBlobIdConfiguration, + clock); + + return gcAlgorithm.gc(expectedBlobCount, associatedProbability, bucketName, context) + .subscribeOn(Schedulers.elastic()) + .block(); } @Override @@ -258,6 +220,6 @@ public class BlobGCTask implements Task { @Override public Optional<TaskExecutionDetails.AdditionalInformation> details() { - return Optional.of(AdditionalInformation.from(scope, context)); + return Optional.of(AdditionalInformation.from(context)); } } diff --git a/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTaskAdditionalInformationDTO.java b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTaskAdditionalInformationDTO.java new file mode 100644 index 0000000..f9da97a --- /dev/null +++ b/server/blob/blob-storage-strategy/src/main/java/org/apache/james/server/blob/deduplication/BlobGCTaskAdditionalInformationDTO.java @@ -0,0 +1,119 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.server.blob.deduplication; + +import java.time.Instant; + +import org.apache.james.json.DTOModule; +import org.apache.james.server.task.json.dto.AdditionalInformationDTO; +import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class BlobGCTaskAdditionalInformationDTO implements AdditionalInformationDTO { + + public static final AdditionalInformationDTOModule<BlobGCTask.AdditionalInformation, BlobGCTaskAdditionalInformationDTO> SERIALIZATION_MODULE = + DTOModule.forDomainObject(BlobGCTask.AdditionalInformation.class) + .convertToDTO(BlobGCTaskAdditionalInformationDTO.class) + .toDomainObjectConverter(dto -> + new BlobGCTask.AdditionalInformation( + dto.referenceSourceCount, + dto.blobCount, + dto.gcedBlobCount, + dto.errorCount, + dto.bloomFilterExpectedBlobCount, + dto.bloomFilterAssociatedProbability + )) + .toDTOConverter((domain, type) -> + new BlobGCTaskAdditionalInformationDTO( + type, + domain.getTimestamp(), + domain.getReferenceSourceCount(), + domain.getBlobCount(), + domain.getGcedBlobCount(), + domain.getErrorCount(), + domain.getBloomFilterExpectedBlobCount(), + domain.getBloomFilterAssociatedProbability() + )) + .typeName(BlobGCTask.TASK_TYPE.asString()) + .withFactory(AdditionalInformationDTOModule::new); + + private final String type; + private final Instant timestamp; + private final long referenceSourceCount; + private final long blobCount; + private final long gcedBlobCount; + private final long errorCount; + private final long bloomFilterExpectedBlobCount; + private final double bloomFilterAssociatedProbability; + + public BlobGCTaskAdditionalInformationDTO(@JsonProperty("type") String type, + @JsonProperty("timestamp") Instant timestamp, + @JsonProperty("referenceSourceCount") long referenceSourceCount, + @JsonProperty("blobCount") long blobCount, + @JsonProperty("gcedBlobCount") long gcedBlobCount, + @JsonProperty("errorCount") long errorCount, + @JsonProperty("bloomFilterExpectedBlobCount") long bloomFilterExpectedBlobCount, + @JsonProperty("bloomFilterAssociatedProbability") double bloomFilterAssociatedProbability) { + this.type = type; + this.timestamp = timestamp; + this.referenceSourceCount = referenceSourceCount; + this.blobCount = blobCount; + this.gcedBlobCount = gcedBlobCount; + this.errorCount = errorCount; + this.bloomFilterExpectedBlobCount = bloomFilterExpectedBlobCount; + this.bloomFilterAssociatedProbability = bloomFilterAssociatedProbability; + } + + + @Override + public String getType() { + return type; + } + + @Override + public Instant getTimestamp() { + return timestamp; + } + + public long getReferenceSourceCount() { + return referenceSourceCount; + } + + public long getBlobCount() { + return blobCount; + } + + public long getGcedBlobCount() { + return gcedBlobCount; + } + + public long getErrorCount() { + return errorCount; + } + + public long getBloomFilterExpectedBlobCount() { + return bloomFilterExpectedBlobCount; + } + + public double getBloomFilterAssociatedProbability() { + return bloomFilterAssociatedProbability; + } +} diff --git a/server/protocols/webadmin/webadmin-data/pom.xml b/server/protocols/webadmin/webadmin-data/pom.xml index d9ce9c9..2aee4cb 100644 --- a/server/protocols/webadmin/webadmin-data/pom.xml +++ b/server/protocols/webadmin/webadmin-data/pom.xml @@ -34,6 +34,15 @@ <dependencies> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>blob-memory</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> + <artifactId>blob-storage-strategy</artifactId> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>event-sourcing-event-store-memory</artifactId> <scope>test</scope> </dependency> @@ -67,6 +76,11 @@ </dependency> <dependency> <groupId>${james.groupId}</groupId> + <artifactId>james-server-task-memory</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>${james.groupId}</groupId> <artifactId>james-server-testing</artifactId> <scope>test</scope> </dependency> diff --git a/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/BlobRoutes.java b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/BlobRoutes.java new file mode 100644 index 0000000..346ca39 --- /dev/null +++ b/server/protocols/webadmin/webadmin-data/src/main/java/org/apache/james/webadmin/routes/BlobRoutes.java @@ -0,0 +1,168 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.routes; + +import java.time.Clock; +import java.util.Optional; +import java.util.Set; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.ws.rs.DELETE; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; + +import org.apache.james.blob.api.BlobReferenceSource; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BucketName; +import org.apache.james.server.blob.deduplication.BlobGCTask; +import org.apache.james.server.blob.deduplication.GenerationAwareBlobId; +import org.apache.james.task.Task; +import org.apache.james.task.TaskManager; +import org.apache.james.webadmin.Routes; +import org.apache.james.webadmin.tasks.TaskFromRequest; +import org.apache.james.webadmin.tasks.TaskIdDto; +import org.apache.james.webadmin.utils.JsonTransformer; +import org.eclipse.jetty.http.HttpStatus; + +import com.google.common.base.Preconditions; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiImplicitParam; +import io.swagger.annotations.ApiImplicitParams; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import spark.Request; +import spark.Service; + +@Api(tags = "Blobs") +@Path("/blobs") +@Produces("application/json") +public class BlobRoutes implements Routes { + + public static final String BASE_PATH = "/blobs"; + public static final int EXPECTED_BLOB_COUNT_DEFAULT = 1_000_000; + public static final double ASSOCIATED_PROBABILITY_DEFAULT = 0.8; + + private final TaskManager taskManager; + private final JsonTransformer jsonTransformer; + private final Clock clock; + private final BlobStoreDAO blobStoreDAO; + private final BucketName bucketName; + private final Set<BlobReferenceSource> blobReferenceSources; + private final GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration; + private final GenerationAwareBlobId.Factory generationAwareBlobIdFactory; + + @Inject + public BlobRoutes(TaskManager taskManager, + JsonTransformer jsonTransformer, + Clock clock, + BlobStoreDAO blobStoreDAO, + @Named(BlobStore.DEFAULT_BUCKET_NAME_QUALIFIER) BucketName defaultBucketName, + Set<BlobReferenceSource> blobReferenceSources, + GenerationAwareBlobId.Configuration generationAwareBlobIdConfiguration, + GenerationAwareBlobId.Factory generationAwareBlobIdFactory) { + this.taskManager = taskManager; + this.jsonTransformer = jsonTransformer; + this.clock = clock; + this.blobStoreDAO = blobStoreDAO; + this.bucketName = defaultBucketName; + this.blobReferenceSources = blobReferenceSources; + this.generationAwareBlobIdConfiguration = generationAwareBlobIdConfiguration; + this.generationAwareBlobIdFactory = generationAwareBlobIdFactory; + } + + @Override + public String getBasePath() { + return BASE_PATH; + } + + @Override + public void define(Service service) { + TaskFromRequest gcUnreferencedTaskRequest = this::gcUnreferenced; + service.delete(BASE_PATH, gcUnreferencedTaskRequest.asRoute(taskManager), jsonTransformer); + } + + @DELETE + @Path("/blobs") + @ApiOperation(value = "Create a task to run blob deduplicate garbage collection", nickname = "BlobGC") + @ApiImplicitParams({ + @ApiImplicitParam(required = true, dataType = "string", name = "scope", paramType = "query", example = "scope=unreferenced"), + @ApiImplicitParam(required = false, dataType = "double", name = "associatedProbability", paramType = "query", + defaultValue = "1_000_000", example = "associatedProbability=1000"), + @ApiImplicitParam(required = false, dataType = "integer", name = "expectedBlobCount", paramType = "query", + defaultValue = "0.8", example = "expectedBlobCount=0.7") + }) + @ApiResponses( + { + @ApiResponse(code = HttpStatus.CREATED_201, message = "The taskId of the given scheduled task", response = TaskIdDto.class), + @ApiResponse(code = HttpStatus.BAD_REQUEST_400, message = "Invalid arguments supplied in the user request"), + @ApiResponse(code = HttpStatus.UNAUTHORIZED_401, message = "Unauthorized. The user is not authenticated on the platform"), + }) + public Task gcUnreferenced(Request request) { + Preconditions.checkArgument(Optional.ofNullable(request.queryParams("scope")) + .filter("unreferenced"::equals) + .isPresent(), + "'scope' is missing or must be 'unreferenced'"); + + int expectedBlobCount = getExpectedBlobCount(request).orElse(EXPECTED_BLOB_COUNT_DEFAULT); + double associatedProbability = getAssociatedProbability(request).orElse(ASSOCIATED_PROBABILITY_DEFAULT); + + return BlobGCTask.builder() + .blobStoreDAO(blobStoreDAO) + .generationAwareBlobIdFactory(generationAwareBlobIdFactory) + .generationAwareBlobIdConfiguration(generationAwareBlobIdConfiguration) + .blobReferenceSource(blobReferenceSources) + .bucketName(bucketName) + .clock(clock) + .expectedBlobCount(expectedBlobCount) + .associatedProbability(associatedProbability); + } + + private static Optional<Integer> getExpectedBlobCount(Request req) { + try { + return Optional.ofNullable(req.queryParams("expectedBlobCount")) + .map(Integer::parseInt) + .map(expectedBlobCount -> { + Preconditions.checkArgument(expectedBlobCount > 0, + "'expectedBlobCount' must be strictly positive"); + return expectedBlobCount; + }); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException("'expectedBlobCount' must be numeric"); + } + } + + private static Optional<Double> getAssociatedProbability(Request req) { + try { + return Optional.ofNullable(req.queryParams("associatedProbability")) + .map(Double::parseDouble) + .map(associatedProbability -> { + Preconditions.checkArgument(associatedProbability > 0 && associatedProbability < 1, + "'associatedProbability' must be greater than 0.0 and smaller than 1.0"); + return associatedProbability; + }); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException("'associatedProbability' must be numeric"); + } + } +} diff --git a/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/BlobRoutesTest.java b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/BlobRoutesTest.java new file mode 100644 index 0000000..5b64236 --- /dev/null +++ b/server/protocols/webadmin/webadmin-data/src/test/java/org/apache/james/webadmin/routes/BlobRoutesTest.java @@ -0,0 +1,430 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.webadmin.routes; + +import static io.restassured.RestAssured.given; +import static io.restassured.http.ContentType.JSON; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS; +import static org.awaitility.Durations.TEN_SECONDS; +import static org.eclipse.jetty.http.HttpStatus.BAD_REQUEST_400; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.apache.james.blob.api.BlobId; +import org.apache.james.blob.api.BlobReferenceSource; +import org.apache.james.blob.api.BlobStore; +import org.apache.james.blob.api.BlobStoreDAO; +import org.apache.james.blob.api.BucketName; +import org.apache.james.blob.api.HashBlobId; +import org.apache.james.blob.api.ObjectNotFoundException; +import org.apache.james.blob.memory.MemoryBlobStoreDAO; +import org.apache.james.json.DTOConverter; +import org.apache.james.server.blob.deduplication.BlobGCTaskAdditionalInformationDTO; +import org.apache.james.server.blob.deduplication.DeDuplicationBlobStore; +import org.apache.james.server.blob.deduplication.GenerationAwareBlobId; +import org.apache.james.task.Hostname; +import org.apache.james.task.MemoryTaskManager; +import org.apache.james.utils.UpdatableTickingClock; +import org.apache.james.webadmin.WebAdminServer; +import org.apache.james.webadmin.WebAdminUtils; +import org.apache.james.webadmin.utils.JsonTransformer; +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; +import org.eclipse.jetty.http.HttpStatus; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import com.google.common.collect.ImmutableSet; + +import io.restassured.RestAssured; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class BlobRoutesTest { + private static final String BASE_PATH = "/blobs"; + private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory(); + private static final ZonedDateTime TIMESTAMP = ZonedDateTime.parse("2015-10-30T16:12:00Z"); + private static final BucketName DEFAULT_BUCKET = BucketName.of("default"); + private static final GenerationAwareBlobId.Configuration GENERATION_AWARE_BLOB_ID_CONFIGURATION = GenerationAwareBlobId.Configuration.DEFAULT; + private static final ConditionFactory CALMLY_AWAIT = Awaitility + .with().pollInterval(ONE_HUNDRED_MILLISECONDS) + .and().pollDelay(ONE_HUNDRED_MILLISECONDS) + .await() + .atMost(TEN_SECONDS); + + private WebAdminServer webAdminServer; + private MemoryTaskManager taskManager; + private UpdatableTickingClock clock; + private BlobReferenceSource blobReferenceSource; + private BlobStore blobStore; + + @BeforeEach + void setUp() { + taskManager = new MemoryTaskManager(new Hostname("foo")); + clock = new UpdatableTickingClock(TIMESTAMP.toInstant()); + blobReferenceSource = mock(BlobReferenceSource.class); + when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.empty()); + GenerationAwareBlobId.Factory generationAwareBlobIdFactory = new GenerationAwareBlobId.Factory(clock, BLOB_ID_FACTORY, GENERATION_AWARE_BLOB_ID_CONFIGURATION); + + BlobStoreDAO blobStoreDAO = new MemoryBlobStoreDAO(); + blobStore = new DeDuplicationBlobStore(blobStoreDAO, DEFAULT_BUCKET, generationAwareBlobIdFactory); + JsonTransformer jsonTransformer = new JsonTransformer(); + TasksRoutes tasksRoutes = new TasksRoutes(taskManager, jsonTransformer, DTOConverter.of(BlobGCTaskAdditionalInformationDTO.SERIALIZATION_MODULE)); + BlobRoutes blobRoutes = new BlobRoutes( + taskManager, + jsonTransformer, + clock, + blobStoreDAO, + DEFAULT_BUCKET, + ImmutableSet.of(blobReferenceSource), + GENERATION_AWARE_BLOB_ID_CONFIGURATION, + generationAwareBlobIdFactory); + + webAdminServer = WebAdminUtils.createWebAdminServer(blobRoutes, tasksRoutes).start(); + + RestAssured.requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminServer) + .setBasePath(BASE_PATH) + .build(); + } + + @AfterEach + void stop() { + webAdminServer.destroy(); + taskManager.stop(); + } + + @Test + void deleteUnReferencedShouldReturnErrorWhenScopeInvalid() { + given() + .queryParam("scope", "invalid") + .delete() + .then() + .statusCode(BAD_REQUEST_400) + .contentType(JSON) + .body("statusCode", is(BAD_REQUEST_400)) + .body("type", is("InvalidArgument")) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", is("'scope' is missing or must be 'unreferenced'")); + } + + @Test + void deleteUnReferencedShouldReturnErrorWhenMissingScope() { + given() + .delete() + .then() + .statusCode(BAD_REQUEST_400) + .contentType(JSON) + .body("statusCode", is(BAD_REQUEST_400)) + .body("type", is("InvalidArgument")) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", is("'scope' is missing or must be 'unreferenced'")); + } + + @Test + void deleteUnReferencedShouldReturnTaskId() { + given() + .queryParam("scope", "unreferenced") + .delete() + .then() + .statusCode(HttpStatus.CREATED_201) + .body("taskId", notNullValue()); + } + + @Test + void gcTaskShouldReturnDetail() { + String taskId = given() + .queryParam("scope", "unreferenced") + .delete() + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("taskId", is(notNullValue())) + .body("type", is("BlobGCTask")) + .body("startedDate", is(notNullValue())) + .body("submitDate", is(notNullValue())) + .body("completedDate", is(notNullValue())) + .body("additionalInformation.type", is("BlobGCTask")) + .body("additionalInformation.timestamp", is(notNullValue())) + .body("additionalInformation.referenceSourceCount", is(0)) + .body("additionalInformation.blobCount", is(0)) + .body("additionalInformation.gcedBlobCount", is(0)) + .body("additionalInformation.errorCount", is(0)) + .body("additionalInformation.bloomFilterExpectedBlobCount", is(1_000_000)) + .body("additionalInformation.bloomFilterAssociatedProbability", is(0.8F)); + } + + @Test + void deleteUnReferencedShouldAcceptBloomFilterExpectedBlobCountParam() { + String taskId = given() + .queryParam("scope", "unreferenced") + .queryParam("expectedBlobCount", 99) + .delete() + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("additionalInformation.bloomFilterExpectedBlobCount", is(99)); + } + + @ParameterizedTest + @MethodSource("expectedBlobCountParameters") + void deleteUnReferencedShouldReturnErrorWhenExpectedBlobCountInvalid(Object expectedBlobCount) { + given() + .queryParam("scope", "unreferenced") + .queryParam("expectedBlobCount", expectedBlobCount) + .delete() + .then() + .statusCode(BAD_REQUEST_400) + .contentType(JSON) + .body("statusCode", is(BAD_REQUEST_400)) + .body("type", is("InvalidArgument")) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", containsString("expectedBlobCount")); + } + + private static Stream<Arguments> expectedBlobCountParameters() { + return Stream.of( + Arguments.of(-1), + Arguments.of(0), + Arguments.of("invalid") + ); + } + + @Test + void deleteUnReferencedShouldAcceptBloomFilterAssociatedProbabilityParam() { + String taskId = given() + .queryParam("scope", "unreferenced") + .queryParam("associatedProbability", 0.2) + .delete() + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("additionalInformation.bloomFilterAssociatedProbability", is(0.2F)); + } + + @ParameterizedTest + @MethodSource("associatedProbabilityParameters") + void deleteUnReferencedShouldReturnErrorWhenAssociatedProbabilityInvalid(Object associatedProbability) { + given() + .queryParam("scope", "unreferenced") + .queryParam("associatedProbability", associatedProbability) + .delete() + .then() + .statusCode(BAD_REQUEST_400) + .contentType(JSON) + .body("statusCode", is(BAD_REQUEST_400)) + .body("type", is("InvalidArgument")) + .body("message", is("Invalid arguments supplied in the user request")) + .body("details", containsString("associatedProbability")); + } + + private static Stream<Arguments> associatedProbabilityParameters() { + return Stream.of( + Arguments.of(-1), + Arguments.of(-0.1F), + Arguments.of(1.1), + Arguments.of(1), + Arguments.of(Integer.MAX_VALUE), + Arguments.of("invalid"), + Arguments.of("") + ); + } + + @Test + void gcTaskShouldRemoveOrphanBlob() { + BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block(); + clock.setInstant(TIMESTAMP.plusMonths(2).toInstant()); + + String taskId = given() + .queryParam("scope", "unreferenced") + .delete() + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("additionalInformation.referenceSourceCount", is(0)) + .body("additionalInformation.blobCount", is(1)) + .body("additionalInformation.gcedBlobCount", is(1)) + .body("additionalInformation.errorCount", is(0)); + + assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId)) + .isInstanceOf(ObjectNotFoundException.class); + } + + @Test + void gcTaskShouldNotRemoveUnExpireBlob() { + BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block(); + + String taskId = given() + .queryParam("scope", "unreferenced") + .delete() + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("additionalInformation.referenceSourceCount", is(0)) + .body("additionalInformation.blobCount", is(1)) + .body("additionalInformation.gcedBlobCount", is(0)) + .body("additionalInformation.errorCount", is(0)); + + assertThat(blobStore.read(DEFAULT_BUCKET, blobId)) + .isNotNull(); + } + + @Test + void gcTaskShouldNotRemoveReferencedBlob() { + BlobId blobId = Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block(); + when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.just(blobId)); + + String taskId = given() + .queryParam("scope", "unreferenced") + .delete() + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("additionalInformation.referenceSourceCount", is(1)) + .body("additionalInformation.blobCount", is(1)) + .body("additionalInformation.gcedBlobCount", is(0)) + .body("additionalInformation.errorCount", is(0)); + + assertThat(blobStore.read(DEFAULT_BUCKET, blobId)) + .isNotNull(); + } + + @Test + void gcTaskShouldSuccessWhenMixCase() { + List<BlobId> referencedBlobIds = IntStream.range(0, 100) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .collect(Collectors.toList()); + List<BlobId> orphanBlobIds = IntStream.range(0, 50) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .collect(Collectors.toList()); + + when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds)); + clock.setInstant(TIMESTAMP.plusMonths(2).toInstant()); + + List<BlobId> unExpiredBlobIds = IntStream.range(0, 30) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .collect(Collectors.toList()); + + String taskId = given() + .queryParam("scope", "unreferenced") + .delete() + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await") + .then() + .body("status", is("completed")) + .body("additionalInformation.referenceSourceCount", is(referencedBlobIds.size())) + .body("additionalInformation.blobCount", is(referencedBlobIds.size() + orphanBlobIds.size() + unExpiredBlobIds.size())) + .body("additionalInformation.gcedBlobCount", Matchers.lessThanOrEqualTo(orphanBlobIds.size())) + .body("additionalInformation.errorCount", is(0)); + + referencedBlobIds.forEach(blobId -> + assertThat(blobStore.read(DEFAULT_BUCKET, blobId)) + .isNotNull()); + + unExpiredBlobIds.forEach(blobId -> + assertThat(blobStore.read(DEFAULT_BUCKET, blobId)) + .isNotNull()); + } + + @Test + void allOrphanBlobIdsShouldRemovedAfterMultipleCallDeleteUnreferenced() { + List<BlobId> referencedBlobIds = IntStream.range(0, 100) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .collect(Collectors.toList()); + List<BlobId> orphanBlobIds = IntStream.range(0, 50) + .mapToObj(index -> Mono.from(blobStore.save(DEFAULT_BUCKET, UUID.randomUUID().toString(), BlobStore.StoragePolicy.HIGH_PERFORMANCE)).block()) + .collect(Collectors.toList()); + + when(blobReferenceSource.listReferencedBlobs()).thenReturn(Flux.fromIterable(referencedBlobIds)); + clock.setInstant(TIMESTAMP.plusMonths(2).toInstant()); + + CALMLY_AWAIT.untilAsserted(() -> { + String taskId = given() + .queryParam("scope", "unreferenced") + .delete() + .jsonPath() + .get("taskId"); + + given() + .basePath(TasksRoutes.BASE) + .when() + .get(taskId + "/await"); + + orphanBlobIds.forEach(blobId -> + assertThatThrownBy(() -> blobStore.read(DEFAULT_BUCKET, blobId)) + .isInstanceOf(ObjectNotFoundException.class)); + }); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
