This is an automated email from the ASF dual-hosted git repository. btellier pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b49cceed1d41c8f5aa2f07bbf0b0557327021a0b Author: Benoit Tellier <btell...@linagora.com> AuthorDate: Mon Oct 21 17:23:54 2019 +0700 JAMES-2927 Prevent unwanted tumbstone creation for ExecutionDetails projection A tumbstone was created when a null value is specified in a prepared statement. This basically happened several time a "task details" get saved - as a task cannot be both failed and completed. This is due to the fact that null has the meaning `remove` and not the meaning `unspecified`, which is represented by no binding at all. Of course unwanted tumbstones occurs with a performance cost. The recommended method for fixing on the latest version of cassandra is to not bind the null value. Read this for further information: https://thelastpickle.com/blog/2016/09/15/Null-bindings-on-prepared-statements-and-undesired-tombstone-creation.html --- ...assandraTaskExecutionDetailsProjectionDAO.scala | 51 ++++++++++++++++------ 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala index fad2b48..3c69a62 100644 --- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala +++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala @@ -22,15 +22,17 @@ import java.util.Optional import com.datastax.driver.core.querybuilder.QueryBuilder import com.datastax.driver.core.querybuilder.QueryBuilder.{bindMarker, insertInto, select} -import com.datastax.driver.core.{Row, Session} +import com.datastax.driver.core.{BoundStatement, Row, Session, UDTValue} import javax.inject.Inject import org.apache.james.backends.cassandra.init.{CassandraTypesProvider, CassandraZonedDateTimeModule} import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor import org.apache.james.server.task.json.JsonTaskAdditionalInformationSerializer +import org.apache.james.task._ import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionTable._ -import org.apache.james.task.{Hostname, TaskExecutionDetails, TaskId, TaskManager, TaskType} import reactor.core.publisher.{Flux, Mono} +import scala.compat.java8.OptionConverters._ + class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typesProvider: CassandraTypesProvider, jsonTaskAdditionalInformationSerializer: JsonTaskAdditionalInformationSerializer) { private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session) private val dateType = typesProvider.getDefinedUserType(CassandraZonedDateTimeModule.ZONED_DATE_TIME) @@ -55,20 +57,44 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typ private val listStatement = session.prepare(select().from(TABLE_NAME)) - def saveDetails(details: TaskExecutionDetails): Mono[Void] = cassandraAsyncExecutor.executeVoid( - insertStatement.bind + def saveDetails(details: TaskExecutionDetails): Mono[Void] = { + val boundStatement = insertStatement.bind .setUUID(TASK_ID, details.getTaskId.getValue) .setString(TYPE, details.getType.asString()) .setString(STATUS, details.getStatus.getValue) .setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmittedDate)) .setString(SUBMITTED_NODE, details.getSubmittedNode.asString) - .setUDTValue(STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null)) - .setString(RAN_NODE, details.getRanNode.map[String](_.asString).orElse(null)) - .setUDTValue(COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate).orElse(null)) - .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate).orElse(null)) - .setString(CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString).orElse(null)) - .setUDTValue(FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null)) - .setString(ADDITIONAL_INFORMATION, serializeAdditionalInformation(details).orElse(null))) + + val bindOptionalFieldOperations = List( + (statement: BoundStatement) => bindOptionalUDTValue(statement, STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate)), + (statement: BoundStatement) => bindOptionalStringValue(statement, RAN_NODE, details.getRanNode.map[String](_.asString)), + (statement: BoundStatement) => bindOptionalUDTValue(statement, COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate)), + (statement: BoundStatement) => bindOptionalUDTValue(statement, CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate)), + (statement: BoundStatement) => bindOptionalStringValue(statement, CANCEL_REQUESTED_NODE, details.getCancelRequestedNode.map[String](_.asString)), + (statement: BoundStatement) => bindOptionalUDTValue(statement, FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getFailedDate)), + (statement: BoundStatement) => bindOptionalStringValue(statement, ADDITIONAL_INFORMATION, serializeAdditionalInformation(details)), + ) + + val fullyBoundStatement = bindOptionalFieldOperations.foldLeft(boundStatement)((statement, bindFieldOperation) => { + bindFieldOperation(statement) + }) + + cassandraAsyncExecutor.executeVoid(fullyBoundStatement); + } + + private def bindOptionalStringValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[String]) = { + fieldValue.asScala match { + case Some(value) => statement.setString(fieldName, value) + case None => statement + } + } + + private def bindOptionalUDTValue(statement: BoundStatement, fieldName: String, fieldValue: Optional[UDTValue]) = { + fieldValue.asScala match { + case Some(value) => statement.setUDTValue(fieldName, value) + case None => statement + } + } private def serializeAdditionalInformation(details: TaskExecutionDetails): Optional[String] = details .getAdditionalInformation @@ -96,8 +122,7 @@ class CassandraTaskExecutionDetailsProjectionDAO @Inject()(session: Session, typ canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)), cancelRequestedNode = Optional.ofNullable(row.getString(CANCEL_REQUESTED_NODE)).map(Hostname(_)), failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)), - additionalInformation = () => deserializeAdditionalInformation(taskType, row), - ) + additionalInformation = () => deserializeAdditionalInformation(taskType, row)) } private def deserializeAdditionalInformation(taskType: TaskType, row: Row): Optional[TaskExecutionDetails.AdditionalInformation] = { --------------------------------------------------------------------- To unsubscribe, e-mail: server-dev-unsubscr...@james.apache.org For additional commands, e-mail: server-dev-h...@james.apache.org