"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import json
import logging
import pwd
import uuid
from collections import defaultdict
from functools import lru_cache, wraps
from itertools import islice
from pathlib import Path
from typing import Any, Callable, Coroutine
from urllib.parse import parse_qsl, urlencode, urlparse, urlunparse
import diskcache
from defence360agent.internals.iaid import IndependentAgentIDAPI
from defence360agent.subsys.panels.base import PanelException
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.utils import (
log_error_and_ignore,
safe_fileops,
)
from imav.malwarelib.api.imunify_patch_subscription import (
ImunifyPatchSubscriptionAPI,
)
from imav.malwarelib.api.vulnerability import VulnerabilityAPI
from imav.malwarelib.config import VulnerabilityHitStatus
from imav.malwarelib.model import VulnerabilityHit
IMUNIFY_PATCH_ID_FILE = ".imunify_patch_id"
PURCHASE_URL_MAX_WEBSITES = 5 # Take only first 5 domains for better display in UI. Not used anywhere else.
MAX_SEVERITY_COUNT = 10
logger = logging.getLogger(__name__)
class PurchaseUrlCache:
"""Disk-based cache decorator for async functions.
This class can be used as a decorator to cache results of async functions
on disk with automatic expiration. The underlying diskcache.Cache
is lazily initialized on first access to avoid import-time errors
when the cache directory doesn't exist.
Usage:
@purchase_url_cache(ttl=3600)
async def get_purchase_url(username: str) -> str | None:
...
"""
CACHE_DIR = "/var/imunify360/.cache/purchase_url"
DEFAULT_TTL = 3600 # 1 hour in seconds
KEY_PREFIX = "purchase_url"
TIMEOUT = 60 # disk operation timeout
SIZE_LIMIT = 10 * 1024 * 1024 # 10 MB
def __init__(self):
self._cache: diskcache.Cache | None = None
self._locks: defaultdict[str, asyncio.Lock] = defaultdict(asyncio.Lock)
@property
def cache(self) -> diskcache.Cache:
"""Lazily initialize and return the diskcache.Cache instance."""
if self._cache is None:
self._cache = diskcache.Cache(
self.CACHE_DIR,
timeout=self.TIMEOUT,
size_limit=self.SIZE_LIMIT,
)
return self._cache
@staticmethod
def _make_key(username: str) -> str:
"""Generate a cache key for the given username.
Args:
username: The username to generate a key for.
Returns:
A string with the prefixed username.
"""
return f"{PurchaseUrlCache.KEY_PREFIX}:{username}"
def _get(self, username: str) -> str | None:
"""Get a cached value for the given username."""
try:
cache_key = self._make_key(username)
value = self.cache.get(cache_key)
if value is not None:
return str(value)
return None
except Exception as e:
logger.warning("Failed to read from purchase URL cache: %s", e)
return None
def _set(self, username: str, value: str, ttl: int) -> None:
"""Store a value in the cache."""
try:
cache_key = self._make_key(username)
self.cache.set(cache_key, value, expire=ttl)
except Exception as e:
logger.warning("Failed to write to purchase URL cache: %s", e)
def clear(self, username: str | None = None) -> None:
"""Clear the purchase URL cache.
Args:
username: If provided, clear only the cache entry for this user.
If None, clear all cache entries.
"""
try:
if username is None:
self.cache.clear()
else:
cache_key = self._make_key(username)
self.cache.delete(cache_key)
except Exception as e:
logger.warning("Failed to clear purchase URL cache: %s", e)
def __call__(
self,
ttl: int = DEFAULT_TTL,
) -> Callable[
[Callable[[str], Coroutine[Any, Any, str | None]]],
Callable[[str], Coroutine[Any, Any, str | None]],
]:
"""Create a decorator that caches async function results.
Args:
ttl: Time-to-live for cache entries in seconds.
Defaults to DEFAULT_TTL (3600 seconds).
Returns:
A decorator function.
"""
def decorator(
func: Callable[[str], Coroutine[Any, Any, str | None]],
) -> Callable[[str], Coroutine[Any, Any, str | None]]:
@wraps(func)
async def wrapper(username: str) -> str | None:
# Check cache without lock first (fast path)
cached_value = self._get(username)
if cached_value is not None:
logger.info(
"Cache hit for username: %s, cached value: %s",
username,
cached_value,
)
return cached_value
# Acquire per-username lock to avoid concurrent calls on cache miss
async with self._locks[username]:
# Double-check cache after acquiring lock
cached_value = self._get(username)
if cached_value is not None:
logger.info(
"Cache hit (after lock) for username: %s, cached"
" value: %s",
username,
cached_value,
)
return cached_value
result = await func(username)
if result is not None:
logger.info(
"Cache miss for username: %s, result: %s",
username,
result,
)
self._set(username, result, ttl)
return result
return wrapper
return decorator
purchase_url_cache = PurchaseUrlCache()
class ImunifyPatchIdError(Exception):
pass
class DiskQuotaError(Exception):
pass
ImunifyPatchUserId = str
async def ensure_id_file(username: str) -> ImunifyPatchUserId:
"""Ensure the Imunify Patch ID file exists for the given user.
This function checks if the Imunify Patch ID file exists in the user's
home directory. If it does not exist, it generates a new ID, writes it
to the file, and returns the ID. If the file already exists, it reads
and returns the existing ID.
Args:
username (str): The username for which to ensure the ID file.
Returns:
ImunifyPatchUserId: The Imunify Patch user ID.
"""
id_file = await _get_id_file(username)
if _id := _read_id_file(id_file):
return _id
_id = _generate_id()
await _write_id_file(id_file, _id)
return _id
@log_error_and_ignore()
async def get_imunify_patch_id(username: str) -> ImunifyPatchUserId | None:
async with get_lock(username):
try:
return await ensure_id_file(username)
except DiskQuotaError as e:
logger.warning(
"Unable to ensure %s user id file %s",
username,
e,
)
@lru_cache(maxsize=None)
def get_lock(username: str):
"""
Get a lock for the given username.
Username argument used as cache key in lru_cache.
"""
return asyncio.Lock()
async def _get_id_file(username: str) -> Path:
"""Get a file with Imunify Patch user id and create it if does not exist"""
try:
user_pwd = pwd.getpwnam(username)
except KeyError as e:
logger.error(f"No such user: {username}")
raise ImunifyPatchIdError(f"No such user {username}") from e
else:
id_file = Path(user_pwd.pw_dir) / IMUNIFY_PATCH_ID_FILE
if not id_file.exists():
if not id_file.parent.exists():
logger.error(f"No such user homedir: {id_file.parent}")
raise ImunifyPatchIdError(
f"No such user homedir: {id_file.parent}"
)
try:
await safe_fileops.touch(str(id_file))
except (PermissionError, OSError) as e:
if "Disk quota exceeded" in str(e):
raise DiskQuotaError from e
else:
logger.error(
"Unable to put %s in user home dir %s",
IMUNIFY_PATCH_ID_FILE,
e,
)
raise ImunifyPatchIdError from e
return id_file
def _generate_id() -> ImunifyPatchUserId:
"""Generate Imunify Patch id"""
return uuid.uuid4().hex
def _read_id_file(id_file: Path) -> ImunifyPatchUserId | None:
"""
Read Imunify Patch ID from `id_file`.
If ID is not found, return `None`.
"""
with id_file.open("r") as f:
for line in reversed(f.readlines()):
if line and not line.startswith("#"):
if imunify_patch_id := line.strip():
return imunify_patch_id
logger.warning(f"Cannot parse {id_file}, file is corrupted or empty")
return None
async def _write_id_file(id_file: Path, _id: ImunifyPatchUserId) -> None:
"""Write Imunify Patch id to `id_file`."""
text = (
"# DO NOT EDIT\n"
"# This file contains Imunify Patch id unique to this user\n"
"\n"
f"{_id}\n"
)
try:
await safe_fileops.write_text(str(id_file), text)
except (OSError, PermissionError) as e:
logger.error(
"Unable to write %s in user home dir: %s", IMUNIFY_PATCH_ID_FILE, e
)
raise ImunifyPatchIdError from e
@purchase_url_cache(ttl=3600)
async def get_imunify_patch_purchase_url(username: str) -> str | None:
"""Get the Imunify Patch purchase URL for a given user.
This function builds a purchase URL with user-specific parameters.
Results are cached on disk for PurchaseUrlCache.TTL seconds (1 hour).
Imunify Patch purchase URL template:
https://www.cloudlinux.com/purchase-imunify-patch?iaid=
&imunify_patch_user_id=&server_ip=12.23.34.45
&username=johndoe&websites=example.com,anotherexample.com
Defined in Jira ticket: https://cloudlinux.atlassian.net/browse/DEF-32303
Args:
username: The username to generate the purchase URL for.
Returns:
The purchase URL string, or None if not eligible.
"""
purchase_eligibility = (
await ImunifyPatchSubscriptionAPI.get_purchase_eligibility()
)
if not purchase_eligibility.purchase_url:
return None
iaid = IndependentAgentIDAPI.get_iaid()
imunify_patch_user_id = await get_imunify_patch_id(username)
panel_manager = HostingPanel()
server_ip = panel_manager.get_server_ip()
user_domains = (await panel_manager.get_domains_per_user()).get(
username, []
)
total_websites = len(user_domains)
try:
domain_paths = (await panel_manager.get_domain_paths()).items()
except PanelException as e:
logger.error("Error fetching domain paths: %s", e)
domain_paths = {}
user_domain_paths = {
domain: paths
for domain, paths in domain_paths
if domain in user_domains
}
hits = VulnerabilityHit.select().where(
(VulnerabilityHit.user == username)
& (VulnerabilityHit.status == VulnerabilityHitStatus.VULNERABLE)
)
vulnerable_domains = [
domain
for hit in hits
for domain, doc_roots in user_domain_paths.items()
for path in doc_roots
if hit.orig_file.startswith(path)
]
vulnerabilities = group_by_severity(
await VulnerabilityAPI.get_details(
VulnerabilityHit.get_vulnerabilities_ids(
[hit.as_dict() for hit in hits]
)
)
)
url_args = {
"iaid": iaid,
"imunify_patch_user_id": imunify_patch_user_id or "",
"subscription_target_id": imunify_patch_user_id or "",
"server_ip": server_ip,
"username": username,
"websites": ",".join(user_domains[:PURCHASE_URL_MAX_WEBSITES]),
"total_websites": total_websites,
"vulnerable_domains": len(vulnerable_domains),
"vulnerabilities": json.dumps(vulnerabilities, sort_keys=True),
}
return build_purchase_url(purchase_eligibility.purchase_url, url_args)
def build_purchase_url(base_url: str, params: dict) -> str:
parsed_url = urlparse(base_url)
existing_qs = dict(parse_qsl(parsed_url.query, keep_blank_values=True))
existing_qs.update({k: v for k, v in params.items() if v is not None})
new_parsed = parsed_url._replace(query=urlencode(existing_qs, doseq=True))
return urlunparse(new_parsed)
def group_by_severity(
vulnerabilities: dict[str, Any],
limit: int = MAX_SEVERITY_COUNT,
) -> dict[str, dict[str, int]]:
result = defaultdict(lambda: defaultdict(int))
for item in islice(vulnerabilities.values(), limit):
app_name = item["app"]
severity = item.get("severity", "UNKNOWN")
if severity in ("HIGH", "MEDIUM", "LOW", "UNKNOWN"):
result[app_name][severity] += 1
return dict(result)