🔐 Sid Gifari File Manager Pro
v8.0.5 | 2026-06-23 22:16:29 | PHP 8.2.31
📂
/ (Root)
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
subsys
📍 /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys
🔄 Refresh
✏️
Editing: web_server.py
Read Only
import asyncio import functools import inspect import io import logging import os import re import shlex import shutil import string import xml.etree.ElementTree as ET from contextlib import suppress from contextvars import ContextVar from datetime import timedelta from packaging.version import Version from pathlib import Path from subprocess import CalledProcessError, check_call, check_output, DEVNULL from typing import Any, Callable, List, Optional, Set, Tuple, Iterable import psutil from defence360agent.api.integration_conf import IntegrationConfig from defence360agent.application.determine_hosting_panel import ( is_generic_panel_installed, is_plesk_installed, ) from defence360agent.internals.global_scope import g from defence360agent.utils import ( async_lru_cache, atomic_rewrite, check_run, get_system_user_names, OsReleaseInfo, CheckRunError, TimedCache, BACKUP_EXTENSION, ) from defence360agent.utils.common import webserver_gracefull_restart GRACEFUL_RESTART_MIN_PERIOD = int( os.environ.get("IM360_GRACEFUL_RESTART_MIN_PERIOD", 5 * 60) ) # seconds """ how many seconds should pass minimum between web server restarts. """ CPANEL_RESTART_APACHE_SCRIPT = "/usr/local/cpanel/scripts/restartsrv_httpd" # according to LS docs https://www.litespeedtech.com/docs/webserver/admin LITESPEED_PID_FILE_PATH = Path("/tmp/lshttpd/lshttpd.pid") LITESPEED_RESTART_CMD = ("/usr/local/lsws/bin/lswsctrl", "condrestart") # Recovery needs an unconditional restart: condrestart is a no-op when the # server is down, which is exactly when the hard restart runs. LITESPEED_HARD_RESTART_CMD = ("/usr/local/lsws/bin/lswsctrl", "restart") LITESPEED_CONF_PATH = "/usr/local/lsws/conf/httpd_config.xml" APACHE2_BIN_PATH = "/usr/sbin/apache2" HTTPD_BIN_PATH = "/usr/sbin/httpd" apache_version_regexp = re.compile(r"Server version:.*(\d+\.\d+\.\d+)") BYTE_SPACES = tuple(x.encode() for x in list(string.whitespace)) APACHE = "apache" logger = logging.getLogger(__name__) class NotRunningError(RuntimeError): """ Error for cases when the web server is expected to be running but it is not. """ class ConfigInvalidError(RuntimeError): """ Error used to indicate that the web server config is having error(s). """ class LiteSpeedConfig: CLIENT_IP_IN_HEADER_TAG = "useIpInProxyHeader" SECURITY_TAG = "security" ACCESS_CONTROL_TAG = "accessControl" ACCESS_CONTROL_ALLOWED_TAG = "allow" ACCESS_CONTROL_DENIED_TAG = "deny" CLIENT_IP_IN_HEADER_DISABLED = 0 CLIENT_IP_IN_HEADER_ENABLED = 1 CLIENT_IP_IN_HEADER_TRUSTED_IP_ONLY = 2 def __init__(self, content): self.config = ET.fromstring(content) def client_ip_in_header(self) -> int: element = self.config.find(self.CLIENT_IP_IN_HEADER_TAG) if element is None or not element.text: return self.CLIENT_IP_IN_HEADER_DISABLED return int(element.text) def set_client_ip_in_header(self, value: int): element = self.config.find(self.CLIENT_IP_IN_HEADER_TAG) if element is None: element = ET.Element(self.CLIENT_IP_IN_HEADER_TAG) self.config.append(element) element.text = str(value) def access_control_allowed_list(self) -> Set[Tuple[str, bool]]: element = self.config.find( "/".join( [ ".", self.SECURITY_TAG, self.ACCESS_CONTROL_TAG, self.ACCESS_CONTROL_ALLOWED_TAG, ] ) ) if element is not None and element.text: return { (item[:-1] if item.endswith("T") else item, item.endswith("T")) for s in element.text.split() for item in s.split(",") if item } return set() def set_access_control_allowed_list(self, allowed): items = [item[0] + "T" if item[1] else item[0] for item in allowed] value = ",".join(items) element = self.config.find( "/".join( [ ".", self.SECURITY_TAG, self.ACCESS_CONTROL_TAG, self.ACCESS_CONTROL_ALLOWED_TAG, ] ) ) if element is None: element = ET.Element(self.ACCESS_CONTROL_ALLOWED_TAG) access_control = self.config.find( "/".join( [ ".", self.SECURITY_TAG, self.ACCESS_CONTROL_TAG, ] ) ) if access_control is None: access_control = ET.Element(self.ACCESS_CONTROL_TAG) security = self.config.find(self.SECURITY_TAG) if security is None: security = ET.Element(self.SECURITY_TAG) self.config.append(security) security.append(access_control) access_control.append(element) element.text = value def tostring(self) -> bytes: buf = io.BytesIO() tree = ET.ElementTree(self.config) tree.write(buf, encoding="utf-8", xml_declaration=True) return buf.getvalue() def _get_litespeed_pid(): """Return LiteSpeed's pid or None if it can't be read.""" with suppress(OSError, ValueError): return int(LITESPEED_PID_FILE_PATH.read_bytes()) def litespeed_running(): """ Litespeed use constant PID file path, so using it to determinate status :return bool """ pid = _get_litespeed_pid() try: return bool(pid and psutil.pid_exists(pid)) except OverflowError: return False def apache_running() -> Optional[str]: """ Finding process with name 'httpd' which belongs to system user. :return str: path to the apache binary if it is running :return None: if apache is not running """ info = _apache_running_process() return info["httpd_bin"] if info else None async def apache_binary_call(*args) -> bytes: httpd_bin = apache_running() if not httpd_bin: raise NotRunningError("Apache is not running") try: if ( OsReleaseInfo.id_like() & OsReleaseInfo.DEBIAN and Path("/etc/apache2/envvars").exists() ): # on Debian OS apache requires some env variables # that are set in /etc/apache2/envvars (see DEF-6844) stdout = await check_run( ". /etc/apache2/envvars && {} {}".format( shlex.quote(httpd_bin), shlex.join(args) ), shell=True, ) else: stdout = await check_run([httpd_bin, *args]) except CheckRunError: logger.warning("Apache doesn't work properly") return b"" return stdout def _apache_running_process(*, exclude_users=frozenset()): """ Finding process with name 'httpd' which belongs to system user. Return process info for the apache binary if it is running. Return None if apache is not running """ # Cpanel works on rpm based os and uses packages # according documentation https://documentation.cpanel.net/display/EA4/Apache # noqa # httpd binary is /usr/sbin/httpd # Plesk/Generic uses pkgs from os # so it has /usr/sbin/httpd on rpm based os and /usr/sbin/apache2 on debian # DirectAdmin uses custombuild # It's httpd binary is /usr/sbib/httpd def is_generic_panel_on_apache(): if is_generic_panel_installed(): return IntegrationConfig.get("web_server", "server_type") == APACHE return False if (OsReleaseInfo.id_like() & OsReleaseInfo.DEBIAN) and ( is_plesk_installed() or is_generic_panel_on_apache() ): httpd_bin = APACHE2_BIN_PATH else: httpd_bin = HTTPD_BIN_PATH sys_users = set(get_system_user_names()) - exclude_users info = _apache_running_process_info(sys_users) if info: assert info["exe"] is not None info["httpd_bin"] = httpd_bin try: httpd_process_exe = info["exe"] if os.path.samefile(httpd_bin, httpd_process_exe): return info except OSError as exc: logger.info("Can't determine apache bin path: %s", exc) return None def _apache_running_process_info(sys_users): """Retry process_iter() on IndexError.""" for _ in range(2): # retry with suppress(IndexError): return next( ( p.info for p in psutil.process_iter( attrs=["name", "username", "exe", "uids", "gids"] ) if ( p.info["exe"] is not None # non ad_value and p.info["exe"].endswith(("/httpd", "/apache2")) and p.info["username"] in sys_users ) ), None, ) def chown(path): """Make web server user/group own *path*.""" info = _apache_running_process(exclude_users={"root"}) if not info: raise NotRunningError( "Can't find running apache process without root owner." ) os.chown(path, info["uids"][0], info["gids"][0]) def find_running_nginx(): """Return path to a running nginx binary or None if not found.""" return next( ( p.info["exe"] for p in psutil.process_iter(attrs=["name", "username", "exe"]) if ( p.info["name"] is not None # non ad_value and p.info["name"].endswith("nginx") and p.info["exe"] is not None # non ad_value and "nginx" in p.info["exe"] and p.info["username"] in ("nginx", "www-data") ) ), None, ) async def check_with_timeout( webserver_running_cb: Callable[[], Any], timeout_sec=10, granularity: int = 10, ): assert granularity > 0 for _ in range(granularity): result = webserver_running_cb() if result: return result await asyncio.sleep(timeout_sec / granularity) else: return result def is_EA4_available(): """ though, available != running :return bool: """ return os.path.isfile("/etc/cpanel/ea4/is_ea4") def _apache_graceful_restart_cmd(apachectl) -> List[str]: """ :return list: command which can be passed to check_call(..., shell=False) 'apache2 -k graceful' will not work for Ubuntu and will produce 'Invalid Mutex directory in argument file:${APACHE_LOCK_DIR}' error. https://serverfault.com/questions/558283/apache2-config-variable-is-not-defined That is why this specialization for Ubuntu graceful restart. """ # noqa restartsrv_httpd = shutil.which(CPANEL_RESTART_APACHE_SCRIPT) if restartsrv_httpd: # use cpanel specific script if found return [restartsrv_httpd] if OsReleaseInfo.id_like() & OsReleaseInfo.DEBIAN: # see DEF-16795 for details return [ "systemctl", "reload", "--job-mode=replace-irreversibly", os.path.basename(apachectl), ] else: return [apachectl, "-k", "graceful"] def _graceful_restart_cmd_from_integration_conf() -> Optional[Iterable[str]]: if IntegrationConfig.exists(): # Fallback on regular restart techniques # in case of missing restart script. try: restart_script = IntegrationConfig.to_dict()["web_server"][ "graceful_restart_script" ] except KeyError: logger.warning( "Integration config is missing graceful_restart_script field" ) else: if not restart_script: logger.warning( "graceful_restart_script option is empty", ) return None cmd = restart_script.split() if os.path.exists(cmd[0]): return cmd logger.warning( "Web server restart script does not exist: %s", restart_script, ) return None # systemd-run gained --wait (synchronous transient units that propagate the # child's exit code) in v232. CL7/CentOS7 ship systemd 219 and lack it, so # reload confirmation falls back to a config test there. _SYSTEMD_RUN_WAIT_MIN_VERSION = 232 @functools.lru_cache(maxsize=1) def _systemd_run_supports_wait() -> bool: systemd_run = shutil.which("systemd-run") if not systemd_run: return False try: out = check_output([systemd_run, "--version"], stderr=DEVNULL).decode() except (OSError, CalledProcessError): return False match = re.search(r"systemd\s+(\d+)", out) return match is not None and ( int(match.group(1)) >= _SYSTEMD_RUN_WAIT_MIN_VERSION ) def _systemd_run_prefix(wait: bool) -> List[str]: # Do not restart web server in the agent cgroup # (to avoid attaching its processes to it). prefix: List[str] = [] if systemd_run := shutil.which("systemd-run"): prefix += [ systemd_run, "-p", "SendSIGKILL=no", "--slice=graceful_restart", ] if wait and _systemd_run_supports_wait(): prefix.append("--wait") prefix.append("--") return prefix def _graceful_restart_cmd(wait: bool = False) -> Iterable[str]: """Gracefully restart a web server.""" prefix = _systemd_run_prefix(wait) cmd = _graceful_restart_cmd_from_integration_conf() if cmd is not None: return prefix + list(cmd) if litespeed_running(): return prefix + list(LITESPEED_RESTART_CMD) if apachectl := apache_running(): return prefix + _apache_graceful_restart_cmd(apachectl) raise RuntimeError("Could not detect a web server") def _litespeed_installed() -> bool: return os.path.exists(LITESPEED_CONF_PATH) def _apache_systemd_unit() -> Optional[str]: """systemd unit for this host's Apache, or None when the host is not Apache-based. Derived from OS/panel, not a running process — recovery runs precisely when the server is not alive.""" on_generic_apache = False if is_generic_panel_installed(): try: server_type = IntegrationConfig.get("web_server", "server_type") except KeyError: return None if server_type != APACHE: return None on_generic_apache = True if (OsReleaseInfo.id_like() & OsReleaseInfo.DEBIAN) and ( is_plesk_installed() or on_generic_apache ): return os.path.basename(APACHE2_BIN_PATH) return os.path.basename(HTTPD_BIN_PATH) def _hard_restart_cmd(wait: bool = True) -> Iterable[str]: """Full (non-graceful) restart to bring a web server back up after a reload left it down. Detects the server by install/config presence (not a running process, which may be down) and raises when no safe command is known (e.g. generic nginx, which has only a graceful integration script). """ prefix = _systemd_run_prefix(wait) if _litespeed_installed(): return prefix + list(LITESPEED_HARD_RESTART_CMD) if restartsrv_httpd := shutil.which(CPANEL_RESTART_APACHE_SCRIPT): return prefix + [restartsrv_httpd, "--restart"] unit = _apache_systemd_unit() if unit is None: raise RuntimeError("No safe hard-restart command for this web server") return prefix + ["systemctl", "restart", unit] def _configtest_cmd() -> Iterable[str]: if is_generic_panel_installed(): try: cmd = IntegrationConfig.get("web_server", "config_test_script") if cmd: return cmd.split() except KeyError: # if setting is not present, fall back to default detection pass if apache_bin := apache_running(): if OsReleaseInfo.id_like() & OsReleaseInfo.DEBIAN: return ["apachectl", "configtest"] return [apache_bin, "-t"] elif litespeed_running(): return ["litespeed", "-t"] elif nginx_bin := find_running_nginx(): return [nginx_bin, "-t"] raise RuntimeError("Could not detect a web server") _graceful_restart_caller = ContextVar("graceful_restart_caller") async def safe_update_config(config_path, new_config: str) -> bool: """ Update Web-server config with fallback in case of an error happens. It tries to do all the best but because of graceful_restart() the faulty config might still be applied but in practice it is barely probable (because of premature config check). 1. The new config is checked before to be applied. 2. The new config (if checked valid) is atomically applied. 3. The graceful Web-server restart is scheduled. It may hold the actual restart for some time, but it is a required workaround of a litespeed issue. 4. If the Web-server failed to restart the config is reverted. Return value: True if no errors (at least up to the server restart), False if There was an error and config was reverted. Note: It is possible that the config may be reverted even when return value is True. It is because the graceful_restart may delay the actual restart and config may be reverted on that (delayed) stage. """ config_backup_path = os.fspath(config_path) + BACKUP_EXTENSION def remove_backup(): with suppress(FileNotFoundError): os.unlink(config_backup_path) make_backup = os.path.exists(config_path) if not atomic_rewrite(config_path, new_config, backup=make_backup): # nothing has changed => no need to restart return True def revert(): try: os.rename(config_backup_path, config_path) except FileNotFoundError: # truncate file if backup does not exist open(config_path, "w").close() try: await configtest(raise_exception=True) except ConfigInvalidError as e: logger.error("Web server config is invalid: %s", e) revert() else: try: restart_cmd = _graceful_restart_cmd() except RuntimeError as e: logger.error("Failed to get graceful restart command: %s", e) revert() return False loop = asyncio.get_running_loop() def restart_callback(task): def log_config_error(fut): if not fut.cancelled() and fut.exception() is not None: logger.critical( "The reverted config seems to be invalid", exc_info=fut.exception(), ) def log_uncaught_exception(fut): if not fut.cancelled() and fut.exception() is not None: logger.critical( "uncaught exception", exc_info=fut.exception() ) if not task.cancelled() and task.exception() is not None: logger.error( "Web server failed to start... Revert changes back. (%s)", task.exception(), ) revert() task = loop.create_task(configtest(raise_exception=True)) task.add_done_callback(log_config_error) # the least we can do is to try to restart task = loop.create_task(_graceful_restart(restart_cmd)) task.add_done_callback(log_uncaught_exception) else: remove_backup() graceful_restart = webserver_gracefull_restart.coalesce_calls( GRACEFUL_RESTART_MIN_PERIOD, done_callback=restart_callback )(_graceful_restart) caller_frame = inspect.stack()[1] context_token = _graceful_restart_caller.set(caller_frame.function) try: await graceful_restart(restart_cmd) finally: _graceful_restart_caller.reset(context_token) logger.info("Successfully scheduled web server restart") return True return False async def _graceful_restart(restart_cmd=None): """ Gracefully restart a web server. If web server cannot be detected, do nothing. """ _log_graceful_restart_start() try: await check_run(restart_cmd or _graceful_restart_cmd()) except RuntimeError as err: logger.warning("Could not restart a Web server: %s", err) else: logger.info("Successfully restarted web server") @webserver_gracefull_restart.coalesce_calls(GRACEFUL_RESTART_MIN_PERIOD) async def _graceful_restart_coalesced(restart_cmd=None): task = _graceful_restart(restart_cmd) g.web_server_restart_task = task try: return await task finally: g.pop("web_server_restart_task") async def graceful_restart(restart_cmd=None): """ Gracefully restart a web server. If web server cannot be detected, do nothing. """ caller_frame = inspect.stack()[1] context_token = _graceful_restart_caller.set(caller_frame.function) try: result = await _graceful_restart_coalesced(restart_cmd) finally: _graceful_restart_caller.reset(context_token) return result def _log_graceful_restart_start(): caller = _graceful_restart_caller.get("unknown") logger.info("Performing web server graceful restart, from %s", caller) def graceful_restart_sync(): """ Gracefully restart a web server synchronously. If web server cannot be detected, do nothing. """ caller_frame = inspect.stack()[1] context_token = _graceful_restart_caller.set(caller_frame.function) try: _log_graceful_restart_start() finally: _graceful_restart_caller.reset(context_token) try: check_call(_graceful_restart_cmd(), stdout=DEVNULL, stderr=DEVNULL) except RuntimeError as err: logger.warning("Could not restart a Web server: %s", err) else: logger.info("Successfully restarted web server") async def graceful_restart_confirmed() -> bool: """Graceful web-server restart that confirms the reload actually completed and recovers the server if it did not. Unlike graceful_restart() it bypasses the coalesce throttle (the post-update reload must never be dropped); unlike graceful_restart_sync() it observes the reload outcome instead of returning as soon as systemd-run queues the transient unit. """ caller_frame = inspect.stack()[1] context_token = _graceful_restart_caller.set(caller_frame.function) try: _log_graceful_restart_start() finally: _graceful_restart_caller.reset(context_token) try: cmd = _graceful_restart_cmd(wait=True) except RuntimeError as err: logger.warning("Could not restart a Web server: %s", err) return False if await _reload_confirmed(cmd): logger.info("Successfully restarted web server") return True logger.error( "Web server reload after update did not complete cleanly;" " attempting recovery" ) await _log_failed_configtest() if await _reload_confirmed(cmd): logger.info("Web server recovered on graceful reload retry") return True return await _hard_restart() async def _reload_confirmed(cmd) -> bool: """Run *cmd* and report whether the reload truly succeeded. With systemd-run --wait the exit code already reflects completion; on older systemd (no --wait) the reload is fire-and-forget, so fall back to a config test to detect a broken reload. """ try: await check_run(cmd) except (CheckRunError, RuntimeError) as err: logger.warning("Web server reload returned an error: %s", err) return False if _systemd_run_supports_wait(): return True try: await configtest(raise_exception=True) except ConfigInvalidError: return False return True async def _log_failed_configtest() -> None: # The crash is otherwise invisible in the agent log — only Apache's own # error_log records the failed graceful reload. try: await configtest(raise_exception=True) except ConfigInvalidError as err: logger.error("Web server config test failed after update: %s", err) async def _hard_restart() -> bool: try: cmd = _hard_restart_cmd(wait=True) except RuntimeError as err: logger.error("Cannot recover web server: %s", err) return False try: await check_run(cmd) except (CheckRunError, RuntimeError) as err: logger.error("Web server hard restart failed: %s", err) return False logger.info("Web server hard-restarted after failed reload") return True async def configtest(raise_exception=False): """ Check web server's config file. If web server cannot be detected, do nothing. """ logger.info("Performing web server config test") try: await check_run(_configtest_cmd(), raise_exc=ConfigInvalidError) except RuntimeError as err: logger.warning("Could not run configtest: %s", err) if raise_exception: raise ConfigInvalidError("Failed to check config") from err def _parse_apache_version_output(output): match = apache_version_regexp.search(output) if match is not None: return Version(match.group(1)) else: raise ValueError( "Failed to parse apache version string: {}".format(output) ) def _parse_apache_module_list(output: bytes) -> List[bytes]: """ Parse response of httpd -M :param output: stdout of httpd -M (with spaces before module name) Output example: Loaded Modules: core_module (static) so_module (static) http_module (static) mpm_prefork_module (shared) :return: list with installed modules """ return [ line.strip().split()[0] for line in output.splitlines() if line.startswith(BYTE_SPACES) ] def _parse_includes(dump): includes = [] for line in dump.decode().split("\n"): index = line.find("/") if index > 0: includes.append(line[index:].strip()) return includes async def dump_includes(): try: return _parse_includes( await check_run(["apachectl", "-t", "-D", "DUMP_INCLUDES"]) ) except FileNotFoundError: return [] @async_lru_cache(maxsize=1) async def apache_version(): apache_bin = apache_running() if apache_bin is None: raise NotRunningError("Apache is not running") out = await check_run([apache_bin, "-v"]) version = _parse_apache_version_output(out.decode()) logger.info("Apache %s version detected", version) return version @TimedCache( expiration=timedelta( seconds=int( os.environ.get("IMUNIFY360_APACHE_MODULES_CACHE_TIMEOUT", 600) ) ) ) async def apache_modules(): stdout = await apache_binary_call("-M") return _parse_apache_module_list(stdout)
💾 Save Changes
❌ Cancel