This is an automated email from the ASF dual-hosted git repository. rouazana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 58a8c255feb90b7ead395108583e9f70ffde3c92 Author: Gautier DI FOLCO <[email protected]> AuthorDate: Fri Mar 20 16:25:12 2020 +0100 JAMES-3150 Add the first garbadge collection properties --- pom.xml | 10 + .../blob/blob-deduplicating/doc/gc-properties.adoc | 8 +- server/blob/blob-deduplicating/pom.xml | 18 +- .../james/server/blob/deduplication/GC.scala | 187 +++++++++++++++++ .../src/test/scala/GCPropertiesTest.scala | 107 ---------- .../blob/deduplication/GCPropertiesTest.scala | 228 +++++++++++++++++++++ .../james/server/blob/deduplication/State.scala | 46 +++++ 7 files changed, 483 insertions(+), 121 deletions(-) diff --git a/pom.xml b/pom.xml index bea0a41..4c64989 100644 --- a/pom.xml +++ b/pom.xml @@ -2581,6 +2581,16 @@ <version>0.9.0</version> </dependency> <dependency> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.base}</artifactId> + <version>1.14.3</version> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.base}</artifactId> + <version>3.1.1</version> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${slf4j.version}</version> diff --git a/gc-properties.adoc b/server/blob/blob-deduplicating/doc/gc-properties.adoc similarity index 74% rename from gc-properties.adoc rename to server/blob/blob-deduplicating/doc/gc-properties.adoc index 7c69c01..6ffed72 100644 --- a/gc-properties.adoc +++ b/server/blob/blob-deduplicating/doc/gc-properties.adoc @@ -1,10 +1,10 @@ = GC properties 1. the execution time of the GC should be linked to -active dataset but not to global dataset -(for scalability purpose) +active dataset (ie. where the number of references have changed) +but not to global dataset (for scalability purpose) -2. GC should run on live dataset +2. GC should run on active dataset 2.1. GC should not delete data being referenced by a pending process or still referenced @@ -17,7 +17,7 @@ not have a different outcome than a single one 3.1. an unreferenced piece of data should be removed after 1 day 3.2. less than 10% of unreferenced data of a significant dataset -should persist after three GC executions +should persist 4. GC should report what it does diff --git a/server/blob/blob-deduplicating/pom.xml b/server/blob/blob-deduplicating/pom.xml index e849535..ada4371 100644 --- a/server/blob/blob-deduplicating/pom.xml +++ b/server/blob/blob-deduplicating/pom.xml @@ -23,7 +23,7 @@ <parent> <artifactId>james-server-blob</artifactId> <groupId>org.apache.james</groupId> - <version>3.5.0-SNAPSHOT</version> + <version>3.6.0-SNAPSHOT</version> <relativePath>../pom.xml</relativePath> </parent> @@ -32,7 +32,7 @@ <name>Apache James :: Server :: Blob :: Deduplicating Blob Storage</name> <description> - An implementation of BlobStore which deduplicate the stored blobs and use a garbage collector + An implementation of BlobStore which deduplicates the stored blobs and uses a garbage collector to ensure their effective deletion. </description> @@ -75,21 +75,19 @@ <artifactId>scala-java8-compat_${scala.base}</artifactId> </dependency> <dependency> - <groupId>org.scalactic</groupId> - <artifactId>scalactic_2.13</artifactId> - <version>3.1.1</version> + <groupId>org.scalacheck</groupId> + <artifactId>scalacheck_${scala.base}</artifactId> <scope>test</scope> </dependency> <dependency> - <groupId>org.scalatest</groupId> - <artifactId>scalatest_2.13</artifactId> + <groupId>org.scalactic</groupId> + <artifactId>scalactic_${scala.base}</artifactId> <version>3.1.1</version> <scope>test</scope> </dependency> <dependency> - <groupId>org.scalacheck</groupId> - <artifactId>scalacheck_2.13</artifactId> - <version>1.14.3</version> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.base}</artifactId> <scope>test</scope> </dependency> </dependencies> diff --git a/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala new file mode 100644 index 0000000..0fa4ea8 --- /dev/null +++ b/server/blob/blob-deduplicating/src/main/scala/org/apache/james/server/blob/deduplication/GC.scala @@ -0,0 +1,187 @@ +/**************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.server.blob.deduplication + +import org.apache.james.blob.api.BlobId + +/** + * Isolating and grouping Events + */ +sealed abstract class Generation extends Comparable[Generation] { + def previous: Generation + def previous(times: Long): Generation = + (0L until times).foldLeft(this)((generation, _) => generation.previous) + + def next: Generation + def next(times: Long): Generation = + (0L until times).foldLeft(this)((generation, _) => generation.next) + + /** + * List all generations the GC is able to collect + */ + def collectibles(targetedGeneration: Generation): Set[Generation] = + Generation.range(this, targetedGeneration.previous(GC.temporization)).toSet + + def <(that: Generation): Boolean = compareTo(that) < 0 + def <=(that: Generation): Boolean = compareTo(that) <= 0 + def >(that: Generation): Boolean = compareTo(that) > 0 + def >=(that: Generation): Boolean = compareTo(that) >= 0 +} + +object Generation { + val first: Generation = apply(0) + + def apply(id: Long): Generation = { + if (id < 0) { + NonExistingGeneration + } else { + ValidGeneration(id) + } + } + + def range(start: Generation, end: Generation): Seq[Generation] = (start, end) match { + case (NonExistingGeneration, NonExistingGeneration) => Seq(NonExistingGeneration) + case (ValidGeneration(_), NonExistingGeneration) => Nil + case (NonExistingGeneration, ValidGeneration(id)) => NonExistingGeneration +: (0L to id).map(Generation.apply) + case (ValidGeneration(id1), ValidGeneration(id2)) => (id1 to id2).map(Generation.apply) + } +} + +/** + * Generation which has existed + */ +case class ValidGeneration(id: Long) extends Generation { + override def previous: Generation = Generation(id - 1) + + override def next: Generation = copy(id + 1) + + override def compareTo(t: Generation): Int = t match { + case NonExistingGeneration => 1 + case that: ValidGeneration => id.compareTo(that.id) + } + +} + +/** + * NullObject for the initialisation of the GC + */ +case object NonExistingGeneration extends Generation { + override def previous: Generation = NonExistingGeneration + + override def next: Generation = Generation.first + + override def compareTo(t: Generation): Int = t match { + case NonExistingGeneration => 0 + case _: ValidGeneration => -1 + } +} + +/** + * A run of the GC regarding a Set of Generations + */ +case class Iteration(id: Long, processedGenerations: Set[Generation], lastGeneration: Generation) { + def next(generations: Set[Generation], lastGeneration: Generation): Iteration = Iteration(id + 1, generations, lastGeneration) +} + +object Iteration { + def initial: Iteration = Iteration(0, Set(), NonExistingGeneration) +} + +case class ExternalID(id: String) + +/** + * Modelized users' interactions related to blobs + */ +sealed trait Event { + def blob: BlobId + def externalId: ExternalID + def generation: Generation +} + +case class Reference(externalId: ExternalID, blobId: BlobId, generation: Generation) extends Event { + override def blob: BlobId = blobId +} + +case class Dereference(generation: Generation, reference: Reference) extends Event { + override def blob: BlobId = reference.blob + override def externalId: ExternalID = reference.externalId +} + +object Events { + def getLastGeneration(events: Seq[Event]): Generation = events.map(_.generation).maxOption + .getOrElse(Generation.first) + +} + +case class Report(iteration: Iteration, blobsToDelete: Set[(Generation, BlobId)]) + +/** + * Accessors to the References/Dereferences made by generations + */ +case class StabilizedState(references: Map[Generation, Seq[Reference]], dereferences: Map[Generation, Seq[Dereference]]) { + private val referencedBlobsAcrossGenerations: Map[Generation, ReferencedBlobs] = { + val blobIds = references.keys ++ dereferences.keys + val maxGeneration = blobIds.maxOption.getOrElse(Generation.first) + val minGeneration = blobIds.minOption.getOrElse(Generation.first) + + val initialRefs = Generation.range(NonExistingGeneration, minGeneration.previous).map((_, ReferencedBlobs(Map()))).toMap + Generation.range(minGeneration, maxGeneration) + .foldLeft(initialRefs)(buildGeneration) + } + + private def buildGeneration(refs: Map[Generation, ReferencedBlobs], generation: Generation): Map[Generation, ReferencedBlobs] = { + val populatedRefs = references.getOrElse(generation, Set()) + .foldLeft(refs(generation.previous))((currentReferences, ref) => currentReferences.addReferences(ref.blobId)) + + val expungedRefs = dereferences.getOrElse(generation, Set()) + .foldLeft(populatedRefs)((currentReferences, ref) => currentReferences.removeReferences(ref.reference.blobId)) + + refs + (generation -> expungedRefs) + } + + def referencesAt(generation: Generation): ReferencedBlobs = referencedBlobsAcrossGenerations(generation) + + type ReferenceCount = Int + + case class ReferencedBlobs(blobs: Map[BlobId, ReferenceCount]) { + def isNotReferenced(blobId: BlobId): Boolean = + !blobs.contains(blobId) + + def addReferences(blobId: BlobId): ReferencedBlobs = + ReferencedBlobs(blobs.updatedWith(blobId)(oldCount => oldCount.map(count => Some(count + 1)).getOrElse(Some(1)))) + def removeReferences(blobId: BlobId): ReferencedBlobs = + ReferencedBlobs(blobs.updatedWith(blobId)(oldCount => oldCount.map(_ - 1).filter(_ > 0))) + } + +} + +object GC { + val temporization: Long = 2 + def plan(state: StabilizedState, lastIteration: Iteration, targetedGeneration: Generation): Report = { + val processedGenerations = lastIteration.lastGeneration.collectibles(targetedGeneration) + val blobsToDelete = state.dereferences + .filter { case (generation, _) => processedGenerations.contains(generation) } + .flatMap { case (_, dereferences) => dereferences } + .toSet + .filter(dereference => state.referencesAt(processedGenerations.max).isNotReferenced(dereference.reference.blobId)) + .map(dereference => (dereference.reference.generation, dereference.reference.blobId)) + + Report(lastIteration.next(processedGenerations, targetedGeneration.previous(temporization)), blobsToDelete) + } +} diff --git a/server/blob/blob-deduplicating/src/test/scala/GCPropertiesTest.scala b/server/blob/blob-deduplicating/src/test/scala/GCPropertiesTest.scala deleted file mode 100644 index 5de3f44..0000000 --- a/server/blob/blob-deduplicating/src/test/scala/GCPropertiesTest.scala +++ /dev/null @@ -1,107 +0,0 @@ -import org.apache.james.blob.api.{BlobId, TestBlobId} -import org.scalacheck.Gen -import org.scalatest.funsuite.AnyFunSuite - -case class Generation(id: Long) -case class Iteration(id: Long) -case class ExternalID(id: String) // TODO - -sealed trait Event -case class Reference(externalId: ExternalID, blobId: BlobId, generation: Generation) extends Event -case class Deletion(generation: Generation, reference: Reference) extends Event - -case class Report(iteration: Iteration, - blobsToDelete: Set[(Generation, BlobId)] - ) - -object Generators { - - val smallInteger = Gen.choose(0L,100L) - var current = 0; - val generationsGen: Gen[LazyList[Generation]] = Gen.infiniteLazyList(Gen.frequency((90, Gen.const(0)), (9, Gen.const(1)), (1, Gen.const(2)))) - .map(list => list.scanLeft(0)(_ + _)) - .map(list => list.map(_.toLong).map(Generation.apply)) - - val iterationGen = smallInteger.map(Iteration.apply) - - val blobIdFactory = new TestBlobId.Factory - - def blobIdGen(generation: Generation) : Gen[BlobId] = Gen.uuid.map(uuid => - blobIdFactory.from(s"${generation}_$uuid")) - - val externalIDGen = Gen.uuid.map(uuid => ExternalID(uuid.toString)) - - def referenceGen(generation: Generation): Gen[Reference] = for { - blobId <- blobIdGen(generation) - externalId <- externalIDGen - } yield Reference(externalId, blobId, generation) - - def existingReferences : Seq[Event] => Set[Reference] = _ - .foldLeft((Set[Reference](), Set[Reference]()))((acc, event) => event match { - case deletion: Deletion => (acc._1 ++ Set(deletion.reference), acc._2) - case reference: Reference => if (acc._1.contains(reference)) { - acc - } else { - (acc._1, acc._2 ++ Set(reference)) - } - })._2 - - def deletionGen(previousEvents : Seq[Event], generation: Generation): Gen[Option[Deletion]] = { - val persistingReferences = existingReferences(previousEvents) - if (persistingReferences.isEmpty) { - Gen.const(None) - } else { - Gen.oneOf(persistingReferences) - .map(reference => Deletion(generation, reference)) - .map(Some(_)) - } - } - - def duplicateReferenceGen(generation: Generation, reference: Reference): Gen[Reference] = { - if (reference.generation == generation) { - externalIDGen.map(id => reference.copy(externalId = id)) - } else { - referenceGen(generation) - } - } - - def eventGen(previousEvents: Seq[Event], generation: Generation): Gen[Event] = for { - greenAddEvent <- referenceGen(generation) - addEvents = previousEvents.flatMap { - case x: Reference => Some(x) - case _ => None - } - randomAddEvent <- Gen.oneOf(addEvents) - duplicateAddEvent <- duplicateReferenceGen(generation, randomAddEvent) - deleteEvent <- deletionGen(previousEvents, generation) - event <- Gen.oneOf(Seq(greenAddEvent, duplicateAddEvent) ++ deleteEvent) - } yield event - - def eventsGen() : Gen[Seq[Event]] = for { - nbEvents <- Gen.choose(0, 100) - generations <- generationsGen.map(_.take(nbEvents)) - startEvent <- referenceGen(Generation.apply(0)) - events <- foldM(generations, (Seq(startEvent): Seq[Event]))((previousEvents, generation) => eventGen(previousEvents, generation).map(_ +: previousEvents)) - } yield events.reverse - - def foldM[A, B](fa: LazyList[A], z: B)(f: (B, A) => Gen[B]): Gen[B] = { - def step(in: (LazyList[A], B)): Gen[Either[(LazyList[A], B), B]] = { - val (s, b) = in - if (s.isEmpty) - Gen.const(Right(b)) - else { - f (b, s.head).map { bnext => - Left((s.tail, bnext)) - } - } - } - - Gen.tailRecM((fa, z))(step) - } -} - -class GCPropertiesTest extends AnyFunSuite { - test("print sample") { - Generators.eventsGen().sample.foreach(_.foreach(println)) - } -} diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala new file mode 100644 index 0000000..ad90f1c --- /dev/null +++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/GCPropertiesTest.scala @@ -0,0 +1,228 @@ +/*************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ +package org.apache.james.server.blob.deduplication + +import java.nio.charset.StandardCharsets + +import com.google.common.hash +import org.apache.james.blob.api.BlobId +import org.apache.james.server.blob.deduplication.Generators.{OnePassGCTestParameters, TestParameters} +import org.scalacheck.Prop.forAll +import org.scalacheck.Test.Parameters +import org.scalacheck.{Arbitrary, Gen, Properties, Shrink} + +case class GenerationAwareBlobId(generation: Generation, hash: String) extends BlobId { + override def asString(): String = s"${generation}_$hash" +} + +object Generators { + + // generate a sequence of Generations with monotonic numeric ids + // 80% of the time, the generation id is not incremented + // 19% of the time, the generation id is incremented by 1 + // 1% of the time, the generation id is incremented by 2 + // i.e. (0, 0, 0, 2, 3, 3, 4, 5, 5, 5, 5) + def nextGenerationsGen(previousGeneration: Generation): Gen[Generation] = + Gen.frequency((80, Gen.const(0l)), (19, Gen.const(1l)), (1, Gen.const(2l))).map(previousGeneration.next) + + val externalIDGen: Gen[ExternalID] = Gen.uuid.map(uuid => ExternalID(uuid.toString)) + + def referenceGen(generation: Generation, hash: String): Gen[Reference] = for { + externalId <- externalIDGen + } yield Reference(externalId, GenerationAwareBlobId(generation, hash), generation) + + case class ReferenceAccumulator(dereferenced: Set[Reference], existing: Set[Reference]) { + def addDeletion(dereference: Reference) = this.copy(dereferenced = dereferenced ++ Set(dereference)) + def addExisting(reference: Reference) = if (dereferenced.contains(reference)) { + this + } else { + this.copy(existing = existing ++ Set(reference)) + } + } + + object ReferenceAccumulator { + val empty: ReferenceAccumulator = ReferenceAccumulator(Set.empty, Set.empty) + } + + def existingReferences: Seq[Event] => Set[Reference] = + _.foldLeft(ReferenceAccumulator.empty)((acc, event) => event match { + case dereference: Dereference => acc.addDeletion(dereference.reference) + case reference: Reference => acc.addExisting(reference) + }).existing + + def dereferenceGen(previousEvents: Seq[Event], generation: Generation): Gen[Option[Dereference]] = { + val remainingReferences: Set[Reference] = existingReferences(previousEvents) + if (remainingReferences.isEmpty) { + Gen.const(None) + } else { + Gen.oneOf(remainingReferences) + .map(reference => Dereference(generation, reference)) + .map(Some(_)) + } + } + + val hashGenerator: Gen[String] = Gen.alphaLowerStr.map(content => hash.Hashing.sha256().hashString(content, StandardCharsets.UTF_8).toString) + + // Generate an Event, either a Reference or a Dereference (10% of the time if there are previous Events) + def eventGen(previousEvents: Seq[Event], contentHashes: Seq[String]): Gen[Event] = + if (previousEvents.isEmpty) { + for { + hashForEvent <- Gen.oneOf(contentHashes) + firstEvent <- referenceGen(Generation.first, hashForEvent) + } yield firstEvent + } else { + def pickEvent(newReferenceEvent: Reference, dereferenceEventOption: Option[Dereference]): Gen[Event] = dereferenceEventOption match { + case Some(dereferenceEvent) => Gen.frequency((90, newReferenceEvent), (10, dereferenceEvent)) + case None => Gen.const(newReferenceEvent) + } + + for { + generation <- nextGenerationsGen(previousEvents.head.generation) + contentHashForEvent <- Gen.oneOf(contentHashes) + + newReferenceEvent <- referenceGen(generation, contentHashForEvent) + dereferenceEvent <- dereferenceGen(previousEvents, generation) + + event <- pickEvent(newReferenceEvent, dereferenceEvent) + } yield event + } + + // Generates a list of Events with a ratio of hashes per event to enforce referencing the same hashes multiple times. + def eventsGen(maxNbEvents: Int, hashesPerEventsRatio: Float): Gen[Seq[Event]] = for { + hashes <- generateHashes(maxNbEvents, hashesPerEventsRatio) + // Generate iteratively events until the number of events is reached + events <- Gen.tailRecM(Seq[Event]())(previousEvents => { + previousEvents.size match { + case nbEvents if nbEvents >= maxNbEvents => Gen.const(Right(previousEvents)) + case _ => eventGen(previousEvents, hashes).map(event => Left(event +: previousEvents)) + } + }) + } yield events.reverse + + def generateHashes(maxNbEvents: Int, hashesPerEventsRatio: Float): Gen[Seq[String]] = { + val nbHashes = Math.ceil(maxNbEvents * hashesPerEventsRatio).intValue + for { + contentHashes <- Gen.listOfN(nbHashes, hashGenerator) + } yield contentHashes + } + + case class TestParameters(events: Seq[Event], generationsToCollect: Seq[Generation]) + case class OnePassGCTestParameters(events: Seq[Event], generationToCollect: Generation) + + def testParametersGen(eventsGen: Gen[Seq[Event]]): Gen[TestParameters] = for { + events <- eventsGen + allGenerations = Generation.range(Generation.first, Events.getLastGeneration(events)) + generationsToCollect <- Gen.someOf(allGenerations) + } yield TestParameters(events, generationsToCollect.sorted.toSeq) + + def onePassTestParametersGen(eventsGen: Gen[Seq[Event]]): Gen[OnePassGCTestParameters] = for { + events <- eventsGen + generation <- Gen.oneOf(if(events.isEmpty) Seq(Generation.first) else events.map(_.generation).toSet) + } yield OnePassGCTestParameters(events, generation) +} + +object GCPropertiesTest extends Properties("GC") { + val maxNbEvents = 100 + val hashesPerEventsRatio = 0.2f + + // Arbitrary machinery to effective shrinking + val arbEvents: Arbitrary[Seq[Event]] = Arbitrary(Gen.choose(0, maxNbEvents).flatMap(Generators.eventsGen(_, hashesPerEventsRatio))) + implicit val arbTestParameters: Arbitrary[Generators.TestParameters] = Arbitrary(Generators.testParametersGen(arbEvents.arbitrary)) + implicit val arbTestParameter: Arbitrary[Generators.OnePassGCTestParameters] = Arbitrary(Generators.onePassTestParametersGen(arbEvents.arbitrary)) + import org.scalacheck.Shrink._ + + override def overrideParameters(p: Parameters) = + p.withMinSuccessfulTests(1000) + + def createSaneTestParameters(events: Seq[Event], generations: Seq[Generation]): TestParameters = { + val allGenerations = Generation.range(Generation.first, Events.getLastGeneration(events)) + TestParameters(events, generations.filter(allGenerations.contains(_))) + } + def createSaneOnePassGCTestParameters(events: Seq[Event], generation: Generation): OnePassGCTestParameters = { + OnePassGCTestParameters(events, generation) + } + + implicit val shrinkTestParameters: Shrink[Generators.TestParameters] = Shrink { + params: Generators.TestParameters => + shrink(params.events).flatMap(events => shrink(params.generationsToCollect).map(generations => createSaneTestParameters(events, generations))) + } + + implicit val shrinkOnePassGCTestParameters: Shrink[Generators.OnePassGCTestParameters] = Shrink { + params: Generators.OnePassGCTestParameters => + shrink(params.events).map(events => createSaneOnePassGCTestParameters(events, params.generationToCollect)) + } + + property("2.1. GC should not delete data being referenced by a pending process or still referenced") = forAll { + testParameters: Generators.TestParameters => { + + val partitionedBlobsId = partitionBlobs(testParameters.events) + testParameters.generationsToCollect.foldLeft(true)((acc, e) => { + val plannedDeletions = GC.plan(Interpreter(testParameters.events).stabilize(), Iteration.initial, e).blobsToDelete.map(_._2) + acc && partitionedBlobsId.stillReferencedBlobIds.intersect(plannedDeletions).isEmpty + }) + } + } + + property("3.2. less than 10% of unreferenced data of a significant dataset should persist") = forAll { + testParameters: Generators.OnePassGCTestParameters => { + if (testParameters.generationToCollect >= Events.getLastGeneration(testParameters.events).previous(GC.temporization)) + true + else { + val plan = GC.plan(Interpreter(testParameters.events).stabilize(), Iteration.initial, testParameters.generationToCollect) + // An Event belongs to a collected Generation + val relevantEvents: Event => Boolean = event => event.generation <= testParameters.generationToCollect.previous(GC.temporization) + val plannedDeletions = plan.blobsToDelete.map(_._2) + + val partitionedBlobsId = partitionBlobs(testParameters.events.filter(relevantEvents)) + plannedDeletions.size >= partitionedBlobsId.notReferencedBlobIds.size * 0.9 + } + } + } + + /* + Implement an oracle that implements BlobStore with a Ref Count reference tracking + */ + def partitionBlobs(events: Seq[Event]): PartitionedEvents = { + val (referencingEvents, dereferencingEvents) = events.partition { + case _: Reference => true + case _: Dereference => false + } + + val referencedBlobsCount = referencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap + val dereferencedBlobsCount = dereferencingEvents.groupBy(_.blob).view.mapValues(_.size).toMap + + val stillReferencedBlobIds = referencedBlobsCount.foldLeft(Set[BlobId]())((acc, kv) => { + val (blobId, referencesCount) = kv + val dereferencesCount = dereferencedBlobsCount.getOrElse(blobId, 0) + + if(referencesCount > dereferencesCount) + acc + blobId + else + acc + }) + + lazy val notReferencedBlobIds = dereferencedBlobsCount.keySet -- stillReferencedBlobIds + PartitionedEvents(stillReferencedBlobIds, notReferencedBlobIds) + } + + case class PartitionedEvents(stillReferencedBlobIds: Set[BlobId], notReferencedBlobIds: Set[BlobId]) + +} + + diff --git a/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala new file mode 100644 index 0000000..119d0cf --- /dev/null +++ b/server/blob/blob-deduplicating/src/test/scala/org/apache/james/server/blob/deduplication/State.scala @@ -0,0 +1,46 @@ +/*************************************************************** + * Licensed to the Apache Software Foundation (ASF) under one * + * or more contributor license agreements. See the NOTICE file * + * distributed with this work for additional information * + * regarding copyright ownership. The ASF licenses this file * + * to you under the Apache License, Version 2.0 (the * + * "License"); you may not use this file except in compliance * + * with the License. You may obtain a copy of the License at * + * * + * http://www.apache.org/licenses/LICENSE-2.0 * + * * + * Unless required by applicable law or agreed to in writing, * + * software distributed under the License is distributed on an * + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * + * KIND, either express or implied. See the License for the * + * specific language governing permissions and limitations * + * under the License. * + ****************************************************************/ + +package org.apache.james.server.blob.deduplication + +/** + * Used to iteratively build a StabilizedState + */ +case class State(references: Map[Generation, Seq[Reference]], deletions: Map[Generation, Seq[Dereference]]) { + + def stabilize(): StabilizedState = StabilizedState(references, deletions) + + def apply(event: Event): State = event match { + case e: Reference => copy(references = addElement(references, e)) + case e: Dereference => copy(deletions = addElement(deletions, e)) + } + + private def addElement[T <: Event](collection: Map[Generation, Seq[T]], e: T): Map[Generation, Seq[T]] = { + collection.updatedWith(e.generation)(previous => Some(e +: previous.getOrElse(Seq()))) + } +} + +object State { + val initial: State = State(references = Map.empty, deletions = Map.empty) +} + +object Interpreter { + def apply(events: Seq[Event]): State = + events.foldLeft(State.initial)((state, event) => state(event)) +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
