PHP 8.3.31
Preview: wordpress_security_plugin.py Size: 6.77 KB
/proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/simple_rpc/wordpress_security_plugin.py

import logging
import os
import pwd

from defence360agent.rpc_tools import ValidationError
from defence360agent.rpc_tools.lookup import (
    CommonEndpoints,
    RootEndpoints,
    bind,
)
from defence360agent.utils import Scope, is_root_user
from defence360agent.contracts.messages import MessageType
from defence360agent.model.wordpress_incident import get_wordpress_incidents
from defence360agent.wordpress.site_repository import (
    get_installed_sites_paginated,
)
from defence360agent.wordpress.utils import get_domain_paths

logger = logging.getLogger(__name__)


def get_user_id_and_site_for_query(
    user: str | None = None, site_search: str | None = None
) -> tuple[int | None, str | None]:
    """
    Determine the user_id and site_path for filtering WordPress incidents.

    Three calling contexts:
    1. Root user: Can query all incidents or filter by specific user
    2. Non-root user: Can only query their own incidents (user/site_search ignored)
    3. Proxy service: Both user and site_search must be set, restricted to that site

    Args:
        user: Username to filter by
        site_search: Site path to filter by

    Returns:
        Tuple of (user_id, site_path) to filter by, or (None, None) for all

    Raises:
        KeyError: If the specified user doesn't exist
        ValueError: If proxy service call is missing required parameters
    """
    current_uid = os.getuid()

    if is_root_user():
        # Root user can see all incidents or filter by user
        logger.debug("Root user querying incidents, user filter: %s", user)
        user_id = None  # Root can see all incidents by default
        if user is not None:
            # Root user specified a username to filter by
            try:
                user_id = pwd.getpwnam(user).pw_uid
                logger.debug(
                    "Filtering incidents for user %s (uid=%d)", user, user_id
                )
            except KeyError:
                logger.warning("User not found: %s", user)
                raise KeyError(f"User '{user}' not found")
        return user_id, site_search

    return current_uid, site_search


class WordpressEndpoints(RootEndpoints):
    SCOPE = Scope.AV_IM360

    @bind("wordpress-plugin", "install-on-new-sites")
    async def wordpress_plugin_install(self):
        await self._sink.process_message(
            MessageType.WordpressPluginAction(action="install_on_new_sites")
        )

    @bind("wordpress-plugin", "tidy-up")
    async def wordpress_plugin_tidy_up(self):
        await self._sink.process_message(
            MessageType.WordpressPluginAction(action="tidy_up")
        )

    @bind("wordpress-plugin", "update")
    async def wordpress_plugin_update(self):
        await self._sink.process_message(
            MessageType.WordpressPluginAction(action="update_existing")
        )

    @bind("wordpress-plugin", "install-and-update")
    async def wordpress_plugin_install_and_update(self):
        await self._sink.process_message(
            MessageType.WordpressPluginAction(action="install_and_update")
        )


class WordpressCommonEndpoints(CommonEndpoints):
    SCOPE = Scope.AV_IM360

    @bind("wordpress-plugin", "list-incidents")
    async def wordpress_plugin_list_incidents(
        self,
        user: str | None = None,
        site_search: str | None = None,
        limit: int = 50,
        offset: int = 0,
        by_abuser_ip: str | None = None,
        by_country_code: str | None = None,
        by_domain: str | None = None,
        search: str | None = None,
        since: int | None = None,
        to: int | None = None,
        order_by: list | None = None,
    ) -> list[dict]:
        """
        List WordPress security incidents.

        Three calling contexts:
        1. Root user: Can query all incidents or filter by specific user
        2. Non-root user: Can only query their own incidents
        3. Proxy service: Both user and site_search must be set, restricted to that site

        Args:
            user: Username to filter by (root or proxy service)
            site_search: Site path to filter by (proxy service only)
            limit: Maximum number of incidents to return
            offset: Number of incidents to skip
            by_abuser_ip: Filter by attacker IP address
            by_country_code: Filter by country code
            by_domain: Filter by domain
            search: Search across multiple fields
            since: Filter by timestamp >= this value (unix timestamp)
            to: Filter by timestamp <= this value (unix timestamp)
            order_by: List of fields to order by (e.g., ['timestamp-', 'severity-'])

        Returns:
            List of incident dictionaries

        Raises:
            ValidationError: If the specified user doesn't exist
        """
        try:
            user_id, site_path = get_user_id_and_site_for_query(
                user, site_search
            )
        except KeyError as e:
            raise ValidationError(str(e)) from e

        incidents = get_wordpress_incidents(
            limit=limit,
            offset=offset,
            user_id=user_id,
            by_abuser_ip=by_abuser_ip,
            by_country_code=by_country_code,
            by_domain=by_domain,
            search=search,
            site_search=site_path,
            since=since,
            to=to,
            order_by=order_by,
        )

        # Fields transformation for UI
        for incident in incidents:
            incident["times"] = incident.pop("retries")
            country = incident.pop("country")
            incident["country"] = (
                {"code": country} if country is not None else None
            )

        return incidents

    @bind("wordpress-plugin", "list-sites")
    async def list_sites(self, limit=50, offset=0, user=None):
        """
        List WordPress sites with Imunify plugin installed.

        For root users: returns all sites.
        For non-root users: returns only sites belonging to that user.
        """
        uid = None
        if user:
            try:
                uid = pwd.getpwnam(user).pw_uid
            except KeyError:
                return 0, []

        max_count, sites = get_installed_sites_paginated(
            uid=uid, limit=limit, offset=offset
        )

        # Get docroot to domain mapping from control panel
        docroot_domains = await get_domain_paths()

        # Build result with primary domain resolution
        items = []
        for site in sites:
            # Get domains from control panel, fall back to stored domain
            domains = docroot_domains.get(site.docroot, [])
            primary_domain = domains[0] if domains else site.domain

            items.append(
                {
                    "domain": primary_domain,
                    "docroot": site.docroot,
                }
            )

        return max_count, items

Directory Contents

Dirs: 3 × Files: 13

Name Size Perms Modified Actions
schema DIR
- drwxr-xr-x 2026-06-08 20:23:14
Edit Download
- drwxr-xr-x 2026-06-08 20:23:14
Edit Download
- drwxr-xr-x 2026-06-08 20:23:14
Edit Download
1.49 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
7.07 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
11.05 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
2.82 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
1.47 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
2.70 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
317 B lrw-r--r-- 2026-05-26 21:20:44
Edit Download
4.32 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
1.95 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
7.06 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
6.77 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
10.09 KB lrw-r--r-- 2026-05-26 21:20:44
Edit Download
15.72 KB lrw-r--r-- 2026-05-26 21:20:45
Edit Download

If ZipArchive is unavailable, a .tar will be created (no compression).