­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ """Processor for WordPress rule disable/enable changelog files. The PHP WordPress plugin writes rule change actions to changelog.php when a user disables or enables protection rules from the WordPress admin panel. This module reads, parses, and applies those actions to the agent database. The changelog.php file uses the same format as incident files: None: # changelog.php uses the same format as incident files # (base64-encoded JSON lines wrapped in PHP), so we reuse the parser self.parser = IncidentFileParser() async def process_changelogs_for_sites( self, sites: list[WPSite], sink: MessageSink | None, ) -> list[WPSite]: """Process changelog.php for all given sites. Args: sites: WordPress sites to process. sink: MessageSink for sending correlation events. Returns: Sites whose disabled rules were affected (needing disabled-rules.php regeneration). """ affected: list[WPSite] = [] for site in sites: if await self._process_site(site, sink): affected.append(site) if affected: logger.info( "Changelog processing affected %d site(s)", len(affected), ) return affected async def _process_site( self, site: WPSite, sink: MessageSink | None, ) -> bool: """Process changelog.php for a single site. Args: site: WordPress site to process. sink: MessageSink for sending correlation events. Returns: True if the site's disabled rules were affected. """ try: data_dir = await get_data_dir(site) if not data_dir.exists(): return False changelog_path = data_dir / CHANGELOG_FILENAME if changelog_path.exists(): if await self._process_changelog_file( changelog_path, site, sink ): return True if self._is_disabled_rules_file_stale(site, data_dir): return True except Exception as e: logger.error( "Error processing changelog for site %s: %s", site.docroot, e, ) return False def _consume_changelog( self, changelog_path: Path, site: WPSite ) -> list[dict]: """Parse a changelog file and delete it. The file is deleted regardless of whether parsing succeeds. """ try: return self.parser.parse_file(changelog_path) except (OSError, ValueError) as e: logger.error( "Failed to parse changelog for site %s: %s", site.docroot, e, ) return [] finally: try: changelog_path.unlink(missing_ok=True) except OSError as e: logger.error( "Failed to delete changelog for site %s: %s", site.docroot, e, ) async def _process_changelog_file( self, changelog_path: Path, site: WPSite, sink: MessageSink | None, ) -> bool: """Parse and apply actions from a changelog file. The file is always deleted after reading, even on parse errors. Actions older than the last sync timestamp are skipped to prevent stale changelog files (e.g. from backup restores) from undoing more recent changes. Returns: True if any DB changes occurred. """ actions = self._consume_changelog(changelog_path, site) if not actions: return False last_sync_ts = self._get_last_sync_ts(site) changed = False for action in actions: try: timestamp = float(action.get("ts", 0)) if timestamp <= 0: raise ValueError( "Missing or invalid timestamp in changelog action" f" for rule {action.get('rule_id', '?')}" f" on site {site.docroot}" ) if last_sync_ts is not None and timestamp <= last_sync_ts: logger.info( "Skipping stale changelog action for rule %s" " on site %s (ts=%.0f <= sync_ts=%.0f)", action.get("rule_id", "?"), site.docroot, timestamp, last_sync_ts, ) continue if self._process_action(action, site, timestamp): changed = True await self._report_action(action, site, sink, timestamp) except ValueError as e: logger.warning("Skipping invalid changelog entry: %s", e) except Exception as e: logger.error( "Failed to process changelog action %s for site %s: %s", action, site.docroot, e, ) logger.info( "Processed changelog for site %s: %d action(s), changed=%s", site.docroot, len(actions), changed, ) return changed def _process_action( self, action: dict, site: WPSite, timestamp: float ) -> bool: """Apply a single changelog action to the database. Args: action: Parsed action dict with keys: action, rule_id, ts. site: The WordPress site the action belongs to. timestamp: Pre-resolved Unix timestamp for this action. Returns: True if the database state was modified. Raises: ValueError: If the action is missing required fields or has an unknown action type. """ action_type = action.get("action") rule_id = action.get("rule_id") if not action_type or not rule_id: raise ValueError( f"Missing action or rule_id in changelog entry: {action}" ) if action_type == ACTION_DISABLE: return self._apply_disable(rule_id, site, timestamp) elif action_type == ACTION_ENABLE: return self._apply_enable(rule_id, site) else: raise ValueError( f"Unknown changelog action '{action_type}'" f" for rule {rule_id} on site {site.docroot}" ) @staticmethod def _get_last_sync_ts(site: WPSite) -> float | None: """Get the last disabled-rules sync timestamp for a site. Returns None if the site has no DB record or no sync timestamp, meaning all actions should be processed. """ try: db_site = WordpressSite.get_by_id(site.docroot) return db_site.disabled_rules_sync_ts except WordpressSite.DoesNotExist: return None @staticmethod def _apply_disable(rule_id: str, site: WPSite, timestamp: float) -> bool: """Apply a disable action from the changelog. Returns: True if a new disable entry was created (not a no-op). """ count = WPDisabledRule.store( rule_id=rule_id, domains=[site.domain], source=WPDisabledRule.SOURCE_WORDPRESS, user_id=site.uid, timestamp=timestamp, ) return count > 0 def _apply_enable(self, rule_id: str, site: WPSite) -> bool: """Apply an enable action from the changelog. Returns: True if a disable entry was removed. """ count = WPDisabledRule.remove( rule_id=rule_id, domains=[site.domain], ) return count > 0 @staticmethod async def _report_action( action: dict, site: WPSite, sink: MessageSink | None, timestamp: float, ) -> None: """Send a rule change event to the correlation server. Must only be called for valid actions (after _process_action succeeds). """ if sink is None: return action_type = action["action"] rule_id = action["rule_id"] if action_type == ACTION_DISABLE: message_cls = MessageType.WPRuleDisabled elif action_type == ACTION_ENABLE: message_cls = MessageType.WPRuleEnabled else: return try: await sink.process_message( message_cls( plugin_id="wordpress", rule=rule_id, domains=[site.domain], timestamp=timestamp, user_id=site.uid, source=WPDisabledRule.SOURCE_WORDPRESS, ) ) except Exception as e: logger.error( "Failed to report changelog action for rule %s on site %s: %s", rule_id, site.docroot, e, ) @staticmethod def _is_disabled_rules_file_stale( site: WPSite, data_dir: Path, ) -> bool: """Check if disabled-rules.php was modified externally. Reads the embedded timestamp from the file and compares it against the stored sync timestamp in the database. If they differ (e.g. file restored from backup), returns True to trigger regeneration. """ disabled_rules_path = data_dir / DISABLED_RULES_FILENAME if not disabled_rules_path.exists(): return False try: content = disabled_rules_path.read_text() data = parse_php_with_embedded_json(content) file_ts = float(data.get("ts", 0)) except (OSError, ValueError) as e: logger.warning( "Cannot read disabled-rules.php for site %s: %s", site.docroot, e, ) return False try: db_site = WordpressSite.get_by_id(site.docroot) except WordpressSite.DoesNotExist: return False db_ts = db_site.disabled_rules_sync_ts return db_ts is None or abs(file_ts - db_ts) > 1.0