vttranlina commented on a change in pull request #574:
URL: https://github.com/apache/james-project/pull/574#discussion_r686601915
##########
File path:
server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/upload/CassandraUploadRepository.java
##########
@@ -0,0 +1,68 @@
+package org.apache.james.jmap.cassandra.upload;
+
+import static org.apache.james.blob.api.BlobStore.StoragePolicy.LOW_COST;
+
+import java.io.InputStream;
+
+import org.apache.james.blob.api.BlobStore;
+import org.apache.james.blob.api.BucketName;
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.Upload;
+import org.apache.james.jmap.api.model.UploadId;
+import org.apache.james.jmap.api.model.UploadMetaData;
+import org.apache.james.jmap.api.model.UploadNotFoundException;
+import org.apache.james.jmap.api.upload.UploadRepository;
+import org.apache.james.mailbox.model.ContentType;
+import org.reactivestreams.Publisher;
+
+import com.datastax.driver.core.utils.UUIDs;
+import com.google.common.io.CountingInputStream;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class CassandraUploadRepository implements UploadRepository {
+ private final UploadDAO uploadDAO;
+ private final BlobStore blobStore;
+ private final BucketNameGenerator bucketNameGenerator;
+
+ public CassandraUploadRepository(UploadDAO uploadDAO, BlobStore blobStore,
BucketNameGenerator bucketNameGenerator) {
+ this.uploadDAO = uploadDAO;
+ this.blobStore = blobStore;
+ this.bucketNameGenerator = bucketNameGenerator;
+ }
+
+ @Override
+ public Publisher<UploadId> upload(InputStream data, ContentType
contentType, Username user) {
+ UploadId uploadId = generateId();
+ UploadBucketName uploadBucketName = bucketNameGenerator.current();
+ BucketName bucketName = uploadBucketName.asBucketName();
+
+ return Mono.fromCallable(() -> new CountingInputStream(data))
+ .flatMap(countingInputStream ->
Mono.from(blobStore.save(bucketName, countingInputStream, LOW_COST))
+ .map(blobId -> new UploadDAO.UploadRepresentation(uploadId,
bucketName, blobId, contentType, countingInputStream.getCount(), user))
+ .flatMap(upload ->
uploadDAO.save(upload).thenReturn(upload.getId())));
+ }
+
+ @Override
+ public Publisher<Upload> retrieve(UploadId id, Username user) {
+ return uploadDAO.retrieve(id)
+ .filter(upload -> upload.getUser().equals(user))
+ .map(upload -> Upload.from(
+ UploadMetaData.from(id, upload.getContentType(),
upload.getSize(), upload.getBlobId()),
+ () -> blobStore.read(upload.getBucketName(),
upload.getBlobId(), LOW_COST)))
+ .switchIfEmpty(Mono.error(() -> new UploadNotFoundException(id)));
+ }
+
+ public Mono<Void> purge() {
+ return Flux.from(blobStore.listBuckets())
+ .<UploadBucketName>handle((bucketName, sink) ->
UploadBucketName.ofBucket(bucketName).ifPresentOrElse(sink::next,
sink::complete))
+ .filter(bucketNameGenerator.evictionPredicate())
+ .concatMap(bucket -> blobStore.deleteBucket(bucket.asBucketName()))
Review comment:
When I run this method, I got the exception when `deleteBucket`.
(Start the second element of the collection. It throws
`java.util.ConcurrentModificationException`,
No exception with the head element).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]