Christopher Johnson (WMDE) has uploaded a new change for review.
https://gerrit.wikimedia.org/r/179916
Change subject: adds SprintFactDaemon and PhabricatorFactSprintEngine
......................................................................
adds SprintFactDaemon and PhabricatorFactSprintEngine
bug: T78127
Change-Id: Id8d63075ca94f5204997b23b35496a2cd1e527f2
---
M src/__phutil_library_map__.php
M src/application/SprintApplication.php
A src/fact/PhabricatorFactSprintEngine.php
A src/fact/SprintFactDaemon.php
A src/fact/SprintFactUpdateIterator.php
5 files changed, 511 insertions(+), 1 deletion(-)
git pull ssh://gerrit.wikimedia.org:29418/phabricator/extensions/Sprint
refs/changes/16/179916/1
diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php
index e39343f..a6457b5 100644
--- a/src/__phutil_library_map__.php
+++ b/src/__phutil_library_map__.php
@@ -20,6 +20,7 @@
'EventTableView' => 'view/EventTableView.php',
'HistoryTableView' => 'view/HistoryTableView.php',
'OpenTasksView' => 'view/OpenTasksView.php',
+ 'PhabricatorFactSprintEngine' => 'fact/PhabricatorFactSprintEngine.php',
'ProjectOpenTasksView' => 'view/ProjectOpenTasksView.php',
'SprintApplication' => 'application/SprintApplication.php',
'SprintApplicationTest' => 'tests/SprintApplicationTest.php',
@@ -39,8 +40,11 @@
'SprintConstants' => 'constants/SprintConstants.php',
'SprintController' => 'controller/SprintController.php',
'SprintControllerTest' => 'tests/SprintControllerTest.php',
+ 'SprintDAO' => 'storage/SprintDAO.php',
'SprintDataViewController' => 'controller/SprintDataViewController.php',
'SprintEndDateField' => 'customfield/SprintEndDateField.php',
+ 'SprintFactDaemon' => 'fact/SprintFactDaemon.php',
+ 'SprintFactUpdateIterator' => 'fact/SprintFactUpdateIterator.php',
'SprintListController' => 'controller/SprintListController.php',
'SprintProjectCustomField' => 'customfield/SprintProjectCustomField.php',
'SprintProjectProfileController' =>
'controller/SprintProjectProfileController.php',
@@ -50,6 +54,7 @@
'SprintReportController' => 'controller/SprintReportController.php',
'SprintReportOpenTasksView' => 'view/SprintReportOpenTasksView.php',
'SprintTableView' => 'view/SprintTableView.php',
+ 'SprintTask' => 'storage/SprintTask.php',
'SprintTaskStoryPointsField' =>
'customfield/SprintTaskStoryPointsField.php',
'SprintTestCase' => 'tests/SprintTestCase.php',
'SprintTransaction' => 'storage/SprintTransaction.php',
@@ -67,6 +72,7 @@
'BurndownException' => 'Exception',
'CeleritySprintResources' => 'CelerityResourcesOnDisk',
'DateIterator' => 'Iterator',
+ 'PhabricatorFactSprintEngine' => 'PhabricatorFactEngine',
'ProjectOpenTasksView' => 'OpenTasksView',
'SprintApplication' => 'PhabricatorApplication',
'SprintApplicationTest' => 'SprintTestCase',
@@ -83,8 +89,11 @@
'SprintBuildStatsTest' => 'SprintTestCase',
'SprintController' => 'PhabricatorController',
'SprintControllerTest' => 'SprintTestCase',
+ 'SprintDAO' => 'PhabricatorLiskDAO',
'SprintDataViewController' => 'SprintController',
'SprintEndDateField' => 'SprintProjectCustomField',
+ 'SprintFactDaemon' => 'PhabricatorDaemon',
+ 'SprintFactUpdateIterator' => 'PhutilBufferedIterator',
'SprintListController' => 'SprintController',
'SprintProjectCustomField' => array(
'PhabricatorProjectCustomField',
@@ -95,6 +104,19 @@
'SprintReportBurndownView' => 'SprintView',
'SprintReportController' => 'SprintController',
'SprintReportOpenTasksView' => 'SprintView',
+ 'SprintTask' => array(
+ 'SprintDAO',
+ 'PhabricatorMarkupInterface',
+ 'PhabricatorPolicyInterface',
+ 'PhabricatorTokenReceiverInterface',
+ 'PhabricatorFlaggableInterface',
+ 'PhabricatorMentionableInterface',
+ 'PhrequentTrackableInterface',
+ 'PhabricatorCustomFieldInterface',
+ 'PhabricatorDestructibleInterface',
+ 'PhabricatorApplicationTransactionInterface',
+ 'PhabricatorProjectInterface',
+ ),
'SprintTaskStoryPointsField' => array(
'ManiphestCustomField',
'PhabricatorStandardCustomFieldInterface',
diff --git a/src/application/SprintApplication.php
b/src/application/SprintApplication.php
index aaf1032..bb55617 100644
--- a/src/application/SprintApplication.php
+++ b/src/application/SprintApplication.php
@@ -31,7 +31,7 @@
public function getFactObjectsForAnalysis() {
return array(
- new ManiphestTask(),
+ new ManiphestTransaction(),
);
}
diff --git a/src/fact/PhabricatorFactSprintEngine.php
b/src/fact/PhabricatorFactSprintEngine.php
new file mode 100644
index 0000000..189a210
--- /dev/null
+++ b/src/fact/PhabricatorFactSprintEngine.php
@@ -0,0 +1,161 @@
+<?php
+
+/**
+ * Simple fact engine which counts objects.
+ */
+final class PhabricatorFactSprintEngine extends PhabricatorFactEngine {
+
+ public function getFactSpecs(array $fact_types) {
+ $results = array();
+ foreach ($fact_types as $type) {
+ if (!strncmp($type, '+N:', 3)) {
+ if ($type == '+N:*') {
+ $name = 'Total Objects';
+ } else {
+ $name = 'Total Objects of type '.substr($type, 3);
+ }
+
+ $results[] = id(new PhabricatorFactSimpleSpec($type))
+ ->setName($name)
+ ->setUnit(PhabricatorFactSimpleSpec::UNIT_COUNT);
+ }
+
+ if (!strncmp($type, 'N:', 2)) {
+ if ($type == 'N:*') {
+ $name = 'Objects';
+ } else {
+ $name = 'Objects of type '.substr($type, 2);
+ }
+ $results[] = id(new PhabricatorFactSimpleSpec($type))
+ ->setName($name)
+ ->setUnit(PhabricatorFactSimpleSpec::UNIT_COUNT);
+ }
+
+ }
+ return $results;
+ }
+
+ public function shouldComputeRawFactsForObject(PhabricatorLiskDAO $object) {
+ return true;
+ }
+
+ public function computeRawFactsForObject(PhabricatorLiskDAO $object) {
+ $facts = array();
+
+ $phid = $object->getPHID();
+ $type = phid_get_type($phid);
+
+ if ($object instanceof ManiphestTransaction) {
+ $xacttype = $object->getTransactionType();
+ if ($xacttype == 'core:customfield') {
+ $oldvalue = $object->getOldValue();
+ $newvalue = $object->getNewValue();
+ $objectPHID = $object->getObjectPHID();
+ foreach (array('N:*', 'N:' . $xacttype) as $fact_type) {
+ $facts[] = id(new PhabricatorFactRaw())
+ ->setFactType($fact_type)
+ ->setObjectPHID($objectPHID)
+ ->setValueX($oldvalue)
+ ->setValueY($newvalue)
+ ->setEpoch($object->getDateCreated());
+ }
+ } elseif ($xacttype == 'status') {
+ $oldstatus = null;
+ $newstatus = null;
+
+ if ($object->getOldValue() == 'open') {
+ $oldstatus = 1;
+ } elseif ($object->getOldValue() == 'resolved') {
+ $oldstatus = 0;
+ }
+
+ if ($object->getNewValue() == 'open') {
+ $newstatus = 1;
+ } elseif ($object->getNewValue() == 'resolved') {
+ $newstatus = 0;
+ }
+
+ $objectPHID = $object->getObjectPHID();
+ foreach (array('N:*', 'N:' . $xacttype) as $fact_type) {
+ $facts[] = id(new PhabricatorFactRaw())
+ ->setFactType($fact_type)
+ ->setObjectPHID($objectPHID)
+ ->setValueX($oldstatus)
+ ->setValueY($newstatus)
+ ->setEpoch($object->getDateCreated());
+ }
+ }
+ } else {
+ foreach (array('N:*', 'N:'.$type) as $fact_type) {
+ $facts[] = id(new PhabricatorFactRaw())
+ ->setFactType($fact_type)
+ ->setObjectPHID($phid)
+ ->setValueX(1)
+ ->setValueY()
+ ->setEpoch($object->getDateCreated());
+ }
+ }
+
+ return $facts;
+ }
+
+ public function computeRawFactsfromXACTForObject(PhabricatorLiskDAO $object)
{
+ $facts = array();
+
+ $phid = $object->getPHID();
+ $type = phid_get_type($phid);
+
+ if ($object instanceof ManiphestTransaction) {
+ $oldvalue = $object->getOldValue();
+ $newvalue = $object->getNewValue();
+ $objectPHID = $object->getObjectPHID();
+ foreach (array('N:*', 'N:'.$type) as $fact_type) {
+ $facts[] = id(new PhabricatorFactRaw())
+ ->setFactType($fact_type)
+ ->setObjectPHID($objectPHID)
+ ->setValueX($oldvalue)
+ ->setValueY($newvalue)
+ ->setEpoch($object->getDateCreated());
+ }
+ } else {
+ foreach (array('N:*', 'N:'.$type) as $fact_type) {
+ $facts[] = id(new PhabricatorFactRaw())
+ ->setFactType($fact_type)
+ ->setObjectPHID($phid)
+ ->setValueX(1)
+ ->setValueY()
+ ->setEpoch($object->getDateCreated());
+ }
+ }
+
+ return $facts;
+ }
+
+ public function shouldComputeAggregateFacts() {
+ return true;
+ }
+
+ public function computeAggregateFacts() {
+ $table = new PhabricatorFactRaw();
+ $table_name = $table->getTableName();
+ $conn = $table->establishConnection('r');
+
+ $counts = queryfx_all(
+ $conn,
+ 'SELECT factType, SUM(valueX) N FROM %T WHERE factType LIKE %>
+ GROUP BY factType',
+ $table_name,
+ 'N:');
+
+ $facts = array();
+ foreach ($counts as $count) {
+ $facts[] = id(new PhabricatorFactAggregate())
+ ->setFactType('+'.$count['factType'])
+ ->setValueX($count['N']);
+ }
+
+ return $facts;
+ }
+
+
+}
diff --git a/src/fact/SprintFactDaemon.php b/src/fact/SprintFactDaemon.php
new file mode 100644
index 0000000..b2e9409
--- /dev/null
+++ b/src/fact/SprintFactDaemon.php
@@ -0,0 +1,227 @@
+<?php
+
+final class SprintFactDaemon extends PhabricatorDaemon {
+
+ private $engines;
+
+ const RAW_FACT_BUFFER_LIMIT = 128;
+
+ public function run() {
+ $this->setEngines(PhabricatorFactEngine::loadAllEngines());
+ while (!$this->shouldExit()) {
+ $iterators = $this->getAllApplicationIterators();
+ foreach ($iterators as $iterator_name => $iterator) {
+ $this->processIteratorWithCursor($iterator_name, $iterator);
+ }
+ $this->processAggregates();
+
+ $this->log('Zzz...');
+ $this->sleep(60 * 5);
+ }
+ }
+
+ public static function getAllApplicationIterators() {
+ $apps = PhabricatorApplication::getAllInstalledApplications();
+
+ $iterators = array();
+ foreach ($apps as $app) {
+ foreach ($app->getFactObjectsForAnalysis() as $object) {
+ $iterator = new PhabricatorFactUpdateIterator($object);
+ $iterators[get_class($object)] = $iterator;
+ }
+ }
+
+ return $iterators;
+ }
+
+ public function processIteratorWithCursor($iterator_name, $iterator) {
+ $this->log("Processing cursor '{$iterator_name}'.");
+
+ $cursor = id(new PhabricatorFactCursor())->loadOneWhere(
+ 'name = %s',
+ $iterator_name);
+ if (!$cursor) {
+ $cursor = new PhabricatorFactCursor();
+ $cursor->setName($iterator_name);
+ $position = null;
+ } else {
+ $position = $cursor->getPosition();
+ }
+
+ if ($position) {
+ $iterator->setPosition($position);
+ }
+
+ $new_cursor_position = $this->processIterator($iterator);
+
+ if ($new_cursor_position) {
+ $cursor->setPosition($new_cursor_position);
+ $cursor->save();
+ }
+ }
+
+ public function setEngines(array $engines) {
+ assert_instances_of($engines, 'PhabricatorFactEngine');
+
+ $this->engines = $engines;
+ return $this;
+ }
+
+ public function processIterator($iterator) {
+ $result = null;
+ $raw_facts = array();
+
+ $object = $iterator->getObject();
+
+ if ($object instanceof ManiphestTransaction) {
+ $corecustomfield = $object->loadAllWhere('transactionType = %s',
'core:customfield');
+ $status = $object->loadAllWhere('transactionType = %s', 'status');
+ foreach ($corecustomfield as $object) {
+ $phid = $object->getPHID();
+ $this->log("Processing {$phid}...");
+ $raw_facts[$phid] = $this->computeRawFacts($object);
+ }
+ foreach ($status as $object) {
+ $phid = $object->getPHID();
+ $this->log("Processing {$phid}...");
+ $raw_facts[$phid] = $this->computeRawFacts($object);
+ }
+
+ if (count($raw_facts) < self::RAW_FACT_BUFFER_LIMIT) {
+ $this->updateRawFacts($raw_facts);
+ $raw_facts = array();
+ }
+ } else {
+ foreach ($iterator as $key => $object) {
+ $phid = $object->getPHID();
+ $this->log("Processing {$phid}...");
+ $raw_facts[$phid] = $this->computeRawFacts($object);
+ if (count($raw_facts) > self::RAW_FACT_BUFFER_LIMIT) {
+ $this->updateRawFacts($raw_facts);
+ $raw_facts = array();
+ }
+ $result = $key;
+ }
+
+ }
+
+ if ($raw_facts) {
+ $this->updateRawFacts($raw_facts);
+ }
+
+ return $result;
+ }
+
+ public function processAggregates() {
+ $this->log('Processing aggregates.');
+
+ $facts = $this->computeAggregateFacts();
+ $this->updateAggregateFacts($facts);
+ }
+
+ private function computeAggregateFacts() {
+ $facts = array();
+ foreach ($this->engines as $engine) {
+ if (!$engine->shouldComputeAggregateFacts()) {
+ continue;
+ }
+ $facts[] = $engine->computeAggregateFacts();
+ }
+ return array_mergev($facts);
+ }
+
+ private function computeRawFacts(PhabricatorLiskDAO $object) {
+ $facts = array();
+ foreach ($this->engines as $engine) {
+ if (!$engine->shouldComputeRawFactsForObject($object)) {
+ continue;
+ }
+ $facts[] = $engine->computeRawFactsForObject($object);
+ }
+
+ return array_mergev($facts);
+ }
+
+ private function updateRawFacts(array $map) {
+ foreach ($map as $phid => $facts) {
+ assert_instances_of($facts, 'PhabricatorFactRaw');
+ }
+
+ $phids = array_keys($map);
+ if (!$phids) {
+ return;
+ }
+
+ $table = new PhabricatorFactRaw();
+ $conn = $table->establishConnection('w');
+ $table_name = $table->getTableName();
+
+ $sql = array();
+ foreach ($map as $phid => $facts) {
+ foreach ($facts as $fact) {
+ $sql[] = qsprintf(
+ $conn,
+ '(%s, %s, %s, %d, %d, %d)',
+ $fact->getFactType(),
+ $fact->getObjectPHID(),
+ $fact->getObjectA(),
+ $fact->getValueX(),
+ $fact->getValueY(),
+ $fact->getEpoch());
+ }
+ }
+
+ $table->openTransaction();
+
+ queryfx(
+ $conn,
+ 'DELETE FROM %T WHERE objectPHID IN (%Ls)',
+ $table_name,
+ $phids);
+
+ if ($sql) {
+ foreach (array_chunk($sql, 256) as $chunk) {
+ queryfx(
+ $conn,
+ 'INSERT INTO %T
+ (factType, objectPHID, objectA, valueX, valueY, epoch)
+ VALUES %Q',
+ $table_name,
+ implode(', ', $chunk));
+ }
+ }
+
+ $table->saveTransaction();
+ }
+
+ private function updateAggregateFacts(array $facts) {
+ if (!$facts) {
+ return;
+ }
+
+ $table = new PhabricatorFactAggregate();
+ $conn = $table->establishConnection('w');
+ $table_name = $table->getTableName();
+
+ $sql = array();
+ foreach ($facts as $fact) {
+ $sql[] = qsprintf(
+ $conn,
+ '(%s, %s, %d)',
+ $fact->getFactType(),
+ $fact->getObjectPHID(),
+ $fact->getValueX());
+ }
+
+ foreach (array_chunk($sql, 256) as $chunk) {
+ queryfx(
+ $conn,
+ 'INSERT INTO %T (factType, objectPHID, valueX) VALUES %Q
+ ON DUPLICATE KEY UPDATE valueX = VALUES(valueX)',
+ $table_name,
+ implode(', ', $chunk));
+ }
+
+ }
+
+}
diff --git a/src/fact/SprintFactUpdateIterator.php
b/src/fact/SprintFactUpdateIterator.php
new file mode 100644
index 0000000..4230d1d
--- /dev/null
+++ b/src/fact/SprintFactUpdateIterator.php
@@ -0,0 +1,100 @@
+<?php
+
+/**
+ * Iterate over objects by update time in a stable way. This iterator only
works
+ * for "normal" Lisk objects: objects with an autoincrement ID and a
+ * dateModified column.
+ */
+final class SprintFactUpdateIterator extends PhutilBufferedIterator {
+
+ private $cursor;
+ private $object;
+ private $position;
+ private $ignoreUpdatesDuration = 15;
+
+ private $set;
+
+ public function __construct(LiskDAO $object) {
+ $this->set = new LiskDAOSet();
+ $this->object = $object->putInSet($this->set);
+ }
+
+ public function setPosition($position) {
+ $this->position = $position;
+ return $this;
+ }
+
+ public function getObject() {
+ return $this->object;
+ }
+
+ protected function didRewind() {
+ $this->cursor = $this->position;
+ }
+
+ protected function getCursorFromObject($object) {
+ if ($object->hasProperty('dateModified')) {
+ return $object->getDateModified().':'.$object->getID();
+ } else {
+ return $object->getID();
+ }
+ }
+
+ public function key() {
+ return $this->getCursorFromObject($this->current());
+ }
+
+ protected function loadPage() {
+ $this->set->clearSet();
+
+ if ($this->object->hasProperty('dateModified')) {
+ if ($this->cursor) {
+ list($after_epoch, $after_id) = explode(':', $this->cursor);
+ } else {
+ $after_epoch = 0;
+ $after_id = 0;
+ }
+
+ // NOTE: We ignore recent updates because once we process an update we'll
+ // never process rows behind it again. We need to read only rows which
+ // we're sure no new rows will be inserted behind. If we read a row that
+ // was updated on the current second, another update later on in this
+ // second could affect an object with a lower ID, and we'd skip that
+ // update. To avoid this, just ignore any rows which have been updated in
+ // the last few seconds. This also reduces the amount of work we need to
+ // do if an object is repeatedly updated; we will just look at the end
+ // state without processing the intermediate states. Finally, this gives
+ // us reasonable protections against clock skew between the machine the
+ // daemon is running on and any machines performing writes.
+
+ $page = $this->object->loadAllWhere(
+ '((dateModified > %d) OR (dateModified = %d AND id > %d))
+ AND (dateModified < %d - %d)
+ ORDER BY dateModified ASC, id ASC LIMIT %d',
+ $after_epoch,
+ $after_epoch,
+ $after_id,
+ time(),
+ $this->ignoreUpdatesDuration,
+ $this->getPageSize());
+ } else {
+ if ($this->cursor) {
+ $after_id = $this->cursor;
+ } else {
+ $after_id = 0;
+ }
+
+ $page = $this->object->loadAllWhere(
+ 'id > %d ORDER BY id ASC LIMIT %d',
+ $after_id,
+ $this->getPageSize());
+ }
+
+ if ($page) {
+ $this->cursor = $this->getCursorFromObject(end($page));
+ }
+
+ return $page;
+ }
+
+}
--
To view, visit https://gerrit.wikimedia.org/r/179916
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Id8d63075ca94f5204997b23b35496a2cd1e527f2
Gerrit-PatchSet: 1
Gerrit-Project: phabricator/extensions/Sprint
Gerrit-Branch: master
Gerrit-Owner: Christopher Johnson (WMDE) <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits