jenkins-bot has submitted this change and it was merged.

Change subject: Damaged datastore
......................................................................


Damaged datastore

Anything failed goes here, with an optional date to retry.  Instead
of keeping track of the number of times something has been retried,
we just keep track of the original message date for standard expiration.

Bug: T142028
Change-Id: I83e3672e10046867e9f6ed5c7c9ce8a1a042dd02
---
A Core/DataStores/DamagedDatabase.php
M Core/UtcDate.php
A Schema/002_CreateDamagedTable.sql
A Schema/sqlite/002_CreateDamagedTable.sqlite.sql
M SmashPig.yaml
A Tests/DamagedDatabaseTest.php
M Tests/data/config_pending_db.yaml
7 files changed, 374 insertions(+), 2 deletions(-)

Approvals:
  Awight: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/Core/DataStores/DamagedDatabase.php 
b/Core/DataStores/DamagedDatabase.php
new file mode 100644
index 0000000..a6e6322
--- /dev/null
+++ b/Core/DataStores/DamagedDatabase.php
@@ -0,0 +1,185 @@
+<?php
+namespace SmashPig\Core\DataStores;
+
+use PDO;
+use SmashPig\Core\Context;
+use SmashPig\Core\SmashPigException;
+use SmashPig\Core\UtcDate;
+
+/**
+ * Data store containing messages which were not successfully processed
+ */
+class DamagedDatabase {
+
+       /**
+        * @var PDO
+        * We do the silly singleton thing for convenient testing with in-memory
+        * databases that would otherwise not be shared between components.
+        */
+       protected static $db;
+
+       protected function __construct() {
+               if ( !self::$db ) {
+                       $config = Context::get()->getConfiguration();
+                       self::$db = $config->object( 'data-store/damaged-db' );
+               }
+       }
+
+       /**
+        * @return PDO
+        */
+       public function getDatabase() {
+               return self::$db;
+       }
+
+       public static function get() {
+               return new DamagedDatabase();
+       }
+
+       protected function validateMessage( $message ) {
+               if (
+                       empty( $message['date'] ) ||
+                       empty( $message['gateway'] )
+               ) {
+                       throw new SmashPigException( 'Message missing required 
fields' );
+               }
+       }
+
+       /**
+        * Build and insert a database record from a queue message
+        *
+        * @param array $message Unprocessable message
+        * @param string $originalQueue Queue the message was first sent to
+        * @param string $error Information about why this message is damaged
+        * @param int|null $retryDate When provided, re-process message after
+        *  this timestamp
+        * @return int ID of message in damaged database
+        * @throws SmashPigException if insert fails
+        */
+       public function storeMessage(
+               $message, $originalQueue, $error = '', $retryDate = null
+       ) {
+               $this->validateMessage( $message );
+
+               $dbRecord = array(
+                       'original_date' => UtcDate::getUtcDatabaseString(
+                               $message['date']
+                       ),
+                       'damaged_date' => UtcDate::getUtcDatabaseString(),
+                       'original_queue' => $originalQueue,
+                       'error' => $error,
+                       'message' => json_encode( $message ),
+               );
+               if ( $retryDate ) {
+                       $dbRecord['retry_date'] = UtcDate::getUtcDatabaseString(
+                               $retryDate
+                       );
+               }
+
+               // These fields have their own columns in the database
+               // Copy the values from the message to the record
+               $indexedFields = array(
+                       'gateway', 'gateway_txn_id', 'order_id'
+               );
+
+               foreach ( $indexedFields as $fieldName ) {
+                       if ( isset( $message[$fieldName] ) ) {
+                               $dbRecord[$fieldName] = $message[$fieldName];
+                       }
+               }
+               $fieldList = implode( ',', array_keys( $dbRecord ) );
+
+               // Build a list of parameter names for safe db insert. Same as
+               // the field list, but each parameter is prefixed with a colon.
+               $paramList = ':' . implode( ', :', array_keys( $dbRecord ) );
+
+               $insert = "INSERT INTO damaged ( $fieldList )
+                       VALUES ( $paramList );";
+               $prepared = self::$db->prepare( $insert );
+
+               foreach ( $dbRecord as $field => $value ) {
+                       $prepared->bindValue(
+                               ':' . $field,
+                               $value,
+                               PDO::PARAM_STR
+                       );
+               }
+               if ( $prepared->execute() ) {
+                       return self::$db->lastInsertId();
+               }
+               throw new SmashPigException( 'Unable to insert into damaged db' 
);
+       }
+
+       /**
+        * Return messages ready to be retried
+        *
+        * @return array|null Records with retry_date prior to now
+        */
+       public function fetchRetryMessages() {
+               $prepared = self::$db->prepare( '
+                       SELECT * FROM damaged
+                       WHERE retry_date < :now'
+               );
+               $prepared->bindValue(
+                       ':now', UtcDate::getUtcDatabaseString(), PDO::PARAM_STR
+               );
+               $prepared->execute();
+               $rows = $prepared->fetchAll( PDO::FETCH_ASSOC );
+               if ( !$rows ) {
+                       return null;
+               }
+               return array_map(
+                       array( $this, 'messageFromDbRow' ),
+                       $rows
+               );
+       }
+
+       /**
+        * Delete a message from the database
+        *
+        * @param array $message
+        */
+       public function deleteMessage( $message ) {
+               $prepared = self::$db->prepare( '
+                       DELETE FROM damaged
+                       WHERE id = :id'
+               );
+               $prepared->bindValue( ':id', $message['damaged_id'], 
PDO::PARAM_STR );
+               $prepared->execute();
+       }
+
+       /**
+        * Delete expired messages, optionally by original queue
+        *
+        * @param int $originalDate Oldest original timestamp to keep
+        * @param string|null $queue
+        */
+       public function deleteOldMessages( $originalDate, $queue = null ) {
+               $sql = 'DELETE FROM damaged WHERE original_date < :date';
+               if ( $queue ) {
+                       $sql .= ' AND original_queue = :queue';
+               }
+               $prepared = self::$db->prepare( $sql );
+               $prepared->bindValue(
+                       ':date',
+                       UtcDate::getUtcDatabaseString( $originalDate ),
+                       PDO::PARAM_STR
+               );
+               if ( $queue ) {
+                       $prepared->bindValue( ':queue', $queue, PDO::PARAM_STR 
);
+               }
+               $prepared->execute();
+       }
+
+       /**
+        * Parse a database row and return the normalized message.
+        * @param array $row
+        * @return array
+        */
+       protected function messageFromDbRow( $row ) {
+               $message = json_decode( $row['message'], true );
+               $message['damaged_id'] = $row['id'];
+               $message['original_queue'] = $row['original_queue'];
+               return $message;
+       }
+}
diff --git a/Core/UtcDate.php b/Core/UtcDate.php
index 85184a2..2e473cc 100644
--- a/Core/UtcDate.php
+++ b/Core/UtcDate.php
@@ -20,12 +20,17 @@
 
        /**
         * Format a UTC timestamp for database insertion
-        * @param int $timestamp
+        * @param int|null $timestamp, defaults to time()
         * @param string $format optional time format
         * @return string
         * @throws Exception
         */
-       public static function getUtcDatabaseString( $timestamp, $format = 
'YmdHis') {
+       public static function getUtcDatabaseString(
+               $timestamp = null, $format = 'YmdHis'
+       ) {
+               if ( $timestamp === null ) {
+                       $timestamp = time();
+               }
                $obj = new DateTime( '@' . $timestamp, new DateTimeZone( 'UTC' 
) );
                return $obj->format( $format );
        }
diff --git a/Schema/002_CreateDamagedTable.sql 
b/Schema/002_CreateDamagedTable.sql
new file mode 100644
index 0000000..1376d90
--- /dev/null
+++ b/Schema/002_CreateDamagedTable.sql
@@ -0,0 +1,18 @@
+CREATE TABLE damaged (
+  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
+  `original_date` datetime NOT NULL,
+  `damaged_date` datetime NOT NULL,
+  `retry_date` datetime NULL,
+  `original_queue` varchar(255) NOT NULL,
+  `gateway` varchar(255) NOT NULL,
+  `order_id` varchar(255) NULL,
+  `gateway_txn_id` varchar(255) NULL,
+  `error` text NULL,
+  `message` text NOT NULL,
+  INDEX `idx_damaged_original_date` (`original_date`),
+  INDEX `idx_damaged_original_date_original_queue` (`original_date`, 
`original_queue`),
+  INDEX `idx_damaged_retry_date` (`retry_date`),
+  INDEX `idx_damaged_order_id_gateway` (`order_id`, `gateway`),
+  INDEX `idx_damaged_gateway_txn_id_gateway` (`gateway_txn_id`, `gateway`),
+  PRIMARY KEY `pk_damaged_id` (`id`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8 AUTO_INCREMENT=1;
diff --git a/Schema/sqlite/002_CreateDamagedTable.sqlite.sql 
b/Schema/sqlite/002_CreateDamagedTable.sqlite.sql
new file mode 100644
index 0000000..53deea0
--- /dev/null
+++ b/Schema/sqlite/002_CreateDamagedTable.sqlite.sql
@@ -0,0 +1,12 @@
+CREATE TABLE damaged (
+  `id` integer primary key,
+  `original_date` datetime NOT NULL,
+  `damaged_date` datetime NOT NULL,
+  `retry_date` datetime NULL,
+  `original_queue` varchar(255) NOT NULL,
+  `gateway` varchar(255) NOT NULL,
+  `order_id` varchar(255) NULL,
+  `gateway_txn_id` varchar(255) NULL,
+  `error` text NULL,
+  `message` text NOT NULL
+);
diff --git a/SmashPig.yaml b/SmashPig.yaml
index 0542db1..611b598 100644
--- a/SmashPig.yaml
+++ b/SmashPig.yaml
@@ -53,6 +53,11 @@
             inst-args:
                 - 'mysql:host=127.0.0.1;dbname=pending'
 
+        damaged-db:
+            class: PDO
+            inst-args:
+                - 'mysql:host=127.0.0.1;dbname=pending'
+
         refund:
             class: SmashPig\Core\DataStores\MultiQueueWriter
             inst-args:
diff --git a/Tests/DamagedDatabaseTest.php b/Tests/DamagedDatabaseTest.php
new file mode 100644
index 0000000..8760fbb
--- /dev/null
+++ b/Tests/DamagedDatabaseTest.php
@@ -0,0 +1,142 @@
+<?php
+
+namespace SmashPig\Tests;
+
+use PDO;
+use SmashPig\Core\Context;
+use SmashPig\Core\DataStores\DamagedDatabase;
+
+class DamagedDatabaseTest extends BaseSmashPigUnitTestCase {
+
+       /**
+        * @var DamagedDatabase
+        */
+       protected $db;
+
+       public function setUp() {
+               parent::setUp();
+               Context::initWithLogger( new PendingDatabaseTestConfiguration() 
);
+               $this->db = DamagedDatabase::get();
+
+               // Create sqlite schema
+               $sql = file_get_contents( __DIR__ . 
'/../Schema/sqlite/002_CreateDamagedTable.sqlite.sql' );
+               $this->db->getDatabase()->exec( $sql );
+       }
+
+       public function tearDown() {
+               // Reset PDO static member
+               $klass = new \ReflectionClass( 
'SmashPig\Core\DataStores\DamagedDatabase' );
+               $dbProperty = $klass->getProperty( 'db' );
+               $dbProperty->setAccessible( true );
+               $dbProperty->setValue( null );
+
+               parent::tearDown();
+       }
+
+       protected static function getTestMessage( $uniq = null ) {
+               if ( !$uniq ) { 
+                       $uniq = mt_rand();
+               }
+               return array(
+                       'gateway' => 'test',
+                       'gateway_txn_id' => "txn-{$uniq}",
+                       'order_id' => "order-{$uniq}",
+                       'gateway_account' => 'default',
+                       'date' => 1468973648,
+                       'amount' => 123,
+                       'currency' => 'EUR',
+               );
+       }
+
+       public function testStoreMessage() {
+               $message = self::getTestMessage();
+               $queue = 'test_queue';
+               $err = 'ERROR MESSAGE';
+               $this->db->storeMessage( $message, $queue, $err );
+
+               // Confirm work without using the API.
+               $pdo = $this->db->getDatabase();
+               $result = $pdo->query( "
+                       SELECT * FROM damaged
+                       WHERE gateway='test'
+                       AND order_id = '{$message['order_id']}'" );
+               $rows = $result->fetchAll( PDO::FETCH_ASSOC );
+               $this->assertEquals( 1, count( $rows ),
+                       'One row stored and retrieved.' );
+               $expected = array(
+                       'id' => '1',
+                       # NOTE: This is a db-specific string, sqlite3 in this 
case, and
+                       # you'll have different formatting if using any other 
database.
+                       'original_date' => '20160720001408',
+                       'gateway' => 'test',
+                       'order_id' => $message['order_id'],
+                       'gateway_txn_id' => $message['gateway_txn_id'],
+                       'message' => json_encode( $message ),
+                       'original_queue' => $queue,
+                       'error' => $err,
+                       'retry_date' => null,
+               );
+               unset( $rows[0]['damaged_date'] );
+               $this->assertEquals( $expected, $rows[0],
+                       'Stored message had expected contents' );
+       }
+
+       public function testFetchRetryMessages() {
+               $message = self::getTestMessage();
+               $this->db->storeMessage( $message, 'test_queue', '', time() - 1 
);
+
+               $fetched = $this->db->fetchRetryMessages();
+
+               $this->assertNotNull( $fetched,
+                       'No record retrieved by fetchRetryMessages.' );
+
+               $expected = $message + array(
+                       'damaged_id' => 1,
+                       'original_queue' => 'test_queue'
+               );
+               $this->assertEquals( $expected, $fetched[0],
+                       'Fetched record does not matches stored message.' );
+       }
+
+       public function testDeleteMessage() {
+               $uniq = mt_rand();
+               $queue = 'test_queue';
+               $message1 = $this->getTestMessage( $uniq );
+               // Store a second message for a good time, and make sure we 
delete the
+               // right one.
+               $message2 = $this->getTestMessage( $uniq );
+
+               $this->db->storeMessage( $message1, $queue );
+               // store message 2 with a
+               $this->db->storeMessage( $message2, $queue );
+
+               // Confirm work without using the API.
+               $pdo = $this->db->getDatabase();
+               $result = $pdo->query( "
+                       SELECT * FROM damaged
+                       WHERE gateway='test'
+                               AND order_id = '{$message1['order_id']}'" );
+               $rows = $result->fetchAll( PDO::FETCH_ASSOC );
+               $this->assertEquals( 2, count( $rows ),
+                       'Both records were stored.' );
+               $this->assertNotNull( $rows[0]['id'],
+                       'Record includes a primary row id' );
+               $this->assertNotEquals( $rows[0]['id'], $rows[1]['id'],
+                       'Records have unique primary ids' );
+
+               $message2['damaged_id'] = $rows[1]['id'];
+               $this->db->deleteMessage( $message2 );
+
+               // Confirm work without using the API.
+               $pdo = $this->db->getDatabase();
+               $result = $pdo->query( "
+                       SELECT * FROM damaged
+                       WHERE gateway='test'
+                               AND order_id = '{$message1['order_id']}'" );
+               $rowsAfter = $result->fetchAll( PDO::FETCH_ASSOC );
+               $this->assertEquals( 1, count( $rowsAfter ),
+                       'Not only one row deleted.' );
+               $this->assertEquals( $rowsAfter[0]['id'], $rows[0]['id'],
+                       'Deleted the wrong row.' );
+       }
+}
diff --git a/Tests/data/config_pending_db.yaml 
b/Tests/data/config_pending_db.yaml
index 4e4a973..bd0de01 100644
--- a/Tests/data/config_pending_db.yaml
+++ b/Tests/data/config_pending_db.yaml
@@ -4,3 +4,8 @@
             class: PDO
             inst-args:
                 - 'sqlite::memory:'
+
+        damaged-db:
+            class: PDO
+            inst-args:
+                - 'sqlite::memory:'

-- 
To view, visit https://gerrit.wikimedia.org/r/302758
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I83e3672e10046867e9f6ed5c7c9ce8a1a042dd02
Gerrit-PatchSet: 6
Gerrit-Project: wikimedia/fundraising/SmashPig
Gerrit-Branch: master
Gerrit-Owner: Ejegg <eeggles...@wikimedia.org>
Gerrit-Reviewer: Awight <awi...@wikimedia.org>
Gerrit-Reviewer: Cdentinger <cdentin...@wikimedia.org>
Gerrit-Reviewer: Ejegg <eeggles...@wikimedia.org>
Gerrit-Reviewer: Springle <sprin...@wikimedia.org>
Gerrit-Reviewer: XenoRyet <dkozlow...@wikimedia.org>
Gerrit-Reviewer: jenkins-bot <>

_______________________________________________
MediaWiki-commits mailing list
MediaWiki-commits@lists.wikimedia.org
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to