业务背景
我们运营一款 FPS 端游,外挂作者常把 DDoS 伪装成「玩家掉线」来骗客服。以前排查要捞 CDN 日志、对时间戳、人工比对,平均 2 小时才能定位。现在用一条 30 行的 Python 脚本把边缘节点日志实时打到 Kafka,再回放到 Grafana,5 分钟就能复现「谁在什么时间被哪段流量打挂」。
1. 数据流
边缘节点(Nginx) → filebeat → Kafka → Python 回放脚本 → Grafana(Loki)
- 边缘节点用 Nginx stream 模块,日志格式自定义为
$time_iso8601|$remote_addr|$bytes_sent|$bytes_received|$proxy_host
- filebeat 直接读
/var/log/nginx/stream.log
,topic 叫game_traffic
- Python 脚本用
confluent-kafka
消费,实时计算「异常窗口」并推送到 Loki; - 运维在 Grafana 里点一下「回放」就能把时间轴往回拖 30 min,像看录像一样。
2. 回放脚本(replay.py)
#!/usr/bin/env python3
# pip install confluent-kafka python-dateutil requests
import json, time, datetime, collections, requests
from confluent_kafka import ConsumerBOOTSTRAP = 'kafka.example.com:9092'
LOKI_URL = 'https://loki.example.com/loki/api/v1/push 'consumer = Consumer({'bootstrap.servers': BOOTSTRAP,'group.id': 'replay','auto.offset.reset': 'latest'
})
consumer.subscribe(['game_traffic'])window = collections.deque(maxlen=1000) # 滑动 1000 条
ALERT_THRESHOLD = 100_000 # 10 秒内上行或下行超 100 MB 就告警def push_loki(stream, labels):payload = {"streams": [{"stream": labels,"values": [[str(int(time.time()*1e9)), json.dumps(stream)]]}]}requests.post(LOKI_URL, json=payload, timeout=3)while True:msg = consumer.poll(1)if msg is None: continueif msg.error():print(msg.error())continuets, src, up, down, dest = msg.value().decode().split('|')now = datetime.datetime.fromisoformat(ts)window.append((now, int(up)+int(down)))# 滑动窗口统计cutoff = now - datetime.timedelta(seconds=10)while window and window[0][0] < cutoff:window.popleft()total = sum(b for _, b in window)if total > ALERT_THRESHOLD:push_loki({"src": src, "dest": dest, "bytes": total},{"job": "game_traffic", "alert": "ddos"})
3. 落地步骤
- 边缘节点 Nginx 加一行
log_format stream '$time_iso8601|$remote_addr|$bytes_sent|$bytes_received|$proxy_host'; access_log /var/log/nginx/stream.log stream;
- filebeat.yml 里加
filebeat.inputs: - type: logpaths: ["/var/log/nginx/stream.log"]fields_under_root: truefields:topic: game_traffic output.kafka:hosts: ["kafka.example.com:9092"]topic: '%{[topic]}'
python3 replay.py &
,丢进 supervisor 或 systemd;- Grafana 新建 Loki 数据源,查询
就能实时看到攻击曲线;{job="game_traffic"} | json | alert="ddos"
- 回放时把时间选择器拖到「异常发生前 30 s」,可逐帧看哪段流量峰值对应哪批玩家掉线。
4. 结果
- 上线两周,客服工单量下降 60%,外挂作者发现「掉线不再好用」;
- 运维同学从「捞日志 2 h」变成「点两下 Grafana 5 min」,周末终于能安心打游戏了。