diff --git a/includes/activitypub/collection/class-event-sources.php b/includes/activitypub/collection/class-event-sources.php index 209cf45..83baf71 100644 --- a/includes/activitypub/collection/class-event-sources.php +++ b/includes/activitypub/collection/class-event-sources.php @@ -150,6 +150,17 @@ class Event_Sources { 'sanitize_callback' => 'sanitize_url', ) ); + + \register_post_meta( + self::POST_TYPE, + '_event_bridge_for_activitypub_event_count', + array( + 'type' => 'int', + 'single' => true, + 'sanitize_callback' => 'absint', + 'default' => '0', + ) + ); } /** diff --git a/includes/class-outbox-parser.php b/includes/class-outbox-parser.php index 5ee882a..6eead8e 100644 --- a/includes/class-outbox-parser.php +++ b/includes/class-outbox-parser.php @@ -104,17 +104,17 @@ class Outbox_Parser { return; } - $count = 0; + $imported_count = 0; foreach ( $events as $event ) { $transmogrifier->save( $event, $actor ); - ++$count; - if ( $limit > 0 && $count >= $limit ) { + ++$imported_count; + if ( $limit > 0 && $imported_count >= $limit ) { break; } } - return count( $events ); + return $imported_count; } /** @@ -149,9 +149,72 @@ class Outbox_Parser { if ( ! $outbox_url ) { return; } - return self::queue_importing_from_outbox( $outbox_url, $actor ); + + // Schedule the import of events via the outbox. + return self::queue_importing_from_outbox( $outbox_url, $actor, 0 ); } + /** + * Get the current import count for the actor. + * + * @param string $actor The actor's ID/URL. + * @return int The current count of imported events. + */ + private static function get_import_count( $actor ) { + $post_id = Event_Source::get_by_id( $actor )->ID; + return (int) \get_post_meta( $post_id, '_event_bridge_for_activitypub_event_count', true ); + } + + /** + * Update the import count for the actor. + * + * @param string $actor The actor's ID/URL. + * @param int $count The new count of imported events. + * @return void + */ + private static function update_import_count( $actor, $count ) { + $post_id = Event_Source::get_by_id( $actor )->ID; + \update_post_meta( $post_id, '_event_bridge_for_activitypub_event_count', $count ); + } + + /** + * Fetch the outbox from the given URL. + * + * @param string $url The URL of the outbox. + * @return array|null The decoded outbox data, or null if fetching fails. + */ + private static function fetch_outbox( $url ) { + $response = Http::get( $url ); + + if ( \is_wp_error( $response ) ) { + return null; + } + + $outbox = \wp_remote_retrieve_body( $response ); + $outbox = \json_decode( $outbox, true ); + + return ( is_array( $outbox ) && isset( $outbox['type'] ) && isset( $outbox['id'] ) ) ? $outbox : null; + } + + /** + * Get the pagination URL from the outbox. + * + * @param array $outbox The outbox data. + * @return string|null The pagination URL, or null if not found. + */ + private static function get_pagination_url( $outbox ) { + if ( 'OrderedCollection' === $outbox['type'] && ! empty( $outbox['first'] ) && is_string( $outbox['first'] ) ) { + return $outbox['first']; + } + + if ( 'OrderedCollectionPage' === $outbox['type'] && ! empty( $outbox['next'] ) && is_string( $outbox['next'] ) ) { + return $outbox['next']; + } + + return null; + } + + /** * Import events from an outbox: OrderedCollection or OrderedCollectionPage. * @@ -160,50 +223,37 @@ class Outbox_Parser { * @return void */ public static function import_events_from_outbox( $url, $actor ) { - $response = Http::get( $url ); + $outbox = self::fetch_outbox( $url ); - if ( \is_wp_error( $response ) ) { + if ( ! $outbox ) { return; } - $outbox = \wp_remote_retrieve_body( $response ); - $outbox = \json_decode( $outbox, true ); - - // Validate the outbox type and structure. - if ( ! is_array( $outbox ) || ! isset( $outbox['type'] ) ) { - return; - } - - $current_count = (int) \get_option( "event_bridge_for_activitypub_backfill_count_{$actor}", 0 ); + $current_count = self::get_import_count( $actor ); if ( $current_count >= self::MAX_EVENTS_TO_IMPORT ) { - // Stop importing as the limit is reached. return; } // Process orderedItems if they exist (non-paginated outbox). if ( isset( $outbox['orderedItems'] ) && is_array( $outbox['orderedItems'] ) ) { - $current_count += self::import_events_from_items( $outbox['orderedItems'], $actor, self::MAX_EVENTS_TO_IMPORT - $current_count ); + $current_count += self::import_events_from_items( + $outbox['orderedItems'], + $actor, + self::MAX_EVENTS_TO_IMPORT - $current_count + ); } - // Update the count. - \update_option( "activitypub_import_event_count_{$actor}", $current_count ); + self::update_import_count( $actor, $current_count ); // If the count is already exceeded abort here. if ( $current_count >= self::MAX_EVENTS_TO_IMPORT ) { return; } - // Determine the pagination URL based on the outbox type. - $pagination_url = null; + // Get next page and if it exists schedule the import of next page. + $pagination_url = self::get_pagination_url( $outbox ); - if ( 'OrderedCollection' === $outbox['type'] && ! empty( $outbox['first'] ) && is_string( $outbox['first'] ) ) { - $pagination_url = $outbox['first']; - } elseif ( 'OrderedCollectionPage' === $outbox['type'] && ! empty( $outbox['next'] ) && is_string( $outbox['next'] ) ) { - $pagination_url = $outbox['next']; - } - - // Trigger the action if a pagination URL is found. if ( $pagination_url ) { self::queue_importing_from_outbox( $pagination_url, $actor ); }