REDROOM
PHP 8.3.31
Path:
Logout
Edit File
Size: 17.65 KB
Close
//opt/cloudlinux/venv/lib/python3.11/site-packages/clcommon/public_hooks/bundle/cpanel/lib.py
Text
Base64
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT # import argparse import hashlib import json import os import re import subprocess import logging import urllib.parse import raven from clcommon.public_hooks import ( POST_MODIFY_DOMAIN, PRE_MODIFY_USER, POST_MODIFY_USER ) from clcommon.public_hooks.lib.helpers import valid_name from clcommon.utils import run_command, ExternalProgramFailed, is_user_present from clcommon import cpapi from secureio import makedirs_secure, write_file_secure, read_file_secure EXIT_NO_USER_FOUND = 1 ERR_NO_USER_FOUND = 'Unable to find username in hook cmdline' WHMAPI1 = '/usr/sbin/whmapi1' LVE_DIR = '/var/lve' TMP_DIR = os.path.join(LVE_DIR, 'tmp') logger = logging.getLogger(__name__) # Domain validation: RFC-1035 hostname-style (letters, digits, dot, hyphen). # Use re.fullmatch at call-sites — the prior `^...$` form would have allowed # a trailing newline (Python `$` matches end-of-string OR just before a final # `\n`). bugbot 0bcbadf9 on !27 flagged this for the username regex; we apply # the same fullmatch discipline here to domains. Username validation routes # through valid_name (which already uses fullmatch). # # RFC 1035 hostname shape: each label starts with an alphanumeric, may # contain interior `-`, ends with an alphanumeric, ≤63 chars per label, # labels joined by literal `.`. Rejects bare `.`, `..`, `---`, leading/ # trailing dots and hyphens (bugbot 5a4f0354 on d1cc213). Mirrors the # regex already in post_modify_domain.py — keeping the cpanel-bundle # copy avoids a public-hooks-bin import from the cpanel bundle. _VALID_DOMAIN_RE = re.compile( r'[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?' r'(?:\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*' ) def _is_valid_username(value): """Wrap the shared valid_name helper to fit the cpanel-hook return-0 convention. valid_name raises argparse.ArgumentTypeError on bad input; cpanel hooks must return 0 (not sys.exit) so the cPanel hook chain keeps running for unrelated downstream listeners.""" try: valid_name(value) return True except argparse.ArgumentTypeError: return False def _is_valid_domain(value): """Domain-name validation matching the prior cpanel-bundle behavior but using fullmatch to defeat the trailing-newline bypass (bugbot 0bcbadf9).""" return isinstance(value, str) and bool(_VALID_DOMAIN_RE.fullmatch(value)) def print_response(hook_name, success=True): """ cPanel expects that each custom hook prints two values in the end of the execution: - status, where 1 means success - message, which explains non-1 statuses otherwise nothing really breaks, but logs are full of "script returned invalid response" msgs :param hook_name: name, path or anything else to fill message with in order to understand what exactly failed :param success: is it everything ended successfully? :return: Nothing """ if not success: print(0, f"Failed to execute hook {hook_name}; you can find logs in " "/var/log/cloudlinux/hooks/info.log " "and contact CloudLinux Support if you need " "help with the issue.") else: print(1, "Ok") # ATTENTION: we use this function in # processpaneluserspackages and lvemanager def call_sync_map(): """ Run lvectl sync-map and log possible stdout|err in case of errors. :return: None """ with subprocess.Popen( ['lvectl', 'sync-map'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) as proc: stdout, stderr = proc.communicate() if proc.returncode != 0: logger.error('Error during "lvectl sync-map", code: %s, ' 'stderr: `%s`, stdout: `%s`. Reseller limits ' 'kernel mapping might be not synchronized.' 'Contact CloudLinux Support for help.' '', proc.returncode, stdout, stderr) def cpanel_postwwwacct_main(data): """ Post create account hook of cPanel :return: None """ user = data.get('user', None) owner = data.get('owner', None) if not user: logger.warning(ERR_NO_USER_FOUND) return EXIT_NO_USER_FOUND return subprocess.call([ POST_MODIFY_USER, 'create', '--username', user, '--owner', owner], env={'CPANEL_RESTORE': str(data.get('is_restore', 0))}) def cpanel_prekillacct_main(data): """ Pre kill account hook of cPanel :return: None """ # It's necessary destroy lve before remove user home directory, # otherwise will be error due busy mount points of cagefs user = data.get('user', None) if not user: logger.warning(ERR_NO_USER_FOUND) return EXIT_NO_USER_FOUND if not is_user_present(user): logger.warning('User %s does not present in the system, skip hook', user) return 0 return subprocess.call([ PRE_MODIFY_USER, 'delete', '--username', user]) def cpanel_postkillacct_main(data): """ Post kill account hook of cPanel :return: None """ user = data.get('user', None) if not user: logger.warning(ERR_NO_USER_FOUND) return EXIT_NO_USER_FOUND return subprocess.call([ POST_MODIFY_USER, 'delete', '--username', user]) def cpanel_postunsuspendacct_main(data): """ Post unsuspend account hook of cPanel data: {'result': 1, 'args': {'user': 'susp2'}, 'user': 'root'} """ user = data.get('args', {}).get('user', None) if not user: logger.warning(ERR_NO_USER_FOUND) return EXIT_NO_USER_FOUND if not is_user_present(user): logger.warning('User %s does not present in the system, skip hook', user) return 0 return subprocess.call([ POST_MODIFY_USER, 'unsuspend', '--username', user]) def cpanel_postsuspendacct_main(data): """ Post unsuspend account hook of cPanel data: {'result': 1, 'args': {'user': 'susp2'}, 'user': 'root'} """ user = data.get('args', {}).get('user', None) if not user: logger.warning(ERR_NO_USER_FOUND) return EXIT_NO_USER_FOUND if not is_user_present(user): logger.warning('User %s does not present in the system, skip hook', user) return 0 return subprocess.call([ POST_MODIFY_USER, 'suspend', '--username', user]) def _safe_tmp_path(user): """ Build a safe file path under TMP_DIR for the given user. Raises ValueError if the resulting path escapes TMP_DIR. """ if not user or os.sep in user or (os.altsep and os.altsep in user) or '..' in user: raise ValueError(f"Invalid user value: {user!r}") filename = os.path.join(TMP_DIR, user) real = os.path.realpath(filename) if not real.startswith(os.path.realpath(TMP_DIR) + os.sep): raise ValueError(f"Path traversal detected for user: {user!r}") return filename def _read_old_domain(user): """ Read old domain for modified account :param user: name of user :return: old domain :rtype: str """ domain = None try: filename = _safe_tmp_path(user) except ValueError: logger.error("Invalid user for tmp file: %s", user) return None try: content = read_file_secure(filename, uid=0, gid=0, exit_on_error=False, # This log is intended to be used only by # cagefs update command write_log=False) domain = content[0] except (IndexError, OSError, IOError): # use Raven carefully and only in places where # you sure that sentry is already initialized raven.base.Raven.captureException( message='failed to read old domain for user (pre hook no called?)') return domain def cpanel_postmodifyacct_main(data): """ Post modify account hook of cPanel :return: None """ user = data.get('user', None) new_user = data.get('newuser', None) domain = data.get('domain', None) exit_code = 0 # changing owner of user # FIXME: this check does not work because cpanel sends `owner` always new_owner = data.get('owner', data.get('OWNER')) args = [POST_MODIFY_USER, 'modify', '-u', user] if new_owner: args += ['--new-owner', new_owner] if all((user, new_user,)) and user != new_user: args += ['--new-username', new_user] exit_code += subprocess.call(args) old_domain = _read_old_domain(user) if domain is not None and domain != old_domain: # looks like domain is renamed exit_code += subprocess.call([ POST_MODIFY_DOMAIN, 'modify', '--username', user if new_user is None else new_user, '--domain', old_domain, '--new-domain', domain, '--include-subdomains']) return exit_code def cpanel_postrestoreacct_main(data): """ Post restore account hook of cPanel :return: None """ user = data.get('user', None) if not user: logger.warning(ERR_NO_USER_FOUND) return EXIT_NO_USER_FOUND return subprocess.call([ POST_MODIFY_USER, 'restore', '--username', user]) def _get_old_domain(user): """ Get old domain for modified account :param user: name of user :return: old domain :rtype: str """ domain = None try: cmd = [ WHMAPI1, 'listaccts', f'search={user}', 'searchtype=user', 'searchmethod=exact', 'want=domain', '--output=json' ] std_out = run_command(cmd, return_full_output=True)[1] # take only std_out, ignore std_err data = json.loads(std_out) domain = data['data']['acct'][0]['domain'] except (ExternalProgramFailed, IndexError, KeyError): # use Raven carefully and only in places where # you sure that sentry is already initialized raven.base.Raven.captureException(message='failed to get old domain for user from cpanel') return domain def cpanel_premodifyacct_main(data): """ Pre modify account hook of cPanel :return: None """ user = data.get('user') # getting old domain # TODO: why not cpapi? domain = _get_old_domain(user) if domain is None: return 0 # save old domain to file try: filename = _safe_tmp_path(user) except ValueError: logger.error("Invalid user for tmp file: %s", user) return 0 makedirs_secure(TMP_DIR, perm=0o750, uid=0, gid=0, parent_path=LVE_DIR) write_file_secure([domain], filename, uid=0, gid=0, perm=0o700) return 0 def cpanel_postcreateaddondom_main(data): args = data.get('args', {}) user = data.get('user', None) newdomain = args.get('newdomain', None) or args.get('domain', None) if not all([user, newdomain]): logger.warning("Missing required arguments: user=%s, newdomain=%s", str(user), str(newdomain)) return 0 if not _is_valid_username(user): logger.warning("Invalid username value: %s", str(user)) return 0 if not _is_valid_domain(newdomain): logger.warning("Invalid domain value: %s", str(newdomain)) return 0 return subprocess.call( [ POST_MODIFY_DOMAIN, 'create', '--username', user, '--domain', newdomain] ) def cpanel_postkilladdondom_main(data): args = data.get('args', {}) user = data.get('user', None) domain = args.get('domain', None) if not all([user, domain]): logger.warning("Missing required arguments: user=%s, domain=%s", str(user), str(domain)) return 0 if not _is_valid_username(user): logger.warning("Invalid username value: %s", str(user)) return 0 if not _is_valid_domain(domain): logger.warning("Invalid domain value: %s", str(domain)) return 0 return subprocess.call( [ POST_MODIFY_DOMAIN, 'delete', '--username', user, '--domain', domain] ) def cpanel_prechangesubdomaindocroot_main(data): """ Pre change subdomain document root hook of cPanel Called when subdomain document root is being changed via Api2::SubDomain::changedocroot Saves old docroot to file for post hook to use :param data: hook data from cPanel :return: exit code """ args = data.get('args', {}) user = data.get('user', None) subdomain = args.get('subdomain', None) rootdomain = args.get('rootdomain', None) new_dir = args.get('dir', None) if not all([user, subdomain, rootdomain, new_dir]): logger.error("Missing required arguments: user=%s, subdomain=%s, rootdomain=%s, new_dir=%s", str(user), str(subdomain), str(rootdomain), str(new_dir)) return 0 # Construct full domain name (subdomain.rootdomain) domain = f"{subdomain}.{rootdomain}" # Get old docroot using cpapi try: old_docroot, _ = cpapi.docroot(domain) except Exception: raven.base.Raven.captureException( message=f'failed to get old docroot for subdomain {domain}') logger.warning("Could not get old docroot for domain %s", domain) return 0 # Save old docroot and new dir to file for post hook # Use a hash suffix to avoid TOCTOU races with concurrent changes docroot_tmp_dir = os.path.join(TMP_DIR, 'prechangesubdomaindoc') suffix = hashlib.md5(f"{user}:{new_dir}".encode()).hexdigest()[:8] filename = os.path.join(docroot_tmp_dir, f"{domain}.{suffix}") if not os.path.realpath(filename).startswith(os.path.realpath(docroot_tmp_dir) + os.sep): logger.error("Invalid domain name %s: path traversal detected", domain) return 0 if not os.path.realpath(filename).startswith(os.path.realpath(docroot_tmp_dir) + os.sep): logger.error("Invalid domain name %s: path traversal detected", domain) return 0 makedirs_secure(TMP_DIR, perm=0o750, uid=0, gid=0, parent_path=LVE_DIR) makedirs_secure(docroot_tmp_dir, perm=0o750, uid=0, gid=0, parent_path=TMP_DIR) write_file_secure([old_docroot], filename, uid=0, gid=0, perm=0o700) return 0 def _read_old_docroot(domain, suffix): """ Read saved docroot data for a subdomain """ docroot_tmp_dir = os.path.join(TMP_DIR, 'prechangesubdomaindoc') filename = os.path.join(docroot_tmp_dir, f"{domain}.{suffix}") if not os.path.realpath(filename).startswith(os.path.realpath(docroot_tmp_dir) + os.sep): logger.error("Invalid domain name %s: path traversal detected", domain) return None if not os.path.realpath(filename).startswith(os.path.realpath(docroot_tmp_dir) + os.sep): logger.error("Invalid domain name %s: path traversal detected", domain) return None try: content = read_file_secure(filename, uid=0, gid=0, exit_on_error=False, write_log=False) old_docroot = content[0] return old_docroot except (IndexError, OSError, IOError): raven.base.Raven.captureException( message='failed to read old docroot for subdomain (pre hook not called?)') return None finally: try: os.unlink(filename) except OSError: pass def cpanel_postchangesubdomaindocroot_main(data): """ Post change subdomain document root hook of cPanel Called after subdomain document root is changed via Api2::SubDomain::changedocroot Reads saved docroot data and calls post_modify_domain :param data: hook data from cPanel :return: exit code """ args = data.get('args', {}) user = data.get('user', None) subdomain = args.get('subdomain', None) rootdomain = args.get('rootdomain', None) new_dir = args.get('dir', None) # new_dir must be in the all() check: the pre-hook only writes a temp # file when new_dir is present (line ~408 above), and the suffix is # md5(user + new_dir). If we computed suffix here with new_dir=None, # the literal string "None" would produce a suffix that does NOT match # any pre-hook file — _read_old_docroot would silently return None and # the pre-hook's temp file would leak. Fixes bugbot fbaa32bc on !28. if not all([user, subdomain, rootdomain, new_dir]): logger.warning("Missing required arguments: user=%s, subdomain=%s, rootdomain=%s, new_dir=%s", str(user), str(subdomain), str(rootdomain), str(new_dir)) return 0 # Construct full domain name (subdomain.rootdomain) domain = f"{subdomain}.{rootdomain}" suffix = hashlib.md5(f"{user}:{new_dir}".encode()).hexdigest()[:8] old_docroot = _read_old_docroot(domain, suffix) if old_docroot is None: logger.warning("Could not read old docroot for domain %s", domain) return 0 # new_dir is guaranteed truthy by the all([...]) guard above. new_docroot = urllib.parse.unquote(new_dir) if '\0' in new_docroot or '..' in os.path.normpath(new_docroot).split(os.sep): logger.warning("Invalid new_docroot path for domain %s: %s", domain, new_docroot) return 0 cmd = [ POST_MODIFY_DOMAIN, 'modify', '--username', user, '--domain', domain, '--old-docroot', old_docroot, '--new-docroot', new_docroot, ] return subprocess.call(cmd)
Save
Close
Exit & Reset
Text mode: syntax highlighting auto-detects file type.
Directory Contents
Dirs: 2 × Files: 3
Delete Selected
Select All
Select None
Sort:
Name
Size
Modified
Enable drag-to-move
Name
Size
Perms
Modified
Actions
bin
DIR
-
drwxr-xr-x
2026-06-23 06:30:22
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__pycache__
DIR
-
drwxr-xr-x
2026-06-23 06:30:22
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
lib.py
17.65 KB
lrw-r--r--
2026-06-03 13:38:12
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
manager.py
6.29 KB
lrw-r--r--
2026-06-03 13:38:12
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
__init__.py
345 B
lrw-r--r--
2026-06-03 13:38:12
Edit
Download
Rename
Chmod
Change Date
Delete
OK
Cancel
recursive
OK
Cancel
recursive
OK
Cancel
Zip Selected
If ZipArchive is unavailable, a
.tar
will be created (no compression).