Richard Dern

Une façon créative de bloquer les indésirables

Une façon créative de bloquer les indésirables

L’architecture de mon réseau parlera peut-être aux plus vieux, mais elle fera probablement sourciller les plus jeunes et les cloud-natives. Ces derniers disposent d’une infinité d’outils pour mener à bien le genre de tâche à laquelle je me suis attaqué dernièrement.

Ma solution intervient en complément d’autres mesures sécuritaires (notamment IPS/IDS).

Elle est rendue possible notamment par le fait que mon site est statique, sauf pour la page de recherche qui sera traitée plus loin.

J’ai décrit à ChatGPT 5.4 ce que je voulais, c’est lui qui a produit le code nix et python.

Description de mon architecture réseau

Derrière ma Freebox en mode bridge se trouve mon routeur sous OPNsense, sur lequel est installé le plugin Caddy en tant que reverse-proxy pour tout service web que je désire rendre accessible depuis Internet. Ces services sont hébergés sur différentes machines physiques de mon réseau, derrière le routeur.

Plus spécifiquement, mon site est hébergé sur une machine sobrement appelée server-main. Cette machine, sous NixOS, héberge en particulier un autre serveur Caddy, qui permet d’accéder à ce site.

Vue simplifiée de mon architecture réseau.

Objectifs

Contraintes

Principe de fonctionnement

L’instance de Caddy sur server-main est configurée pour enregistrer les journaux des erreurs spécifiques au blog. Cela va inclure les tentatives d’accès à des fichiers inexistants, mais aussi des choses plus exotiques ou délibérées, notamment la recherche de scripts d’administration.

Un script, exécuté périodiquement, va analyser ces logs, et en extraire des informations utiles à la prise de décision : doit-on considérer que telle adresse IP a un comportement suspect ? Si on décide qu’un client a un comportement inapproprié, le script contacte le serveur OPNsense afin d’ajouter à un alias du firewall l’adresse IP incriminée.

Enfin, le script génère un flux RSS de ces erreurs, de sorte à me remonter l’information.

Définition d’un comportement suspect

Compte tenu du fait que mon site est statique, toute tentative de lire un fichier .php est considérée comme un scan agressif. Pour la même raison, toute requête de type POST est un comportement considéré agressif, sauf s’il survient dans le cadre d’une recherche. Enfin, j’ai constaté plusieurs clients qui scannent mes fichiers à la recherche d’une installation de WordPress, ou d’autres applications populaires, par exemple des outils de gestion de base de données. Comme la structure de WordPress est bien connue, toute tentative d’accès à un de ses dossiers aboutira à un blocage du client.

J’ai constaté, pendant l’écriture de ce système, que la majorité des clients à comportement suspect proviennent des services de cloud offerts par Microsoft qui, décidément, équipe aussi bien les administrations que les bandits…

FAQ

Pourquoi ne pas utiliser fail2ban comme tout le monde ?

Parce que fail2ban répond surtout à un besoin local : lire des journaux et modifier des règles sur la machine concernée. Mon besoin est un peu différent : je veux décider à partir des journaux HTTP de server-main, mais appliquer le blocage à l’entrée de mon réseau, sur OPNsense, via son API.

Pourquoi des flux RSS ?

Pour éviter le temps réel, qui me noierait sous les notifications. Parce que j’utilise déjà les flux RSS. Et parce qu’un flux RSS me permet de convoyer plus d’informations qu’une petite notification.

Pourquoi pas une application web complète et pratique ?

Parce que je n’en ai pas besoin. J’ai juste besoin d’un script, pas d’une usine à gaz, même si l’on pourra objecter que mon script en est déjà une à sa manière.

Mise en place

Module nix

{ config, lib, pkgs, ... }:

let
  cfg = config.services.caddyErrorsRss;

  absolutePathType = lib.types.strMatching "^/.*";
  feedFileNameType = lib.types.strMatching "^[^/]+$";
  formatFeedHost = host:
    if lib.hasInfix ":" host && !(lib.hasPrefix "[" host && lib.hasSuffix "]" host) then
      "[${host}]"
    else
      host;

  stateDirectory = "caddy-errors-rss";
  stateDirectoryPath = "/var/lib/${stateDirectory}";
  defaultFeedDirectory = "/var/lib/${stateDirectory}-feed";
  feedFilePath = "${cfg.feedDirectory}/${cfg.feedFileName}";
  feedLink = "http://${formatFeedHost cfg.feedAddress}:${toString cfg.listenPort}/${cfg.feedFileName}";

  pythonEnv = pkgs.python3.withPackages (ps: with ps; [
    dnspython
    feedgen
    ipwhois
    requests
  ]);

  rssScript = pkgs.writeTextFile {
    name = "caddy-errors-rss.py";
    executable = true;

    # C'est là que j'ai placé le script python, mais rien n'empêche de le mettre ailleurs
    text = builtins.readFile ../scripts/caddy-errors-rss.py;
  };

  runtimeConfig = pkgs.writeText "caddy-errors-rss-config.json" (builtins.toJSON {
    logFile = cfg.logFile;
    outputFile = feedFilePath;
    cacheFile = "${stateDirectoryPath}/ip-cache.json";
    feedLink = feedLink;
    language = cfg.language;
    maxItems = cfg.analysis.maxItems;
    maxUrlsPerItem = cfg.analysis.maxUrlsPerItem;
    cacheTtlSeconds = cfg.analysis.cacheTtlSeconds;
    tailLines = cfg.analysis.tailLines;
    networkLookupTimeoutSeconds = cfg.analysis.networkLookupTimeoutSeconds;
    trustedBlockExcludeNets = cfg.blocking.trustedExcludeNets;
    allowedPostUris = cfg.blocking.allowedPostUris;
    suspiciousPathPrefixes = cfg.blocking.suspiciousPathPrefixes;
    opnsense = lib.optionalAttrs cfg.opnsense.enable {
      baseUrl = cfg.opnsense.apiBaseUrl;
      timeoutSeconds = cfg.opnsense.apiTimeoutSeconds;
      verifyTls = cfg.opnsense.verifyTls;
      alias = {
        name = cfg.opnsense.aliasName;
        type = cfg.opnsense.aliasType;
        description = cfg.opnsense.aliasDescription;
      };
      apiPaths = {
        aliasGetUuid = cfg.opnsense.apiPaths.aliasGetUuid;
        aliasAddItem = cfg.opnsense.apiPaths.aliasAddItem;
        aliasSetItem = cfg.opnsense.apiPaths.aliasSetItem;
        aliasReconfigure = cfg.opnsense.apiPaths.aliasReconfigure;
        aliasUtilList = cfg.opnsense.apiPaths.aliasUtilList;
        aliasUtilAdd = cfg.opnsense.apiPaths.aliasUtilAdd;
      };
    };
  });
in {
  options.services.caddyErrorsRss = {
    enable = lib.mkOption {
      type = lib.types.bool;
      default = true;
      description = "Enable generation of the Caddy error RSS feed.";
    };

    user = lib.mkOption {
      type = lib.types.str;
      default = "caddy";
      description = "User used by the RSS generation service.";
    };

    group = lib.mkOption {
      type = lib.types.str;
      default = "caddy";
      description = "Group used by the RSS generation service.";
    };

    logFile = lib.mkOption {
      type = absolutePathType;
      default = "/var/log/caddy/caddy-errors.json";
      description = "Absolute path to the Caddy JSON error log file.";
    };

    feedDirectory = lib.mkOption {
      type = absolutePathType;
      default = defaultFeedDirectory;
      description = "Directory where the generated RSS feed is written.";
    };

    feedFileName = lib.mkOption {
      type = feedFileNameType;
      default = "index.xml";
      description = "File name used for the generated RSS feed.";
    };

    feedAddress = lib.mkOption {
      type = lib.types.str;
      default = "127.0.0.1";
      description = "Host name or IP address advertised in the generated feed link.";
    };

    listenPort = lib.mkOption {
      type = lib.types.port;
      default = 30083;
      description = "Local Caddy port used to serve the generated feed.";
    };

    language = lib.mkOption {
      type = lib.types.str;
      default = "fr-FR";
      description = "Language advertised in the generated RSS feed.";
    };

    onBootDelay = lib.mkOption {
      type = lib.types.str;
      default = "2m";
      description = "Delay before the timer triggers after boot.";
    };

    refreshInterval = lib.mkOption {
      type = lib.types.str;
      default = "15m";
      description = "Interval between RSS feed refreshes.";
    };

    analysis = lib.mkOption {
      type = lib.types.submodule {
        options = {
          maxItems = lib.mkOption {
            type = lib.types.ints.positive;
            default = 500;
            description = "Maximum number of distinct RSS items kept in the feed.";
          };

          maxUrlsPerItem = lib.mkOption {
            type = lib.types.ints.positive;
            default = 25;
            description = "Maximum number of URLs shown per IP in each RSS item.";
          };

          cacheTtlSeconds = lib.mkOption {
            type = lib.types.ints.positive;
            default = 7 * 24 * 3600;
            description = "TTL of cached IP intelligence lookups, in seconds.";
          };

          tailLines = lib.mkOption {
            type = lib.types.ints.positive;
            default = 50000;
            description = "Number of log lines read from the end of the Caddy error log.";
          };

          networkLookupTimeoutSeconds = lib.mkOption {
            type = lib.types.ints.positive;
            default = 8;
            description = "Timeout used for DNS, RDAP and OPNsense lookups.";
          };
        };
      };
      default = { };
      description = "Feed generation and log analysis limits.";
    };

    blocking = lib.mkOption {
      type = lib.types.submodule {
        options = {
          trustedExcludeNets = lib.mkOption {
            type = lib.types.listOf lib.types.str;
            default = [ ];
            description = "Networks that must never be auto-blocked.";
          };

          allowedPostUris = lib.mkOption {
            type = lib.types.listOf lib.types.str;
            default = [ ];
            description = "POST URIs that should not trigger OPNsense auto-blocking.";
          };

          suspiciousPathPrefixes = lib.mkOption {
            type = lib.types.listOf lib.types.str;
            default = [ ];
            description = "Common probe path prefixes that should trigger OPNsense auto-blocking.";
          };
        };
      };
      default = { };
      description = "Auto-blocking heuristics and exclusions.";
    };

    opnsense = lib.mkOption {
      type = lib.types.submodule {
        options = {
          enable = lib.mkOption {
            type = lib.types.bool;
            default = false;
            description = "Enable OPNsense alias management for suspicious clients.";
          };

          aliasName = lib.mkOption {
            type = lib.types.str;
            default = "suspicious_caddy_clients";
            description = "OPNsense alias used for suspicious IP auto-blocking.";
          };

          aliasType = lib.mkOption {
            type = lib.types.enum [ "host" "network" "external" ];
            default = "host";
            description = "Alias type created on OPNsense when the alias is missing.";
          };

          aliasDescription = lib.mkOption {
            type = lib.types.str;
            default = "Automatically maintained alias for suspicious Caddy clients";
            description = "Description used when creating the OPNsense alias.";
          };

          apiBaseUrl = lib.mkOption {
            type = lib.types.nullOr lib.types.str;
            default = null;
            description = "Base URL of the OPNsense API.";
          };

          apiTimeoutSeconds = lib.mkOption {
            type = lib.types.ints.positive;
            default = 8;
            description = "Timeout used for OPNsense API requests.";
          };

          verifyTls = lib.mkOption {
            type = lib.types.bool;
            default = false;
            description = "Whether to verify the OPNsense API TLS certificate.";
          };

          apiKeyFile = lib.mkOption {
            type = lib.types.nullOr absolutePathType;
            default = null;
            description = "Absolute path to the OPNsense API key file.";
          };

          apiSecretFile = lib.mkOption {
            type = lib.types.nullOr absolutePathType;
            default = null;
            description = "Absolute path to the OPNsense API secret file.";
          };

          apiPaths = lib.mkOption {
            type = lib.types.submodule {
              options = {
                aliasGetUuid = lib.mkOption {
                  type = lib.types.str;
                  default = "/api/firewall/alias/get_alias_u_u_i_d/{alias}";
                  description = "Path template used to retrieve an alias UUID by name.";
                };

                aliasAddItem = lib.mkOption {
                  type = lib.types.str;
                  default = "/api/firewall/alias/add_item";
                  description = "Path used to create a new alias.";
                };

                aliasSetItem = lib.mkOption {
                  type = lib.types.str;
                  default = "/api/firewall/alias/set_item/{uuid}";
                  description = "Path template used to update an existing alias.";
                };

                aliasReconfigure = lib.mkOption {
                  type = lib.types.str;
                  default = "/api/firewall/alias/reconfigure";
                  description = "Path used to apply alias configuration changes.";
                };

                aliasUtilList = lib.mkOption {
                  type = lib.types.str;
                  default = "/api/firewall/alias_util/list/{alias}";
                  description = "Path template used to list alias table members.";
                };

                aliasUtilAdd = lib.mkOption {
                  type = lib.types.str;
                  default = "/api/firewall/alias_util/add/{alias}";
                  description = "Path template used to add an address to an alias table.";
                };
              };
            };
            default = { };
            description = "OPNsense API paths used by the service.";
          };
        };
      };
      default = { };
      description = "OPNsense connectivity used for alias updates.";
    };
  };

  config = lib.mkIf cfg.enable {
    assertions = [
      {
        assertion = config.services.caddy.enable;
        message = "services.caddyErrorsRss requires services.caddy.enable = true.";
      }
      {
        assertion = !cfg.opnsense.enable || cfg.opnsense.apiBaseUrl != null;
        message = "services.caddyErrorsRss.opnsense.apiBaseUrl must be set when OPNsense support is enabled.";
      }
      {
        assertion = !cfg.opnsense.enable || cfg.opnsense.apiKeyFile != null;
        message = "services.caddyErrorsRss.opnsense.apiKeyFile must be set when OPNsense support is enabled.";
      }
      {
        assertion = !cfg.opnsense.enable || cfg.opnsense.apiSecretFile != null;
        message = "services.caddyErrorsRss.opnsense.apiSecretFile must be set when OPNsense support is enabled.";
      }
    ];

    # Le vhost caddy qui va permettre d'accéder au flux RSS
    services.caddy.virtualHosts = {
      ":${toString cfg.listenPort}".extraConfig = ''
        root * ${cfg.feedDirectory}
        file_server

        log {
          output discard
        }
      '';
    };

    systemd.tmpfiles.rules = [
      "d ${cfg.feedDirectory} 0755 ${cfg.user} ${cfg.group} -"
    ];

    systemd.services.caddy-errors-rss = {
      description = "Génération du flux RSS des erreurs Caddy";
      wants = [ "network-online.target" ];
      after = [ "network-online.target" "caddy.service" ];

      serviceConfig = {
        Type = "oneshot";
        User = cfg.user;
        Group = cfg.group;
        UMask = "0022";
        WorkingDirectory = stateDirectoryPath;
        StateDirectory = stateDirectory;

        ExecStart = "${pythonEnv}/bin/python3 ${rssScript} --config ${runtimeConfig}";

        LoadCredential = lib.optionals cfg.opnsense.enable [
          "opnsense_api_key:${cfg.opnsense.apiKeyFile}"
          "opnsense_api_secret:${cfg.opnsense.apiSecretFile}"
        ];

        ReadWritePaths = [ cfg.feedDirectory ];

        NoNewPrivileges = true;
        PrivateTmp = true;
        ProtectSystem = "strict";
        ProtectHome = true;
        ProtectKernelTunables = true;
        ProtectKernelModules = true;
        ProtectControlGroups = true;
        RestrictSUIDSGID = true;
        LockPersonality = true;
        MemoryDenyWriteExecute = true;
      };
    };

    systemd.timers.caddy-errors-rss = {
      description = "Reconstruction périodique du flux RSS des erreurs Caddy";
      wantedBy = [ "timers.target" ];
      timerConfig = {
        OnBootSec = cfg.onBootDelay;
        OnUnitActiveSec = cfg.refreshInterval;
        Persistent = true;
        Unit = "caddy-errors-rss.service";
      };
    };
  };
}

Script

from __future__ import annotations

import argparse
import datetime as dt
import hashlib
import ipaddress
import json
import os
from collections import deque
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib.parse import quote
import xml.etree.ElementTree as ET

import dns.exception
import dns.reversename
import dns.resolver
import requests
from feedgen.feed import FeedGenerator
from ipwhois.exceptions import BaseIpwhoisException, IPDefinedError
from ipwhois import IPWhois


LOCAL_TZ = dt.datetime.now().astimezone().tzinfo or dt.timezone.utc


@dataclass(frozen=True)
class AliasSpec:
    """Décrit l'alias OPNsense maintenu par le service."""

    name: str
    type: str
    description: str


@dataclass(frozen=True)
class ApiPaths:
    """Stocke les modèles de chemins de l'API OPNsense utilisés par le client."""

    alias_get_uuid: str
    alias_add_item: str
    alias_set_item: str
    alias_reconfigure: str
    alias_util_list: str
    alias_util_add: str


@dataclass(frozen=True)
class OPNsenseConfig:
    """Regroupe les paramètres OPNsense consommés par le script."""

    base_url: str
    timeout_seconds: int
    verify_tls: bool
    alias: AliasSpec
    api_paths: ApiPaths


@dataclass(frozen=True)
class Config:
    """Représente la configuration d'exécution entièrement parsée."""

    log_file: Path
    output_file: Path
    cache_file: Path
    feed_link: str
    language: str
    max_items: int
    max_urls_per_item: int
    cache_ttl_seconds: int
    tail_lines: int
    network_lookup_timeout_seconds: int
    trusted_block_exclude_nets: tuple[ipaddress._BaseNetwork, ...]
    allowed_post_uris: frozenset[str]
    suspicious_path_prefixes: tuple[str, ...]
    opnsense: OPNsenseConfig | None


@dataclass(frozen=True)
class Record:
    """Représente une entrée d'erreur Caddy déjà parsée."""

    client_ip: str
    uri: str
    method: str
    status: int
    user_agent: str
    host: str
    full_url: str
    ts_epoch: float
    timestamp_iso: str


def require(condition: bool, message: str) -> None:
    """Lève une erreur d'exécution lorsqu'une condition attendue n'est pas satisfaite."""

    if not condition:
        raise RuntimeError(message)


def normalize_ip(value: str) -> str:
    """Normalise une IPv4 ou une IPv6 dans sa forme textuelle canonique."""

    raw = value.strip()
    if raw.startswith("[") and raw.endswith("]"):
        raw = raw[1:-1]
    if "%" in raw:
        raw = raw.split("%", 1)[0]
    return str(ipaddress.ip_address(raw))


def normalize_uri_path(uri: str) -> str:
    """Supprime la chaîne de requête et le fragment d'une URI."""

    return uri.split("#", 1)[0].split("?", 1)[0].strip()


def normalize_suspicious_path_prefix(prefix: str) -> str:
    """Normalise et valide un préfixe de chemin suspect."""

    normalized = normalize_uri_path(prefix).strip().lower().rstrip("/")
    require(normalized != "", "Un préfixe de chemin suspect ne peut pas être vide")
    require(normalized.startswith("/"), f"Préfixe de chemin suspect invalide: {prefix!r}")
    require(normalized != "/", "Le préfixe de chemin suspect '/' est trop large")
    return normalized


def path_matches_suspicious_prefix(path: str, prefix: str) -> bool:
    """Indique si un chemin correspond à un préfixe suspect avec borne raisonnable."""

    if path == prefix:
        return True
    if not path.startswith(prefix):
        return False
    return path[len(prefix)] in "/._-"


def uri_targets_suspicious_path(uri: str, suspicious_path_prefixes: tuple[str, ...]) -> bool:
    """Indique si une URI cible un chemin classiquement scanné."""

    path = normalize_uri_path(uri).lower()
    return any(
        path_matches_suspicious_prefix(path, prefix)
        for prefix in suspicious_path_prefixes
    )


def method_is_post(method: str) -> bool:
    """Indique si la méthode HTTP est POST."""

    return method.strip().upper() == "POST"


def uri_ends_with_php(uri: str) -> bool:
    """Indique si le chemin d'URI normalisé cible une ressource PHP."""

    return normalize_uri_path(uri).lower().endswith(".php")


def format_iso_local(ts_epoch: float) -> str:
    """Formate un timestamp Unix en ISO 8601 dans le fuseau local."""

    return dt.datetime.fromtimestamp(ts_epoch, tz=LOCAL_TZ).isoformat()


def read_json(path: Path) -> Any:
    """Lit et décode un document JSON depuis le disque."""

    with path.open("r", encoding="utf-8") as handle:
        return json.load(handle)


def parse_config(raw: dict[str, Any]) -> Config:
    """Convertit la charge JSON générée en configuration typée."""

    trusted_nets = tuple(
        ipaddress.ip_network(value, strict=False)
        for value in raw["trustedBlockExcludeNets"]
    )
    suspicious_path_prefixes = tuple(
        normalize_suspicious_path_prefix(value)
        for value in raw["suspiciousPathPrefixes"]
    )

    opnsense_raw = raw.get("opnsense")
    opnsense = None
    if opnsense_raw:
        opnsense = OPNsenseConfig(
            base_url=opnsense_raw["baseUrl"].rstrip("/"),
            timeout_seconds=int(opnsense_raw["timeoutSeconds"]),
            verify_tls=bool(opnsense_raw["verifyTls"]),
            alias=AliasSpec(
                name=opnsense_raw["alias"]["name"],
                type=opnsense_raw["alias"]["type"],
                description=opnsense_raw["alias"]["description"],
            ),
            api_paths=ApiPaths(
                alias_get_uuid=opnsense_raw["apiPaths"]["aliasGetUuid"],
                alias_add_item=opnsense_raw["apiPaths"]["aliasAddItem"],
                alias_set_item=opnsense_raw["apiPaths"]["aliasSetItem"],
                alias_reconfigure=opnsense_raw["apiPaths"]["aliasReconfigure"],
                alias_util_list=opnsense_raw["apiPaths"]["aliasUtilList"],
                alias_util_add=opnsense_raw["apiPaths"]["aliasUtilAdd"],
            ),
        )

    return Config(
        log_file=Path(raw["logFile"]),
        output_file=Path(raw["outputFile"]),
        cache_file=Path(raw["cacheFile"]),
        feed_link=raw["feedLink"],
        language=raw["language"],
        max_items=int(raw["maxItems"]),
        max_urls_per_item=int(raw["maxUrlsPerItem"]),
        cache_ttl_seconds=int(raw["cacheTtlSeconds"]),
        tail_lines=int(raw["tailLines"]),
        network_lookup_timeout_seconds=int(raw["networkLookupTimeoutSeconds"]),
        trusted_block_exclude_nets=trusted_nets,
        allowed_post_uris=frozenset(raw["allowedPostUris"]),
        suspicious_path_prefixes=suspicious_path_prefixes,
        opnsense=opnsense,
    )


def load_config(path: Path) -> Config:
    """Charge et parse le fichier de configuration JSON."""

    return parse_config(read_json(path))


def load_cache(path: Path) -> dict[str, dict[str, Any]]:
    """Charge le cache de renseignements IP lorsqu'il existe."""

    if not path.exists():
        return {}

    payload = read_json(path)
    require(isinstance(payload, dict), f"Invalid cache payload in {path}")
    return payload


def save_cache(path: Path, cache: dict[str, dict[str, Any]]) -> None:
    """Écrit le cache de renseignements IP de manière atomique."""

    path.parent.mkdir(parents=True, exist_ok=True)
    tmp_path = path.with_suffix(path.suffix + ".tmp")
    with tmp_path.open("w", encoding="utf-8") as handle:
        json.dump(cache, handle, ensure_ascii=False, sort_keys=True)
    tmp_path.replace(path)


def read_log_tail(path: Path, max_lines: int) -> list[str]:
    """Lit uniquement la fin du journal d'erreurs Caddy."""

    if not path.exists():
        raise FileNotFoundError(path)

    with path.open("r", encoding="utf-8", errors="strict") as handle:
        return list(deque(handle, maxlen=max_lines))


def extract_user_agent(headers: dict[str, Any]) -> str:
    """Extrait l'en-tête User-Agent d'une requête Caddy."""

    user_agent_value = headers.get("User-Agent") or headers.get("user-agent") or "-"
    if isinstance(user_agent_value, list):
        return user_agent_value[0] if user_agent_value else "-"
    require(isinstance(user_agent_value, str), "Invalid User-Agent header payload")
    return user_agent_value


def parse_timestamp(obj: dict[str, Any]) -> float:
    """Extrait un timestamp Unix depuis un objet de log Caddy."""

    ts_value = obj.get("ts")
    if isinstance(ts_value, (int, float)):
        return float(ts_value)
    if isinstance(ts_value, str):
        return float(ts_value)

    time_value = obj.get("time")
    require(isinstance(time_value, str), "Missing Caddy timestamp in log record")
    return dt.datetime.fromisoformat(time_value.replace("Z", "+00:00")).timestamp()


def parse_records(lines: list[str]) -> list[Record]:
    """Parse les lignes JSON de Caddy en enregistrements d'erreur triés."""

    records: list[Record] = []

    for raw_line in lines:
        line = raw_line.strip()
        if not line:
            continue

        obj = json.loads(line)
        require(isinstance(obj, dict), "Unexpected Caddy log payload")

        status = obj.get("status")
        require(isinstance(status, int), "Missing Caddy status in log record")
        if status < 400 or status >= 600:
            continue

        request = obj.get("request")
        require(isinstance(request, dict), "Missing Caddy request payload")

        headers = request.get("headers") or {}
        require(isinstance(headers, dict), "Invalid Caddy headers payload")

        raw_client_ip = request.get("client_ip") or request.get("remote_ip")
        require(isinstance(raw_client_ip, str) and raw_client_ip.strip(), "Missing client IP")
        client_ip = normalize_ip(raw_client_ip)
        uri = str(request.get("uri") or "/")
        method = str(request.get("method") or "-")
        host = str(request.get("host") or "")
        ts_epoch = parse_timestamp(obj)

        records.append(
            Record(
                client_ip=client_ip,
                uri=uri,
                method=method,
                status=status,
                user_agent=extract_user_agent(headers),
                host=host,
                full_url=f"https://{host}{uri}" if host else uri,
                ts_epoch=ts_epoch,
                timestamp_iso=format_iso_local(ts_epoch),
            )
        )

    return sorted(records, key=lambda item: item.ts_epoch, reverse=True)


def build_dedup_items(records: list[Record], max_items: int) -> list[Record]:
    """Conserve au plus un élément de flux par IP cliente et URI."""

    seen: set[tuple[str, str]] = set()
    dedup: list[Record] = []

    for record in records:
        key = (record.client_ip, record.uri)
        if key in seen:
            continue
        seen.add(key)
        dedup.append(record)
        if len(dedup) >= max_items:
            break

    return dedup


def build_ip_stats(records: list[Record]) -> dict[str, dict[str, Any]]:
    """Agrège les compteurs, dates et fréquences d'URL par IP."""

    stats: dict[str, dict[str, Any]] = {}

    for record in records:
        current = stats.setdefault(
            record.client_ip,
            {
                "count": 0,
                "first_seen": record.ts_epoch,
                "last_seen": record.ts_epoch,
                "urls": {},
            },
        )

        current["count"] += 1
        current["first_seen"] = min(current["first_seen"], record.ts_epoch)
        current["last_seen"] = max(current["last_seen"], record.ts_epoch)
        current["urls"][record.uri] = current["urls"].get(record.uri, 0) + 1

    for current in stats.values():
        urls_sorted = sorted(current["urls"].items(), key=lambda item: (-item[1], item[0]))
        current["urls_sorted"] = urls_sorted
        current["distinct_urls"] = len(urls_sorted)

    return stats


def is_excluded_from_blocking(ip: str, excluded_nets: tuple[ipaddress._BaseNetwork, ...]) -> bool:
    """Indique si une IP appartient à un réseau exclu du blocage."""

    parsed_ip = ipaddress.ip_address(ip)
    return any(parsed_ip in network for network in excluded_nets)


def build_blocking_reasons(
    records: list[Record],
    excluded_nets: tuple[ipaddress._BaseNetwork, ...],
    allowed_post_uris: frozenset[str],
    suspicious_path_prefixes: tuple[str, ...],
) -> dict[str, set[str]]:
    """Calcule les motifs de blocage détectés pour chaque IP cliente."""

    reasons_by_ip: dict[str, set[str]] = {}

    for record in records:
        if is_excluded_from_blocking(record.client_ip, excluded_nets):
            continue

        reasons = reasons_by_ip.setdefault(record.client_ip, set())

        if uri_ends_with_php(record.uri):
            reasons.add("url_php")
        if method_is_post(record.method) and normalize_uri_path(record.uri) not in allowed_post_uris:
            reasons.add("method_post")
        if uri_targets_suspicious_path(record.uri, suspicious_path_prefixes):
            reasons.add("path_probe")

    return reasons_by_ip


def format_blocking_reasons(reasons: set[str]) -> str:
    """Traduit les codes de blocage en libellés lisibles."""

    labels = []

    if "method_post" in reasons:
        labels.append("requête POST hors recherche")
    if "url_php" in reasons:
        labels.append("URL en .php")
    if "path_probe" in reasons:
        labels.append("chemin classiquement scanné (CMS/admin/fichiers sensibles)")

    return ", ".join(labels) if labels else "aucun"


def classify_ip(ip: str) -> dict[str, str]:
    """Décrit la version et la portée d'une adresse IP."""

    addr = ipaddress.ip_address(ip)

    if addr.is_private:
        scope = "privée"
    elif addr.is_loopback:
        scope = "loopback"
    elif addr.is_link_local:
        scope = "link-local"
    elif addr.is_multicast:
        scope = "multicast"
    elif addr.is_reserved:
        scope = "réservée"
    elif addr.is_global:
        scope = "publique routable"
    else:
        scope = "indéterminée"

    return {"version": "IPv6" if addr.version == 6 else "IPv4", "scope": scope}


def classify_network_profile(as_name: str, network: str, ptr: str, scope: str) -> str:
    """Estime si une IP publique semble résidentielle ou hébergée."""

    if scope != "publique routable":
        return "adresse non publique"

    signal = " ".join([as_name, network, ptr]).lower()
    hosting_keywords = [
        "cloud",
        "hosting",
        "datacenter",
        "server",
        "vps",
        "ovh",
        "amazon",
        "aws",
        "google",
        "azure",
        "microsoft",
        "digitalocean",
        "linode",
        "hetzner",
        "oracle",
        "scaleway",
        "ionos",
    ]
    residential_keywords = [
        "residential",
        "broadband",
        "dsl",
        "fiber",
        "fibre",
        "pool",
        "dynamic",
        "pppoe",
    ]

    if any(keyword in signal for keyword in hosting_keywords):
        return "hébergeur/datacenter probable"
    if any(keyword in signal for keyword in residential_keywords):
        return "accès résidentiel probable"
    return "profil réseau indéterminé"


class DNSLookup:
    """Effectue les requêtes DNS utilisées pour enrichir les IP clientes suspectes."""

    def __init__(self, timeout_seconds: int):
        """Construit un résolveur configuré avec le délai demandé."""

        self.resolver = dns.resolver.Resolver()
        self.resolver.timeout = timeout_seconds
        self.resolver.lifetime = timeout_seconds

    def _resolve_text(self, qname: Any, rdtype: str) -> list[str]:
        """Résout un jeu d'enregistrements et renvoie des réponses textuelles normalisées."""

        answers = self.resolver.resolve(qname, rdtype)
        return [answer.to_text().rstrip(".") for answer in answers]

    def reverse_ptr(self, ip: str) -> str:
        """Renvoie le PTR d'une IP, ou n/d s'il est absent."""

        try:
            pointer = dns.reversename.from_address(ip)
            return self._resolve_text(pointer, "PTR")[0]
        except dns.exception.DNSException:
            return "n/d"

    def forward_matches(self, host: str, ip: str, rrtype: str) -> str:
        """Vérifie qu'un nom d'hôte résout bien vers l'IP attendue."""

        if host == "n/d":
            return "n/d"

        try:
            values = {normalize_ip(value) for value in self._resolve_text(host, rrtype)}
        except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
            return "non"
        except dns.exception.DNSException:
            return "indéterminé"

        return "oui" if ip in values else "non"

    def spamhaus(self, ip: str) -> str:
        """Interroge la DNSBL Spamhaus ZEN pour une IPv4 publique."""

        addr = ipaddress.ip_address(ip)
        if not addr.is_global:
            return "non applicable (IP non publique)"
        if addr.version == 6:
            return "non vérifiée (IPv6)"

        query = ".".join(reversed(ip.split("."))) + ".zen.spamhaus.org"
        try:
            listed = self._resolve_text(query, "A")
        except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
            return "non listée"
        except dns.exception.DNSException:
            return "indisponible"

        return "listée (" + ", ".join(listed) + ")"


def extract_abuse_contact(payload: dict[str, Any]) -> str:
    """Extrait le contact abuse le plus pertinent d'une réponse RDAP."""

    for obj in payload.get("objects", {}).values():
        if not isinstance(obj, dict):
            continue

        roles = {str(role).lower() for role in obj.get("roles", []) if isinstance(role, str)}
        contact = obj.get("contact")
        if not isinstance(contact, dict):
            continue

        emails = contact.get("email") or []
        if not isinstance(emails, list):
            continue

        for entry in emails:
            if not isinstance(entry, dict):
                continue
            value = entry.get("value")
            if isinstance(value, str) and value:
                if "abuse" in roles:
                    return value

    for obj in payload.get("objects", {}).values():
        if not isinstance(obj, dict):
            continue
        contact = obj.get("contact")
        if not isinstance(contact, dict):
            continue
        emails = contact.get("email") or []
        if not isinstance(emails, list):
            continue
        for entry in emails:
            if isinstance(entry, dict) and isinstance(entry.get("value"), str):
                return entry["value"]

    return "n/d"


def query_ip_intel(
    ip: str,
    cache: dict[str, dict[str, Any]],
    now_ts: int,
    dns_lookup: DNSLookup,
    timeout_seconds: int,
    cache_ttl_seconds: int,
) -> dict[str, Any]:
    """Renvoie les renseignements DNS et RDAP d'une IP, en cache ou fraîchement récupérés."""

    cached = cache.get(ip)
    if isinstance(cached, dict):
        updated_at = cached.get("updated_at")
        if isinstance(updated_at, (int, float)) and (now_ts - int(updated_at) < cache_ttl_seconds):
            return cached

    ip_meta = classify_ip(ip)
    ptr = dns_lookup.reverse_ptr(ip)
    rr_type = "AAAA" if ip_meta["version"] == "IPv6" else "A"
    fcrdns = dns_lookup.forward_matches(ptr, ip, rr_type)
    spamhaus = dns_lookup.spamhaus(ip)

    intel = {
        "updated_at": now_ts,
        "version": ip_meta["version"],
        "scope": ip_meta["scope"],
        "ptr": ptr,
        "fcrdns": fcrdns,
        "asn": "n/d",
        "prefix": "n/d",
        "as_name": "n/d",
        "network": "n/d",
        "abuse_contact": "n/d",
        "spamhaus": spamhaus,
        "profile": "adresse non publique",
    }

    if ipaddress.ip_address(ip).is_global:
        try:
            rdap = IPWhois(ip, timeout=timeout_seconds).lookup_rdap(depth=1)
        except (IPDefinedError, BaseIpwhoisException, OSError):
            rdap = None

        if isinstance(rdap, dict):
            network = rdap.get("network") or {}

            intel["asn"] = rdap.get("asn") or "n/d"
            intel["prefix"] = rdap.get("asn_cidr") or network.get("cidr") or "n/d"
            intel["as_name"] = rdap.get("asn_description") or "n/d"
            intel["network"] = network.get("name") or network.get("handle") or "n/d"
            intel["abuse_contact"] = extract_abuse_contact(rdap)
            intel["profile"] = classify_network_profile(
                intel["as_name"],
                intel["network"],
                ptr,
                intel["scope"],
            )
        else:
            intel["profile"] = "profil réseau indéterminé"

    cache[ip] = intel
    return intel


def build_list(root: ET.Element, title: str, rows: list[tuple[str, str, bool]]) -> None:
    """Ajoute une section de liste HTML titrée à la description RSS."""

    title_paragraph = ET.SubElement(root, "p")
    ET.SubElement(title_paragraph, "strong").text = title
    items = ET.SubElement(root, "ul")

    for label, value, as_code in rows:
        item = ET.SubElement(items, "li")
        strong = ET.SubElement(item, "strong")
        strong.text = label
        if as_code:
            strong.tail = " "
            code = ET.SubElement(item, "code")
            code.text = value
        else:
            strong.tail = value


def build_urls_list(root: ET.Element, urls_sorted: list[tuple[str, int]], max_urls: int) -> None:
    """Ajoute à la description HTML la liste des URL et de leurs fréquences pour l'IP."""

    paragraph = ET.SubElement(root, "p")
    ET.SubElement(paragraph, "strong").text = "Toutes les URL en erreur pour cette IP (avec nombre d'erreurs)"

    items = ET.SubElement(root, "ul")
    visible_urls = urls_sorted[:max_urls]
    omitted_urls_count = max(0, len(urls_sorted) - len(visible_urls))

    if not visible_urls:
        ET.SubElement(items, "li").text = "Aucune URL"
        return

    for url, count in visible_urls:
        item = ET.SubElement(items, "li")
        code = ET.SubElement(item, "code")
        code.text = url
        code.tail = f" ({count})"

    if omitted_urls_count:
        item = ET.SubElement(items, "li")
        emphasis = ET.SubElement(item, "em")
        emphasis.text = (
            f"{omitted_urls_count} URL supplémentaires omises pour limiter la taille du flux"
        )


def build_description_html(
    record: Record,
    stats: dict[str, Any],
    intel: dict[str, Any],
    block_reasons: set[str],
    block_status: str,
    max_urls_per_item: int,
) -> str:
    """Construit le contenu HTML embarqué dans chaque élément RSS."""

    root = ET.Element("div")

    build_list(
        root,
        "Requête en erreur",
        [
            ("Méthode HTTP : ", record.method, True),
            ("Code de statut : ", str(record.status), True),
            ("Hôte demandé : ", record.host or "-", True),
            ("URL demandée : ", record.full_url, True),
            ("User-Agent : ", record.user_agent, True),
            ("IP cliente : ", record.client_ip, True),
            ("Date de la requête (heure locale) : ", record.timestamp_iso, False),
            (
                "Nombre total de requêtes en erreur (IP, fenêtre analysée) : ",
                str(stats.get("count", 1)),
                False,
            ),
            (
                "Nombre d'URL distinctes en erreur (IP) : ",
                str(stats.get("distinct_urls", 1)),
                False,
            ),
            (
                "Signal de blocage OPNsense observé pour cette IP : ",
                format_blocking_reasons(block_reasons),
                False,
            ),
            (
                "État d'ajout dans l'alias OPNsense : ",
                block_status or "non tenté",
                False,
            ),
        ],
    )

    build_list(
        root,
        "Renseignements réseau",
        [
            (
                "Type d'adresse : ",
                f"{intel.get('version', 'n/d')} - {intel.get('scope', 'n/d')}",
                False,
            ),
            ("PTR (reverse DNS) : ", intel.get("ptr", "n/d"), False),
            ("FCrDNS (PTR cohérent) : ", intel.get("fcrdns", "n/d"), False),
            ("ASN : ", intel.get("asn", "n/d"), False),
            ("Préfixe : ", intel.get("prefix", "n/d"), False),
            ("Nom AS : ", intel.get("as_name", "n/d"), False),
            ("Réseau / organisation : ", intel.get("network", "n/d"), False),
            ("Contact abuse : ", intel.get("abuse_contact", "n/d"), False),
            ("Spamhaus ZEN : ", intel.get("spamhaus", "inconnu"), False),
            ("Profil réseau : ", intel.get("profile", "indéterminé"), False),
            (
                "Première erreur observée (heure locale) : ",
                format_iso_local(stats.get("first_seen", record.ts_epoch)),
                False,
            ),
            (
                "Dernière erreur observée (heure locale) : ",
                format_iso_local(stats.get("last_seen", record.ts_epoch)),
                False,
            ),
        ],
    )

    build_urls_list(root, stats.get("urls_sorted", []), max_urls_per_item)
    return ET.tostring(root, encoding="unicode", method="html")


class OPNsenseAliasClient:
    """Gère l'alias OPNsense utilisé pour les IP clientes suspectes."""

    def __init__(self, config: OPNsenseConfig, credentials_directory: Path):
        """Initialise la session HTTP authentifiée vers OPNsense."""

        self.config = config
        self.alias_uuid: str | None = None
        self.known_alias_ips: set[str] | None = None

        key_path = credentials_directory / "opnsense_api_key"
        secret_path = credentials_directory / "opnsense_api_secret"
        api_key = key_path.read_text(encoding="utf-8").strip()
        api_secret = secret_path.read_text(encoding="utf-8").strip()
        require(api_key != "", f"Empty OPNsense API key in {key_path}")
        require(api_secret != "", f"Empty OPNsense API secret in {secret_path}")

        self.session = requests.Session()
        self.session.auth = (api_key, api_secret)
        self.session.verify = config.verify_tls

    def _build_url(self, template: str, **values: str) -> str:
        """Rend un modèle de chemin d'API en URL complète."""

        replacements = {key: quote(value, safe="") for key, value in values.items()}
        return self.config.base_url + template.format(**replacements)

    def _request_json(self, method: str, template: str, **kwargs: Any) -> dict[str, Any]:
        """Envoie une requête à l'API OPNsense et valide la forme de la réponse JSON."""

        response = self.session.request(
            method=method,
            url=self._build_url(template, **kwargs.pop("path_values", {})),
            timeout=self.config.timeout_seconds,
            **kwargs,
        )
        response.raise_for_status()
        payload = response.json()
        require(isinstance(payload, dict), "Unexpected OPNsense API payload")
        return payload

    def get_alias_uuid(self) -> str | None:
        """Recherche l'UUID de l'alias configuré, s'il existe."""

        payload = self._request_json(
            "GET",
            self.config.api_paths.alias_get_uuid,
            path_values={"alias": self.config.alias.name},
        )
        uuid = payload.get("uuid")
        return uuid if isinstance(uuid, str) and uuid else None

    def reconfigure(self) -> None:
        """Demande à OPNsense d'appliquer les changements d'alias en attente."""

        payload = self._request_json("POST", self.config.api_paths.alias_reconfigure)
        require(
            payload.get("status") in {"ok", "done"},
            f"OPNsense alias reconfigure failed: {payload}",
        )

    def ensure_alias_exists(self) -> None:
        """Crée l'alias configuré lorsqu'il n'existe pas encore."""

        if self.alias_uuid is not None:
            return

        uuid = self.get_alias_uuid()
        if uuid is None:
            self._request_json(
                "POST",
                self.config.api_paths.alias_add_item,
                json={
                    "alias": {
                        "enabled": "1",
                        "name": self.config.alias.name,
                        "type": self.config.alias.type,
                        "content": "",
                        "description": self.config.alias.description,
                    }
                },
            )
            uuid = self.get_alias_uuid()
            require(uuid is not None, f"Unable to create OPNsense alias {self.config.alias.name!r}")
            self.alias_uuid = uuid
            self._request_json(
                "POST",
                self.config.api_paths.alias_set_item,
                path_values={"uuid": uuid},
                json={
                    "alias": {
                        "enabled": "1",
                        "name": self.config.alias.name,
                        "type": self.config.alias.type,
                        "content": "",
                        "description": self.config.alias.description,
                    }
                },
            )
            self.reconfigure()
        else:
            self.alias_uuid = uuid

    def ensure_alias_snapshot(self) -> set[str]:
        """Récupère et met en cache les IP actuellement présentes dans la table d'alias."""

        if self.known_alias_ips is not None:
            return self.known_alias_ips

        self.ensure_alias_exists()
        payload = self._request_json(
            "GET",
            self.config.api_paths.alias_util_list,
            path_values={"alias": self.config.alias.name},
        )
        rows = payload.get("rows")
        require(isinstance(rows, list), "Unexpected OPNsense alias listing payload")

        snapshot: set[str] = set()
        for row in rows:
            require(isinstance(row, dict), "Unexpected OPNsense alias row payload")
            candidate = row.get("ip") or row.get("address") or row.get("item")
            if candidate:
                snapshot.add(normalize_ip(str(candidate)))

        self.known_alias_ips = snapshot
        return snapshot

    def add_ip_if_missing(self, ip: str) -> str:
        """Ajoute une IP à la table d'alias si elle n'y est pas déjà."""

        snapshot = self.ensure_alias_snapshot()
        if ip in snapshot:
            return "already_present"

        payload = self._request_json(
            "POST",
            self.config.api_paths.alias_util_add,
            path_values={"alias": self.config.alias.name},
            json={"address": ip},
        )
        require(payload.get("status") == "done", f"OPNsense alias add failed: {payload}")
        snapshot.add(ip)
        return "added"


def build_feed(
    config: Config,
    items: list[Record],
    ip_stats: dict[str, dict[str, Any]],
    blocking_reasons_by_ip: dict[str, set[str]],
    cache: dict[str, dict[str, Any]],
    opnsense_client: OPNsenseAliasClient | None,
) -> bytes:
    """Construit la charge RSS à partir des enregistrements parsés et des données d'enrichissement."""

    fg = FeedGenerator()
    fg.title("Flux RSS des erreurs Caddy")
    fg.description("Erreurs HTTP Caddy (déduplication par IP client + URL demandée)")
    fg.link(href=config.feed_link)
    fg.language(config.language)
    fg.generator("caddy-errors-rss")
    fg.lastBuildDate(dt.datetime.now(tz=LOCAL_TZ))

    dns_lookup = DNSLookup(config.network_lookup_timeout_seconds)
    blocking_state_by_ip: dict[str, str] = {}
    now_ts = int(dt.datetime.now(tz=dt.timezone.utc).timestamp())

    for item in items:
        block_reasons = blocking_reasons_by_ip.get(item.client_ip, set())
        should_try_block = bool(block_reasons)

        intel = query_ip_intel(
            item.client_ip,
            cache,
            now_ts,
            dns_lookup,
            config.network_lookup_timeout_seconds,
            config.cache_ttl_seconds,
        )
        stats = ip_stats.get(
            item.client_ip,
            {
                "count": 1,
                "distinct_urls": 1,
                "urls_sorted": [(item.uri, 1)],
                "first_seen": item.ts_epoch,
                "last_seen": item.ts_epoch,
            },
        )

        block_status = ""
        if should_try_block:
            if opnsense_client is None:
                block_status = "désactivé"
            else:
                if item.client_ip not in blocking_state_by_ip:
                    blocking_state_by_ip[item.client_ip] = opnsense_client.add_ip_if_missing(
                        item.client_ip
                    )
                block_status = blocking_state_by_ip[item.client_ip]

        title_prefix = "[Bloqué] " if block_status in {"added", "already_present"} else ""
        title = f"{title_prefix}[{item.client_ip}] [{item.method} {item.status}] {item.uri}"
        guid = hashlib.sha256(f"{item.client_ip}|{item.uri}".encode("utf-8")).hexdigest()

        entry = fg.add_entry(order="append")
        entry.title(title)
        entry.guid(guid, permalink=False)
        entry.pubDate(dt.datetime.fromtimestamp(item.ts_epoch, tz=LOCAL_TZ))
        entry.content(
            build_description_html(
                record=item,
                stats=stats,
                intel=intel,
                block_reasons=block_reasons,
                block_status=block_status,
                max_urls_per_item=config.max_urls_per_item,
            ),
            type="CDATA",
        )

    return fg.rss_str(pretty=True)


def parse_args() -> argparse.Namespace:
    """Parse les arguments de ligne de commande acceptés par le script."""

    parser = argparse.ArgumentParser(description="Generate an RSS feed from Caddy error logs.")
    parser.add_argument("--config", required=True, type=Path, help="Path to the JSON config file.")
    return parser.parse_args()


def main() -> None:
    """Exécute un cycle complet de génération du flux et écrit les résultats sur disque."""

    args = parse_args()
    config = load_config(args.config)

    config.output_file.parent.mkdir(parents=True, exist_ok=True)
    config.cache_file.parent.mkdir(parents=True, exist_ok=True)

    opnsense_client = None
    if config.opnsense is not None:
        credentials_directory = Path(os.environ["CREDENTIALS_DIRECTORY"])
        opnsense_client = OPNsenseAliasClient(config.opnsense, credentials_directory)

    cache = load_cache(config.cache_file)
    records = parse_records(read_log_tail(config.log_file, config.tail_lines))
    items = build_dedup_items(records, config.max_items)
    ip_stats = build_ip_stats(records)
    blocking_reasons_by_ip = build_blocking_reasons(
        records,
        config.trusted_block_exclude_nets,
        config.allowed_post_uris,
        config.suspicious_path_prefixes,
    )

    rss_payload = build_feed(
        config,
        items,
        ip_stats,
        blocking_reasons_by_ip,
        cache,
        opnsense_client,
    )

    tmp_output = config.output_file.with_suffix(config.output_file.suffix + ".tmp")
    tmp_output.write_bytes(rss_payload)
    tmp_output.replace(config.output_file)
    save_cache(config.cache_file, cache)


if __name__ == "__main__":
    main()

Configuration de mon virtualhost

{ config, lib, ... }:

{
  services.caddy = {
    # [...]

    # Le vhost de mon blog
    virtualHosts = {
      ":30080" = {
        extraConfig = ''
          # [...]

          # On log les erreurs à part
          log errors {
            output file ${config.services.caddyErrorsRss.logFile} {
              roll_size 15MiB
              roll_keep 14
              roll_keep_for 336h
            }
          }

          # [...]

          handle_errors {
            log_name errors

            # On n'oublie pas la gestion spécifiques des 404
            @notFound expression {http.error.status_code} == 404
            route @notFound {
              rewrite * /404.html
              file_server
            }
          }
        '';
      };
    };
  };
}

Configuration du module nix

{ ... }:

{
  imports = [ ./modules/caddy-errors.nix ];

  services.caddyErrorsRss = {
    # Chemin du fichier journal généré par Caddy
    logFile = "/var/log/caddy/richard-dern.fr-errors.log";
    # Adresse du serveur à joindre pour afficher le flux
    feedAddress = "10.0.3.1";
    # Port du vhost permettant d'accéder au flux
    listenPort = 30083;

    blocking = {
      trustedExcludeNets = [
        "10.0.0.0/8"
        # Mes préfixes IPv6
        "...:3210::/64"
        "...:3211::/64"
      ];
      allowedPostUris = [
        # C'est ici qu'on évite le blocage sur le formulaire de recherche
        "/api/search/indexes/blog_posts/search"
        "/indexes/blog_posts/search"
      ];
      # Et ici les URL caractéristiques.
      # À ne pas utiliser si votre blog utilise Wordpress...
      suspiciousPathPrefixes = [
        "/wp-admin"
        "/wp-login.php"
        "/wp-content"
        "/wp-includes"
        "/wp-json"
        "/wordpress"
        "/xmlrpc.php"
        "/wlwmanifest.xml"
        "/readme.html"
        "/license.txt"
        "/admin"
        "/administrator"
        "/adminer"
        "/phpmyadmin"
        "/pma"
        "/manager"
        "/manager/html"
        "/webadmin"
        "/server-status"
        "/phpinfo.php"
        "/drupal"
        "/sites/default"
        "/core/install.php"
        "/joomla"
        "/components"
        "/modules"
        "/plugins"
        "/templates"
        "/vendor/composer"
        "/.env"
        "/.git"
        "/.svn"
        "/.hg"
        "/.bzr"
        "/.DS_Store"
        "/.aws"
        "/.ssh"
        "/vendor/phpunit"
        "/artisan"
        "/storage"
        "/_ignition"
        "/telescope"
        "/cgi-bin"
        "/boaform"
        "/HNAP1"
        "/autodiscover"
        "/owa"
        "/remote"
        "/jmx-console"
        "/solr"
        "/actuator"
        "/jenkins"
        "/hudson"
      ];
    };

    opnsense = {
      enable = true;
      # Nom de l'alias dans OPNsense
      aliasName = "comportement_suspect";
      # Adresse d'accès à OPNsense
      apiBaseUrl = "https://...";
      apiKeyFile = "/etc/nixos/secrets/services/caddy-errors-rss/opnsense-api-key";
      apiSecretFile = "/etc/nixos/secrets/services/caddy-errors-rss/opnsense-api-secret";
    };
  };
}

La clé d’API et le secret sont obtenus depuis System, Access, Users dans OPNsense (c’est l’un des boutons en regard d’un utilisateur).

Résultat

Ce petit malotru vérifie si je fais tourner WordPress. Pour la dernière fois.

Chaque élément de flux me permet alors de décider si je dois bloquer une IP manuellement. Les éléments heuristiques indiqués dans la configuration permettent un blocage automatique, mais certains peuvent passer au travers : ces informations supplémentaires m’aident à décider si une requête est légitime ou pas.

Un effet de bord positif de ce que j’ai mis en place est que je peux voir des requêtes pointant vers des anciens URLs, légitimes mais qui ont changé au cours du temps. Cela me permet d’ajouter à Hugo des aliases pour ces URLs afin d’éviter le link-rot.

L’alias comportement_suspect se remplit tout seul désormais.

On peut voir alors l’alias se remplir, et le nombre de paquets matchés dans OPNsense. C’est efficace !

Conclusion

L’implémentation de cette solution a nécessité quelques efforts en amont pour un résultat immédiatement satisfaisant : je n’ai plus besoin d’avoir le nez ni dans les logs de Caddy ni dans le firewall d’OPNsense, mais j’ai quand même un visuel clair grâce aux flux RSS. Et j’ai la souplesse intrinsèque à une solution “fait-maison” ou “à l’ancienne”.

En outre, la solution de blocage intervient là où elle est supposée intervenir : elle bloque des cas concrets et tangibles de “tentatives d’infractions”, après l’IPS/IDS d’OPNsense. En d’autres termes, elle respecte la présomption d’innocence : elle ne bloque pas un bot sous prétexte que c’est une IA.

Enfin, comme je n’ai jamais adhéré au cloud, je voulais une solution locale, sur laquelle j’ai le contrôle. Autrement, j’aurais fait comme tout le monde, en passant par Cloudflare.