"""Bulk WAF set endpoint.""" import asyncio import logging from defence360agent.contracts.config import Wordpress from defence360agent.rpc_tools import ValidationError from defence360agent.rpc_tools.lookup import RootEndpoints, bind from defence360agent.subsys.panels import hosting_panel from defence360agent.utils import Scope from defence360agent.utils.config import update_config logger = logging.getLogger(__name__) _MAX_CONCURRENT = 10 class WordpressWafBulkEndpoints(RootEndpoints): SCOPE = Scope.AV_IM360 @bind("wordpress-plugin", "waf", "set") async def waf_set( self, status: str, all_users: bool = False, users: list[str] | None = None, ) -> dict: if all_users and users is not None: raise ValidationError( "Specify either --all-users or --users, not both" ) if not all_users and users is None: raise ValidationError("Specify either --all-users or --users") if users is not None and not users: raise ValidationError("--users must not be empty") if not Wordpress.SECURITY_PLUGIN_ENABLED: raise ValidationError( "WordPress Security Plugin is disabled." " Enable it before changing WAF settings." ) logger.warning( "AUDIT wordpress-plugin.waf.set status=%r all_users=%r users=%r", status, all_users, users, ) try: panel_users = set(await hosting_panel.HostingPanel().get_users()) except Exception as e: raise ValidationError( f"Could not enumerate hosting users: {e}" ) from e succeeded: list[str] = [] skipped: list[dict] = [] failed: list[dict] = [] if all_users: valid_users = list(panel_users) else: valid_users = [] for u in dict.fromkeys(users): if u in panel_users: valid_users.append(u) else: skipped.append({"user": u, "reason": "Not a hosting user"}) waf_value = status == "enabled" async def _apply_to_user(u: str) -> tuple[str, str | None]: try: await update_config( self._sink, {"WORDPRESS": {"waf_enabled": waf_value}}, user=u, ) return u, None except Exception as e: return u, str(e) for i in range(0, len(valid_users), _MAX_CONCURRENT): batch = [ _apply_to_user(u) for u in valid_users[i : i + _MAX_CONCURRENT] ] results = await asyncio.gather(*batch) for u, err in results: if err is None: succeeded.append(u) else: failed.append({"user": u, "reason": err}) items = [ *[ {"user": u, "status": "succeeded", "reason": ""} for u in succeeded ], *[ {"user": s["user"], "status": "skipped", "reason": s["reason"]} for s in skipped ], *[ {"user": f["user"], "status": "failed", "reason": f["reason"]} for f in failed ], ] return { "items": items, "succeeded": succeeded, "skipped": skipped, "failed": failed, }