Infrastructure Scripts

1. crowdsec-cf-sync.py

Location: /usr/local/bin/crowdsec-cf-sync.py Systemd service: crowdsec-cf-sync.service Logs: /var/log/crowdsec/cf-sync.log

Installation

cp crowdsec-cf-sync.py /usr/local/bin/
chmod +x /usr/local/bin/crowdsec-cf-sync.py
systemctl enable crowdsec-cf-sync
systemctl start crowdsec-cf-sync

Features

  • Syncs active CrowdSec bans → Cloudflare IP Access Rules (tag crowdsec-local-ban)
  • Reports banned IPs → AbuseIPDB (48h window, deduplicated)
  • Repeat-offender escalation: 1st CrowdSec ban handles | 2nd → 24h | 3rd+ → 7d (7d window)
  • ModSecurity score ≥ 5 → immediate 2h CF ban (tag modsec-ban) + AbuseIPDB report
  • Automatic /24 ban: 2+ IPs from same block in 7d → CF + CrowdSec ban 24h (tag crowdsec-cidr-ban)

Bug fixes (April 2026)

  • cs_origin vs origin in get_recent_local_bans() — the JSON field is cs_origin
  • REST API pagination /v1/decisions?limit=1000 misses cscli bans → replaced with cscli decisions list --origin
  • Itemsitems (lowercase) in CrowdSec allowlist JSON parsing

⚠️ Important: real tokens (CF_API_TOKEN, CF_ZONE_ID, CS_API_KEY, ABUSEIPDB_KEY) must be stored in /etc/secrets/ with chmod 600 and loaded via environment variables, not hardcoded in the script. The VOTRE_* values below are placeholders.

Source code

#!/usr/bin/env python3
"""
CrowdSec → Cloudflare IP Sync + AbuseIPDB Reporter + Recidivist Escalation
+ ModSecurity → CF Ban immediate (2h) + Automatic /24 ban

1. Syncs active CrowdSec bans → Cloudflare IP Access Rules
2. Reports new banned IPs (48h) → AbuseIPDB
3. Escalates recidivist bans: 1st → CrowdSec handles | 2nd → 24h | 3rd+ → 7d
4. ModSecurity score ≥ 5 → immediate 2h CF ban + AbuseIPDB report
5. /24 auto-ban if 2+ distinct IPs from same /24 in 7d → 24h

- Runs every 60 seconds
"""

import ipaddress
import json
import logging
import re
import subprocess
import time
import urllib.request
import urllib.error
import urllib.parse
from pathlib import Path
from datetime import datetime, timezone, timedelta

# ── Configuration ─────────────────────────────────────────────────
CF_API_TOKEN    = "VOTRE_CF_API_TOKEN"
CF_ZONE_ID      = "VOTRE_CF_ZONE_ID"
CS_API_KEY      = "VOTRE_CS_API_KEY"
ABUSEIPDB_KEY   = "VOTRE_ABUSEIPDB_KEY"
ABUSEIPDB_URL   = "https://api.abuseipdb.com/api/v2/report"
INTERVAL        = 60
NOTE_TAG        = "crowdsec-local-ban"
NOTE_TAG_MODSEC = "modsec-ban"
NOTE_TAG_CIDR   = "crowdsec-cidr-ban"
LOCAL_ORIGINS   = {"crowdsec", "cscli"}
DECISIONS_LOG   = Path("/var/log/crowdsec/decisions.log")
NGINX_ERROR_LOG = Path("/var/log/nginx/error.log")
CF_LOG_FILE     = Path("/var/log/crowdsec/cf-sync.log")
ABUSE_STATE     = Path("/var/log/crowdsec/abuseipdb-reported.json")
RECIDIV_STATE   = Path("/var/log/crowdsec/recidivists.json")
MODSEC_STATE    = Path("/var/log/crowdsec/modsec-banned.json")
CIDR_STATE      = Path("/var/log/crowdsec/cidr-banned.json")
LOOKBACK_HOURS  = 48
RECIDIV_WINDOW  = 7
CIDR_WINDOW     = 7
MODSEC_SCORE_MIN    = 5
MODSEC_BAN_SECS     = 7200
CIDR_BAN_DURATION   = "24h"
CIDR_THRESHOLD      = 2

RECIDIV_ESCALATION = {
    0: None,
    1: "24h",
}
RECIDIV_DEFAULT = "168h"

SCENARIO_CATEGORIES = {
    "http-sensitive-files":   "21",
    "http-probing":           "21",
    "http-scan":              "21,19",
    "http-bad-user-agent":    "21",
    "http-wordpress-scan":    "21",
    "http-crawl-non_statics": "21",
    "http-exploit":           "21",
    "vpatch-env-access":      "21",
    "vpatch-git-config":      "21",
    "ssh-bf":                 "22",
    "ssh-slow-bf":            "22",
    "ssh-time-based-bf":      "22",
    "ssh-refused-conn":       "22",
    "ssh-cve":                "22",
    "default":                "21",
}

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.FileHandler(CF_LOG_FILE), logging.StreamHandler()]
)
log = logging.getLogger(__name__)


def get_crowdsec_allowlist() -> set:
    try:
        result = subprocess.run(
            ["cscli", "allowlists", "inspect", "my_allowlist", "-o", "json"],
            capture_output=True, text=True, timeout=15
        )
        if result.returncode != 0:
            return set()
        data = json.loads(result.stdout)
        items = data.get("items", []) or []
        return {item.get("value", "") for item in items if item.get("value")}
    except Exception as e:
        log.warning("Error reading CrowdSec allowlist: %s", e)
        return set()


def is_allowlisted(ip_str: str, cs_allowlist: set) -> bool:
    if ip_str in cs_allowlist:
        return True
    try:
        ip_obj = ipaddress.ip_address(ip_str)
        for entry in cs_allowlist:
            try:
                net = ipaddress.ip_network(entry, strict=False)
                if ip_obj in net:
                    return True
            except ValueError:
                pass
    except ValueError:
        pass
    return False


def cf_request(method: str, path: str, data=None) -> dict:
    url = f"https://api.cloudflare.com/client/v4{path}"
    body = json.dumps(data).encode() if data is not None else None
    req = urllib.request.Request(
        url, data=body, method=method,
        headers={
            "Authorization": f"Bearer {CF_API_TOKEN}",
            "Content-Type":  "application/json",
        }
    )
    try:
        with urllib.request.urlopen(req, timeout=15) as resp:
            result = json.loads(resp.read().decode())
            if not result.get("success"):
                raise RuntimeError(f"CF API error: {result.get('errors')}")
            return result
    except urllib.error.HTTPError as e:
        body = e.read().decode()
        raise RuntimeError(f"HTTP {e.code} on {method} {path}: {body}") from e


def get_active_bans() -> set:
    """Active local bans (crowdsec + cscli) via cscli decisions list."""
    bans = set()
    for origin in LOCAL_ORIGINS:
        try:
            result = subprocess.run(
                ["cscli", "decisions", "list", "--origin", origin, "-o", "json"],
                capture_output=True, text=True, timeout=15
            )
            if result.returncode != 0 or not result.stdout.strip():
                continue
            alerts = json.loads(result.stdout)
            if not isinstance(alerts, list):
                continue
            for alert in alerts:
                for dec in alert.get("decisions") or []:
                    if (dec.get("type") == "ban"
                            and dec.get("scope", "").lower() == "ip"
                            and dec.get("value")):
                        bans.add(dec["value"])
        except Exception as e:
            log.warning("Error cscli decisions list --origin %s: %s", origin, e)
    return bans


def main():
    log.info("=== CrowdSec CF Sync started (interval=%ds) ===", INTERVAL)
    cs_allowlist = get_crowdsec_allowlist()
    log.info("CrowdSec allowlist: %d entries", len(cs_allowlist))

    while True:
        try:
            active_bans = get_active_bans()
            log.info("CrowdSec: %d active bans", len(active_bans))
            # ... (full logic: CF sync, AbuseIPDB, recidivists, ModSec, CIDR)
        except Exception as e:
            log.error("Sync error: %s", e, exc_info=True)
        time.sleep(INTERVAL)


if __name__ == "__main__":
    main()

💡 Note: condensed version above for article readability. The full script (~700 lines) includes detailed handling of recidivists, ModSecurity, and /24 bans. Available on request or in the private repo.


Location: /var/www/grav/user/themes/quark/js/cookie-banner.js

Features

  • Cookie: arleo_cookie_consent — flags Secure + SameSite=Strict — 365 days duration
  • Fixed banner at bottom of page, dark background, OK button
  • Link to /privacy-policies
  • No external dependencies (~4kb)
  • Accessible (ARIA role=dialog)
  • Disappears with opacity animation on OK click

Source code

(function () {
    'use strict';

    var COOKIE_NAME = 'arleo_cookie_consent';
    var COOKIE_DURATION = 365; // days

    function getCookie(name) {
        var match = document.cookie.match(new RegExp('(^| )' + name + '=([^;]+)'));
        return match ? match[2] : null;
    }

    function setCookie(name, value, days) {
        var expires = new Date();
        expires.setTime(expires.getTime() + days * 24 * 60 * 60 * 1000);
        document.cookie = name + '=' + value
            + '; expires=' + expires.toUTCString()
            + '; path=/'
            + '; SameSite=Strict'
            + '; Secure';
    }

    function createBanner() {
        var banner = document.createElement('div');
        banner.id = 'arleo-cookie-banner';
        banner.setAttribute('role', 'dialog');
        banner.setAttribute('aria-live', 'polite');
        banner.setAttribute('aria-label', 'Cookie consent');
        banner.innerHTML =
            '<div class="arleo-cookie-inner">' +
                '<p class="arleo-cookie-text">' +
                    'This site uses session cookies necessary for its operation. ' +
                    '<a href="https://www.arleo.eu/en/privacy-policies" rel="noopener noreferrer">Learn more</a>' +
                '</p>' +
                '<button id="arleo-cookie-accept" aria-label="Accept cookies">OK</button>' +
            '</div>';

        // ... (inline CSS to avoid a 2nd HTTP request)

        document.head.appendChild(style);
        document.body.appendChild(banner);

        document.getElementById('arleo-cookie-accept').addEventListener('click', function () {
            setCookie(COOKIE_NAME, 'accepted', COOKIE_DURATION);
            banner.style.transition = 'opacity 0.3s';
            banner.style.opacity = '0';
            setTimeout(function () {
                if (banner.parentNode) {
                    banner.parentNode.removeChild(banner);
                }
            }, 300);
        });
    }

    function init() {
        if (getCookie(COOKIE_NAME) === 'accepted') {
            return;
        }
        if (document.readyState === 'loading') {
            document.addEventListener('DOMContentLoaded', createBanner);
        } else {
            createBanner();
        }
    }

    init();
})();

3. cloudflare-allowlist-update.py

Hourly cron — Syncs the Cloudflare allowed_ip list with BetterStack + Cloudflare IPs, then syncs the CrowdSec allowlist.

Cron installation

0 * * * * /usr/bin/python3 /usr/local/bin/cloudflare-allowlist-update.py >> /var/log/cloudflare-allowlist.log 2>&1

Synced sources

SourceURL
BetterStackhttps://uptime.betterstack.com/ips.txt
Cloudflare IPv4https://www.cloudflare.com/ips-v4/
Cloudflare IPv6https://www.cloudflare.com/ips-v6/

Main logic

def main():
    # 1. Fetch IPs already in the Cloudflare list
    existing_cf = get_existing_cf_ips(account_id, list_id)

    # 2. For each source, fetch + diff + add
    for source_name, url in SOURCES.items():
        fetched = fetch_ips(url, source_name)
        new_ips = fetched - existing_cf
        if new_ips:
            add_cf_ips(account_id, list_id, new_ips, source_name)

    # 3. Sync to CrowdSec allowlist
    existing_cs = get_crowdsec_allowlist()
    new_cs_ips = all_cf_ips - existing_cs
    if new_cs_ips:
        add_crowdsec_ips(new_cs_ips, "sync-cloudflare-allowlist")

4. cloudflare-cleanup-ip-rules.py

One-time use — Deletes all Cloudflare IP rules except those tagged easycron. Used to clean up 49,974 obsolete rules (Fail2Ban/UFW/ModSec) in March 2026.

python3 cloudflare-cleanup-ip-rules.py

Logic:

  1. Paginated retrieval of all rules via Cloudflare API
  2. Filtering: keeps anything containing easycron in the note
  3. Deletes the rest (rate limit ~1200 req/min, sleep 0.05s between each)

5. Maintenance commands

# Sync service status and logs
systemctl status crowdsec-cf-sync
journalctl -fu crowdsec-cf-sync | grep -E "bans|Added|CIDR|ModSec|RECIDIVIST"

# Show recidivists
cat /var/log/crowdsec/recidivists.json | python3 -c "
import json,sys
d=json.load(sys.stdin)
for ip,v in sorted(d.items(), key=lambda x: x[1]['count'], reverse=True):
    print(f'{ip:20} count={v[\"count\"]} last={v[\"last_seen\"]}')
"

# Check active CrowdSec bans
cscli decisions list --origin cscli
cscli decisions list --origin crowdsec

# Check IPs in Cloudflare by tag
# Via CF dashboard → Security → IP Access Rules

6. Security — Secrets storage

⚠️ Important: all real tokens have been removed from this article. The values VOTRE_CF_API_TOKEN, VOTRE_CF_ZONE_ID, VOTRE_CS_API_KEY, VOTRE_ABUSEIPDB_KEY are placeholders. In production, these tokens are stored in /etc/secrets/ with chmod 600 and loaded via os.getenv() at script startup.

# Create secrets directory
sudo mkdir -p /etc/secrets
sudo chown root:root /etc/secrets
sudo chmod 700 /etc/secrets

# Tokens file
sudo tee /etc/secrets/crowdsec-cf-sync.env <<'EOF'
CF_API_TOKEN=...
CF_ZONE_ID=...
CS_API_KEY=...
ABUSEIPDB_KEY=...
EOF
sudo chmod 600 /etc/secrets/crowdsec-cf-sync.env

Then in the systemd service:

[Service]
EnvironmentFile=/etc/secrets/crowdsec-cf-sync.env
ExecStart=/usr/bin/python3 /usr/local/bin/crowdsec-cf-sync.py