From c614af53bfe946e0a03bd979c9ad5353561fa584 Mon Sep 17 00:00:00 2001 From: epriestley Date: Thu, 26 Jul 2012 13:09:36 -0700 Subject: [PATCH] Add PhutilFactsUpateIterator Summary: This iterator processes objects that have been updated. Test Plan: Ran this test script: $cursor = null; $table = new DifferentialRevision(); while (true) { $iterator = new PhabricatorFactsUpdateIterator($table, $cursor); foreach ($iterator as $new_cursor => $update) { echo "{$new_cursor} => D".$update->getID()."\n"; $cursor = $new_cursor; } echo "Zzz...\n"; sleep(5); } Verified it iterated over every object and then stopped. Made a comment on a differenial revision, verified it iterated over the object after 15 seconds. Reviewers: vrana, btrahan Reviewed By: vrana CC: aran Maniphest Tasks: T1562 Differential Revision: https://secure.phabricator.com/D3077 --- src/__phutil_library_map__.php | 4 +- .../PhabricatorFactsUpdateIterator.php | 84 +++++++++++++++++++ 2 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 src/applications/facts/extract/PhabricatorFactsUpdateIterator.php diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php index ecbd7f7c6b..f32565e78d 100644 --- a/src/__phutil_library_map__.php +++ b/src/__phutil_library_map__.php @@ -624,6 +624,7 @@ phutil_register_library_map(array( 'PhabricatorEventEngine' => 'infrastructure/events/PhabricatorEventEngine.php', 'PhabricatorEventType' => 'infrastructure/events/constant/PhabricatorEventType.php', 'PhabricatorExampleEventListener' => 'infrastructure/events/PhabricatorExampleEventListener.php', + 'PhabricatorFactsUpdateIterator' => 'applications/facts/extract/PhabricatorFactsUpdateIterator.php', 'PhabricatorFeedBuilder' => 'applications/feed/builder/PhabricatorFeedBuilder.php', 'PhabricatorFeedConstants' => 'applications/feed/constants/PhabricatorFeedConstants.php', 'PhabricatorFeedController' => 'applications/feed/controller/PhabricatorFeedController.php', @@ -1507,7 +1508,7 @@ phutil_register_library_map(array( 'LiskIsolationTestCase' => 'PhabricatorTestCase', 'LiskIsolationTestDAO' => 'LiskDAO', 'LiskIsolationTestDAOException' => 'Exception', - 'LiskMigrationIterator' => 'Iterator', + 'LiskMigrationIterator' => 'PhutilBufferedIterator', 'ManiphestAction' => 'ManiphestConstants', 'ManiphestAuxiliaryFieldDefaultSpecification' => 'ManiphestAuxiliaryFieldSpecification', 'ManiphestAuxiliaryFieldTypeException' => 'Exception', @@ -1648,6 +1649,7 @@ phutil_register_library_map(array( 'PhabricatorEvent' => 'PhutilEvent', 'PhabricatorEventType' => 'PhutilEventType', 'PhabricatorExampleEventListener' => 'PhutilEventListener', + 'PhabricatorFactsUpdateIterator' => 'PhutilBufferedIterator', 'PhabricatorFeedController' => 'PhabricatorController', 'PhabricatorFeedDAO' => 'PhabricatorLiskDAO', 'PhabricatorFeedPublicStreamController' => 'PhabricatorFeedController', diff --git a/src/applications/facts/extract/PhabricatorFactsUpdateIterator.php b/src/applications/facts/extract/PhabricatorFactsUpdateIterator.php new file mode 100644 index 0000000000..19b1757396 --- /dev/null +++ b/src/applications/facts/extract/PhabricatorFactsUpdateIterator.php @@ -0,0 +1,84 @@ +object = $object; + $this->start = $start; + } + + protected function didRewind() { + $this->cursor = $this->start; + } + + protected function getCursorFromObject($object) { + return $object->getDateModified().':'.$object->getID(); + } + + public function key() { + return $this->getCursorFromObject($this->current()); + } + + protected function loadPage() { + list($after_epoch, $after_id) = explode(':', $this->cursor); + + // NOTE: We ignore recent updates because once we process an update we'll + // never process rows behind it again. We need to read only rows which + // we're sure no new rows will be inserted behind. If we read a row that + // was updated on the current second, another update later on in this second + // could affect an object with a lower ID, and we'd skip that update. To + // avoid this, just ignore any rows which have been updated in the last few + // seconds. This also reduces the amount of work we need to do if an object + // is repeatedly updated; we will just look at the end state without + // processing the intermediate states. Finally, this gives us reasonable + // protections against clock skew between the machine the daemon is running + // on and any machines performing writes. + + $page = $this->object->loadAllWhere( + '((dateModified > %d) OR (dateModified = %d AND id > %d)) + AND (dateModified < %d - %d) + ORDER BY dateModified ASC, id ASC LIMIT %d', + $after_epoch, + $after_epoch, + $after_id, + time(), + $this->ignoreUpdatesDuration, + $this->getPageSize()); + + if ($page) { + $this->cursor = $this->getCursorFromObject(end($page)); + } + + return $page; + } + +}