🔐 Sid Gifari File Manager Pro
v8.0.5 | 2026-06-23 22:40:04 | PHP 8.2.31
📂
/ (Root)
/
opt
/
imunify360
/
venv
/
lib
/
python3.11
/
site-packages
/
defence360agent
/
simple_rpc
📍 /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/simple_rpc
🔄 Refresh
✏️
Editing: __init__.py
Read Only
""" Simple unix socket RPC server implementation """ import asyncio import functools import inspect import io import json import os import select import socket import struct import sys import time from contextlib import suppress from contextvars import ContextVar from logging import getLogger from typing import Sequence from psutil import Process import sentry_sdk from defence360agent.api import inactivity from defence360agent.application import app from defence360agent.contracts.config import Core, SimpleRpc as Config from defence360agent.feature_management.exceptions import ( FeatureManagementError, ) from defence360agent.internals.auth_protocol import UnixSocketAuthProtocol from defence360agent.model import tls_check from defence360agent.model.simplification import run_in_executor from defence360agent.utils import is_root_user, run_coro from defence360agent.utils.buffer import LineBuffer, LineBufferOverflow from defence360agent.subsys.panels import hosting_panel from defence360agent.subsys.panels.base import InvalidTokenException from defence360agent.subsys import svcctl from defence360agent.rpc_tools.exceptions import ( ResponseError, ServiceStateError, SocketError, ) from defence360agent.rpc_tools.lookup import Endpoints, UserType from defence360agent.rpc_tools.utils import ( is_running, # noqa: F401 rpc_is_running, ) from defence360agent.rpc_tools.validate import ValidationError from defence360agent.rpc_tools import ERROR, SUCCESS, WARNING logger = getLogger(__name__) caller_uid_var: ContextVar[int] = ContextVar("rpc_caller_uid") _SENSITIVE_PARAM_KEYS = frozenset({"jwt", "token", "password"}) def _redact_for_log(decoded): safe = dict(decoded) params = safe.get("params") if isinstance(params, dict): safe_params = dict(params) for key in _SENSITIVE_PARAM_KEYS: if key in safe_params: safe_params[key] = "***" safe["params"] = safe_params return safe def _safe_log_payload(raw): try: decoded = json.loads(raw) except Exception: return "<unparseable, {} chars>".format(len(raw)) if not isinstance(decoded, dict): return repr(decoded) return repr(_redact_for_log(decoded)) class RpcServiceState: # If need DB and agent should be running # e.g. on-demand scan RUNNING = "running" # Agent should be stopped STOPPED = "stopped" # It doesn't matter for operation running or stopping the agent # if agent is running - using socket, instead of direct communication ANY = "any" # No need DB and UI interaction # preferable for use direct instead any for execution external process # e.g. enable/disable plugins/features DIRECT = "direct" async def _execute_request(coro, method): try: result = await coro except ValidationError as e: result = { "result": WARNING, "messages": e.errors, } result.update(e.extra_data) return result except (PermissionError, FeatureManagementError) as e: msg, *args = e.args logger.error(msg, *args) return { "result": ERROR, "messages": [msg % tuple(args)], } except Exception as e: sentry_sdk.capture_exception(e) logger.error( "Something went wrong while processing %s (%s)", method, str(e) ) return {"result": ERROR, "messages": str(e)} else: return {"result": SUCCESS, "messages": [], "data": result} def _apply_middleware(method, user): cb = Endpoints.route_to_endpoint if isinstance(method, (list, tuple)): hashable = tuple(method) common = app.MIDDLEWARE.get(None, []) specific = app.MIDDLEWARE.get(hashable, []) excluded = app.MIDDLEWARE_EXCLUDE.get(hashable, []) for mw, users in reversed(common + specific): if (user in users) and (mw not in excluded): logger.debug("Applying middleware %s", mw.__name__) cb = mw(cb) return cb def _find_uds_inodes(socket_path: str) -> Sequence[str]: """Find inodes corresponding to the unix domain socket path.""" with open( "/proc/net/unix", encoding=sys.getfilesystemencoding(), errors=sys.getfilesystemencodeerrors(), ) as file: return [line.split()[-2] for line in file if socket_path in line] def _protocol_supports_guard(protocol_cls): """True if cls.__init__ accepts the guard kwargs; legacy *_ signatures TypeError when passed limiter=.""" try: sig = inspect.signature(protocol_cls.__init__) except (TypeError, ValueError): return False params = sig.parameters return "limiter" in params and "read_timeout" in params class ConnectionLimiter: def __init__(self, max_connections): self.max_connections = max_connections self._count = 0 self.saturation_logged = False def acquire(self): if self._count >= self.max_connections: return False self._count += 1 return True def release(self): if self._count > 0: self._count -= 1 self.saturation_logged = False @property def count(self): return self._count class ConnectionGuard: def __init__(self, loop, *, limiter=None, read_timeout=None, name): self._loop = loop self._limiter = limiter self._read_timeout = read_timeout self._name = name self._transport = None self._timeout_handle = None self._slot_acquired = False self._peer_pid = None self._peer_uid = None def try_admit(self, transport): if self._limiter is not None and not self._limiter.acquire(): if not self._limiter.saturation_logged: logger.warning( "%s connection limit (%d) reached; rejecting new client", self._name, self._limiter.max_connections, ) self._limiter.saturation_logged = True return False self._slot_acquired = self._limiter is not None self._transport = transport self._schedule_timeout() return True def note_peer(self, pid, uid): self._peer_pid = pid self._peer_uid = uid def on_data(self): self._schedule_timeout() def on_lost(self): self._cancel_timeout() if self._slot_acquired and self._limiter is not None: self._limiter.release() self._slot_acquired = False self._transport = None def _schedule_timeout(self): if self._read_timeout is None: return if self._timeout_handle is not None: self._timeout_handle.cancel() self._timeout_handle = self._loop.call_later( self._read_timeout, self._on_timeout ) def _cancel_timeout(self): if self._timeout_handle is not None: self._timeout_handle.cancel() self._timeout_handle = None def _on_timeout(self): self._timeout_handle = None if self._transport is None: return logger.warning( "Closing idle %s connection (pid=%s uid=%s, no data for %ds)", self._name, self._peer_pid, self._peer_uid, self._read_timeout, ) transport, self._transport = self._transport, None transport.close() self.on_lost() class _RpcServerProtocol(UnixSocketAuthProtocol): def __init__(self, loop, sink, user, *, limiter=None, read_timeout=None): self._loop = loop self._sink = sink self.user = user self._transport = None self._buf = LineBuffer() self._guard = ConnectionGuard( loop, limiter=limiter, read_timeout=read_timeout, name="RPC" ) def connection_made(self, transport): if not self._guard.try_admit(transport): transport.close() return try: super().connection_made(transport) except (OSError, AttributeError, struct.error) as exc: logger.warning( "Rejected RPC connection: SO_PEERCRED unavailable (%s)", exc, ) transport.close() self._transport = None self._guard.on_lost() return self._guard.note_peer(self._pid, self._uid) def preprocess_data(self, data: str): decoded = json.loads(data) user_type, user_name = hosting_panel.HostingPanel().authenticate( self, decoded ) self.user = user_type if user_name is not None: decoded["params"]["user"] = user_name # Prevent multi-user bypass: non-root callers may only operate on # their own username even when 'users' (plural) is supplied. if "users" in decoded["params"]: decoded["params"]["users"] = [user_name] # add calling process try: calling_process = Process(self._pid).cmdline() except Exception as e: calling_process = [str(e)] decoded["calling_process"] = calling_process return decoded def data_received(self, data): if self._transport is None: return self._guard.on_data() try: self._buf.append(data.decode()) except LineBufferOverflow as e: logger.warning( "Closing RPC connection (pid=%s uid=%s): %s", self._pid, self._uid, e, ) self._transport.close() self._transport = None self._guard.on_lost() return for msg in self._buf: try: result = self.preprocess_data(msg) method = result["command"] params = result["params"] logger.debug("Data received: command=%s", method) cb = _apply_middleware(method, self.user) # Scope caller_uid_var to the create_task call so the new # task captures it via copy_context, but the parent # protocol context is left untouched -- preventing leakage # into subsequent reads (tests, repeated requests, etc.). token = caller_uid_var.set(self._uid) try: # TODO: fix that there is no json flag in params self._loop.create_task( self._dispatch( method, params, cb(result, self._sink, self.user) ) ) finally: caller_uid_var.reset(token) except InvalidTokenException as e: # without events in Sentry logger.warning("Incorrect token provided") self._write_response({"result": ERROR, "messages": str(e)}) except Exception as e: logger.exception( "Something went wrong before processing %s", _safe_log_payload(msg), ) self._write_response({"result": ERROR, "messages": str(e)}) async def _dispatch(self, method, params, coro): with inactivity.track.task("rpc_{}".format(method)): # route and save result to 'result' response = await _execute_request(coro, method) logger.info( "Response: method - {}, data - {}".format(method, response) ) self._write_response(response) def connection_lost(self, transport): self._guard.on_lost() self._transport = None def _write_response(self, data): if self._transport is None: logger.warning("Cannot send RPC response: connection lost.") return else: try: self._transport.write((json.dumps(data) + "\n").encode()) except Exception as e: logger.exception(e) # TODO: need to own message error def _check_socket_folder_permissions(socket_path): dir_name = os.path.dirname(socket_path) os.makedirs(dir_name, exist_ok=True) os.chmod(dir_name, 0o755) class RpcServer: SOCKET_PATH = Config.SOCKET_PATH USER = UserType.ROOT SOCKET_MODE = 0o700 @classmethod async def create(cls, loop, sink): _check_socket_folder_permissions(cls.SOCKET_PATH) with suppress(FileNotFoundError): os.unlink(cls.SOCKET_PATH) limiter = ConnectionLimiter(Config.MAX_CONCURRENT_CONNECTIONS) server = await loop.create_unix_server( lambda: _RpcServerProtocol( loop, sink, cls.USER, limiter=limiter, read_timeout=Config.READ_TIMEOUT, ), cls.SOCKET_PATH, ) os.chmod(cls.SOCKET_PATH, cls.SOCKET_MODE) return server class RpcServerAV: USER = UserType.ROOT SOCKET_PATH = Config.SOCKET_PATH PROTOCOL_CLASS = _RpcServerProtocol @classmethod async def create(cls, loop, sink): """Looking for socket in /proc/net/unix and check which descriptor corresponded to it by comparing inode $ ls -l /proc/[pid]/fd lrwx------ 1 root root 64 Apr 11 07:20 4 -> socket:[2866765] $ cat /proc/net/unix Num RefCount Protocol Flags Type St Inode Path ffff880054c0a4c0: 00000002 00000000 00010000 0001 01 2866765 /var/run/defence360agent/simple_rpc.sock # noqa """ def safe_readlink(*args, **kwargs): """Return empty path on error.""" with suppress(OSError): return os.readlink(*args, **kwargs) return "" # find inodes for the SOCKET_PATH _socket_path = cls.SOCKET_PATH _check_socket_folder_permissions(_socket_path) if _socket_path.startswith("/var/run"): # remove /var prefix, see DEF-16201 _socket_path = _socket_path[len("/var") :] inodes = _find_uds_inodes(_socket_path) # find socket fds corresponding to the inodes last_error = None for inode in inodes: try: with os.scandir("/proc/self/fd") as it: for fd in it: if safe_readlink(fd.path) == "socket:[{}]".format( inode ): socket_fd = int(fd.name) break # found fd else: # no break, not found fd for given inode continue # try another inode break # found fd except OSError as e: last_error = e else: # no break, not found raise SocketError( "[{}] Socket {!r} for {} not found.".format( "inode" * (not inodes), cls.SOCKET_PATH, cls.USER ) ) from last_error _socket = socket.fromfd( socket_fd, socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK, ) if _protocol_supports_guard(cls.PROTOCOL_CLASS): limiter = ConnectionLimiter(Config.MAX_CONCURRENT_CONNECTIONS) factory = lambda: cls.PROTOCOL_CLASS( # noqa: E731 loop, sink, cls.USER, limiter=limiter, read_timeout=Config.READ_TIMEOUT, ) else: factory = lambda: cls.PROTOCOL_CLASS( # noqa: E731 loop, sink, cls.USER ) server = await loop.create_unix_server(factory, sock=_socket) return server class NonRootRpcServerAV(RpcServerAV): USER = UserType.NON_ROOT SOCKET_PATH = Config.NON_ROOT_SOCKET_PATH class NonRootRpcServer(RpcServer): SOCKET_PATH = Config.NON_ROOT_SOCKET_PATH USER = UserType.NON_ROOT # Match the systemd .socket unit (SocketMode=0666). UNIX domain sockets # don't use the execute bit, so granting it (the previous 0o777) only # widened the attack surface without enabling any client. SOCKET_MODE = 0o666 class _RpcClientImpl: def __init__(self, socket_path): try: self._sock = socket.socket( socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK ) self._sock.connect(socket_path) except (ConnectionRefusedError, FileNotFoundError, BlockingIOError): raise ServiceStateError() def dispatch(self, method, params): try: self._sock.sendall( ( json.dumps({"command": method, "params": params}) + "\n" ).encode() ) except BrokenPipeError as e: raise SocketError(f"communication interrupted, {e}") try: data = self._sock_recv_until(terminator_byte=b"\n") except ConnectionResetError as e: raise ResponseError(f"Connection reset: {e}") from e try: response = json.loads(data.decode()) except Exception as e: raise ResponseError( "Error parsing RPC response {!r}".format(data) ) from e return response def _sock_recv_until(self, terminator_byte): assert not self._sock.getblocking() chunks = [] while (not chunks) or (terminator_byte not in chunks[-1]): fdread_list = [self._sock.fileno()] rwx_fdlist = select.select( fdread_list, [], [], # naive timeout for one-shot response # scenario Config.CLIENT_TIMEOUT, ) fdready_list = rwx_fdlist[0] if self._sock.fileno() not in fdready_list: if any(rwx_fdlist): raise SocketError( "select() = {!r} resulted in error".format(rwx_fdlist) ) else: raise SocketError("request timeout") chunk = self._sock.recv(io.DEFAULT_BUFFER_SIZE) if len(chunk) == 0: raise SocketError("Empty response from socket.recv()") chunks.append(chunk) return b"".join(chunks) class _NoRpcImpl: def __init__(self, sink=None): self._sink = sink # suppress is for doing those things idempotent way # PSSST! simplification.run_in_executor() is main thread now! :-X # with suppress(tls_check.OverridingReset): # tls_check.reset("main CLI thread for stopped agent") with suppress(tls_check.OverridingReset): loop = asyncio.get_event_loop() loop.run_until_complete(run_in_executor(loop, tls_check.reset)) def dispatch(self, method, params): loop = asyncio.get_event_loop() logger.info("Executing {}, params: {}".format(method, params)) request = {"command": method, "params": params} token = caller_uid_var.set(os.getuid()) try: cb = _apply_middleware(method, user=UserType.ROOT) return loop.run_until_complete( _execute_request(cb(request, self._sink), method) ) finally: caller_uid_var.reset(token) class RpcClient: """ One RpcClient instance is suitable to use for multiple ipc calls :param RpcServiceState require_svc_is_running: whether to provide direct endpoints binding if the service is stopped. :param int reconnect_with_timeout: timeout in sec for reconnect retries :param int num_retries: number of reconnect retries """ def __init__( self, *, require_svc_is_running=RpcServiceState.RUNNING, reconnect_with_timeout=None, num_retries=1, ): self._impl = None self._socket_path = ( Config.SOCKET_PATH if is_root_user() else Config.NON_ROOT_SOCKET_PATH ) if ( require_svc_is_running == RpcServiceState.STOPPED and rpc_is_running() ): raise ServiceStateError(RpcServiceState.RUNNING) elif require_svc_is_running == RpcServiceState.RUNNING: # ensure that socket is active run_coro(svcctl.activate_socket_service(Core.SVC_NAME)) if require_svc_is_running in ( RpcServiceState.ANY, RpcServiceState.RUNNING, ): try: if reconnect_with_timeout: self._impl = self._reconnect_with_timeout( reconnect_with_timeout, num_retries ) else: self._impl = _RpcClientImpl(self._socket_path) return except ServiceStateError: if require_svc_is_running == RpcServiceState.RUNNING: raise if self._impl is None: # In other cases (ANY, STOPPED, DIRECT) need to use _NoRpcImpl assert ( is_root_user() ), "_NoRpcImpl is not available for non root user" self._impl = _NoRpcImpl() def __getattr__(self, method): return functools.partial(self._dispatch, method) def cmd(self, *command): return functools.partial(self._dispatch, command) def _dispatch(self, method, **params): response = self._impl.dispatch(method, params) if isinstance(method, (list, tuple)): if response["result"] in (ERROR, WARNING): return response["result"], response["messages"] else: assert response["result"] == SUCCESS return response["result"], response["data"] else: if response["result"] in (ERROR, WARNING): raise ResponseError(response["messages"]) return response["data"] def _reconnect_with_timeout(self, timeout, num_retries): while True: try: return _RpcClientImpl(self._socket_path) except ServiceStateError: if num_retries: logger.info( "Waiting %d second(s) before retry...", timeout ) time.sleep(timeout) num_retries -= 1 else: raise
💾 Save Changes
❌ Cancel