This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch postgresql
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 392ece628f3201db667c81fd7163242e433b3576
Author: Quan Tran <[email protected]>
AuthorDate: Fri Mar 29 13:05:58 2024 +0700

    JAMES-2586 Implement PostgresTaskExecutionDetailsProjection
---
 .../james/backends/postgres/PostgresCommons.java   |   4 +
 server/task/task-postgres/pom.xml                  |   6 +
 .../PostgresTaskExecutionDetailsProjection.scala   |  54 ++++++
 ...PostgresTaskExecutionDetailsProjectionDAO.scala | 112 ++++++++++++
 ...tgresTaskExecutionDetailsProjectionModule.scala |  72 ++++++++
 ...tgresTaskExecutionDetailsProjectionDAOTest.java | 202 +++++++++++++++++++++
 ...PostgresTaskExecutionDetailsProjectionTest.java |  52 ++++++
 7 files changed, 502 insertions(+)

diff --git 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java
 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java
index 5557b591b9..88201ac066 100644
--- 
a/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java
+++ 
b/backends-common/postgres/src/main/java/org/apache/james/backends/postgres/PostgresCommons.java
@@ -64,6 +64,10 @@ public class PostgresCommons {
         .map(value -> LocalDateTime.ofInstant(value.toInstant(), 
ZoneOffset.UTC))
         .orElse(null);
 
+    public static final Function<ZonedDateTime, LocalDateTime> 
ZONED_DATE_TIME_TO_LOCAL_DATE_TIME = date -> Optional.ofNullable(date)
+        .map(value -> 
value.withZoneSameInstant(ZoneOffset.UTC).toLocalDateTime())
+        .orElse(null);
+
     public static final Function<Instant, LocalDateTime> 
INSTANT_TO_LOCAL_DATE_TIME = instant -> Optional.ofNullable(instant)
         .map(value -> LocalDateTime.ofInstant(instant, ZoneOffset.UTC))
         .orElse(null);
diff --git a/server/task/task-postgres/pom.xml 
b/server/task/task-postgres/pom.xml
index 3cf839f848..35160283f7 100644
--- a/server/task/task-postgres/pom.xml
+++ b/server/task/task-postgres/pom.xml
@@ -33,6 +33,12 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-guice-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-lifecycle-api</artifactId>
diff --git 
a/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjection.scala
 
b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjection.scala
new file mode 100644
index 0000000000..999ea770d4
--- /dev/null
+++ 
b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjection.scala
@@ -0,0 +1,54 @@
+ /***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task.eventsourcing.postgres
+
+import java.time.Instant
+
+import javax.inject.Inject
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection
+import org.apache.james.task.{TaskExecutionDetails, TaskId}
+import org.reactivestreams.Publisher
+
+import scala.compat.java8.OptionConverters._
+import scala.jdk.CollectionConverters._
+
+class PostgresTaskExecutionDetailsProjection 
@Inject()(taskExecutionDetailsProjectionDAO: 
PostgresTaskExecutionDetailsProjectionDAO)
+  extends TaskExecutionDetailsProjection {
+
+  override def load(taskId: TaskId): Option[TaskExecutionDetails] =
+    
taskExecutionDetailsProjectionDAO.readDetails(taskId).blockOptional().asScala
+
+  override def list: List[TaskExecutionDetails] =
+    
taskExecutionDetailsProjectionDAO.listDetails().collectList().block().asScala.toList
+
+  override def update(details: TaskExecutionDetails): Unit =
+    taskExecutionDetailsProjectionDAO.saveDetails(details).block()
+
+  override def loadReactive(taskId: TaskId): Publisher[TaskExecutionDetails] =
+    taskExecutionDetailsProjectionDAO.readDetails(taskId)
+
+  override def listReactive(): Publisher[TaskExecutionDetails] = 
taskExecutionDetailsProjectionDAO.listDetails()
+
+  override def updateReactive(details: TaskExecutionDetails): Publisher[Void] 
= taskExecutionDetailsProjectionDAO.saveDetails(details)
+
+  override def listDetailsByBeforeDate(beforeDate: Instant): 
Publisher[TaskExecutionDetails] = 
taskExecutionDetailsProjectionDAO.listDetailsByBeforeDate(beforeDate)
+
+  override def remove(taskExecutionDetails: TaskExecutionDetails): 
Publisher[Void] = taskExecutionDetailsProjectionDAO.remove(taskExecutionDetails)
+}
diff --git 
a/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAO.scala
 
b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAO.scala
new file mode 100644
index 0000000000..5ed08bc536
--- /dev/null
+++ 
b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAO.scala
@@ -0,0 +1,112 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task.eventsourcing.postgres
+
+import java.time.{Instant, LocalDateTime}
+import java.util.Optional
+
+import com.google.common.collect.ImmutableMap
+import javax.inject.Inject
+import 
org.apache.james.backends.postgres.PostgresCommons.{LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION,
 ZONED_DATE_TIME_TO_LOCAL_DATE_TIME, INSTANT_TO_LOCAL_DATE_TIME}
+import org.apache.james.backends.postgres.utils.PostgresExecutor
+import 
org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer
+import org.apache.james.task._
+import 
org.apache.james.task.eventsourcing.postgres.PostgresTaskExecutionDetailsProjectionModule._
+import org.apache.james.util.ReactorUtils
+import org.jooq.JSONB.jsonb
+import org.jooq.{InsertQuery, Record}
+import reactor.core.publisher.{Flux, Mono}
+
+class PostgresTaskExecutionDetailsProjectionDAO @Inject()(postgresExecutor: 
PostgresExecutor, jsonTaskAdditionalInformationSerializer: 
JsonTaskAdditionalInformationSerializer) {
+
+  def saveDetails(details: TaskExecutionDetails): Mono[Void] =
+    Mono.from(serializeAdditionalInformation(details)
+      .flatMap(serializedAdditionalInformation => 
postgresExecutor.executeVoid(dsl => {
+        val insertValues: ImmutableMap[Any, Any] = toInsertValues(details, 
serializedAdditionalInformation)
+
+        val insertStatement: InsertQuery[Record] = dsl.insertQuery(TABLE_NAME)
+        insertStatement.addValue(TASK_ID, details.getTaskId.getValue)
+        insertStatement.addValues(insertValues)
+        insertStatement.onConflict(TASK_ID)
+        insertStatement.onDuplicateKeyUpdate(true)
+        insertStatement.addValuesForUpdate(insertValues)
+
+        Mono.from(insertStatement)
+      })))
+
+  private def toInsertValues(details: TaskExecutionDetails, 
serializedAdditionalInformation: Optional[String]): ImmutableMap[Any, Any] = {
+    val builder: ImmutableMap.Builder[Any, Any] = ImmutableMap.builder()
+    builder.put(TYPE, details.getType.asString())
+    builder.put(STATUS, details.getStatus.getValue)
+    builder.put(SUBMITTED_DATE, 
ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(details.getSubmittedDate))
+    builder.put(SUBMITTED_NODE, details.getSubmittedNode.asString)
+    details.getStartedDate.ifPresent(startedDate => builder.put(STARTED_DATE, 
ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(startedDate)))
+    details.getRanNode.ifPresent(hostname => builder.put(RAN_NODE, 
hostname.asString))
+    details.getCompletedDate.ifPresent(completedDate => 
builder.put(COMPLETED_DATE, 
ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(completedDate)))
+    details.getCanceledDate.ifPresent(canceledDate => 
builder.put(CANCELED_DATE, 
ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(canceledDate)))
+    details.getCancelRequestedNode.ifPresent(hostname => 
builder.put(CANCEL_REQUESTED_NODE, hostname.asString))
+    details.getFailedDate.ifPresent(failedDate => builder.put(FAILED_DATE, 
ZONED_DATE_TIME_TO_LOCAL_DATE_TIME.apply(failedDate)))
+    serializedAdditionalInformation.ifPresent(info => 
builder.put(ADDITIONAL_INFORMATION, jsonb(info)))
+    builder.build()
+  }
+
+  private def serializeAdditionalInformation(details: TaskExecutionDetails): 
Mono[Optional[String]] = Mono.fromCallable(() => details
+      .getAdditionalInformation
+      .map(jsonTaskAdditionalInformationSerializer.serialize(_)))
+    .cast(classOf[Optional[String]])
+    .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
+
+  def readDetails(taskId: TaskId): Mono[TaskExecutionDetails] =
+    postgresExecutor.executeRow(dsl => Mono.from(dsl.selectFrom(TABLE_NAME)
+        .where(TASK_ID.eq(taskId.getValue))))
+      .map(toTaskExecutionDetails)
+
+  def listDetails(): Flux[TaskExecutionDetails] =
+    postgresExecutor.executeRows(dsl => Flux.from(dsl.selectFrom(TABLE_NAME)))
+      .map(toTaskExecutionDetails)
+
+  def listDetailsByBeforeDate(beforeDate: Instant): Flux[TaskExecutionDetails] 
=
+    postgresExecutor.executeRows(dsl => Flux.from(dsl.selectFrom(TABLE_NAME)
+        
.where(SUBMITTED_DATE.lt(INSTANT_TO_LOCAL_DATE_TIME.apply(beforeDate)))))
+      .map(toTaskExecutionDetails)
+
+  def remove(details: TaskExecutionDetails): Mono[Void] =
+    postgresExecutor.executeVoid(dsl => Mono.from(dsl.deleteFrom(TABLE_NAME)
+      .where(TASK_ID.eq(details.getTaskId.getValue))))
+
+  private def toTaskExecutionDetails(record: Record): TaskExecutionDetails =
+    new TaskExecutionDetails(
+      taskId = TaskId.fromUUID(record.get(TASK_ID)),
+      `type` = TaskType.of(record.get(TYPE)),
+      status = TaskManager.Status.fromString(record.get(STATUS)),
+      submittedDate = 
LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(SUBMITTED_DATE, 
classOf[LocalDateTime])),
+      submittedNode = Hostname(record.get(SUBMITTED_NODE)),
+      startedDate = 
Optional.ofNullable(LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(STARTED_DATE,
 classOf[LocalDateTime]))),
+      ranNode = Optional.ofNullable(record.get(RAN_NODE)).map(Hostname(_)),
+      completedDate = 
Optional.ofNullable(LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(COMPLETED_DATE,
 classOf[LocalDateTime]))),
+      canceledDate = 
Optional.ofNullable(LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(CANCELED_DATE,
 classOf[LocalDateTime]))),
+      cancelRequestedNode = 
Optional.ofNullable(record.get(CANCEL_REQUESTED_NODE)).map(Hostname(_)),
+      failedDate = 
Optional.ofNullable(LOCAL_DATE_TIME_ZONED_DATE_TIME_FUNCTION.apply(record.get(FAILED_DATE,
 classOf[LocalDateTime]))),
+      additionalInformation = () => deserializeAdditionalInformation(record))
+
+  private def deserializeAdditionalInformation(record: Record): 
Optional[TaskExecutionDetails.AdditionalInformation] =
+    Optional.ofNullable(record.get(ADDITIONAL_INFORMATION))
+      .map(additionalInformation => 
jsonTaskAdditionalInformationSerializer.deserialize(additionalInformation.data()))
+}
diff --git 
a/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionModule.scala
 
b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionModule.scala
new file mode 100644
index 0000000000..21918fd804
--- /dev/null
+++ 
b/server/task/task-postgres/src/main/scala/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionModule.scala
@@ -0,0 +1,72 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task.eventsourcing.postgres
+
+import java.time.LocalDateTime
+import java.util.UUID
+
+import org.apache.james.backends.postgres.{PostgresCommons, PostgresIndex, 
PostgresModule, PostgresTable}
+import org.jooq.impl.{DSL, SQLDataType}
+import org.jooq.{Field, JSONB, Record, Table}
+
+object PostgresTaskExecutionDetailsProjectionModule {
+  val TABLE_NAME: Table[Record] = 
DSL.table("task_execution_details_projection")
+
+  val TASK_ID: Field[UUID] = DSL.field("task_id", SQLDataType.UUID.notNull)
+  val ADDITIONAL_INFORMATION: Field[JSONB] = 
DSL.field("additional_information", SQLDataType.JSONB)
+  val TYPE: Field[String] = DSL.field("type", SQLDataType.VARCHAR)
+  val STATUS: Field[String] = DSL.field("status", SQLDataType.VARCHAR)
+  val SUBMITTED_DATE: Field[LocalDateTime] = DSL.field("submitted_date", 
PostgresCommons.DataTypes.TIMESTAMP)
+  val SUBMITTED_NODE: Field[String] = DSL.field("submitted_node", 
SQLDataType.VARCHAR)
+  val STARTED_DATE: Field[LocalDateTime] = DSL.field("started_date", 
PostgresCommons.DataTypes.TIMESTAMP)
+  val RAN_NODE: Field[String] = DSL.field("ran_node", SQLDataType.VARCHAR)
+  val COMPLETED_DATE: Field[LocalDateTime] = DSL.field("completed_date", 
PostgresCommons.DataTypes.TIMESTAMP)
+  val CANCELED_DATE: Field[LocalDateTime] = DSL.field("canceled_date", 
PostgresCommons.DataTypes.TIMESTAMP)
+  val CANCEL_REQUESTED_NODE: Field[String] = 
DSL.field("cancel_requested_node", SQLDataType.VARCHAR)
+  val FAILED_DATE: Field[LocalDateTime] = DSL.field("failed_date", 
PostgresCommons.DataTypes.TIMESTAMP)
+
+  private val TABLE: PostgresTable = PostgresTable.name(TABLE_NAME.getName)
+    .createTableStep((dsl, tableName) => dsl.createTableIfNotExists(tableName)
+      .column(TASK_ID)
+      .column(ADDITIONAL_INFORMATION)
+      .column(TYPE)
+      .column(STATUS)
+      .column(SUBMITTED_DATE)
+      .column(SUBMITTED_NODE)
+      .column(STARTED_DATE)
+      .column(RAN_NODE)
+      .column(COMPLETED_DATE)
+      .column(CANCELED_DATE)
+      .column(CANCEL_REQUESTED_NODE)
+      .column(FAILED_DATE)
+      .constraint(DSL.primaryKey(TASK_ID)))
+    .disableRowLevelSecurity
+    .build
+
+  private val SUBMITTED_DATE_INDEX: PostgresIndex = 
PostgresIndex.name("task_execution_details_projection_submittedDate_index")
+    .createIndexStep((dsl, indexName) => dsl.createIndexIfNotExists(indexName)
+      .on(TABLE_NAME, SUBMITTED_DATE));
+
+  val MODULE: PostgresModule = PostgresModule
+    .builder
+    .addTable(TABLE)
+    .addIndex(SUBMITTED_DATE_INDEX)
+    .build
+}
diff --git 
a/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAOTest.java
 
b/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAOTest.java
new file mode 100644
index 0000000000..22f07fd340
--- /dev/null
+++ 
b/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionDAOTest.java
@@ -0,0 +1,202 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task.eventsourcing.postgres;
+
+import static 
org.apache.james.task.TaskExecutionDetailsFixture.TASK_EXECUTION_DETAILS;
+import static 
org.apache.james.task.TaskExecutionDetailsFixture.TASK_EXECUTION_DETAILS_2;
+import static 
org.apache.james.task.TaskExecutionDetailsFixture.TASK_EXECUTION_DETAILS_UPDATED;
+import static 
org.apache.james.task.TaskExecutionDetailsFixture.TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION;
+import static org.apache.james.task.TaskExecutionDetailsFixture.TASK_ID;
+import static org.apache.james.task.TaskExecutionDetailsFixture.TASK_ID_2;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import 
org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer;
+import 
org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskAdditionalInformationDTO;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskExecutionDetailsFixture;
+import org.apache.james.task.TaskManager;
+import org.apache.james.task.TaskType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import reactor.core.publisher.Flux;
+
+class PostgresTaskExecutionDetailsProjectionDAOTest {
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresTaskExecutionDetailsProjectionModule.MODULE());
+
+    private static final JsonTaskAdditionalInformationSerializer 
JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER = 
JsonTaskAdditionalInformationSerializer.of(MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE);
+
+    private PostgresTaskExecutionDetailsProjectionDAO testee;
+
+    @BeforeEach
+    void setUp() {
+        testee = new 
PostgresTaskExecutionDetailsProjectionDAO(postgresExtension.getPostgresExecutor(),
 JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER);
+    }
+
+    @Test
+    void readDetailsShouldBeAbleToRetrieveASavedRecord() {
+        testee.saveDetails(TASK_EXECUTION_DETAILS()).block();
+
+        TaskExecutionDetails taskExecutionDetails = 
testee.readDetails(TASK_ID()).block();
+
+        assertThat(taskExecutionDetails)
+            .usingRecursiveComparison()
+            .ignoringFields("submittedDate")
+            .isEqualTo(TASK_EXECUTION_DETAILS());
+    }
+
+    @Test
+    void 
readDetailsShouldBeAbleToRetrieveASavedRecordWithAdditionalInformation() {
+        
testee.saveDetails(TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION()).block();
+
+        TaskExecutionDetails taskExecutionDetails = 
testee.readDetails(TASK_ID()).block();
+
+        assertThat(taskExecutionDetails)
+            .usingRecursiveComparison()
+            .ignoringFields("submittedDate")
+            .isEqualTo(TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION());
+
+        
assertThat(taskExecutionDetails.getSubmittedDate().isEqual(TASK_EXECUTION_DETAILS_WITH_ADDITIONAL_INFORMATION().getSubmittedDate()))
+            .isTrue();
+    }
+
+    @Test
+    void saveDetailsShouldUpdateRecords() {
+        testee.saveDetails(TASK_EXECUTION_DETAILS()).block();
+
+        testee.saveDetails(TASK_EXECUTION_DETAILS_UPDATED()).block();
+
+        TaskExecutionDetails taskExecutionDetails = 
testee.readDetails(TASK_ID()).block();
+
+        assertThat(taskExecutionDetails)
+            .usingRecursiveComparison()
+            .ignoringFields("submittedDate")
+            .isEqualTo(TASK_EXECUTION_DETAILS_UPDATED());
+
+        
assertThat(taskExecutionDetails.getSubmittedDate().isEqual(TASK_EXECUTION_DETAILS_UPDATED().getSubmittedDate()))
+            .isTrue();
+    }
+
+    @Test
+    void readDetailsShouldReturnEmptyWhenNone() {
+        Optional<TaskExecutionDetails> taskExecutionDetails = 
testee.readDetails(TASK_ID()).blockOptional();
+        assertThat(taskExecutionDetails).isEmpty();
+    }
+
+    @Test
+    void listDetailsShouldReturnEmptyWhenNone() {
+        Stream<TaskExecutionDetails> taskExecutionDetails = 
testee.listDetails().toStream();
+        assertThat(taskExecutionDetails).isEmpty();
+    }
+
+    @Test
+    void listDetailsShouldReturnAllRecords() {
+        testee.saveDetails(TASK_EXECUTION_DETAILS()).block();
+        testee.saveDetails(TASK_EXECUTION_DETAILS_2()).block();
+
+        Stream<TaskExecutionDetails> taskExecutionDetails = 
testee.listDetails().toStream();
+
+        assertThat(taskExecutionDetails)
+            
.usingRecursiveFieldByFieldElementComparatorIgnoringFields("submittedDate")
+            .containsOnly(TASK_EXECUTION_DETAILS(), 
TASK_EXECUTION_DETAILS_2());
+    }
+
+    @Test
+    void listDetailsShouldReturnLastUpdatedRecords() {
+        testee.saveDetails(TASK_EXECUTION_DETAILS()).block();
+        testee.saveDetails(TASK_EXECUTION_DETAILS_UPDATED()).block();
+
+        Stream<TaskExecutionDetails> taskExecutionDetails = 
testee.listDetails().toStream();
+        assertThat(taskExecutionDetails)
+            
.usingRecursiveFieldByFieldElementComparatorIgnoringFields("submittedDate")
+            .containsOnly(TASK_EXECUTION_DETAILS_UPDATED());
+    }
+
+    @Test
+    void listBeforeDateShouldReturnCorrectEntry() {
+        TaskExecutionDetails taskExecutionDetails1 = new 
TaskExecutionDetails(TASK_ID(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,
+            ZonedDateTime.ofInstant(Instant.parse("2000-01-01T00:00:00Z"), 
ZoneId.systemDefault()),
+            TaskExecutionDetailsFixture.SUBMITTED_NODE(),
+            Optional::empty,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+        TaskExecutionDetails taskExecutionDetails2 = new 
TaskExecutionDetails(TASK_ID_2(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,
+            ZonedDateTime.ofInstant(Instant.parse("2000-01-20T00:00:00Z"), 
ZoneId.systemDefault()),
+            TaskExecutionDetailsFixture.SUBMITTED_NODE(),
+            Optional::empty,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+        testee.saveDetails(taskExecutionDetails1).block();
+        testee.saveDetails(taskExecutionDetails2).block();
+
+        
assertThat(Flux.from(testee.listDetailsByBeforeDate(Instant.parse("2000-01-15T12:00:55Z"))).collectList().block())
+            
.usingRecursiveFieldByFieldElementComparatorIgnoringFields("submittedDate")
+            .containsOnly(taskExecutionDetails1);
+    }
+
+    @Test
+    void removeShouldDeleteAssignEntry() {
+        TaskExecutionDetails taskExecutionDetails1 = new 
TaskExecutionDetails(TASK_ID(),
+            TaskType.of("type"),
+            TaskManager.Status.COMPLETED,
+            ZonedDateTime.ofInstant(Instant.parse("2000-01-01T00:00:00Z"), 
ZoneId.systemDefault()),
+            TaskExecutionDetailsFixture.SUBMITTED_NODE(),
+            Optional::empty,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+        testee.saveDetails(taskExecutionDetails1).block();
+
+        assertThat(testee.listDetails().collectList().block())
+            .hasSize(1);
+
+        testee.remove(taskExecutionDetails1).block();
+
+        assertThat(testee.listDetails().collectList().block())
+            .isEmpty();
+    }
+}
\ No newline at end of file
diff --git 
a/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionTest.java
 
b/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionTest.java
new file mode 100644
index 0000000000..d64c0688d2
--- /dev/null
+++ 
b/server/task/task-postgres/src/test/java/org/apache/james/task/eventsourcing/postgres/PostgresTaskExecutionDetailsProjectionTest.java
@@ -0,0 +1,52 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.task.eventsourcing.postgres;
+
+import java.util.function.Supplier;
+
+import org.apache.james.backends.postgres.PostgresExtension;
+import 
org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer;
+import 
org.apache.james.server.task.json.dto.MemoryReferenceWithCounterTaskAdditionalInformationDTO;
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import 
org.apache.james.task.eventsourcing.TaskExecutionDetailsProjectionContract;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+class PostgresTaskExecutionDetailsProjectionTest implements 
TaskExecutionDetailsProjectionContract {
+    @RegisterExtension
+    static PostgresExtension postgresExtension = 
PostgresExtension.withoutRowLevelSecurity(PostgresTaskExecutionDetailsProjectionModule.MODULE());
+
+    private static final JsonTaskAdditionalInformationSerializer 
JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER = 
JsonTaskAdditionalInformationSerializer.of(MemoryReferenceWithCounterTaskAdditionalInformationDTO.SERIALIZATION_MODULE);
+
+    private Supplier<PostgresTaskExecutionDetailsProjection> testeeSupplier;
+
+    @BeforeEach
+    void setUp() {
+        PostgresTaskExecutionDetailsProjectionDAO dao = new 
PostgresTaskExecutionDetailsProjectionDAO(postgresExtension.getPostgresExecutor(),
+            JSON_TASK_ADDITIONAL_INFORMATION_SERIALIZER);
+        testeeSupplier = () -> new PostgresTaskExecutionDetailsProjection(dao);
+    }
+
+    @Override
+    public TaskExecutionDetailsProjection testee() {
+        return testeeSupplier.get();
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to