This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch 3.9.x
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/3.9.x by this push:
new 867463f9d0 JAMES-4157 Implement Blob/copy
867463f9d0 is described below
commit 867463f9d0adedf08cefb712535e8761fa1fc9cf
Author: Trần Hồng Quân <[email protected]>
AuthorDate: Wed Jan 7 09:11:45 2026 +0700
JAMES-4157 Implement Blob/copy
(cherry picked from commit 2d46f2f4f6c2911a479decc5c064add0c75e8795)
---
.../james/jmap/rfc8621/RFC8621MethodsModule.java | 2 +
.../distributed/DistributedBlobCopyTest.java | 62 +++
.../jmap/rfc8621/contract/BlobCopyContract.scala | 617 +++++++++++++++++++++
.../jmap/rfc8621/memory/MemoryBlobCopyTest.java | 49 ++
.../rfc8621/postgres/PostgresBlobCopyTest.java | 61 ++
.../jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown | 4 +-
.../org/apache/james/jmap/core/BlobCopy.scala | 40 ++
.../org/apache/james/jmap/core/Invocation.scala | 4 +
.../james/jmap/json/BlobCopySerializer.scala | 41 ++
.../apache/james/jmap/method/BlobCopyMethod.scala | 148 +++++
.../apache/james/jmap/routes/UploadRoutes.scala | 9 +-
11 files changed, 1031 insertions(+), 6 deletions(-)
diff --git
a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
index 8141b9093b..66437c8c70 100644
---
a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
+++
b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
@@ -52,6 +52,7 @@ import org.apache.james.jmap.http.rfc8621.InjectionKeys;
import org.apache.james.jmap.mail.DefaultNamespaceFactory;
import org.apache.james.jmap.mail.NamespaceFactory;
import org.apache.james.jmap.mail.SortOrderProvider;
+import org.apache.james.jmap.method.BlobCopyMethod;
import org.apache.james.jmap.method.CoreEchoMethod;
import org.apache.james.jmap.method.DelegateGetMethod;
import org.apache.james.jmap.method.DelegateSetMethod;
@@ -141,6 +142,7 @@ public class RFC8621MethodsModule extends AbstractModule {
Multibinder<Method> methods = Multibinder.newSetBinder(binder(),
Method.class);
methods.addBinding().to(CoreEchoMethod.class);
+ methods.addBinding().to(BlobCopyMethod.class);
methods.addBinding().to(EmailChangesMethod.class);
methods.addBinding().to(EmailImportMethod.class);
methods.addBinding().to(EmailGetMethod.class);
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedBlobCopyTest.java
b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedBlobCopyTest.java
new file mode 100644
index 0000000000..1f5468a9d3
--- /dev/null
+++
b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedBlobCopyTest.java
@@ -0,0 +1,62 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.distributed;
+
+import org.apache.james.CassandraExtension;
+import org.apache.james.CassandraRabbitMQJamesConfiguration;
+import org.apache.james.CassandraRabbitMQJamesServerMain;
+import org.apache.james.DockerOpenSearchExtension;
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.SearchConfiguration;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract$;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
+import org.apache.james.modules.AwsS3BlobStoreExtension;
+import org.apache.james.modules.RabbitMQExtension;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.blobstore.BlobStoreConfiguration;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+
+public class DistributedBlobCopyTest implements BlobCopyContract {
+ @RegisterExtension
+ static JamesServerExtension testExtension = new
JamesServerBuilder<CassandraRabbitMQJamesConfiguration>(tmpDir ->
+ CassandraRabbitMQJamesConfiguration.builder()
+ .workingDirectory(tmpDir)
+ .configurationFromClasspath()
+ .enableJMAP()
+ .blobStore(BlobStoreConfiguration.builder()
+ .s3()
+ .disableCache()
+ .deduplication()
+ .noCryptoConfig())
+ .searchConfiguration(SearchConfiguration.openSearch())
+ .build())
+ .extension(new DockerOpenSearchExtension())
+ .extension(new CassandraExtension())
+ .extension(new RabbitMQExtension())
+ .extension(new AwsS3BlobStoreExtension())
+ .server(configuration ->
CassandraRabbitMQJamesServerMain.createServer(configuration)
+ .overrideWith(new
TestJMAPServerModule(ImmutableMap.of("upload.quota.limit",
BlobCopyContract$.MODULE$.TWENTY_KILO_BYTES_UPLOAD_QUOTA_LIMIT())),
+ new DelegationProbeModule()))
+ .build();
+}
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/BlobCopyContract.scala
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/BlobCopyContract.scala
new file mode 100644
index 0000000000..03abf2c1d7
--- /dev/null
+++
b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/BlobCopyContract.scala
@@ -0,0 +1,617 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.contract
+
+import java.io.ByteArrayInputStream
+import java.nio.charset.StandardCharsets
+
+import com.google.common.base.Strings
+import io.netty.handler.codec.http.HttpHeaderNames.ACCEPT
+import io.restassured.RestAssured.{`given`, requestSpecification}
+import io.restassured.http.ContentType
+import io.restassured.path.json.JsonPath
+import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
+import org.apache.http.HttpStatus.{SC_CREATED, SC_OK}
+import org.apache.james.GuiceJamesServer
+import org.apache.james.jmap.core.AccountId
+import org.apache.james.jmap.http.UserCredential
+import
org.apache.james.jmap.rfc8621.contract.BlobCopyContract.{ALICE_ACCOUNT_ID,
TEN_KILO_BYTES}
+import
org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER,
ALICE, ALICE_PASSWORD, ANDRE, ANDRE_ACCOUNT_ID, ANDRE_PASSWORD, BOB,
BOB_PASSWORD, DOMAIN, _2_DOT_DOMAIN, authScheme, baseRequestSpecBuilder,
ACCOUNT_ID => BOB_ACCOUNT_ID}
+import org.apache.james.junit.categories.BasicFeature
+import org.apache.james.utils.DataProbeImpl
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.experimental.categories.Category
+import org.junit.jupiter.api.{BeforeEach, Test}
+
+object BlobCopyContract {
+ val TWENTY_KILO_BYTES_UPLOAD_QUOTA_LIMIT: String = "20K"
+ val TEN_KILO_BYTES: Array[Byte] = Strings.repeat("0123456789\r\n",
853).getBytes(StandardCharsets.UTF_8)
+ val ALICE_ACCOUNT_ID: String = AccountId.from(ALICE).toOption.get.id.value
+}
+
+trait BlobCopyContract {
+ @BeforeEach
+ def setUp(server: GuiceJamesServer): Unit = {
+ server.getProbe(classOf[DataProbeImpl])
+ .fluent
+ .addDomain(DOMAIN.asString)
+ .addDomain(_2_DOT_DOMAIN.asString())
+ .addUser(BOB.asString, BOB_PASSWORD)
+ .addUser(ALICE.asString(), ALICE_PASSWORD)
+ .addUser(ANDRE.asString(), ANDRE_PASSWORD)
+
+ requestSpecification = baseRequestSpecBuilder(server)
+ .setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD)))
+ .build
+ }
+
+ @Category(Array(classOf[BasicFeature]))
+ @Test
+ def shouldCopyBlobBetweenAccountsWhenDelegated(server: GuiceJamesServer):
Unit = {
+ // Alice delegates Bob to access her account
+ server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+ // Bob uploads a blob to his account
+ val bobBlobId: String = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(TEN_KILO_BYTES)
+ .when
+ .post(s"/upload/$BOB_ACCOUNT_ID")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .jsonPath()
+ .getString("blobId")
+
+ // Bob copies the blob from his account to Alice's account
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "$BOB_ACCOUNT_ID",
+ | "accountId": "$ALICE_ACCOUNT_ID",
+ | "blobIds": [ "$bobBlobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val copiedBlobId: String = `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .jsonPath()
+ .getString(s"methodResponses[0][1].copied.$bobBlobId")
+
+ // Alice downloads the copied blob from her account
+ val downloadResponse: Array[Byte] = `given`
+ .auth().basic(ALICE.asString(), ALICE_PASSWORD)
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .when
+ .get(s"/download/$ALICE_ACCOUNT_ID/$copiedBlobId")
+ .`then`
+ .statusCode(SC_OK)
+ .extract
+ .body
+ .asByteArray()
+
+ assertThat(new ByteArrayInputStream(downloadResponse))
+ .hasBinaryContent(TEN_KILO_BYTES)
+ }
+
+ @Test
+ def shouldCopyBlobFromAliceToBobWhenBobDelegated(server: GuiceJamesServer):
Unit = {
+ // Alice delegates Bob to access her account
+ server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+ // Alice uploads a blob to her account
+ val aliceBlobId: String = `given`
+ .auth().preemptive().basic(ALICE.asString(), ALICE_PASSWORD)
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(TEN_KILO_BYTES)
+ .when
+ .post(s"/upload/$ALICE_ACCOUNT_ID")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .jsonPath()
+ .getString("blobId")
+
+ // Bob copies the blob from Alice's account to his account
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "$ALICE_ACCOUNT_ID",
+ | "accountId": "$BOB_ACCOUNT_ID",
+ | "blobIds": [ "$aliceBlobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val copiedBlobId: String = `given`
+ .auth().preemptive().basic(BOB.asString(), BOB_PASSWORD)
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .jsonPath()
+ .getString(s"methodResponses[0][1].copied.$aliceBlobId")
+
+ // Bob downloads the copied blob from his account
+ val downloadResponse: Array[Byte] = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .when
+ .get(s"/download/$BOB_ACCOUNT_ID/$copiedBlobId")
+ .`then`
+ .statusCode(SC_OK)
+ .extract
+ .body
+ .asByteArray()
+
+ assertThat(new ByteArrayInputStream(downloadResponse))
+ .hasBinaryContent(TEN_KILO_BYTES)
+ }
+
+ @Test
+ def transitiveDelegationShouldNotWorkForTargetAccountId(server:
GuiceJamesServer): Unit = {
+ // Andre delegates to Alice; Alice delegates to Bob
+ server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ANDRE, ALICE)
+ server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+ // Bob uploads a blob to his account
+ val bobBlobId: String = `given`
+ .auth().preemptive().basic(BOB.asString(), BOB_PASSWORD)
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(TEN_KILO_BYTES)
+ .when
+ .post(s"/upload/$ALICE_ACCOUNT_ID")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .jsonPath()
+ .getString("blobId")
+
+ // Bob tries to copy from his account to Andre (transitive delegation
should not work)
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "$BOB_ACCOUNT_ID",
+ | "accountId": "$ANDRE_ACCOUNT_ID",
+ | "blobIds": [ "$bobBlobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val response: String = `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .body
+ .asString
+
+ assertThatJson(response)
+ .inPath("methodResponses")
+ .isEqualTo(
+ """[["error",{"type":"accountNotFound"},"c1"]]""")
+ }
+
+ @Test
+ def transitiveDelegationShouldNotWorkForFromAccountId(server:
GuiceJamesServer): Unit = {
+ // Andre delegates to Alice; Alice delegates to Bob
+ server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ANDRE, ALICE)
+ server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+ // Andre uploads a blob to his account
+ val andreBlobId: String = `given`
+ .auth().preemptive().basic(ANDRE.asString(), ANDRE_PASSWORD)
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(TEN_KILO_BYTES)
+ .when
+ .post(s"/upload/$ANDRE_ACCOUNT_ID")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .jsonPath()
+ .getString("blobId")
+
+ // Bob tries to copy blob from Andre to his account (transitive delegation
should fail)
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "$ANDRE_ACCOUNT_ID",
+ | "accountId": "$BOB_ACCOUNT_ID",
+ | "blobIds": [ "$andreBlobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val response: String = `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .body
+ .asString
+
+ assertThatJson(response)
+ .inPath("methodResponses")
+ .isEqualTo(
+ """[["error",{"type":"fromAccountNotFound"},"c1"]]""")
+ }
+
+ @Test
+ def copyBlobShouldSucceedWhenUploadQuotaExceeded(): Unit = {
+ // Given upload quota is 20KB, upload 2 x 10KB blobs to reach the upload
quota
+ `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(TEN_KILO_BYTES)
+ .when
+ .post(s"/upload/$BOB_ACCOUNT_ID")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .jsonPath()
+ .getString("blobId")
+
+ val secondBlobId: String = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(TEN_KILO_BYTES)
+ .when
+ .post(s"/upload/$BOB_ACCOUNT_ID")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .jsonPath()
+ .getString("blobId")
+
+ // Upload quota should be reached, now copy 10KB blob to the same account
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "$BOB_ACCOUNT_ID",
+ | "accountId": "$BOB_ACCOUNT_ID",
+ | "blobIds": [ "$secondBlobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val copiedBlobId: String = `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .jsonPath()
+ .getString(s"methodResponses[0][1].copied.$secondBlobId")
+
+ // Download the copied blob should succeed
+ val downloadResponse: Array[Byte] = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .when
+ .get(s"/download/$BOB_ACCOUNT_ID/$copiedBlobId")
+ .`then`
+ .statusCode(SC_OK)
+ .extract
+ .body
+ .asByteArray()
+ assertThat(new ByteArrayInputStream(downloadResponse))
+ .hasBinaryContent(TEN_KILO_BYTES)
+ }
+
+ @Test
+ def shouldReturnNotFoundForNonExistingBlob(): Unit = {
+ val notFoundBlobId: String = "notFoundBlobId"
+
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "$BOB_ACCOUNT_ID",
+ | "accountId": "$BOB_ACCOUNT_ID",
+ | "blobIds": [ "$notFoundBlobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val response: String = `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .body
+ .asString
+
+ assertThatJson(response)
+ .inPath("methodResponses")
+ .isEqualTo(
+ s"""[[
+ | "Blob/copy",
+ | {
+ | "fromAccountId":"$BOB_ACCOUNT_ID",
+ | "accountId":"$BOB_ACCOUNT_ID",
+ | "notCopied":{
+ | "$notFoundBlobId":{
+ | "type":"notFound",
+ | "description":"Blob BlobId($notFoundBlobId) could not be
found"
+ | }
+ | }
+ | },
+ | "c1"
+ |]]""".stripMargin)
+ }
+
+ @Test
+ def shouldReturnCopiedAndNotCopiedWhenMixingExistingAndMissingBlobs(): Unit
= {
+ val existingBlobId: String = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(TEN_KILO_BYTES)
+ .when
+ .post(s"/upload/$BOB_ACCOUNT_ID")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .jsonPath()
+ .getString("blobId")
+
+ val notFoundBlobId: String = "notFoundBlobId"
+
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "$BOB_ACCOUNT_ID",
+ | "accountId": "$BOB_ACCOUNT_ID",
+ | "blobIds": [ "$existingBlobId", "$notFoundBlobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val response: String = `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .body
+ .asString
+
+ val copiedBlobId: String =
JsonPath.from(response).getString(s"methodResponses[0][1].copied.$existingBlobId")
+
+ assertThatJson(response)
+ .inPath("methodResponses")
+ .isEqualTo(
+ s"""[[
+ | "Blob/copy",
+ | {
+ | "fromAccountId":"$BOB_ACCOUNT_ID",
+ | "accountId":"$BOB_ACCOUNT_ID",
+ | "copied":{
+ | "$existingBlobId":"$copiedBlobId"
+ | },
+ | "notCopied":{
+ | "$notFoundBlobId":{
+ | "type":"notFound",
+ | "description":"Blob BlobId($notFoundBlobId) could not be
found"
+ | }
+ | }
+ | },
+ | "c1"
+ |]]""".stripMargin)
+ }
+
+ @Test
+ def shouldReturnFromAccountNotFoundWhenFromAccountInvalid(): Unit = {
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "unknownFromAccountId",
+ | "accountId": "$BOB_ACCOUNT_ID",
+ | "blobIds": [ "blobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val response: String = `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .body
+ .asString
+
+ assertThatJson(response)
+ .inPath("methodResponses")
+ .isEqualTo(
+ """[["error",{"type":"fromAccountNotFound"},"c1"]]""")
+ }
+
+ @Category(Array(classOf[BasicFeature]))
+ @Test
+ def shouldFailWhenTargetAccountNotDelegated(): Unit = {
+ // Alice does NOT delegate her account to Bob
+
+ // upload a blob to Bob's account
+ val bobBlobId: String = `given`
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(TEN_KILO_BYTES)
+ .when
+ .post(s"/upload/$BOB_ACCOUNT_ID")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .jsonPath()
+ .getString("blobId")
+
+ // Bob tries to copy the blob from his account to Alice's account
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "$BOB_ACCOUNT_ID",
+ | "accountId": "$ALICE_ACCOUNT_ID",
+ | "blobIds": [ "$bobBlobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val response: String = `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .body
+ .asString
+
+ assertThatJson(response)
+ .inPath("methodResponses")
+ .isEqualTo(
+ """[["error",{"type":"accountNotFound"},"c1"]]""")
+ }
+
+ @Test
+ def shouldFailWhenAliceCopiesToBobWithoutDelegation(server:
GuiceJamesServer): Unit = {
+ // Alice delegates Bob to access her account
+ server.getProbe(classOf[DataProbeImpl]).addAuthorizedUser(ALICE, BOB)
+
+ // Alice uploads a blob to her account
+ val aliceBlobId: String = `given`
+ .auth().preemptive().basic(ALICE.asString(), ALICE_PASSWORD)
+ .basePath("")
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .contentType(ContentType.BINARY)
+ .body(TEN_KILO_BYTES)
+ .when
+ .post(s"/upload/$ALICE_ACCOUNT_ID")
+ .`then`
+ .statusCode(SC_CREATED)
+ .extract
+ .jsonPath()
+ .getString("blobId")
+
+ // Alice tries to copy the blob to Bob's account (no delegation from Bob
to Alice)
+ val request: String =
+ s"""{
+ | "using": [ "urn:ietf:params:jmap:core" ],
+ | "methodCalls": [[
+ | "Blob/copy",
+ | {
+ | "fromAccountId": "$ALICE_ACCOUNT_ID",
+ | "accountId": "$BOB_ACCOUNT_ID",
+ | "blobIds": [ "$aliceBlobId" ]
+ | },
+ | "c1"]]
+ |}""".stripMargin
+
+ val response: String = `given`
+ .auth().preemptive().basic(ALICE.asString(), ALICE_PASSWORD)
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .contentType(ContentType.JSON)
+ .extract
+ .body
+ .asString
+
+ assertThatJson(response)
+ .inPath("methodResponses")
+ .isEqualTo(
+ """[["error",{"type":"accountNotFound"},"c1"]]""")
+ }
+}
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryBlobCopyTest.java
b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryBlobCopyTest.java
new file mode 100644
index 0000000000..8f585fe9f8
--- /dev/null
+++
b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryBlobCopyTest.java
@@ -0,0 +1,49 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.memory;
+
+import static
org.apache.james.data.UsersRepositoryModuleChooser.Implementation.DEFAULT;
+
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.MemoryJamesConfiguration;
+import org.apache.james.MemoryJamesServerMain;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract$;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+
+public class MemoryBlobCopyTest implements BlobCopyContract {
+ @RegisterExtension
+ static JamesServerExtension testExtension = new
JamesServerBuilder<MemoryJamesConfiguration>(tmpDir ->
+ MemoryJamesConfiguration.builder()
+ .workingDirectory(tmpDir)
+ .configurationFromClasspath()
+ .usersRepository(DEFAULT)
+ .enableJMAP()
+ .build())
+ .server(configuration ->
MemoryJamesServerMain.createServer(configuration)
+ .overrideWith(new
TestJMAPServerModule(ImmutableMap.of("upload.quota.limit",
BlobCopyContract$.MODULE$.TWENTY_KILO_BYTES_UPLOAD_QUOTA_LIMIT())),
+ new DelegationProbeModule()))
+ .build();
+}
diff --git
a/server/protocols/jmap-rfc-8621-integration-tests/postgres-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/postgres/PostgresBlobCopyTest.java
b/server/protocols/jmap-rfc-8621-integration-tests/postgres-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/postgres/PostgresBlobCopyTest.java
new file mode 100644
index 0000000000..8795e0593b
--- /dev/null
+++
b/server/protocols/jmap-rfc-8621-integration-tests/postgres-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/postgres/PostgresBlobCopyTest.java
@@ -0,0 +1,61 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.postgres;
+
+import static
org.apache.james.data.UsersRepositoryModuleChooser.Implementation.DEFAULT;
+
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.PostgresJamesConfiguration;
+import org.apache.james.PostgresJamesServerMain;
+import org.apache.james.SearchConfiguration;
+import org.apache.james.backends.postgres.PostgresExtension;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract;
+import org.apache.james.jmap.rfc8621.contract.BlobCopyContract$;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
+import org.apache.james.modules.RabbitMQExtension;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.apache.james.modules.blobstore.BlobStoreConfiguration;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+
+public class PostgresBlobCopyTest implements BlobCopyContract {
+ @RegisterExtension
+ static JamesServerExtension testExtension = new
JamesServerBuilder<PostgresJamesConfiguration>(tmpDir ->
+ PostgresJamesConfiguration.builder()
+ .workingDirectory(tmpDir)
+ .configurationFromClasspath()
+ .searchConfiguration(SearchConfiguration.scanning())
+ .usersRepository(DEFAULT)
+ .eventBusImpl(PostgresJamesConfiguration.EventBusImpl.RABBITMQ)
+ .blobStore(BlobStoreConfiguration.builder()
+ .postgres()
+ .disableCache()
+ .deduplication()
+ .noCryptoConfig())
+ .build())
+ .extension(PostgresExtension.empty())
+ .extension(new RabbitMQExtension())
+ .server(configuration ->
PostgresJamesServerMain.createServer(configuration)
+ .overrideWith(new
TestJMAPServerModule(ImmutableMap.of("upload.quota.limit",
BlobCopyContract$.MODULE$.TWENTY_KILO_BYTES_UPLOAD_QUOTA_LIMIT())))
+ .overrideWith(new DelegationProbeModule()))
+ .build();
+}
diff --git a/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown
b/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown
index a93efd8f84..7b8ccdf017 100644
--- a/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown
+++ b/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/binary.mdown
@@ -67,8 +67,8 @@ When an HTTP error response is returned to the client, the
server SHOULD return
## Blob/copy
-> :warning:
-> Not implemented.
+> :information_source:
+> Implemented
Binary data may be copied **between** two different accounts using the
*Blob/copy* method rather than having to download and then reupload on the
client.
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/BlobCopy.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/BlobCopy.scala
new file mode 100644
index 0000000000..1be3fa46fc
--- /dev/null
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/BlobCopy.scala
@@ -0,0 +1,40 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.core
+
+import org.apache.james.jmap.mail.{BlobId, BlobIds, RequestTooLargeException}
+import org.apache.james.jmap.method.{ValidableRequest, WithAccountId}
+
+case class BlobCopyRequest(fromAccountId: AccountId,
+ accountId: AccountId,
+ blobIds: BlobIds) extends WithAccountId with
ValidableRequest {
+ override def validate(configuration: JmapRfc8621Configuration):
Either[Exception, BlobCopyRequest] =
+ if (blobIds.value.size > configuration.maxObjectsInSet.value.value) {
+ Left(RequestTooLargeException(s"""Too many items in a Blob/copy request.
+ Got ${blobIds.value.size} items instead of maximum
${configuration.maxObjectsInSet.value.value}."""))
+ } else {
+ scala.Right(this)
+ }
+}
+
+case class BlobCopyResponse(fromAccountId: AccountId,
+ accountId: AccountId,
+ copied: Option[Map[BlobId, BlobId]],
+ notCopied: Option[Map[BlobId, SetError]])
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/Invocation.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/Invocation.scala
index 8372fba09b..1d06a42f60 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/Invocation.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/Invocation.scala
@@ -70,6 +70,10 @@ object ErrorCode {
override def code: String = "accountNotFound"
}
+ case object FromAccountNotFound extends ErrorCode {
+ override def code: String = "fromAccountNotFound"
+ }
+
case object Forbidden extends ErrorCode {
override def code: String = "forbidden"
}
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/BlobCopySerializer.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/BlobCopySerializer.scala
new file mode 100644
index 0000000000..bc495956fe
--- /dev/null
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/BlobCopySerializer.scala
@@ -0,0 +1,41 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.json
+
+import org.apache.james.jmap.core.{BlobCopyRequest, BlobCopyResponse, SetError}
+import org.apache.james.jmap.mail.{BlobId, BlobIds}
+import play.api.libs.json.{JsObject, JsResult, Json, OWrites, Reads, Writes}
+
+class BlobCopySerializer {
+ private implicit val blobIdWrites: Writes[BlobId] = Json.valueWrites[BlobId]
+ private implicit val blobIdsReads: Reads[BlobIds] = Json.valueReads[BlobIds]
+
+ private implicit val copiedWrites: Writes[Map[BlobId, BlobId]] =
+ mapWrites[BlobId, BlobId](_.value.value, blobIdWrites)
+ private implicit val notCopiedWrites: Writes[Map[BlobId, SetError]] =
+ mapWrites[BlobId, SetError](_.value.value, setErrorWrites)
+
+ private implicit val blobCopyRequestReads: Reads[BlobCopyRequest] =
Json.reads[BlobCopyRequest]
+ private implicit val blobCopyResponseWrites: OWrites[BlobCopyResponse] =
Json.writes[BlobCopyResponse]
+
+ def deserializeBlobCopyRequest(input: JsObject): JsResult[BlobCopyRequest] =
Json.fromJson[BlobCopyRequest](input)
+
+ def serializeBlobCopyResponse(response: BlobCopyResponse): JsObject =
Json.toJsObject(response)
+}
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/BlobCopyMethod.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/BlobCopyMethod.scala
new file mode 100644
index 0000000000..b5bf6ed4f6
--- /dev/null
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/method/BlobCopyMethod.scala
@@ -0,0 +1,148 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.method
+
+import eu.timepit.refined.auto._
+import jakarta.inject.Inject
+import org.apache.james.core.Username
+import org.apache.james.jmap.api.upload.UploadService
+import org.apache.james.jmap.core.CapabilityIdentifier.{CapabilityIdentifier,
JMAP_CORE}
+import org.apache.james.jmap.core.Invocation.{Arguments, MethodName}
+import org.apache.james.jmap.core.SetError.SetErrorDescription
+import org.apache.james.jmap.core.{BlobCopyRequest, BlobCopyResponse,
ErrorCode, Invocation, JmapRfc8621Configuration, SessionTranslator, SetError}
+import org.apache.james.jmap.json.BlobCopySerializer
+import org.apache.james.jmap.mail.BlobId
+import org.apache.james.jmap.mail.MDNParse.UnparsedBlobId
+import org.apache.james.jmap.routes.UploadRoutes.asBlobId
+import org.apache.james.jmap.routes.{Blob, BlobNotFoundException,
BlobResolvers, SessionSupplier}
+import org.apache.james.mailbox.{MailboxSession, SessionProvider}
+import org.apache.james.metrics.api.MetricFactory
+import org.apache.james.util.ReactorUtils
+import org.reactivestreams.Publisher
+import org.slf4j.{Logger, LoggerFactory}
+import reactor.core.publisher.Mono
+import reactor.core.scala.publisher.{SFlux, SMono}
+
+sealed trait CopyResult {
+ def sourceBlobId: BlobId
+}
+case class Copied(sourceBlobId: BlobId, copied: BlobId) extends CopyResult
+case class NotCopied(sourceBlobId: BlobId, error: SetError) extends CopyResult
+case class CopyResults(results: Seq[CopyResult]) {
+ def copied: Option[Map[BlobId, BlobId]] =
+ Option(results.collect { case Copied(source, copied) => source -> copied
}.toMap).filter(_.nonEmpty)
+
+ def notCopied: Option[Map[BlobId, SetError]] =
+ Option(results.collect { case NotCopied(source, error) => source -> error
}.toMap).filter(_.nonEmpty)
+}
+
+case class FromAccountNotFoundException() extends RuntimeException
+
+class BlobCopyMethod @Inject()(val metricFactory: MetricFactory,
+ val sessionSupplier: SessionSupplier,
+ val sessionTranslator: SessionTranslator,
+ val sessionProvider: SessionProvider,
+ val blobResolvers: BlobResolvers,
+ val uploadService: UploadService,
+ val serializer: BlobCopySerializer,
+ val configuration: JmapRfc8621Configuration)
extends MethodRequiringAccountId[BlobCopyRequest] {
+ private val LOGGER: Logger = LoggerFactory.getLogger(classOf[BlobCopyMethod])
+
+ override val methodName: MethodName = MethodName("Blob/copy")
+ override val requiredCapabilities: Set[CapabilityIdentifier] = Set(JMAP_CORE)
+
+ override def doProcess(capabilities: Set[CapabilityIdentifier], invocation:
InvocationWithContext, mailboxSession: MailboxSession, request:
BlobCopyRequest): Publisher[InvocationWithContext] =
+ resolveSourceSession(request, mailboxSession)
+ .flatMap(sourceSession => copyBlobs(request, sourceSession,
targetSession = mailboxSession))
+ .map(response => asInvocation(response, invocation))
+ .onErrorResume {
+ case _: FromAccountNotFoundException =>
+ SMono.just(asErrorInvocation(ErrorCode.FromAccountNotFound,
invocation))
+ case e =>
+ LOGGER.error("Failed to copy blob", e)
+ SMono.just(asErrorInvocation(ErrorCode.ServerFail, e.getMessage,
invocation))
+ }
+
+ private def resolveSourceSession(request: BlobCopyRequest, mailboxSession:
MailboxSession): SMono[MailboxSession] =
+ if (request.fromAccountId.equals(request.accountId)) {
+ SMono.just(mailboxSession)
+ } else {
+ SMono(Mono.justOrEmpty(mailboxSession.getLoggedInUser))
+ .flatMap(createLoggedInUserSession)
+ .flatMap(session => sessionTranslator.delegateIfNeeded(session,
request.fromAccountId)
+ .onErrorResume { case _: AccountNotFoundException =>
SMono.error(FromAccountNotFoundException())})
+ .switchIfEmpty(SMono.error(FromAccountNotFoundException()))
+ }
+
+ private def createLoggedInUserSession(loggedInUser: Username):
SMono[MailboxSession] =
+ SMono.fromCallable(() =>
sessionProvider.authenticate(loggedInUser).withoutDelegation())
+ .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+
+ private def asInvocation(response: BlobCopyResponse, invocation:
InvocationWithContext): InvocationWithContext =
+ InvocationWithContext(
+ Invocation(
+ methodName = methodName,
+ arguments = Arguments(serializer.serializeBlobCopyResponse(response)),
+ methodCallId = invocation.invocation.methodCallId),
+ invocation.processingContext)
+
+ private def asErrorInvocation(errorCode: ErrorCode, invocation:
InvocationWithContext): InvocationWithContext =
+ InvocationWithContext(
+ Invocation.error(errorCode, invocation.invocation.methodCallId),
+ invocation.processingContext)
+
+ private def asErrorInvocation(errorCode: ErrorCode, description: String =
"", invocation: InvocationWithContext): InvocationWithContext =
+ InvocationWithContext(
+ Invocation.error(errorCode, description,
invocation.invocation.methodCallId),
+ invocation.processingContext)
+
+ override def getRequest(mailboxSession: MailboxSession, invocation:
Invocation): Either[Exception, BlobCopyRequest] =
+
serializer.deserializeBlobCopyRequest(invocation.arguments.value).asEitherRequest
+ .flatMap(request => request.validate(configuration).map(_ => request))
+
+ private def copyBlobs(request: BlobCopyRequest, sourceSession:
MailboxSession, targetSession: MailboxSession): SMono[BlobCopyResponse] =
+ SFlux.fromIterable(request.blobIds.value)
+ .flatMap(blobId => copyBlob(blobId, sourceSession, targetSession),
ReactorUtils.DEFAULT_CONCURRENCY)
+ .collectSeq()
+ .map(CopyResults)
+ .map(results => BlobCopyResponse(request.fromAccountId,
request.accountId, results.copied, results.notCopied))
+
+ private def copyBlob(unparsedBlobId: UnparsedBlobId, sourceSession:
MailboxSession, targetSession: MailboxSession): SMono[CopyResult] =
+ SMono.fromTry(BlobId.of(unparsedBlobId))
+ .flatMap { sourceBlobId =>
+ blobResolvers.resolve(sourceBlobId, sourceSession)
+ .flatMap(blob => uploadBlob(sourceBlobId, blob, targetSession))
+ .onErrorResume(e => SMono.just(NotCopied(sourceBlobId,
asSetError(sourceBlobId, sourceSession, targetSession, e))))
+ }
+
+ private def uploadBlob(sourceBlobId: BlobId, blob: Blob, targetSession:
MailboxSession): SMono[CopyResult] =
+ SMono.fromPublisher(uploadService.upload(blob.content, blob.contentType,
targetSession.getUser))
+ .map(upload => Copied(sourceBlobId, asBlobId(upload.uploadId)))
+
+ private def asSetError(sourceId: BlobId, sourceSession: MailboxSession,
targetSession: MailboxSession, throwable: Throwable): SetError =
+ throwable match {
+ case e: BlobNotFoundException =>
+ LOGGER.info(s"Could not copy blob as ${e.blobId} is not found")
+ SetError.notFound(SetErrorDescription(s"Blob ${e.blobId} could not be
found"))
+ case _ =>
+ LOGGER.error(s"Failed to copy blob $sourceId from account
${sourceSession.getUser.asString()} to account
${targetSession.getUser.asString()}", throwable)
+ SetError.serverFail(SetErrorDescription(throwable.getMessage))
+ }
+}
diff --git
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
index 45f18fa67c..1d3a458351 100644
---
a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
+++
b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/UploadRoutes.scala
@@ -26,8 +26,8 @@ import java.util.stream
import java.util.stream.Stream
import io.netty.handler.codec.http.HttpHeaderNames.{CONTENT_LENGTH,
CONTENT_TYPE}
+import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, CREATED,
FORBIDDEN, INTERNAL_SERVER_ERROR, UNAUTHORIZED}
-import io.netty.handler.codec.http.{HttpMethod, HttpResponseStatus}
import jakarta.inject.{Inject, Named}
import org.apache.commons.fileupload.util.LimitedInputStream
import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
@@ -42,7 +42,7 @@ import org.apache.james.jmap.http.rfc8621.InjectionKeys
import org.apache.james.jmap.json.{ResponseSerializer, UploadSerializer}
import org.apache.james.jmap.mail.BlobId
import org.apache.james.jmap.method.AccountNotFoundException
-import org.apache.james.jmap.routes.UploadRoutes.LOGGER
+import org.apache.james.jmap.routes.UploadRoutes.{LOGGER, asBlobId}
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
import org.apache.james.mailbox.MailboxSession
import org.apache.james.mailbox.model.ContentType
@@ -57,6 +57,9 @@ case class TooBigUploadException() extends RuntimeException
object UploadRoutes {
val LOGGER: Logger = LoggerFactory.getLogger(classOf[UploadRoutes])
+
+ def asBlobId(uploadId: UploadId): BlobId =
+ BlobId.of(s"uploads-${uploadId.asString()}").get
}
case class UploadResponse(accountId: AccountId,
@@ -160,8 +163,6 @@ class UploadRoutes @Inject()(@Named(InjectionKeys.RFC_8621)
val authenticator: A
size = uploadMetaData.size,
accountId = accountId)
- private def asBlobId(uploadId: UploadId): BlobId =
BlobId.of(s"uploads-${uploadId.asString()}" ).get
-
private def respondDetails(httpServerResponse: HttpServerResponse, details:
ProblemDetails): SMono[Void] =
SMono.fromCallable(() => ResponseSerializer.serialize(details).toString)
.map(_.getBytes(StandardCharsets.UTF_8))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]