Contenu

Harmonisation des logs nginx et CrowdSec dans BetterStack via Vector

⚡ En bref

Deux problèmes coexistaient dans BetterStack : les logs mcp-oauth.access.log arrivaient en JSON brut illisible, et les logs CrowdSec produisaient des doublons visuels. Ce travail uniformise tous les logs pour qu’ils s’affichent comme des tags cliquables structurés, avec timestamps corrects et sans champs parasites.

🧠 Pourquoi

BetterStack affiche les logs sous forme de tags surlignés cliquables dans le Live Tail lorsque les champs JSON sont correctement structurés. Avant ce travail, l’observation était dégradée sur deux fronts :

  • Les logs mcp-oauth.access.log arrivaient en JSON brut illisible (format personnalisé incompatible avec le parser nginx de Vector) — les champs nginx.client, nginx.path, nginx.status n’étaient pas extraits
  • Les logs CrowdSec et CF WAF arrivaient en texte plat avec des doublons (Ban ban | ... | Ban ban)

L’objectif était d’uniformiser tous les logs dans BetterStack pour qu’ils s’affichent comme les logs nginx standard :

host:NUC8i3BEH  platform:Nginx  nginx.status:200  nginx.method:GET  nginx.path:/ping  nginx.client:91.98.38.26

🔧 Ce qui a été fait

Problème 1 — mcp-oauth.access.log (format JSON non standard)

Diagnostic

Le fichier mcp-oauth.access.log utilisait un format JSON personnalisé produit par nginx, incompatible avec le parser regex de Vector. Deux problèmes secondaires identifiés :

  • Le champ dt utilisait le timestamp d’ingestion Vector plutôt que l’heure réelle de la requête nginx (~5 secondes de décalage)
  • Le champ level: null était systématiquement présent, polluant l’histogramme de sévérité BetterStack

Correction dans Vector (/etc/vector/vector.yaml)

Ajout d’une branche de détection JSON au début du transform remap existant, avant les tentatives de regex :

if contains(string(.file) ?? "", "mcp-oauth") {
  parsed, err = parse_json(.message)
  if err == null && is_object(parsed) {
    .nginx = {}
    .nginx.client           = parsed.real_ip
    .nginx.method           = parsed.method
    .nginx.path             = parsed.uri
    .nginx.status           = to_int(parsed.status) ?? null
    .nginx.size             = to_int(parsed.bytes_sent) ?? null
    .nginx.agent            = parsed.http_user_agent
    .nginx.limit_req_status = parsed.limit_req_status
    .nginx.request          = join!([string(parsed.method) ?? "", " ", string(parsed.uri) ?? ""])
    .platform               = "Nginx"

    # Utiliser l'heure réelle nginx, pas le timestamp Vector
    parsed_time, err = parse_timestamp(parsed.time, "%+")
    if err == null {
      .dt = format_timestamp!(parsed_time, "%+")
    } else {
      .dt = del(.timestamp)
    }
    del(.timestamp)
    del(.message)
    del(.source_type)
    return .
  }
}

Fix du level: null dans le transform principal :

.level = del(.nginx.severity)
if is_null(.level) { del(.level) }

Résultat

host:NUC8i3BEH  platform:Nginx  nginx.status:200  nginx.method:POST
nginx.path:/oauth-mcp/mcp  nginx.client:160.79.106.35
nginx.agent:Claude-User  nginx.limit_req_status:PASSED

Problème 2 — Logs CrowdSec et CF WAF (texte plat avec doublons)

Diagnostic

Les scripts crowdsec-poller.py et crowdsec-cf-sync.py produisaient un champ message redondant avec des doublons visuels dans BetterStack :

Ban ban | 47.82.11.22 | http:scan | dur:3h59m35s | origin:CAPI
Alert banned | 77.74.177.114 | cloudflare-waf/4-hits |  | cloudflare-waf/4-hits
CF WAF ban: 77.74.177.114 | 4 hits | 4h

Solution — Objet imbriqué cs{}

La solution finale est un objet JSON imbriqué cs{} dans les payloads. BetterStack navigue nativement dans les objets imbriqués avec la notation pointée dans le Logs message format — exactement comme il gère l’objet nginx{}.

Structure finale — crowdsec-poller.py :

import socket

record = {
    "dt": dt_str,
    "host": socket.gethostname(),
    "platform": "CrowdSec",
    "cs": {
        "event_type": "decision",
        "ip": decision_ip,
        "type": "ban",
        "scenario": "http:scan",
        "duration": "4h",
        "origin": "CAPI",
        "scope": "Ip",
        "simulated": False
    }
    # pas de champ "message"
}

Structure finale — crowdsec-cf-sync.pysend_to_betterstack() :

import socket

payload = {
    "dt": timestamp,
    "host": socket.gethostname(),
    "platform": "CFWaf",
    "cs": {
        "ip": ip,
        "hits": hit_count,
        "action": cf_action,
        "duration": duration,
        "recidive": recidive_count,
        "uris": uris,
        "source": "cloudflare_waf"
    }
    # pas de champ "message"
}

Correction dans Vector

Le transform crowdsec_decisions_flatten parse le JSON, supprime les champs parasites et conserve l’objet cs{} intact :

parsed, err = parse_json(.message)
if err != null { abort }
. = merge(., parsed) ?? .
del(.source_type)
del(.file)
del(.message)
# NE PAS supprimer .host — fourni par les scripts Python

Un filter transform crowdsec_decisions_filter bloque les logs sans IP et les alertes trop verbeuses :

exists(.cs) && (get(., ["cs", "ip"]) != Ok(null)) && (get(., ["cs", "event_type"]) != Ok("alert"))

Le sink pointe sur ce filter :

crowdsec_betterstack_sink:
  inputs:
    - "crowdsec_decisions_filter"

Configuration BetterStack — Logs message format

Dans Sources → crowdsec-decisions → Advanced settings → Logs message format :

{host} {platform} {cs.type} {cs.ip} {cs.scenario} {cs.duration} {cs.origin} {cs.action} {cs.hits} {cs.uris} {cs.source}

Note : BetterStack supporte la notation pointée {cs.ip} pour naviguer dans les objets imbriqués. Pour les champs dont le nom contient littéralement un point, utiliser la syntaxe {["champ.nom"]}.

Résultat final dans le Live Tail

Décision CrowdSec CAPI :

host:NUC8i3BEH  platform:CrowdSec  cs.type:ban  cs.ip:47.82.11.22
cs.scenario:http:scan  cs.duration:3h  cs.origin:CAPI

Ban CF WAF :

host:NUC8i3BEH  platform:CFWaf  cs.ip:4.197.75.18  cs.action:block
cs.duration:24h  cs.hits:50  cs.uris:["/wp-admin.php","/shell.php"]

Architecture finale du pipeline Vector

/var/log/nginx/*.log
        ↓
better_stack_nginx_parser (remap)
  ├── branche mcp-oauth → objet nginx{} + timestamp réel nginx
  └── branche standard  → regex nginx classiques
        ↓
better_stack_nginx_filter (filter)
        ↓
BetterStack nginx-status (s2315113)

/var/log/crowdsec/decisions.log
        ↓
crowdsec_decisions_source (file)
        ↓
crowdsec_decisions_flatten (remap) — parse JSON, del(.message)
        ↓
crowdsec_decisions_filter (filter) — exists(.cs) && ip présente && pas alert
        ↓
BetterStack crowdsec-decisions (s2328224)

Fichiers modifiés

FichierModifications
/etc/vector/vector.yamlBranche mcp-oauth, fix level null, transform CrowdSec, filter CrowdSec
/usr/local/bin/crowdsec-poller.pyObjet cs{} imbriqué, ajout host, suppression message
/usr/local/bin/crowdsec-cf-sync.pyObjet cs{} imbriqué, ajout host, suppression message, platform: CFWaf

Commandes utiles

# Valider la config Vector sans redémarrer
sudo vector validate /etc/vector/vector.yaml

# Redémarrer Vector
sudo systemctl restart vector

# Logs Vector en temps réel (sans le bruit host_metrics)
sudo journalctl -fu vector | grep -v "host_metrics"

# Injecter un log CrowdSec de test
echo '{"dt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'","host":"NUC8i3BEH","platform":"CrowdSec","cs":{"event_type":"decision","ip":"1.2.3.4","type":"ban","scenario":"http:scan","duration":"4h","origin":"CAPI"}}' | sudo tee -a /var/log/crowdsec/decisions.log

# Injecter un log CF WAF de test
echo '{"dt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'","host":"NUC8i3BEH","platform":"CFWaf","cs":{"ip":"1.2.3.4","hits":5,"action":"block","duration":"4h","source":"cloudflare_waf","uris":["/shell.php"]}}' | sudo tee -a /var/log/crowdsec/decisions.log

🏁 Conclusion

Cette harmonisation transforme des logs illisibles en données structurées et exploitables dans BetterStack. Les tags cliquables dans le Live Tail permettent de filtrer instantanément par IP, scénario ou action — ce qui était impossible avec les blobs JSON bruts ou les messages texte dupliqués. La correction du timestamp nginx élimine les décalages de ~5 secondes qui faussaient l’ordre chronologique.

Pour aller plus loin :

  • 💡 Ajouter des alertes BetterStack sur des patterns cs.ip récurrents pour détecter les campagnes d’attaque coordonnées
  • 💡 Créer un dashboard BetterStack avec répartition par cs.origin pour visualiser la proportion CAPI/cscli/nginx-bouncer