refactor outbox parser
All checks were successful
PHP Code Checker / PHP Code Checker (pull_request) Successful in 48s
PHPUnit / PHPUnit – PHP 7.4 (pull_request) Successful in 1m3s
PHPUnit / PHPUnit – PHP 8.0 (pull_request) Successful in 1m5s
PHPUnit / PHPUnit – PHP 8.1 (pull_request) Successful in 1m0s
PHPUnit / PHPUnit – PHP 8.2 (pull_request) Successful in 1m5s
PHPUnit / PHPUnit – PHP 8.3 (pull_request) Successful in 1m6s
PHPUnit / PHPUnit – PHP 8.4 (pull_request) Successful in 1m7s

This commit is contained in:
André Menrath 2025-01-03 19:50:32 +01:00
parent b8bb698f60
commit 9a2dda0388
2 changed files with 90 additions and 29 deletions

View file

@ -150,6 +150,17 @@ class Event_Sources {
'sanitize_callback' => 'sanitize_url',
)
);
\register_post_meta(
self::POST_TYPE,
'_event_bridge_for_activitypub_event_count',
array(
'type' => 'int',
'single' => true,
'sanitize_callback' => 'absint',
'default' => '0',
)
);
}
/**

View file

@ -104,17 +104,17 @@ class Outbox_Parser {
return;
}
$count = 0;
$imported_count = 0;
foreach ( $events as $event ) {
$transmogrifier->save( $event, $actor );
++$count;
if ( $limit > 0 && $count >= $limit ) {
++$imported_count;
if ( $limit > 0 && $imported_count >= $limit ) {
break;
}
}
return count( $events );
return $imported_count;
}
/**
@ -149,9 +149,72 @@ class Outbox_Parser {
if ( ! $outbox_url ) {
return;
}
return self::queue_importing_from_outbox( $outbox_url, $actor );
// Schedule the import of events via the outbox.
return self::queue_importing_from_outbox( $outbox_url, $actor, 0 );
}
/**
* Get the current import count for the actor.
*
* @param string $actor The actor's ID/URL.
* @return int The current count of imported events.
*/
private static function get_import_count( $actor ) {
$post_id = Event_Source::get_by_id( $actor )->ID;
return (int) \get_post_meta( $post_id, '_event_bridge_for_activitypub_event_count', true );
}
/**
* Update the import count for the actor.
*
* @param string $actor The actor's ID/URL.
* @param int $count The new count of imported events.
* @return void
*/
private static function update_import_count( $actor, $count ) {
$post_id = Event_Source::get_by_id( $actor )->ID;
\update_post_meta( $post_id, '_event_bridge_for_activitypub_event_count', $count );
}
/**
* Fetch the outbox from the given URL.
*
* @param string $url The URL of the outbox.
* @return array|null The decoded outbox data, or null if fetching fails.
*/
private static function fetch_outbox( $url ) {
$response = Http::get( $url );
if ( \is_wp_error( $response ) ) {
return null;
}
$outbox = \wp_remote_retrieve_body( $response );
$outbox = \json_decode( $outbox, true );
return ( is_array( $outbox ) && isset( $outbox['type'] ) && isset( $outbox['id'] ) ) ? $outbox : null;
}
/**
* Get the pagination URL from the outbox.
*
* @param array $outbox The outbox data.
* @return string|null The pagination URL, or null if not found.
*/
private static function get_pagination_url( $outbox ) {
if ( 'OrderedCollection' === $outbox['type'] && ! empty( $outbox['first'] ) && is_string( $outbox['first'] ) ) {
return $outbox['first'];
}
if ( 'OrderedCollectionPage' === $outbox['type'] && ! empty( $outbox['next'] ) && is_string( $outbox['next'] ) ) {
return $outbox['next'];
}
return null;
}
/**
* Import events from an outbox: OrderedCollection or OrderedCollectionPage.
*
@ -160,50 +223,37 @@ class Outbox_Parser {
* @return void
*/
public static function import_events_from_outbox( $url, $actor ) {
$response = Http::get( $url );
$outbox = self::fetch_outbox( $url );
if ( \is_wp_error( $response ) ) {
if ( ! $outbox ) {
return;
}
$outbox = \wp_remote_retrieve_body( $response );
$outbox = \json_decode( $outbox, true );
// Validate the outbox type and structure.
if ( ! is_array( $outbox ) || ! isset( $outbox['type'] ) ) {
return;
}
$current_count = (int) \get_option( "event_bridge_for_activitypub_backfill_count_{$actor}", 0 );
$current_count = self::get_import_count( $actor );
if ( $current_count >= self::MAX_EVENTS_TO_IMPORT ) {
// Stop importing as the limit is reached.
return;
}
// Process orderedItems if they exist (non-paginated outbox).
if ( isset( $outbox['orderedItems'] ) && is_array( $outbox['orderedItems'] ) ) {
$current_count += self::import_events_from_items( $outbox['orderedItems'], $actor, self::MAX_EVENTS_TO_IMPORT - $current_count );
$current_count += self::import_events_from_items(
$outbox['orderedItems'],
$actor,
self::MAX_EVENTS_TO_IMPORT - $current_count
);
}
// Update the count.
\update_option( "activitypub_import_event_count_{$actor}", $current_count );
self::update_import_count( $actor, $current_count );
// If the count is already exceeded abort here.
if ( $current_count >= self::MAX_EVENTS_TO_IMPORT ) {
return;
}
// Determine the pagination URL based on the outbox type.
$pagination_url = null;
// Get next page and if it exists schedule the import of next page.
$pagination_url = self::get_pagination_url( $outbox );
if ( 'OrderedCollection' === $outbox['type'] && ! empty( $outbox['first'] ) && is_string( $outbox['first'] ) ) {
$pagination_url = $outbox['first'];
} elseif ( 'OrderedCollectionPage' === $outbox['type'] && ! empty( $outbox['next'] ) && is_string( $outbox['next'] ) ) {
$pagination_url = $outbox['next'];
}
// Trigger the action if a pagination URL is found.
if ( $pagination_url ) {
self::queue_importing_from_outbox( $pagination_url, $actor );
}