Harmonisation des logs nginx et CrowdSec dans BetterStack via Vector

⚡ En bref
Deux problèmes coexistaient dans BetterStack : les logs mcp-oauth.access.log arrivaient en JSON brut illisible, et les logs CrowdSec produisaient des doublons visuels. Ce travail uniformise tous les logs pour qu’ils s’affichent comme des tags cliquables structurés, avec timestamps corrects et sans champs parasites.
🧠 Pourquoi
BetterStack affiche les logs sous forme de tags surlignés cliquables dans le Live Tail lorsque les champs JSON sont correctement structurés. Avant ce travail, l’observation était dégradée sur deux fronts :
- Les logs
mcp-oauth.access.logarrivaient en JSON brut illisible (format personnalisé incompatible avec le parser nginx de Vector) — les champsnginx.client,nginx.path,nginx.statusn’étaient pas extraits - Les logs CrowdSec et CF WAF arrivaient en texte plat avec des doublons (
Ban ban | ... | Ban ban)
L’objectif était d’uniformiser tous les logs dans BetterStack pour qu’ils s’affichent comme les logs nginx standard :
host:NUC8i3BEH platform:Nginx nginx.status:200 nginx.method:GET nginx.path:/ping nginx.client:91.98.38.26🔧 Ce qui a été fait
Problème 1 — mcp-oauth.access.log (format JSON non standard)
Diagnostic
Le fichier mcp-oauth.access.log utilisait un format JSON personnalisé produit par nginx, incompatible avec le parser regex de Vector. Deux problèmes secondaires identifiés :
- Le champ
dtutilisait le timestamp d’ingestion Vector plutôt que l’heure réelle de la requête nginx (~5 secondes de décalage) - Le champ
level: nullétait systématiquement présent, polluant l’histogramme de sévérité BetterStack
Correction dans Vector (/etc/vector/vector.yaml)
Ajout d’une branche de détection JSON au début du transform remap existant, avant les tentatives de regex :
if contains(string(.file) ?? "", "mcp-oauth") {
parsed, err = parse_json(.message)
if err == null && is_object(parsed) {
.nginx = {}
.nginx.client = parsed.real_ip
.nginx.method = parsed.method
.nginx.path = parsed.uri
.nginx.status = to_int(parsed.status) ?? null
.nginx.size = to_int(parsed.bytes_sent) ?? null
.nginx.agent = parsed.http_user_agent
.nginx.limit_req_status = parsed.limit_req_status
.nginx.request = join!([string(parsed.method) ?? "", " ", string(parsed.uri) ?? ""])
.platform = "Nginx"
# Utiliser l'heure réelle nginx, pas le timestamp Vector
parsed_time, err = parse_timestamp(parsed.time, "%+")
if err == null {
.dt = format_timestamp!(parsed_time, "%+")
} else {
.dt = del(.timestamp)
}
del(.timestamp)
del(.message)
del(.source_type)
return .
}
}Fix du level: null dans le transform principal :
.level = del(.nginx.severity)
if is_null(.level) { del(.level) }Résultat
host:NUC8i3BEH platform:Nginx nginx.status:200 nginx.method:POST
nginx.path:/oauth-mcp/mcp nginx.client:160.79.106.35
nginx.agent:Claude-User nginx.limit_req_status:PASSEDProblème 2 — Logs CrowdSec et CF WAF (texte plat avec doublons)
Diagnostic
Les scripts crowdsec-poller.py et crowdsec-cf-sync.py produisaient un champ message redondant avec des doublons visuels dans BetterStack :
Ban ban | 47.82.11.22 | http:scan | dur:3h59m35s | origin:CAPI
Alert banned | 77.74.177.114 | cloudflare-waf/4-hits | | cloudflare-waf/4-hits
CF WAF ban: 77.74.177.114 | 4 hits | 4hSolution — Objet imbriqué cs{}
La solution finale est un objet JSON imbriqué cs{} dans les payloads. BetterStack navigue nativement dans les objets imbriqués avec la notation pointée dans le Logs message format — exactement comme il gère l’objet nginx{}.
Structure finale — crowdsec-poller.py :
import socket
record = {
"dt": dt_str,
"host": socket.gethostname(),
"platform": "CrowdSec",
"cs": {
"event_type": "decision",
"ip": decision_ip,
"type": "ban",
"scenario": "http:scan",
"duration": "4h",
"origin": "CAPI",
"scope": "Ip",
"simulated": False
}
# pas de champ "message"
}Structure finale — crowdsec-cf-sync.py → send_to_betterstack() :
import socket
payload = {
"dt": timestamp,
"host": socket.gethostname(),
"platform": "CFWaf",
"cs": {
"ip": ip,
"hits": hit_count,
"action": cf_action,
"duration": duration,
"recidive": recidive_count,
"uris": uris,
"source": "cloudflare_waf"
}
# pas de champ "message"
}Correction dans Vector
Le transform crowdsec_decisions_flatten parse le JSON, supprime les champs parasites et conserve l’objet cs{} intact :
parsed, err = parse_json(.message)
if err != null { abort }
. = merge(., parsed) ?? .
del(.source_type)
del(.file)
del(.message)
# NE PAS supprimer .host — fourni par les scripts PythonUn filter transform crowdsec_decisions_filter bloque les logs sans IP et les alertes trop verbeuses :
exists(.cs) && (get(., ["cs", "ip"]) != Ok(null)) && (get(., ["cs", "event_type"]) != Ok("alert"))Le sink pointe sur ce filter :
crowdsec_betterstack_sink:
inputs:
- "crowdsec_decisions_filter"Configuration BetterStack — Logs message format
Dans Sources → crowdsec-decisions → Advanced settings → Logs message format :
{host} {platform} {cs.type} {cs.ip} {cs.scenario} {cs.duration} {cs.origin} {cs.action} {cs.hits} {cs.uris} {cs.source}Note : BetterStack supporte la notation pointée
{cs.ip}pour naviguer dans les objets imbriqués. Pour les champs dont le nom contient littéralement un point, utiliser la syntaxe{["champ.nom"]}.
Résultat final dans le Live Tail
Décision CrowdSec CAPI :
host:NUC8i3BEH platform:CrowdSec cs.type:ban cs.ip:47.82.11.22
cs.scenario:http:scan cs.duration:3h cs.origin:CAPIBan CF WAF :
host:NUC8i3BEH platform:CFWaf cs.ip:4.197.75.18 cs.action:block
cs.duration:24h cs.hits:50 cs.uris:["/wp-admin.php","/shell.php"]Architecture finale du pipeline Vector
/var/log/nginx/*.log
↓
better_stack_nginx_parser (remap)
├── branche mcp-oauth → objet nginx{} + timestamp réel nginx
└── branche standard → regex nginx classiques
↓
better_stack_nginx_filter (filter)
↓
BetterStack nginx-status (s2315113)
/var/log/crowdsec/decisions.log
↓
crowdsec_decisions_source (file)
↓
crowdsec_decisions_flatten (remap) — parse JSON, del(.message)
↓
crowdsec_decisions_filter (filter) — exists(.cs) && ip présente && pas alert
↓
BetterStack crowdsec-decisions (s2328224)Fichiers modifiés
| Fichier | Modifications |
|---|---|
/etc/vector/vector.yaml | Branche mcp-oauth, fix level null, transform CrowdSec, filter CrowdSec |
/usr/local/bin/crowdsec-poller.py | Objet cs{} imbriqué, ajout host, suppression message |
/usr/local/bin/crowdsec-cf-sync.py | Objet cs{} imbriqué, ajout host, suppression message, platform: CFWaf |
Commandes utiles
# Valider la config Vector sans redémarrer
sudo vector validate /etc/vector/vector.yaml
# Redémarrer Vector
sudo systemctl restart vector
# Logs Vector en temps réel (sans le bruit host_metrics)
sudo journalctl -fu vector | grep -v "host_metrics"
# Injecter un log CrowdSec de test
echo '{"dt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'","host":"NUC8i3BEH","platform":"CrowdSec","cs":{"event_type":"decision","ip":"1.2.3.4","type":"ban","scenario":"http:scan","duration":"4h","origin":"CAPI"}}' | sudo tee -a /var/log/crowdsec/decisions.log
# Injecter un log CF WAF de test
echo '{"dt":"'$(date -u +%Y-%m-%dT%H:%M:%SZ)'","host":"NUC8i3BEH","platform":"CFWaf","cs":{"ip":"1.2.3.4","hits":5,"action":"block","duration":"4h","source":"cloudflare_waf","uris":["/shell.php"]}}' | sudo tee -a /var/log/crowdsec/decisions.log🏁 Conclusion
Cette harmonisation transforme des logs illisibles en données structurées et exploitables dans BetterStack. Les tags cliquables dans le Live Tail permettent de filtrer instantanément par IP, scénario ou action — ce qui était impossible avec les blobs JSON bruts ou les messages texte dupliqués. La correction du timestamp nginx élimine les décalages de ~5 secondes qui faussaient l’ordre chronologique.
Pour aller plus loin :
- 💡 Ajouter des alertes BetterStack sur des patterns
cs.iprécurrents pour détecter les campagnes d’attaque coordonnées - 💡 Créer un dashboard BetterStack avec répartition par
cs.originpour visualiser la proportion CAPI/cscli/nginx-bouncer