­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ ­ """RPC endpoints for WordPress disabled protection rules.""" import asyncio import logging import pwd import time from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import MessageSink from defence360agent.files import Index, WP_RULES from defence360agent.model.wp_disabled_rule import WPDisabledRule from defence360agent.rpc_tools import ValidationError from defence360agent.rpc_tools.lookup import CommonEndpoints, bind from defence360agent.subsys.panels import hosting_panel from defence360agent.utils import Scope, log_future_errors from defence360agent.wordpress.changelog_processor import ( ChangelogProcessor, ) from defence360agent.wordpress.plugin import ( redeploy_wp_rules, update_disabled_rules_on_sites, ) from defence360agent.wordpress.site_repository import ( get_installed_sites_by_domains, ) from defence360agent.wordpress.wp_rules import get_wp_rules_data logger = logging.getLogger(__name__) async def _get_user_domains(user: str) -> list[str]: """ Get domains for a user from the hosting panel. Returns: List of domains the user owns, or empty list on error. """ try: hp = hosting_panel.HostingPanel() domains_per_user = await hp.get_domains_per_user() return domains_per_user.get(user, []) except Exception as e: logger.warning("Failed to get domains for user %s: %s", user, e) return [] async def _validate_user_domains( user: str, domains: list[str] | None ) -> list[str]: """ Validate and filter domains for a non-root user. If no domains specified, returns all user's domains. If domains specified, filters to only those the user owns. Args: user: Username to validate domains for domains: Requested domains, or None for all user's domains Returns: List of validated domains the user can access Raises: ValidationError: If user has no domains or no access to requested domains """ user_domains = await _get_user_domains(user) if not domains: if not user_domains: raise ValidationError("No domains found for user") return user_domains authorized_domains = [d for d in domains if d in user_domains] if not authorized_domains: raise ValidationError( "You don't have access to any of the specified domains" ) return authorized_domains def _enrich_with_metadata( disabled_rules: list[dict], wp_rules_data: dict | None ) -> list[dict]: """ Enrich disabled rules with metadata from wp-rules.yaml. Args: disabled_rules: List of disabled rule dicts from WPDisabledRule.fetch() wp_rules_data: Parsed wp-rules.yaml data, or None if unavailable Returns: List of enriched rule dicts with component and versions added """ enriched = [] for rule in disabled_rules: rule_id = rule["rule_id"] metadata = wp_rules_data.get(rule_id, {}) if wp_rules_data else {} enriched.append( { **rule, "component": metadata.get("target"), "versions": metadata.get("versions"), } ) return enriched async def _jit_sync_changelogs( domains: list[str], sink: MessageSink | None = None ) -> None: """Process pending changelog files for the given domains before an API change. This "Just-in-Time" sync ensures the database reflects any WordPress-side changes before the agent applies its own disable/enable operation. File regeneration (disabled-rules.php) is intentionally skipped here because the calling API endpoint will regenerate files after its own DB mutation. """ try: sites = get_installed_sites_by_domains(domains) if not sites: return await ChangelogProcessor().process_changelogs_for_sites( sites, sink=sink ) except Exception as e: logger.warning("JIT changelog sync failed: %s", e, exc_info=True) class WPDisabledRulesEndpoints(CommonEndpoints): """Endpoints for listing disabled WordPress protection rules.""" SCOPE = Scope.AV_IM360 @bind("wordpress-plugin", "rules", "list-disabled") async def list_disabled_rules( self, limit: int = 50, offset: int = 0, domains: list[str] | None = None, user: str | None = None, ) -> tuple[int, list[dict]]: """ List disabled WordPress protection rules with metadata. When user is provided, returns rules for that user's domains. Otherwise, returns all disabled rules. Args: limit: Maximum number of rules to return offset: Number of rules to skip domains: Filter by specific domains (optional) user: Username (populated by middleware) Returns: Tuple of (total_count, list of enriched rule dicts) """ if user: user_domains = await _get_user_domains(user) if not domains: domains = user_domains else: domains = [d for d in domains if d in user_domains] # if user cannot access any of the requested domains, return empty list if not domains: return 0, [] # Fetch disabled rules from database # Root users see all rules (including global), non-root only see their domain rules total_count, disabled_rules = WPDisabledRule.fetch( limit=limit, offset=offset, user_domains=domains, include_global=user is None, ) # Load wp-rules metadata for enrichment try: wp_rules_index = Index(WP_RULES, integrity_check=False) wp_rules_data = get_wp_rules_data(wp_rules_index) except Exception as e: logger.warning("Failed to load wp-rules data: %s", e) wp_rules_data = None # Enrich with metadata enriched_rules = _enrich_with_metadata(disabled_rules, wp_rules_data) return total_count, enriched_rules async def _toggle_rule( self, action: str, rule: str, domains: list[str] | None, user: str | None, ) -> dict: """Shared implementation for disable/enable rule endpoints.""" if user is None: user_id = 0 else: try: user_id = pwd.getpwnam(user).pw_uid except KeyError: raise ValidationError(f"User '{user}' not found") if user: domains = await _validate_user_domains(user, domains) # JIT Sync: process pending changelogs before applying API changes. # Skipped for global operations (domains=None) because global and # domain-level disables are independent scopes and cannot conflict. if domains: await _jit_sync_changelogs(domains, self._sink) if action == "disable": WPDisabledRule.store( rule_id=rule, domains=domains, source=WPDisabledRule.SOURCE_AGENT, user_id=user_id, ) message_cls = MessageType.WPRuleDisabled else: WPDisabledRule.remove(rule_id=rule, domains=domains) message_cls = MessageType.WPRuleEnabled try: await self._sink.process_message( message_cls( plugin_id="wordpress", rule=rule, domains=domains or [], timestamp=time.time(), user_id=user_id, source=WPDisabledRule.SOURCE_AGENT, ) ) except Exception as e: logger.error( "Failed to report rule %s for %s: %s", action, rule, e ) if domains: # Domain-specific: update disabled-rules.php for affected sites task = asyncio.create_task( update_disabled_rules_on_sites(domains=domains) ) else: # Global: re-deploy rules.php with the rule filtered out task = asyncio.create_task(redeploy_wp_rules()) task.add_done_callback(log_future_errors) return {} @bind("wordpress-plugin", "rules", "disable") async def disable_rule( self, rule: str, domains: list[str] | None = None, user: str | None = None, ) -> dict: """ Disable a WordPress protection rule globally or for specific domains. Root users can disable globally (no domains) or for specific domains. Non-root users can disable for all their domains (by specifying no domains) or for specific domains. Non-root users can only disable for domains they own. Args: rule: The rule ID to disable (e.g., "CVE-2025-001") domains: List of domains to disable the rule for, or None for global user: Username (populated by middleware for non-root users) Returns: Empty dict on success. """ return await self._toggle_rule("disable", rule, domains, user) @bind("wordpress-plugin", "rules", "enable") async def enable_rule( self, rule: str, domains: list[str] | None = None, user: str | None = None, ) -> dict: """ Re-enable a WordPress protection rule globally or for specific domains. Root users can enable globally (no domains) or for specific domains. Non-root users can enable for all their domains (no domains) or specific ones. Non-root users can only enable for domains they own. Note: Enabling at one scope doesn't affect the other scope. E.g., enabling globally leaves domain-specific disables intact. Args: rule: The rule ID to enable (e.g., "CVE-2025-001") domains: List of domains to enable the rule for, or None for global user: Username (populated by middleware for non-root users) Returns: Empty dict on success """ return await self._toggle_rule("enable", rule, domains, user)