REDROOM
PHP 8.3.31
Path:
Logout
Edit File
Size: 10.06 KB
Close
/proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/plugins/aibolit_resident_ff_watcher.py
Text
Base64
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> DEF-43111: AV-mode watcher that restarts aibolit-resident when the malware-stack kill-switch (force_aibolit_stack) flips. Polls FLAGS_PLAIN_PATH (one-flag-name-per-line plain text written by defence360agent.plugins.feature_flags.FeatureFlagsSync) on a short interval; on a False<->True transition, scans /etc/systemd/system/ aibolit-resident.service.d/ for drop-in overrides (logs a WARNING if present) and calls imav.malwarelib.subsys.aibolit.restart_on_sigs_or_config_update. In IM360 mode the Go resident-agent already owns this transition (see src/resident-agent/plugins/feature_flags/plugin.go), so this watcher is Scope.AV only to avoid double-restarting. """ import asyncio import logging import os from defence360agent.contracts.plugins import MessageSource from defence360agent.internals.feature_flags import FLAGS_PLAIN_PATH from defence360agent.utils import Scope logger = logging.getLogger(__name__) _FORCE_AIBOLIT_STACK_FF_NAME = "force_aibolit_stack" def _env_int(name: str, default: int) -> int: """Read an int env var tolerantly. A non-numeric value (empty string, typo, etc.) must NOT raise at import time — the watcher lives in the AV agent entry point and a bad env var would otherwise kill the agent. """ raw = os.environ.get(name) if not raw: return default try: return int(raw) except ValueError: logger.warning( "aibolit-resident FF watcher: %s=%r is not an int, using" " default %d", name, raw, default, ) return default _POLL_INTERVAL = _env_int("I360_FORCE_AIBOLIT_STACK_POLL_SEC", 30) def _read_force_aibolit_stack_uncached(path: str | None = None) -> bool: """Return True iff force_aibolit_stack appears in the plain-text sidecar. Reads FLAGS_PLAIN_PATH directly (one bare flag name per line, sorted, written atomically by both FeatureFlagsSync._write_flags and the Go resident-agent). The aibolit-resident systemd unit's ExecStart= shell greps the same file, so reading it here keeps the watcher's transition detector aligned with the unit-file decision. Bypasses any caching deliberately: - Avoids the mtime cache in defence360agent.internals.feature_flags.is_enabled (whole-second resolution would race with an immediately-following write and silently mask transitions inside the same wall-clock second). - Avoids the JSON file's value-interpretation surface (truthy/falsy across bool/int/string variants); the plain file already represents the enabled-set semantic, so exact-line match is the right comparison. `path` defaults to the module-level FLAGS_PLAIN_PATH resolved at call time (NOT bound at function-definition time) so unit tests can override that constant via monkeypatch and have it take effect here. Returns False when the file is missing, unreadable, empty, or the flag name is not present — matching the agent's default-off semantics. """ if path is None: path = FLAGS_PLAIN_PATH try: with open(path, encoding="utf-8") as f: for line in f: if line.rstrip("\n") == _FORCE_AIBOLIT_STACK_FF_NAME: return True except (OSError, UnicodeDecodeError): pass return False _AIBOLIT_RESIDENT_DROPIN_DIR = "/etc/systemd/system/aibolit-resident.service.d" def _scan_aibolit_resident_dropin_dir( path: str = _AIBOLIT_RESIDENT_DROPIN_DIR, ) -> tuple[list[str], OSError | None]: """Scan `path` for .conf drop-in files (blocking syscall). Returns ``(sorted_conf_names, None)`` on success, where the list is empty if the directory exists but contains no .conf overrides. Returns ``([], None)`` when the directory is absent (clean state — no warning). Returns ``([], err)`` for unexpected OSErrors so the async caller can log with ``exc_info=err``. Lives at module scope (not a method) so the async caller can pass it to ``loop.run_in_executor`` without partially binding ``self``. """ try: entries = list(os.scandir(path)) except FileNotFoundError: return [], None except OSError as e: return [], e confs = sorted(e.name for e in entries if e.name.endswith(".conf")) return confs, None class AibolitResidentFFWatcher(MessageSource): SCOPE = Scope.AV def __init__(self) -> None: self._loop: asyncio.AbstractEventLoop | None = None self._task: asyncio.Task | None = None self._last_mtime: float | None = None self._last_value: bool | None = None # None = uninitialised async def create_source(self, loop, sink) -> None: self._loop = loop self._task = loop.create_task(self._poll_loop()) async def shutdown(self) -> None: if self._task is not None: self._task.cancel() try: await self._task except asyncio.CancelledError: pass async def _poll_loop(self) -> None: while True: try: await self._poll_once() except asyncio.CancelledError: raise except Exception: logger.warning( "aibolit-resident FF watcher poll failed", exc_info=True ) await asyncio.sleep(_POLL_INTERVAL) async def _poll_once(self) -> None: loop = asyncio.get_event_loop() try: mtime = await loop.run_in_executor( None, os.path.getmtime, FLAGS_PLAIN_PATH ) except OSError: mtime = None # missing file = "no flags enabled" semantics if mtime == self._last_mtime: return # short-circuit: nothing changed since last poll new_value = await loop.run_in_executor( None, _read_force_aibolit_stack_uncached, FLAGS_PLAIN_PATH ) old_value = self._last_value if old_value is None: # First poll initialises state silently. No risk of half-applied # state to roll back — commit immediately. self._last_mtime = mtime self._last_value = new_value return if old_value != new_value: try: await self._handle_transition(old_value, new_value) except Exception: # Restart failed (transient systemctl/dbus issue, OOM, etc). # KEEP _last_mtime and _last_value at the old values so the # next poll re-detects the transition and retries. Without # this rollback the watcher would think it had applied the # new state and stay stuck on the wrong stack until the FF # flips again. Log ERROR so operators see the failure. logger.error( "failed to apply force_aibolit_stack transition; " "will retry on next poll", exc_info=True, ) return # Transition succeeded (or the value didn't actually change despite # the mtime bump). Commit the new state. self._last_mtime = mtime self._last_value = new_value async def _handle_transition( self, old_value: bool, new_value: bool ) -> None: """Restart aibolit-resident on a force_aibolit_stack flip. Restart picks up the FF in the unit-file's ExecStart= shell. Also scans for systemd drop-ins under /etc/systemd/system/aibolit-resident.service.d/ — anything there (notably rustbolit/scripts/use_rustbolit_realtime.sh's rustbolit.conf) overrides the base unit's FF check, so we log a loud WARNING. We do NOT remove drop-ins automatically — too aggressive (could blow away legitimate operator overrides). Restart errors propagate — the caller (``_poll_once``) catches them, logs ERROR, and rolls back the recorded state so the next poll re-detects the transition. Swallowing here would silently leave the watcher stuck on the wrong state. """ logger.warning( "force_aibolit_stack transitioned %s -> %s; " "restarting aibolit-resident to pick up new selection", old_value, new_value, ) # os.scandir is a blocking syscall — dispatch to the executor. loop = asyncio.get_event_loop() confs, scan_err = await loop.run_in_executor( None, _scan_aibolit_resident_dropin_dir, _AIBOLIT_RESIDENT_DROPIN_DIR, ) if scan_err is not None: logger.warning( "could not scan %s for drop-in overrides", _AIBOLIT_RESIDENT_DROPIN_DIR, exc_info=scan_err, ) elif confs: logger.warning( "aibolit-resident has systemd drop-in overrides at %s; " "FF flip may be ineffective until they are removed. " "Files: %s", _AIBOLIT_RESIDENT_DROPIN_DIR, ", ".join(confs), ) # Direct (eager) import: this watcher lives in imav, so imav.malwarelib # is always available — no need for the lazy try/except that the old # cross-package call site needed. from imav.malwarelib.subsys import aibolit await aibolit.restart_on_sigs_or_config_update(None, True)
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 1 × Files: 19
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
__pycache__
DIR
-
drwxr-xr-x
2026-06-08 20:24:19
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
aibolit_resident_ff_watcher.py
10.06 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
check_license.py
9.27 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
conflicts.py
3.18 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
detect_admin_tools_watcher.py
2.64 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
event_hooks.py
3.47 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
event_hook_executor.py
3.72 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
generic_sensor.py
5.82 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
im360_register.py
2.82 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
imunify_patch_id.py
2.33 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
inotify.py
1.75 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
malware_filters.py
3.59 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
mr_proper.py
2.67 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
plesk_notifications.py
4.59 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
post_action.py
1.72 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
restore_from_backup.py
3.37 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
send_malware_infection_state.py
11.51 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
server_pull.py
2.56 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
service_manager.py
3.93 KB
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
0 B
lrw-r--r--
2026-05-26 21:31:52
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).