REDROOM
PHP 8.3.31
Path:
Logout
Edit File
Size: 13.74 KB
Close
/proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/contracts/config_provider.py
Text
Base64
import json import logging import os import pwd from abc import abstractmethod from contextlib import suppress from textwrap import dedent from typing import Mapping, Optional, Protocol import sentry_sdk import yaml from defence360agent.utils import atomic_rewrite from defence360agent.utils.fd_ops import open_dir_no_symlinks logger = logging.getLogger(__name__) # Don't read config if its file is larger than this. _MAX_CONFIG_SIZE = 1 << 20 # 1MiB class IConfigProvider(Protocol): @abstractmethod def read_config_file( self, force_read: bool = False, ignore_errors: bool = True ): raise NotImplementedError @abstractmethod def write_config_file(self, config: Mapping) -> None: raise NotImplementedError @abstractmethod def modified_since(self, timestamp: Optional[float]) -> bool: raise NotImplementedError class ConfigError(Exception): pass class JsonMessage: """Pretty-print given *obj* as JSON. To be used for logging. Example: logging.info("object: %s", JsonMessage(obj)) """ def __init__(self, obj): self._obj = obj def __str__(self): return json.dumps(self._obj, sort_keys=True) def diff_section(prev_section: Optional[dict], section: Optional[dict]): """Return difference between config sections.""" prev_section = prev_section or {} section = section or {} removed_settings = prev_section.keys() - section.keys() added_settings = section.keys() - prev_section.keys() return { "-": {v: prev_section[v] for v in removed_settings}, "+": {v: section[v] for v in added_settings}, # modified settings "?": { v: (prev_section[v], section[v]) for v in (prev_section.keys() & section.keys()) if prev_section[v] != section[v] }, } def diff_config(prev_conf: dict, conf: dict): """Compare *prev_conf* with the current *conf*.""" removed_sections = prev_conf.keys() - conf.keys() yield {section: prev_conf[section] for section in removed_sections} added_sections = conf.keys() - prev_conf.keys() yield {section: conf[section] for section in added_sections} # changed sections yield { section: diff_section(prev_conf[section], conf[section]) for section in (prev_conf.keys() & conf.keys()) if prev_conf[section] != conf[section] } def exclude_equals(*, main_conf: dict, base_conf: dict) -> dict: """ Return dict derived from *main_conf* excluding parts that are equal in *base_conf*. For example, >>> base_conf = { "SECTION1": {"OPTION1": "default", "OPTION2": "default"}, "SECTION2": {"OPTION1": "default"} } >>> main_conf = { "SECTION1": {"OPTION1": "value", "OPTION2": "default"}, "SECTION2": {"OPTION1": "default"} } >>> >>> exclude_equals(main_conf=main_conf, base_conf=base_conf) {'SECTION1': {'OPTION1': 'value'}} >>> """ _, added, changed = diff_config(base_conf, main_conf) result = {} for section, value in main_conf.items(): if section in added.keys(): result[section] = value if section in changed.keys(): result.setdefault(section, {}).update(changed[section]["+"]) result.setdefault(section, {}).update( {k: v[1] for k, v in changed[section]["?"].items()} ) return result class ConfigReader: """ ConfigFile file for settings page. Location config file is PATH """ def __init__(self, path, disclaimer="", permissions=None): self.path = path self.disclaimer = disclaimer self.permissions = permissions def __repr__(self): return "<{classname}({path})>".format( classname=self.__class__.__qualname__, path=self.path ) def __str__(self): return f"ConfigReader at {self.path}" def read_config_file( self, force_read: bool = False, ignore_errors: bool = True ) -> dict: """Read config file into memory. Raises ConfigError. """ try: if os.path.getsize(self.path) > _MAX_CONFIG_SIZE: raise ConfigError("Config file is too large") filename = self.path with open(filename, "r") as config_file: logger.info("Reading config file %s", filename) text = config_file.read() except UnicodeDecodeError as e: raise ConfigError("Unable to decode config file") from e except FileNotFoundError: return {} try: return self.load_config_body(text) except ConfigError as e: logger.error(e) if ignore_errors: return {} raise e def load_config_body(self, text: str) -> dict: try: config = yaml.safe_load(text) except yaml.YAMLError as e: raise ConfigError( f"Imunify360 config is not valid YAML document ({e})" ) from e if config is None: return {} if not isinstance(config, dict): raise ConfigError( "Imunify360 config is invalid or empty" ": path={!r}, text={!r}".format(self.path, text) ) return config def _pre_write(self): pass def _post_write(self): pass def _serialize_config(self, config) -> str: config_text = "" if self.disclaimer: config_text += dedent(self.disclaimer) config_text += "\n" config_text += yaml.dump(config, default_flow_style=False) return config_text def write_config_file(self, config) -> str: self._pre_write() config_text = self._serialize_config(config) atomic_rewrite( self.path, config_text, backup=False, permissions=self.permissions ) self._post_write() return config_text def modified_since(self, timestamp: Optional[float]) -> bool: return True class CachedConfigReader(ConfigReader): def __init__(self, path, disclaimer="", permissions=None): super().__init__(path, disclaimer) self.mtime: Optional[float] = None self.size: Optional[float] = None self._config = {} self.permissions = permissions def __str__(self): return ( "{classname} <'{path}', modified at {mtime}, {size} bytes>".format( classname=self.__class__.__qualname__, path=self.path, mtime=self.mtime, size=self.size, ) ) def read_config_file( self, force_read: bool = False, ignore_errors: bool = True ): """Update config if config file is modified""" if self.modified_since(self.mtime) or force_read: prev_config = self._config try: self._config = super().read_config_file( ignore_errors=ignore_errors ) except ConfigError as error: sentry_sdk.capture_exception(error) logger.warning( "%s is invalid, using previous settings: %s", self, JsonMessage(self._config), ) if not ignore_errors: raise error else: if self.mtime is not None: # don't log on startup diffs = list(diff_config(prev_config, self._config)) if any(diffs): # content has changed, log it logger.info( "%s modified: removed=%s, added=%s, changed=%s", self, *map(JsonMessage, diffs), ) self._refresh_stat_cache() return self._config def _refresh_stat_cache(self) -> None: """Sync cached mtime/size with the file on disk.""" try: stat = os.stat(self.path) self.mtime = stat.st_mtime self.size = stat.st_size except FileNotFoundError: self.mtime = 0.0 self.size = 0.0 def modified_since(self, timestamp: Optional[float]) -> bool: """Whether the config has updated since *timestamp*. (as defined by its last modification time and size) :param timestamp: None means that the file has never been read before """ # On startup consider timestamp to be None if timestamp is None: timestamp = 0.0 try: stat = os.stat(self.path) except FileNotFoundError: st_mtime, st_size = 0.0, 0.0 else: st_mtime, st_size = stat.st_mtime, stat.st_size return st_mtime > timestamp or st_size != self.size class WriteOnlyConfigReader(CachedConfigReader): def __init__(self, path, disclaimer="", permissions=None): super().__init__(path, disclaimer, permissions) # write-only readers never hit the parent read path that populates # mtime/size, so seed them from disk now — otherwise the size # fallback in modified_since() (st_size != self.size) compares # against None forever and the check is stuck at True. self._refresh_stat_cache() def read_config_file(self, *_, **__): return self._config def write_config_file(self, config): config_text = super().write_config_file(config) self._config = self.load_config_body(config_text) self._refresh_stat_cache() return config_text class UserConfigReader(CachedConfigReader): """Per-user config reader that resists TOCTOU symlink attacks. The user-specific subdirectory ``<USER_CONFDIR>/<username>/`` and the config file inside it must end up owned by ``root:<user-gid>`` with modes ``0750`` / ``0640``. Earlier revisions performed the ``mkdir`` -> ``chown`` -> ``chmod`` sequence on path strings, which left a TOCTOU window: between the directory existing and the metadata syscalls, a swap to a symlink could redirect the chown to an arbitrary inode. See DEF-41586 / CLOS-3965 for context. The hardened path opens the parent ``USER_CONFDIR`` once with ``O_NOFOLLOW`` at every component, then performs every subsequent operation (``mkdir``/``chown``/``chmod``/atomic write) relative to that fd or to a fresh ``O_NOFOLLOW`` fd of the user subdir. No user-controlled path string is dereferenced more than once. """ DIR_PERMISSIONS = 0o750 FILE_PERMISSIONS = 0o640 def __init__(self, path, username): super().__init__(path) self.username = username def __str__(self): return f"Config of user {self.username}" def _open_user_subdir(self, parent_fd: int, name: str) -> int: """Return an O_NOFOLLOW fd for ``name`` inside *parent_fd*. Creates the directory first if it does not already exist. The ``O_NOFOLLOW`` flag guarantees that, if a symlink appears in the slot at any time after this call returns, every subsequent ``fchown``/``fchmod``/atomic-rewrite bound to the returned fd operates on the originally opened inode. """ with suppress(FileExistsError): os.mkdir(name, mode=self.DIR_PERMISSIONS, dir_fd=parent_fd) return os.open( name, os.O_RDONLY | os.O_DIRECTORY | os.O_NOFOLLOW, dir_fd=parent_fd, ) def write_config_file(self, config) -> str: gid = pwd.getpwnam(self.username).pw_gid confdir, basename = os.path.split(self.path) userconfdir, username = os.path.split(confdir) # Open USER_CONFDIR (root-owned, package-controlled) with full # symlink protection at every path component. Then descend to # the per-user subdir using a dir_fd-relative open with # O_NOFOLLOW so a symlink swap cannot redirect us. parent_fd = open_dir_no_symlinks(userconfdir) try: user_fd = self._open_user_subdir(parent_fd, username) try: # Apply directory ownership/permissions on the fd we # just opened — bound to the inode, not to the path. os.chown(user_fd, 0, gid) os.fchmod(user_fd, self.DIR_PERMISSIONS) config_text = self._serialize_config(config) # atomic_rewrite_fd creates a temp file via # O_CREAT|O_EXCL|O_NOFOLLOW relative to user_fd, chowns # and chmods the temp inode (not a path), then renames # it into place — all without leaving a TOCTOU window. atomic_rewrite( basename, config_text, backup=False, uid=0, gid=gid, permissions=self.FILE_PERMISSIONS, dir_fd=user_fd, ) # Re-normalize ownership/permissions on every call so # that an out-of-band ``chmod``/``chown`` between writes # cannot leave the file with weaker permissions. When # ``atomic_rewrite_fd`` short-circuits on identical # content, no chown/chmod runs there, so we apply them # here. ``O_NOFOLLOW`` keeps the fix TOCTOU-safe. file_fd = os.open( basename, os.O_RDONLY | os.O_NOFOLLOW, dir_fd=user_fd, ) try: os.chown(file_fd, 0, gid) os.fchmod(file_fd, self.FILE_PERMISSIONS) finally: os.close(file_fd) finally: os.close(user_fd) finally: os.close(parent_fd) return config_text
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 1 × Files: 12
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
__pycache__
DIR
-
drwxr-xr-x
2026-06-08 20:23:14
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
config.py
46.66 KB
lrw-r--r--
2026-05-26 21:25:11
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
config_provider.py
13.74 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
eula.py
1.47 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
hooks.py
6.13 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
hook_events.py
1.51 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
license.py
24.26 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
messages.py
12.65 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
myimunify_id.py
5.79 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
permissions.py
6.46 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
plugins.py
8.13 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
sentry.py
3.09 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
0 B
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).