Preview: panel.py
Size: 14.23 KB
/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/subsys/panels/directadmin/panel.py
import asyncio
import base64
import configparser
import http.client
import json
import logging
import os
import pwd
import re
import socket
import urllib
import urllib.parse
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Set
from packaging.version import Version
from defence360agent.application.determine_hosting_panel import (
DA_FILE,
is_directadmin_installed,
)
from defence360agent.contracts.config import Core
from defence360agent.utils import (
HTTP_REQUEST_RETRY_TIMEOUT,
backoff_sleep,
retry_on,
run,
timeit,
)
from .. import base
from ..base import PanelException
logger = logging.getLogger(__name__)
BASE_DIR = "/home"
CMD = "/usr/bin/imunify360-command-wrapper"
HOOKS_DIR = "/usr/local/directadmin/scripts/custom"
SUDO_GROUP = "imunify360-sudousers"
SUDO_LINE = "%{0} ALL=NOPASSWD: {1}".format(SUDO_GROUP, CMD)
SUDO_TTY_LINE = "Defaults!/usr/bin/imunify360-command-wrapper !requiretty"
_VIRTUAL_DOMAINOWNERS = "/etc/virtual/domainowners"
TCP_PORTS_DA = base.TCP_PORTS_COMMON + ["2222", "35000-35999"]
USERS_CONF_DIR = "/usr/local/directadmin/data/users/"
_SUDOUSER_NAME_RE = re.compile(r"^[a-z_][a-z0-9_-]{0,31}\Z")
class DirectAdminException(base.PanelException):
pass
def _validate_sudouser(user) -> str:
"""Return ``user`` if safe; otherwise raise :class:`DirectAdminException`."""
if not isinstance(user, str) or not _SUDOUSER_NAME_RE.match(user):
raise DirectAdminException(
"Refusing to manage sudouser: invalid username %r" % (user,)
)
return user
def get_user_domains(path=_VIRTUAL_DOMAINOWNERS) -> Dict[str, str]:
"""Return a mapping from domain name to user name owning this domain."""
domains = {}
with open(path, "rb") as f:
for bline in f:
try:
line = bline.decode()
except UnicodeDecodeError as e:
logger.warning("Broken line in %s: %r (%s)", path, bline, e)
continue
pos = line.find(":")
if pos != -1:
domains[line[:pos].strip()] = line[pos + 1 :].strip()
return domains
async def get_directadmin_version() -> Version:
cmd = ["/usr/local/directadmin/directadmin", "v"]
retcode, stdout, stderr = await run(cmd)
try:
version_pattern = rb"^(Version: )?DirectAdmin (v.)?([\d.]+)"
result = re.search(version_pattern, stdout, flags=re.MULTILINE)
return Version(result.group(3).decode())
except (ValueError, AttributeError):
raise PanelException(
"Failed to parse directadmin version."
f" {retcode=}, {stdout=}, {stderr=}"
)
class DirectAdmin(base.AbstractPanel):
NAME = "DirectAdmin"
DA_BINARY = DA_FILE
OPEN_PORTS = {
"tcp": {
"in": ["465"] + TCP_PORTS_DA,
"out": ["113"] + TCP_PORTS_DA,
},
"udp": {
"in": ["20", "21", "53", "443", "35000-35999", "80"],
"out": ["20", "21", "53", "113", "123", "35000-35999"],
},
}
exception = DirectAdminException
@classmethod
def is_installed(cls):
return is_directadmin_installed()
@classmethod
async def version(cls):
# example output 'Version: DirectAdmin v.1.53.0'
return str(await get_directadmin_version())
@base.ensure_valid_panel()
async def add_sudouser(self, user):
_validate_sudouser(user)
if user in self._get_admins() or os.environ.get("usertype") == "admin":
retcode, _out, err = await run(["gpasswd", "-a", user, SUDO_GROUP])
if retcode != 0:
# Tolerate non-zero exit (matches prior os.system behaviour);
# batch callers rely on this, e.g. gpasswd -d on absent users.
logger.warning(
"gpasswd -a failed for %r: rc=%s err=%r",
user,
retcode,
err,
)
@base.ensure_valid_panel()
async def delete_sudouser(self, user):
_validate_sudouser(user)
if user in self._get_admins():
retcode, _out, err = await run(["gpasswd", "-d", user, SUDO_GROUP])
if retcode != 0:
# See add_sudouser; tolerate non-zero exit.
logger.warning(
"gpasswd -d failed for %r: rc=%s err=%r",
user,
retcode,
err,
)
@staticmethod
def _add_line(path, content):
with open(path, "r+") as f:
content += "\n"
if content not in f.readlines():
f.write(content)
@staticmethod
def _remove_line(path, content):
with open(path, "r+") as f:
data = "".join(line for line in f if content not in line.strip())
f.seek(0)
f.truncate(0)
f.write(data)
def _get_admins(self):
with open("/usr/local/directadmin/data/admin/admin.list", "r") as f:
admin_list = f.read().split()
return admin_list
def _get_resellers(self):
with open("/usr/local/directadmin/data/admin/reseller.list", "r") as f:
reseller_list = f.read().split()
return reseller_list
def _create_hook(self, hook, content):
path = os.path.join(HOOKS_DIR, hook)
if not os.path.exists(path):
open(path, "w").close()
self._add_line(path, "#!/bin/sh")
uid = pwd.getpwnam("diradmin").pw_uid
gid = pwd.getpwnam("diradmin").pw_uid
os.chown(path, uid, gid)
os.chmod(path, 0o700)
self._add_line(path, content)
def _delete_hook(self, hook, content):
path = os.path.join(HOOKS_DIR, hook)
if os.path.exists(path):
self._remove_line(path, content)
@base.ensure_valid_panel()
async def enable_imunify_plugin(self, name=None):
os.system("/usr/sbin/groupadd -f {}".format(SUDO_GROUP))
self._add_line("/etc/sudoers", SUDO_LINE)
self._add_line("/etc/sudoers", SUDO_TTY_LINE)
for user in self._get_admins():
try:
await self.add_sudouser(user)
except DirectAdminException as exc:
logger.warning(
"Skipping invalid sudouser entry %r: %s", user, exc
)
self._create_hook(
"user_create_post.sh",
'/usr/bin/imunify360-agent add-sudouser --user "$username"',
)
self._create_hook(
"user_destroy_pre.sh",
'/usr/bin/imunify360-agent delete-sudouser --user "$username"',
)
self._create_hook(
"user_restore_post.sh",
'/usr/bin/imunify360-agent add-sudouser --user "$username"',
)
@base.ensure_valid_panel()
async def disable_imunify_plugin(self, plugin_name=None):
self._remove_line("/etc/sudoers", SUDO_LINE)
self._remove_line("/etc/sudoers", SUDO_TTY_LINE)
for user in self._get_admins():
try:
await self.delete_sudouser(user)
except DirectAdminException as exc:
logger.warning(
"Skipping invalid sudouser entry %r: %s", user, exc
)
os.system("/usr/sbin/groupdel {}".format(SUDO_GROUP))
self._delete_hook(
"user_create_post.sh",
'/usr/bin/imunify360-agent add-sudouser --user "$username"',
)
self._delete_hook(
"user_destroy_pre.sh",
'/usr/bin/imunify360-agent delete-sudouser --user "$username"',
)
self._delete_hook(
"user_restore_post.sh",
'/usr/bin/imunify360-agent add-sudouser --user "$username"',
)
async def get_users(self) -> List[str]:
"""
:return: list: list of directadmin users
"""
return list(set(get_user_domains().values()))
async def get_user_domains(self):
"""
:return: list: domains hosted on server via directadmin
"""
return list(get_user_domains().keys())
async def get_domain_to_owner(self):
"""
:return: domain to list of users pairs
"""
return {domain: [user] for domain, user in get_user_domains().items()}
async def get_domains_per_user(self):
"""
:return: user to list of domains pairs
"""
user_to_domains = defaultdict(list)
for domain, user in get_user_domains().items():
user_to_domains[user].append(domain)
return user_to_domains
def basedirs(self) -> Set[str]:
return {BASE_DIR}
async def docroots_info(self) -> Dict:
if await get_directadmin_version() >= Version("1.62.8"):
return await self.docroots_info_new()
return await self.docroots_info_legacy()
async def docroots_info_new(self) -> Dict:
cmd = ["/usr/local/directadmin/directadmin", "--root-auth-url"]
with timeit("Call DA binary to obtain auth URL", logger):
retcode, stdout, stderr = await run(cmd)
if retcode != 0:
raise PanelException(
f"Failed to obtain auth URL. Unexpected return code {retcode}."
f" stdout={stdout!r}, stderr={stderr!r}"
)
parsed_url = urllib.parse.urlparse(stdout.decode().strip())
basic_auth, domain = parsed_url.netloc.split("@")
basic_auth = base64.standard_b64encode(basic_auth.encode()).decode()
document_roots_url = "/".join(
[
parsed_url._replace(netloc=domain).geturl(),
"CMD_API_DOMAIN?json=yes&action=document_root_all",
]
)
logger.info("Document roots URL: %s", document_roots_url)
loop = asyncio.get_event_loop()
request = urllib.request.Request(
document_roots_url,
headers={"Authorization": f"Basic {basic_auth}"},
method="GET",
)
return await loop.run_in_executor(None, self._do_request, request)
async def docroots_info_legacy(self) -> Dict:
cmd = [
"/usr/local/directadmin/directadmin",
"--DocumentRoot",
]
with timeit("Call DA binary to obtain all docroots", logger):
ret, out, err = await run(cmd)
if ret != 0 and ret != 1:
raise PanelException(
"Failed to obtain document roots. Unexpected return code"
f" {ret}. stdout={out!r}, stderr={err!r}"
)
try:
output = json.loads(out.decode())
except json.JSONDecodeError as e:
raise PanelException(
f"Failed to obtain document roots. Failed to decode json {e}."
)
return output
@staticmethod
def parse_document_root_output(output) -> Dict:
ret = dict()
for username, userdata in output["users"].items():
for domainname, domaindata in userdata["domains"].items():
if domaindata.get("public_html"):
ret[domaindata["public_html"]] = domainname
for _, sub_data in domaindata.get("subdomains", {}).items():
if sub_data.get("public_html"):
ret[sub_data["public_html"]] = domainname
return ret
async def list_docroots(self) -> Dict[str, str]:
info = await self.docroots_info()
return self.parse_document_root_output(info)
async def get_user_details(self) -> Dict[str, Dict[str, str]]:
res = {}
usernames = await self.get_users()
admins = set(self._get_admins())
resellers = set(self._get_resellers())
for username in usernames:
try:
parsed_config = self.get_user_details_for_username(username)
level = base.UserLevel.REGULAR_USER
if username in resellers:
level = base.UserLevel.RESSELER
if username in admins:
level = base.UserLevel.ADMIN
res[username] = {
"locale": parsed_config.get("language", ""),
"email": parsed_config.get("email", ""),
"parent": parsed_config.get("creator", ""),
"suspended": parsed_config.get("suspended") == "yes",
"level": int(level),
}
except Exception as e:
res[username] = {
"email": "",
"locale": "",
}
logger.warning(
"Failed to get_user_details: %s %s", username, e
)
return res
def get_user_details_for_username(self, username) -> Dict[str, str]:
"""
Implementation taken from
https://github.com/patchman-cloudlinux/patchman2-client/blob/05f54db63639b939c055a9543e82bc9690559965/src/platform/platforms/directadmin.cpp#L315
directadmin::get_user_details
"""
user_conf_str = Path(
USERS_CONF_DIR, f"{username}/user.conf"
).read_text()
parsed_config = configparser.ConfigParser()
user_conf_str = "[top]\n" + user_conf_str
parsed_config.read_string(user_conf_str)
parsed_config = parsed_config["top"]
return parsed_config
@retry_on(
PanelException,
on_error=backoff_sleep,
timeout=HTTP_REQUEST_RETRY_TIMEOUT,
)
def _do_request(self, request: urllib.request.Request) -> Any:
try:
with urllib.request.urlopen(
request, timeout=Core.DEFAULT_SOCKET_TIMEOUT
) as response:
if response.status != 200:
raise PanelException(
"status code is {}".format(response.status)
)
return json.loads(response.read().decode())
except (
UnicodeDecodeError,
http.client.HTTPException,
json.JSONDecodeError,
socket.timeout,
urllib.error.URLError,
) as e:
raise PanelException from e
async def panel_user_link(self, username) -> str:
"""
Returns panel url
:return: str
"""
return ""
@classmethod
async def get_user_domains_details(
cls, username: str
) -> list[base.DomainData]:
return []
Directory Contents
Dirs: 1 × Files: 3