REDROOM
PHP 8.3.31
Path:
Logout
Edit File
Size: 10.91 KB
Close
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys/panels/generic/panel.py
Text
Base64
import asyncio import datetime import functools import json import logging import os import shlex from collections import defaultdict from typing import Dict, List, Set import cerberus import yaml from defence360agent.api.integration_conf import ( ClIntegrationConfig, IntegrationConfig, ) from defence360agent.api.jwt_issuer import JWTIssuer from defence360agent.application.determine_hosting_panel import ( is_generic_panel_installed, ) from defence360agent.contracts.config import int_from_envvar from defence360agent.rpc_tools.lookup import UserType from defence360agent.utils import ( CheckRunError, check_run, get_non_system_users, timed_cache, ) from .. import base logger = logging.getLogger(__name__) _SCHEMA_PATH_TMPL = ( os.path.dirname(__file__) + "/users_script_schemas/schema-{}.yaml" ) ADMIN_LIST_FILE_PATH = "/etc/sysconfig/imunify360/auth.admin" METADATA = "metadata" EXPIRATION_TIME_FOR_INTEGRATION_SCRIPTS = datetime.timedelta( seconds=int_from_envvar( "IMUNIFY360_EXPIRATION_FOR_INTEGRATION_SCRIPTS", 90 ) ) @functools.lru_cache(maxsize=2) def _get_validator(script: str) -> cerberus.Validator: """Returns a validator for given script.""" with open(_SCHEMA_PATH_TMPL.format(script)) as schema_file: schema = yaml.safe_load(schema_file) if script is not METADATA: schema[METADATA] = {"required": True} return cerberus.Validator(schema) def get_users_default_impl(): return [dict(name=pw.pw_name) for pw in get_non_system_users()] def _get_conf_path(cl, script): d = cl.to_dict() if "integration_scripts" in d and script in d["integration_scripts"]: return d["integration_scripts"][script] return None @timed_cache(expiration=EXPIRATION_TIME_FOR_INTEGRATION_SCRIPTS, maxsize=10) async def get_integration_data(script: str): path = _get_conf_path(IntegrationConfig(), script) if not path: path = _get_conf_path(ClIntegrationConfig(), script) if not path: raise IntegrationScriptError( "%s not found neither in " "/etc/sysconfig/imunify360/integration.conf " "nor in /opt/cpvendor/etc/integration.ini." % script ) return await _get_integration_data(script, path) def _build_integration_argv(script: str, path: str) -> List[str]: """Tokenize a config-supplied script command into argv (no shell).""" if not path or not path.strip(): raise IntegrationScriptError( "Empty integration script path for %s" % script ) try: argv = shlex.split(path) except ValueError as e: raise IntegrationScriptError( "Invalid integration script path for %s: %s" % (script, e) ) if not argv: raise IntegrationScriptError( "Empty integration script path for %s" % script ) executable = argv[0] if not os.path.isabs(executable): raise IntegrationScriptError( "Integration script path for %s must be absolute: %s" % (script, executable) ) if not os.path.isfile(executable): raise IntegrationScriptError( "Integration script for %s does not exist: %s" % (script, executable) ) if not os.access(executable, os.X_OK): raise IntegrationScriptError( "Integration script for %s is not executable: %s" % (script, executable) ) return argv async def _get_integration_data(script: str, path: str): argv = _build_integration_argv(script, path) try: stdout = await check_run(argv) except CheckRunError as e: raise IntegrationScriptError( "Integrations script {script} " "failed with exit code {e.returncode} \n" "{e.stderr}".format(script=script, e=e) ) try: data = json.loads(stdout.decode()) except (UnicodeDecodeError, json.JSONDecodeError) as e: raise IntegrationScriptError( "Cannot decode output of %s as JSON" % path ) from e if not isinstance(data, dict): raise IntegrationScriptError("%s should return dict" % path) metadata_validator = _get_validator(METADATA) if not metadata_validator.validate(data): raise IntegrationScriptError( "Validation error in metadata of %s script: %s" % (script, metadata_validator.errors) ) if data[METADATA]["result"] != "ok": metadata_error = data[METADATA]["result"] if "message" in data[METADATA]: metadata_error += ": %s" % data[METADATA]["message"] raise IntegrationScriptError(metadata_error) validator = _get_validator(script) if not validator.validate(data): raise IntegrationScriptError( "Validation error in %s script: %s" % (script, validator.errors) ) return data["data"] async def _get_client_data(): try: users = await get_users_integration_data() domains = await get_integration_data("domains") for k, v in domains.items(): if v["owner"]: user_domains = users.setdefault(v["owner"], []) user_domains.append(k) return [{"name": k, "domains": v} for k, v in users.items()] except IntegrationScriptError: logger.warning( "Applying default implementation of users and domains lists" ) return get_users_default_impl() async def get_users_integration_data(): users = await get_integration_data("users") users_dict = {} for user in users: if not user["username"]: logger.warning(f"Found user with an empty username: {user}") else: users_dict[user["username"]] = [] return users_dict async def get_domain_data(): try: return await get_integration_data("domains") except IntegrationScriptError: logger.warning("Could not parse domains lists") return {} async def get_admin_list() -> List[str]: script_name = "admins" admins_set = {"root"} admins_from_integration_scripts = await asyncio.gather( _get_integration_data( script_name, _get_conf_path( IntegrationConfig(), script_name, ), ), _get_integration_data( script_name, _get_conf_path( ClIntegrationConfig(), script_name, ), ), return_exceptions=True, ) custom_admins = { admin["name"] for admins in admins_from_integration_scripts if isinstance(admins, list) # skip exceptions for admin in admins } if not custom_admins: logger.warning( "Error occurred during extracting admins " "from integration configs: %s", admins_from_integration_scripts, ) admins_set |= custom_admins try: with open(ADMIN_LIST_FILE_PATH) as admin_list_file: admins_set.update(admin_list_file.read().splitlines()) except OSError: logger.warning( "Failed to retrieve admins list from %s", ADMIN_LIST_FILE_PATH ) return list(admins_set) class IntegrationScriptError(base.PanelException): def __init__(self, *args, **kwargs): super().__init__(*args) logger.warning(self) class GenericPanel(base.AbstractPanel): """ Panel, UI to which is provided by imunify{-antivirus,360-firewall}-generic.{rpm,deb} """ NAME = base.GENERIC_PANEL_NAME exception = IntegrationScriptError @classmethod def is_installed(cls): return is_generic_panel_installed() # pragma: no cover async def enable_imunify_plugin(self, name=None): pass async def disable_imunify_plugin(self, plugin_name=None): pass @classmethod async def version(cls): try: info = await get_integration_data("panel_info") return "{name} {version}".format(**info) except IntegrationScriptError: return "0" @classmethod async def name(cls): try: info = await get_integration_data("panel_info") return "{name}".format(**info) except IntegrationScriptError: return cls.NAME async def get_user_domains(self): users = await _get_client_data() result = [] for user in users: result.extend(user.get("domains", tuple())) return result async def get_users(self) -> List[str]: users = await _get_client_data() return [user["name"] for user in users] async def get_domain_to_owner(self) -> Dict[str, List[str]]: users = await _get_client_data() result = defaultdict(list) for user in users: for domain in user.get("domains", []): result[domain].append(user["name"]) return result async def get_user_details(self) -> Dict[str, Dict[str, str]]: usernames = await self.get_users() admins = await get_admin_list() user_data = await get_integration_data("users") def user_info_to_level(user_info: Dict): if user_info.get("username") == "root": return base.UserLevel.ADMIN if user_info.get("username") in admins: return base.UserLevel.RESSELER return base.UserLevel.REGULAR_USER return { info.get("username"): { "email": info.get("email", ""), "locale": info.get("locale_code", ""), "level": int(user_info_to_level(info)), } for info in user_data if info.get("username") in usernames } async def get_domains_per_user(self) -> Dict[str, List[str]]: users = await _get_client_data() return {user["name"]: user.get("domains", []) for user in users} def authenticate(self, protocol, data: dict): if protocol._uid != 0 and data["command"] != ["login", "pam"]: token = data["params"].pop("jwt", None) parsed_token = JWTIssuer.parse_token(token) return parsed_token["user_type"], ( parsed_token["user_name"] if parsed_token["user_type"] == UserType.NON_ROOT else None ) else: return protocol.user, None def basedirs(self) -> Set[str]: conf = IntegrationConfig().to_dict() if "malware" in conf and "basedir" in conf["malware"]: return set(conf["malware"]["basedir"].split()) return set() async def list_docroots(self) -> Dict[str, str]: domains = await get_domain_data() return {v["document_root"]: domain for domain, v in domains.items()} async def panel_user_link(self, username) -> str: """ Returns panel url :return: str """ return "" @classmethod async def get_user_domains_details( cls, username: str ) -> list[base.DomainData]: return []
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 2 × Files: 2
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
users_script_schemas
DIR
-
drwxr-xr-x
2026-06-08 20:23:14
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__pycache__
DIR
-
drwxr-xr-x
2026-06-08 20:23:14
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
panel.py
10.91 KB
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
60 B
lrw-r--r--
2026-05-26 21:20:44
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).