import asyncio import logging from defence360agent.contracts.config import ( UserConfig, UserType, choose_value_from_config, ) from defence360agent.utils import importer panel_users = importer.get( module="imav.malwarelib.utils.user_list", name="panel_users", default=None, ) logger = logging.getLogger(__name__) def migrate(migrator, database, fake=False, **kwargs): if fake or panel_users is None: return loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: try: users = loop.run_until_complete(panel_users()) except Exception: logger.exception( "Failed to enumerate panel users for waf_enabled seed" ) return for entry in users: try: username = entry["user"] except (KeyError, TypeError) as e: logger.warning( "Skipping malformed panel entry %r during waf_enabled" " seed: %s", entry, e, ) continue try: _, source = choose_value_from_config( "WORDPRESS", "waf_enabled", username=username, ) if source != UserType.ROOT: continue except Exception as e: logger.warning( "Failed to read waf_enabled for user %s while seeding: %s", username, e, ) continue try: UserConfig(username=username).dict_to_config( {"WORDPRESS": {"waf_enabled": True}}, without_defaults=True, ) except Exception as e: logger.warning( "Failed to seed WORDPRESS.waf_enabled for user %s: %s", username, e, ) finally: loop.close() def rollback(migrator, database, fake=False, **kwargs): pass