Preview: wordpress_security_plugin.py
Size: 6.77 KB
/opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/simple_rpc/wordpress_security_plugin.py
import logging
import os
import pwd
from defence360agent.rpc_tools import ValidationError
from defence360agent.rpc_tools.lookup import (
CommonEndpoints,
RootEndpoints,
bind,
)
from defence360agent.utils import Scope, is_root_user
from defence360agent.contracts.messages import MessageType
from defence360agent.model.wordpress_incident import get_wordpress_incidents
from defence360agent.wordpress.site_repository import (
get_installed_sites_paginated,
)
from defence360agent.wordpress.utils import get_domain_paths
logger = logging.getLogger(__name__)
def get_user_id_and_site_for_query(
user: str | None = None, site_search: str | None = None
) -> tuple[int | None, str | None]:
"""
Determine the user_id and site_path for filtering WordPress incidents.
Three calling contexts:
1. Root user: Can query all incidents or filter by specific user
2. Non-root user: Can only query their own incidents (user/site_search ignored)
3. Proxy service: Both user and site_search must be set, restricted to that site
Args:
user: Username to filter by
site_search: Site path to filter by
Returns:
Tuple of (user_id, site_path) to filter by, or (None, None) for all
Raises:
KeyError: If the specified user doesn't exist
ValueError: If proxy service call is missing required parameters
"""
current_uid = os.getuid()
if is_root_user():
# Root user can see all incidents or filter by user
logger.debug("Root user querying incidents, user filter: %s", user)
user_id = None # Root can see all incidents by default
if user is not None:
# Root user specified a username to filter by
try:
user_id = pwd.getpwnam(user).pw_uid
logger.debug(
"Filtering incidents for user %s (uid=%d)", user, user_id
)
except KeyError:
logger.warning("User not found: %s", user)
raise KeyError(f"User '{user}' not found")
return user_id, site_search
return current_uid, site_search
class WordpressEndpoints(RootEndpoints):
SCOPE = Scope.AV_IM360
@bind("wordpress-plugin", "install-on-new-sites")
async def wordpress_plugin_install(self):
await self._sink.process_message(
MessageType.WordpressPluginAction(action="install_on_new_sites")
)
@bind("wordpress-plugin", "tidy-up")
async def wordpress_plugin_tidy_up(self):
await self._sink.process_message(
MessageType.WordpressPluginAction(action="tidy_up")
)
@bind("wordpress-plugin", "update")
async def wordpress_plugin_update(self):
await self._sink.process_message(
MessageType.WordpressPluginAction(action="update_existing")
)
@bind("wordpress-plugin", "install-and-update")
async def wordpress_plugin_install_and_update(self):
await self._sink.process_message(
MessageType.WordpressPluginAction(action="install_and_update")
)
class WordpressCommonEndpoints(CommonEndpoints):
SCOPE = Scope.AV_IM360
@bind("wordpress-plugin", "list-incidents")
async def wordpress_plugin_list_incidents(
self,
user: str | None = None,
site_search: str | None = None,
limit: int = 50,
offset: int = 0,
by_abuser_ip: str | None = None,
by_country_code: str | None = None,
by_domain: str | None = None,
search: str | None = None,
since: int | None = None,
to: int | None = None,
order_by: list | None = None,
) -> list[dict]:
"""
List WordPress security incidents.
Three calling contexts:
1. Root user: Can query all incidents or filter by specific user
2. Non-root user: Can only query their own incidents
3. Proxy service: Both user and site_search must be set, restricted to that site
Args:
user: Username to filter by (root or proxy service)
site_search: Site path to filter by (proxy service only)
limit: Maximum number of incidents to return
offset: Number of incidents to skip
by_abuser_ip: Filter by attacker IP address
by_country_code: Filter by country code
by_domain: Filter by domain
search: Search across multiple fields
since: Filter by timestamp >= this value (unix timestamp)
to: Filter by timestamp <= this value (unix timestamp)
order_by: List of fields to order by (e.g., ['timestamp-', 'severity-'])
Returns:
List of incident dictionaries
Raises:
ValidationError: If the specified user doesn't exist
"""
try:
user_id, site_path = get_user_id_and_site_for_query(
user, site_search
)
except KeyError as e:
raise ValidationError(str(e)) from e
incidents = get_wordpress_incidents(
limit=limit,
offset=offset,
user_id=user_id,
by_abuser_ip=by_abuser_ip,
by_country_code=by_country_code,
by_domain=by_domain,
search=search,
site_search=site_path,
since=since,
to=to,
order_by=order_by,
)
# Fields transformation for UI
for incident in incidents:
incident["times"] = incident.pop("retries")
country = incident.pop("country")
incident["country"] = (
{"code": country} if country is not None else None
)
return incidents
@bind("wordpress-plugin", "list-sites")
async def list_sites(self, limit=50, offset=0, user=None):
"""
List WordPress sites with Imunify plugin installed.
For root users: returns all sites.
For non-root users: returns only sites belonging to that user.
"""
uid = None
if user:
try:
uid = pwd.getpwnam(user).pw_uid
except KeyError:
return 0, []
max_count, sites = get_installed_sites_paginated(
uid=uid, limit=limit, offset=offset
)
# Get docroot to domain mapping from control panel
docroot_domains = await get_domain_paths()
# Build result with primary domain resolution
items = []
for site in sites:
# Get domains from control panel, fall back to stored domain
domains = docroot_domains.get(site.docroot, [])
primary_domain = domains[0] if domains else site.domain
items.append(
{
"domain": primary_domain,
"docroot": site.docroot,
}
)
return max_count, items
Directory Contents
Dirs: 3 × Files: 13