Christopher Johnson (WMDE) has uploaded a new change for review.

  https://gerrit.wikimedia.org/r/179916

Change subject: adds SprintFactDaemon and PhabricatorFactSprintEngine
......................................................................

adds SprintFactDaemon and PhabricatorFactSprintEngine

bug: T78127
Change-Id: Id8d63075ca94f5204997b23b35496a2cd1e527f2
---
M src/__phutil_library_map__.php
M src/application/SprintApplication.php
A src/fact/PhabricatorFactSprintEngine.php
A src/fact/SprintFactDaemon.php
A src/fact/SprintFactUpdateIterator.php
5 files changed, 511 insertions(+), 1 deletion(-)


  git pull ssh://gerrit.wikimedia.org:29418/phabricator/extensions/Sprint 
refs/changes/16/179916/1

diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php
index e39343f..a6457b5 100644
--- a/src/__phutil_library_map__.php
+++ b/src/__phutil_library_map__.php
@@ -20,6 +20,7 @@
     'EventTableView' => 'view/EventTableView.php',
     'HistoryTableView' => 'view/HistoryTableView.php',
     'OpenTasksView' => 'view/OpenTasksView.php',
+    'PhabricatorFactSprintEngine' => 'fact/PhabricatorFactSprintEngine.php',
     'ProjectOpenTasksView' => 'view/ProjectOpenTasksView.php',
     'SprintApplication' => 'application/SprintApplication.php',
     'SprintApplicationTest' => 'tests/SprintApplicationTest.php',
@@ -39,8 +40,11 @@
     'SprintConstants' => 'constants/SprintConstants.php',
     'SprintController' => 'controller/SprintController.php',
     'SprintControllerTest' => 'tests/SprintControllerTest.php',
+    'SprintDAO' => 'storage/SprintDAO.php',
     'SprintDataViewController' => 'controller/SprintDataViewController.php',
     'SprintEndDateField' => 'customfield/SprintEndDateField.php',
+    'SprintFactDaemon' => 'fact/SprintFactDaemon.php',
+    'SprintFactUpdateIterator' => 'fact/SprintFactUpdateIterator.php',
     'SprintListController' => 'controller/SprintListController.php',
     'SprintProjectCustomField' => 'customfield/SprintProjectCustomField.php',
     'SprintProjectProfileController' => 
'controller/SprintProjectProfileController.php',
@@ -50,6 +54,7 @@
     'SprintReportController' => 'controller/SprintReportController.php',
     'SprintReportOpenTasksView' => 'view/SprintReportOpenTasksView.php',
     'SprintTableView' => 'view/SprintTableView.php',
+    'SprintTask' => 'storage/SprintTask.php',
     'SprintTaskStoryPointsField' => 
'customfield/SprintTaskStoryPointsField.php',
     'SprintTestCase' => 'tests/SprintTestCase.php',
     'SprintTransaction' => 'storage/SprintTransaction.php',
@@ -67,6 +72,7 @@
     'BurndownException' => 'Exception',
     'CeleritySprintResources' => 'CelerityResourcesOnDisk',
     'DateIterator' => 'Iterator',
+    'PhabricatorFactSprintEngine' => 'PhabricatorFactEngine',
     'ProjectOpenTasksView' => 'OpenTasksView',
     'SprintApplication' => 'PhabricatorApplication',
     'SprintApplicationTest' => 'SprintTestCase',
@@ -83,8 +89,11 @@
     'SprintBuildStatsTest' => 'SprintTestCase',
     'SprintController' => 'PhabricatorController',
     'SprintControllerTest' => 'SprintTestCase',
+    'SprintDAO' => 'PhabricatorLiskDAO',
     'SprintDataViewController' => 'SprintController',
     'SprintEndDateField' => 'SprintProjectCustomField',
+    'SprintFactDaemon' => 'PhabricatorDaemon',
+    'SprintFactUpdateIterator' => 'PhutilBufferedIterator',
     'SprintListController' => 'SprintController',
     'SprintProjectCustomField' => array(
       'PhabricatorProjectCustomField',
@@ -95,6 +104,19 @@
     'SprintReportBurndownView' => 'SprintView',
     'SprintReportController' => 'SprintController',
     'SprintReportOpenTasksView' => 'SprintView',
+    'SprintTask' => array(
+      'SprintDAO',
+      'PhabricatorMarkupInterface',
+      'PhabricatorPolicyInterface',
+      'PhabricatorTokenReceiverInterface',
+      'PhabricatorFlaggableInterface',
+      'PhabricatorMentionableInterface',
+      'PhrequentTrackableInterface',
+      'PhabricatorCustomFieldInterface',
+      'PhabricatorDestructibleInterface',
+      'PhabricatorApplicationTransactionInterface',
+      'PhabricatorProjectInterface',
+    ),
     'SprintTaskStoryPointsField' => array(
       'ManiphestCustomField',
       'PhabricatorStandardCustomFieldInterface',
diff --git a/src/application/SprintApplication.php 
b/src/application/SprintApplication.php
index aaf1032..bb55617 100644
--- a/src/application/SprintApplication.php
+++ b/src/application/SprintApplication.php
@@ -31,7 +31,7 @@
 
   public function getFactObjectsForAnalysis() {
     return array(
-        new ManiphestTask(),
+        new ManiphestTransaction(),
     );
   }
 
diff --git a/src/fact/PhabricatorFactSprintEngine.php 
b/src/fact/PhabricatorFactSprintEngine.php
new file mode 100644
index 0000000..189a210
--- /dev/null
+++ b/src/fact/PhabricatorFactSprintEngine.php
@@ -0,0 +1,161 @@
+<?php
+
+/**
+ * Simple fact engine which counts objects.
+ */
+final class PhabricatorFactSprintEngine extends PhabricatorFactEngine {
+
+  public function getFactSpecs(array $fact_types) {
+    $results = array();
+    foreach ($fact_types as $type) {
+      if (!strncmp($type, '+N:', 3)) {
+        if ($type == '+N:*') {
+          $name = 'Total Objects';
+        } else {
+          $name = 'Total Objects of type '.substr($type, 3);
+        }
+
+        $results[] = id(new PhabricatorFactSimpleSpec($type))
+          ->setName($name)
+          ->setUnit(PhabricatorFactSimpleSpec::UNIT_COUNT);
+      }
+
+      if (!strncmp($type, 'N:', 2)) {
+        if ($type == 'N:*') {
+          $name = 'Objects';
+        } else {
+          $name = 'Objects of type '.substr($type, 2);
+        }
+        $results[] = id(new PhabricatorFactSimpleSpec($type))
+          ->setName($name)
+          ->setUnit(PhabricatorFactSimpleSpec::UNIT_COUNT);
+      }
+
+    }
+    return $results;
+  }
+
+  public function shouldComputeRawFactsForObject(PhabricatorLiskDAO $object) {
+    return true;
+  }
+
+  public function computeRawFactsForObject(PhabricatorLiskDAO $object) {
+    $facts = array();
+
+    $phid = $object->getPHID();
+    $type = phid_get_type($phid);
+
+    if ($object instanceof ManiphestTransaction) {
+      $xacttype = $object->getTransactionType();
+      if ($xacttype == 'core:customfield') {
+        $oldvalue = $object->getOldValue();
+        $newvalue = $object->getNewValue();
+        $objectPHID = $object->getObjectPHID();
+        foreach (array('N:*', 'N:' . $xacttype) as $fact_type) {
+          $facts[] = id(new PhabricatorFactRaw())
+              ->setFactType($fact_type)
+              ->setObjectPHID($objectPHID)
+              ->setValueX($oldvalue)
+              ->setValueY($newvalue)
+              ->setEpoch($object->getDateCreated());
+        }
+      } elseif ($xacttype == 'status') {
+        $oldstatus = null;
+        $newstatus = null;
+
+        if ($object->getOldValue() == 'open') {
+          $oldstatus = 1;
+        } elseif ($object->getOldValue() == 'resolved') {
+          $oldstatus = 0;
+        }
+
+        if ($object->getNewValue() == 'open') {
+          $newstatus = 1;
+        } elseif ($object->getNewValue() == 'resolved') {
+          $newstatus = 0;
+        }
+
+        $objectPHID = $object->getObjectPHID();
+        foreach (array('N:*', 'N:' . $xacttype) as $fact_type) {
+          $facts[] = id(new PhabricatorFactRaw())
+              ->setFactType($fact_type)
+              ->setObjectPHID($objectPHID)
+              ->setValueX($oldstatus)
+              ->setValueY($newstatus)
+              ->setEpoch($object->getDateCreated());
+        }
+      }
+    } else {
+        foreach (array('N:*', 'N:'.$type) as $fact_type) {
+          $facts[] = id(new PhabricatorFactRaw())
+              ->setFactType($fact_type)
+              ->setObjectPHID($phid)
+              ->setValueX(1)
+              ->setValueY()
+              ->setEpoch($object->getDateCreated());
+        }
+    }
+
+    return $facts;
+  }
+
+  public function computeRawFactsfromXACTForObject(PhabricatorLiskDAO $object) 
{
+    $facts = array();
+
+    $phid = $object->getPHID();
+    $type = phid_get_type($phid);
+
+    if ($object instanceof ManiphestTransaction) {
+      $oldvalue = $object->getOldValue();
+      $newvalue = $object->getNewValue();
+      $objectPHID = $object->getObjectPHID();
+      foreach (array('N:*', 'N:'.$type) as $fact_type) {
+        $facts[] = id(new PhabricatorFactRaw())
+            ->setFactType($fact_type)
+            ->setObjectPHID($objectPHID)
+            ->setValueX($oldvalue)
+            ->setValueY($newvalue)
+            ->setEpoch($object->getDateCreated());
+      }
+    } else {
+      foreach (array('N:*', 'N:'.$type) as $fact_type) {
+        $facts[] = id(new PhabricatorFactRaw())
+            ->setFactType($fact_type)
+            ->setObjectPHID($phid)
+            ->setValueX(1)
+            ->setValueY()
+            ->setEpoch($object->getDateCreated());
+      }
+    }
+
+    return $facts;
+  }
+
+  public function shouldComputeAggregateFacts() {
+    return true;
+  }
+
+  public function computeAggregateFacts() {
+    $table = new PhabricatorFactRaw();
+    $table_name = $table->getTableName();
+    $conn = $table->establishConnection('r');
+
+    $counts = queryfx_all(
+      $conn,
+      'SELECT factType, SUM(valueX) N FROM %T WHERE factType LIKE %>
+        GROUP BY factType',
+      $table_name,
+      'N:');
+
+    $facts = array();
+    foreach ($counts as $count) {
+      $facts[] = id(new PhabricatorFactAggregate())
+        ->setFactType('+'.$count['factType'])
+        ->setValueX($count['N']);
+    }
+
+    return $facts;
+  }
+
+
+}
diff --git a/src/fact/SprintFactDaemon.php b/src/fact/SprintFactDaemon.php
new file mode 100644
index 0000000..b2e9409
--- /dev/null
+++ b/src/fact/SprintFactDaemon.php
@@ -0,0 +1,227 @@
+<?php
+
+final class SprintFactDaemon extends PhabricatorDaemon {
+
+  private $engines;
+
+  const RAW_FACT_BUFFER_LIMIT = 128;
+
+  public function run() {
+    $this->setEngines(PhabricatorFactEngine::loadAllEngines());
+    while (!$this->shouldExit()) {
+      $iterators = $this->getAllApplicationIterators();
+      foreach ($iterators as $iterator_name => $iterator) {
+        $this->processIteratorWithCursor($iterator_name, $iterator);
+      }
+      $this->processAggregates();
+
+      $this->log('Zzz...');
+      $this->sleep(60 * 5);
+    }
+  }
+
+  public static function getAllApplicationIterators() {
+    $apps = PhabricatorApplication::getAllInstalledApplications();
+
+    $iterators = array();
+    foreach ($apps as $app) {
+      foreach ($app->getFactObjectsForAnalysis() as $object) {
+        $iterator = new PhabricatorFactUpdateIterator($object);
+        $iterators[get_class($object)] = $iterator;
+      }
+    }
+
+    return $iterators;
+  }
+
+  public function processIteratorWithCursor($iterator_name, $iterator) {
+    $this->log("Processing cursor '{$iterator_name}'.");
+
+    $cursor = id(new PhabricatorFactCursor())->loadOneWhere(
+        'name = %s',
+        $iterator_name);
+    if (!$cursor) {
+      $cursor = new PhabricatorFactCursor();
+      $cursor->setName($iterator_name);
+      $position = null;
+    } else {
+      $position = $cursor->getPosition();
+    }
+
+    if ($position) {
+      $iterator->setPosition($position);
+    }
+
+    $new_cursor_position = $this->processIterator($iterator);
+
+    if ($new_cursor_position) {
+      $cursor->setPosition($new_cursor_position);
+      $cursor->save();
+    }
+  }
+
+  public function setEngines(array $engines) {
+    assert_instances_of($engines, 'PhabricatorFactEngine');
+
+    $this->engines = $engines;
+    return $this;
+  }
+
+  public function processIterator($iterator) {
+    $result = null;
+    $raw_facts = array();
+
+    $object = $iterator->getObject();
+
+    if ($object instanceof ManiphestTransaction) {
+      $corecustomfield = $object->loadAllWhere('transactionType = %s', 
'core:customfield');
+      $status = $object->loadAllWhere('transactionType = %s', 'status');
+      foreach ($corecustomfield as $object) {
+        $phid = $object->getPHID();
+        $this->log("Processing {$phid}...");
+        $raw_facts[$phid] = $this->computeRawFacts($object);
+      }
+      foreach ($status as $object) {
+        $phid = $object->getPHID();
+        $this->log("Processing {$phid}...");
+        $raw_facts[$phid] = $this->computeRawFacts($object);
+      }
+
+      if (count($raw_facts) < self::RAW_FACT_BUFFER_LIMIT) {
+        $this->updateRawFacts($raw_facts);
+        $raw_facts = array();
+      }
+    } else {
+      foreach ($iterator as $key => $object) {
+        $phid = $object->getPHID();
+        $this->log("Processing {$phid}...");
+        $raw_facts[$phid] = $this->computeRawFacts($object);
+        if (count($raw_facts) > self::RAW_FACT_BUFFER_LIMIT) {
+          $this->updateRawFacts($raw_facts);
+          $raw_facts = array();
+        }
+        $result = $key;
+      }
+
+    }
+
+    if ($raw_facts) {
+      $this->updateRawFacts($raw_facts);
+    }
+
+    return $result;
+  }
+
+  public function processAggregates() {
+    $this->log('Processing aggregates.');
+
+    $facts = $this->computeAggregateFacts();
+    $this->updateAggregateFacts($facts);
+  }
+
+  private function computeAggregateFacts() {
+    $facts = array();
+    foreach ($this->engines as $engine) {
+      if (!$engine->shouldComputeAggregateFacts()) {
+        continue;
+      }
+      $facts[] = $engine->computeAggregateFacts();
+    }
+    return array_mergev($facts);
+  }
+
+  private function computeRawFacts(PhabricatorLiskDAO $object) {
+    $facts = array();
+    foreach ($this->engines as $engine) {
+      if (!$engine->shouldComputeRawFactsForObject($object)) {
+        continue;
+      }
+      $facts[] = $engine->computeRawFactsForObject($object);
+    }
+
+    return array_mergev($facts);
+  }
+
+  private function updateRawFacts(array $map) {
+    foreach ($map as $phid => $facts) {
+      assert_instances_of($facts, 'PhabricatorFactRaw');
+    }
+
+    $phids = array_keys($map);
+    if (!$phids) {
+      return;
+    }
+
+    $table = new PhabricatorFactRaw();
+    $conn = $table->establishConnection('w');
+    $table_name = $table->getTableName();
+
+    $sql = array();
+    foreach ($map as $phid => $facts) {
+      foreach ($facts as $fact) {
+        $sql[] = qsprintf(
+            $conn,
+            '(%s, %s, %s, %d, %d, %d)',
+            $fact->getFactType(),
+            $fact->getObjectPHID(),
+            $fact->getObjectA(),
+            $fact->getValueX(),
+            $fact->getValueY(),
+            $fact->getEpoch());
+      }
+    }
+
+    $table->openTransaction();
+
+    queryfx(
+        $conn,
+        'DELETE FROM %T WHERE objectPHID IN (%Ls)',
+        $table_name,
+        $phids);
+
+    if ($sql) {
+      foreach (array_chunk($sql, 256) as $chunk) {
+        queryfx(
+            $conn,
+            'INSERT INTO %T
+              (factType, objectPHID, objectA, valueX, valueY, epoch)
+              VALUES %Q',
+            $table_name,
+            implode(', ', $chunk));
+      }
+    }
+
+    $table->saveTransaction();
+  }
+
+  private function updateAggregateFacts(array $facts) {
+    if (!$facts) {
+      return;
+    }
+
+    $table = new PhabricatorFactAggregate();
+    $conn = $table->establishConnection('w');
+    $table_name = $table->getTableName();
+
+    $sql = array();
+    foreach ($facts as $fact) {
+      $sql[] = qsprintf(
+          $conn,
+          '(%s, %s, %d)',
+          $fact->getFactType(),
+          $fact->getObjectPHID(),
+          $fact->getValueX());
+    }
+
+    foreach (array_chunk($sql, 256) as $chunk) {
+      queryfx(
+          $conn,
+          'INSERT INTO %T (factType, objectPHID, valueX) VALUES %Q
+          ON DUPLICATE KEY UPDATE valueX = VALUES(valueX)',
+          $table_name,
+          implode(', ', $chunk));
+    }
+
+  }
+
+}
diff --git a/src/fact/SprintFactUpdateIterator.php 
b/src/fact/SprintFactUpdateIterator.php
new file mode 100644
index 0000000..4230d1d
--- /dev/null
+++ b/src/fact/SprintFactUpdateIterator.php
@@ -0,0 +1,100 @@
+<?php
+
+/**
+ * Iterate over objects by update time in a stable way. This iterator only 
works
+ * for "normal" Lisk objects: objects with an autoincrement ID and a
+ * dateModified column.
+ */
+final class SprintFactUpdateIterator extends PhutilBufferedIterator {
+
+  private $cursor;
+  private $object;
+  private $position;
+  private $ignoreUpdatesDuration = 15;
+
+  private $set;
+
+  public function __construct(LiskDAO $object) {
+    $this->set = new LiskDAOSet();
+    $this->object = $object->putInSet($this->set);
+  }
+
+  public function setPosition($position) {
+    $this->position = $position;
+    return $this;
+  }
+
+  public function getObject() {
+    return $this->object;
+  }
+
+  protected function didRewind() {
+    $this->cursor = $this->position;
+  }
+
+  protected function getCursorFromObject($object) {
+    if ($object->hasProperty('dateModified')) {
+      return $object->getDateModified().':'.$object->getID();
+    } else {
+      return $object->getID();
+    }
+  }
+
+  public function key() {
+    return $this->getCursorFromObject($this->current());
+  }
+
+  protected function loadPage() {
+    $this->set->clearSet();
+
+    if ($this->object->hasProperty('dateModified')) {
+      if ($this->cursor) {
+        list($after_epoch, $after_id) = explode(':', $this->cursor);
+      } else {
+        $after_epoch = 0;
+        $after_id = 0;
+      }
+
+      // NOTE: We ignore recent updates because once we process an update we'll
+      // never process rows behind it again. We need to read only rows which
+      // we're sure no new rows will be inserted behind. If we read a row that
+      // was updated on the current second, another update later on in this
+      // second could affect an object with a lower ID, and we'd skip that
+      // update. To avoid this, just ignore any rows which have been updated in
+      // the last few seconds. This also reduces the amount of work we need to
+      // do if an object is repeatedly updated; we will just look at the end
+      // state without processing the intermediate states. Finally, this gives
+      // us reasonable protections against clock skew between the machine the
+      // daemon is running on and any machines performing writes.
+
+      $page = $this->object->loadAllWhere(
+          '((dateModified > %d) OR (dateModified = %d AND id > %d))
+          AND (dateModified < %d - %d)
+          ORDER BY dateModified ASC, id ASC LIMIT %d',
+          $after_epoch,
+          $after_epoch,
+          $after_id,
+          time(),
+          $this->ignoreUpdatesDuration,
+          $this->getPageSize());
+    } else {
+      if ($this->cursor) {
+        $after_id = $this->cursor;
+      } else {
+        $after_id = 0;
+      }
+
+      $page = $this->object->loadAllWhere(
+          'id > %d ORDER BY id ASC LIMIT %d',
+          $after_id,
+          $this->getPageSize());
+    }
+
+    if ($page) {
+      $this->cursor = $this->getCursorFromObject(end($page));
+    }
+
+    return $page;
+  }
+
+}

-- 
To view, visit https://gerrit.wikimedia.org/r/179916
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Id8d63075ca94f5204997b23b35496a2cd1e527f2
Gerrit-PatchSet: 1
Gerrit-Project: phabricator/extensions/Sprint
Gerrit-Branch: master
Gerrit-Owner: Christopher Johnson (WMDE) <[email protected]>

_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits

Reply via email to