🚦 差分隐私在运营指标:ABP 的 DP 计数器与噪声预算
📚 目录
- 🚦 差分隐私在运营指标:ABP 的 DP 计数器与噪声预算
- 0. TL;DR 🚀
- 📈 一图看懂(写入→发布→预算→加噪)
- 1. 背景与边界 🧭
- 2. DP 工程口径 📐
- 3. 目标架构(ABP 集成)🏗️
- 4. 数据/配置模型 🗂️
- 5. 关键实现 🧩
- 5.1 接口统一(会计以窗口起点为主键)
- 5.2 随机源与噪声(开区间采样 + 可缓存正态)
- 5.3 窗口对齐(上一窗,UTC)与预算键
- 5.4 Redis 会计(Lua 原子消费 + TTL 对齐“窗口剩余秒数” + 配置化上限)
- 5.5 发布 Worker(上一窗发布 + 幂等锁 + 小样本抑制 + 预算校验 + 加噪)
- 5.6 比例/均值指标(分子/分母分别加噪 + 小样本抑制)
- 6. 灰度与回滚 🎛️
- 7. 可视化与审计 📊
- ⏱️ 窗口与 TTL 对齐示意
- 8. 评测脚本 🧪
- 9. 上线步骤 ✅
- 10. 仓库结构建议 🧱
- 11. 常见坑 🧨
0. TL;DR 🚀
- 👤 隐私单元(Privacy Unit)选择 用户/设备级,写路径先按隐私单元聚合并剪裁贡献上界 K。
- ➕ 计数/求和 → Laplace(尺度
b = Δ/ε
);📈 比例/均值 → Gaussian(生产建议解析高斯校准 σ;示例以经典上界为保守缺省)。 - 🧩 后处理不降隐私:结果可做非负截断、舍入、分桶、平滑等业务约束。
- 📒 预算账本:按“租户×指标×窗口开始时刻(UTC)”累计 ε/δ;Key 与窗口严格对齐;Lua 原子消费;TTL=窗口剩余秒数;超额硬阻断。
- 🔒 发布幂等:周期任务在每个窗口结束时发布上一窗(UTC);用 Redis SET NX 建立
released
锁,保证只发布一次(并可 Upsert 落库)。 - 🧪 灰度与 A/B:按租户/功能开关;以 MAPE / P95 误差阈值放量;随时回滚。
📈 一图看懂(写入→发布→预算→加噪)
1. 背景与边界 🧭
- 🔐 与加密/RLS:加密/RLS 决定“谁能看真值”;差分隐私(DP)约束“公开后能泄露多少”。
- ✅ 适用:DAU/留存、功能调用次数、错误率、漏斗流量等群体指标;⛔ 不用于强一致计费或逐个体对账。
- ⚠️ 风险与对策:合谋/跨窗差分/滑动窗反推 → 隐私预算、最小样本门槛、发布限频、审计追溯。
2. DP 工程口径 📐
-
📏 敏感度 Δ:单个隐私单元在单窗口的最大贡献(经 K 剪裁后)。
-
🔊 Laplace:
b = Δ/ε
;❄️ Gaussian:σ = f(ε, δ, Δ)
(生产建议“解析高斯”或接入 OpenDP 校准;示例用经典上界)。 -
🧮 组合/会计:
- 拉普拉斯:ε 线性加和(基础组合)。
- 高斯:用 RDP 按 α 网格累加,再对给定 δ 反推
ε*(δ) = min_α {RDP(α) + ln(1/δ)/(α-1)}
,展示“已用/剩余”。
-
🧯 后处理不伤隐私:非负截断、舍入、分桶、平滑等结果级操作不降低 DP 保证。
3. 目标架构(ABP 集成)🏗️
Abp.DpMetrics 模块(应用层)
IDpCounter.Increment(tenant, metric, unitId, amount=1)
:写路径,按隐私单元聚合→剪裁 K→入库。DpPublishWorker
:周期发布;按窗口结束发布上一窗(UTC);发布幂等锁→预算校验→加噪→后处理→审计落表。IPrivacyAccountant
(Redis/DB 实现):存(tenant, metric, windowStartUtc)
的 ε/δ 用量与上限;提供Lua 原子 TryConsume。IBudgetPolicyProvider
:预算上限(ε/δ cap)配置化,支持按租户/指标/窗口粒度下发。IReleaseLog
:存噪声参数、ε/δ、机制、真值哈希、seed 哈希、审批单。
组件关系图
4. 数据/配置模型 🗂️
5. 关键实现 🧩
代码为最小可运行模板,重点展示口径与关键边界处理;生产可替换为解析高斯校准、完善 RDP 会计、指标门槛配置中心化等。
5.1 接口统一(会计以窗口起点为主键)
public interface IDpCounter {Task IncrementAsync(string tenantId, string metric, string privacyUnitId, int amount = 1);
}public interface IPrivacyAccountant {Task<bool> TryConsumeAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window,double epsilon, double? delta);Task<(double usedEps, double? usedDelta, double capEps, double? capDelta)>GetUsageAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window);
}public interface IBudgetPolicyProvider {(double capEps, double? capDel) GetCaps(string tenantId, string metric, TimeSpan window);
}public enum DpMechanism { Laplace, Gaussian }
5.2 随机源与噪声(开区间采样 + 可缓存正态)
public interface IRandomSource { double NextUnit(); }public sealed class CryptoRandom : IRandomSource {private readonly System.Security.Cryptography.RandomNumberGenerator _rng= System.Security.Cryptography.RandomNumberGenerator.Create();public double NextUnit(){Span<byte> b = stackalloc byte[8]; _rng.GetBytes(b);ulong u = BitConverter.ToUInt64(b);return (u >> 11) * (1.0 / (1UL << 53)); // [0,1)}
}public static class RandUtil {public static double Open01(IRandomSource r) {double u; do { u = r.NextUnit(); } while (u <= double.Epsilon || u >= 1.0 - double.Epsilon);return u; // (0,1)}
}public sealed class GaussianSampler {private readonly IRandomSource _r; private bool _has; private double _cache;public GaussianSampler(IRandomSource r){ _r = r; }public double Next(){if (_has){ _has=false; return _cache; }var u1 = RandUtil.Open01(_r); var u2 = RandUtil.Open01(_r);var mag = Math.Sqrt(-2.0 * Math.Log(u1));var z0 = mag * Math.Cos(2.0 * Math.PI * u2);var z1 = mag * Math.Sin(2.0 * Math.PI * u2);_cache = z1; _has = true; return z0; // 产出两个,缓存一个}
}public static class DpNoise {public static double Laplace(double x, double b, IRandomSource r){double u = RandUtil.Open01(r) - 0.5; // (-0.5,0.5)double noise = -b * Math.Sign(u) * Math.Log(1 - 2 * Math.Abs(u));return Math.Max(0, x + noise); // 后处理:非负}public static double ClassicGaussianSigma(double deltaF, double eps, double delta){return Math.Sqrt(2 * Math.Log(1.25 / delta)) * deltaF / eps; // 经典上界}
}
5.3 窗口对齐(上一窗,UTC)与预算键
static (DateTimeOffset Start, DateTimeOffset End) PrevWindowUtc(IClock clock, TimeSpan w){var now = clock.Now.ToUniversalTime();long sec = (long)w.TotalSeconds;long end = (now.ToUnixTimeSeconds() / sec) * sec; // 当前窗结束(UTC)long start = end - sec; // 上一窗 [start,end)return (DateTimeOffset.FromUnixTimeSeconds(start), DateTimeOffset.FromUnixTimeSeconds(end));
}
static string BudgetKey(string tenant, string metric, TimeSpan w, DateTimeOffset windowStartUtc){return $"dp:budget:{tenant}:{metric}:{(int)w.TotalSeconds}:{windowStartUtc.ToUnixTimeSeconds()}";
}
5.4 Redis 会计(Lua 原子消费 + TTL 对齐“窗口剩余秒数” + 配置化上限)
public sealed class RedisPrivacyAccountant : IPrivacyAccountant {private readonly IConnectionMultiplexer _redis; private readonly IBudgetPolicyProvider _caps;public RedisPrivacyAccountant(IConnectionMultiplexer redis, IBudgetPolicyProvider caps){ _redis = redis; _caps = caps; }private const string LUA = @"local key = KEYS[1]local epsUse = tonumber(ARGV[1])local delUse = (ARGV[2] ~= '' and tonumber(ARGV[2]) or nil)local capEps = tonumber(ARGV[3])local capDel = (ARGV[4] ~= '' and tonumber(ARGV[4]) or nil)local ttlSec = tonumber(ARGV[5])local used = redis.call('GET', key)local usedEps, usedDel = 0, 0if used thenlocal i = string.find(used, '|')usedEps = tonumber(string.sub(used, 1, i-1))usedDel = tonumber(string.sub(used, i+1))endif usedEps + epsUse > capEps then return 0 endif capDel and delUse and (usedDel + delUse > capDel) then return 0 endusedEps = usedEps + epsUseusedDel = usedDel + (delUse or 0)redis.call('SET', key, usedEps .. '|' .. usedDel)if ttlSec > 0 then redis.call('EXPIRE', key, ttlSec) endreturn 1";public async Task<bool> TryConsumeAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window, double epsilon, double? delta){var db = _redis.GetDatabase();var key = BudgetKey(tenantId, metric, window, windowStartUtc);var (capEps, capDel) = _caps.GetCaps(tenantId, metric, window);var nowUtc = DateTimeOffset.UtcNow;var we = windowStartUtc + window;int ttlSec = (int)Math.Max(1, (we - nowUtc).TotalSeconds); // TTL 对齐窗口结束var result = (long)await db.ScriptEvaluateAsync(LUA,new RedisKey[]{ key },new RedisValue[]{epsilon,delta.HasValue ? (RedisValue)delta.Value : RedisValue.EmptyString,capEps,capDel.HasValue ? (RedisValue)capDel.Value : RedisValue.EmptyString,ttlSec});return result == 1L;}public async Task<(double, double?, double, double?)> GetUsageAsync(string tenantId, string metric, DateTimeOffset windowStartUtc, TimeSpan window){var db = _redis.GetDatabase();var key = BudgetKey(tenantId, metric, window, windowStartUtc);var s = await db.StringGetAsync(key);double usedEps = 0, usedDelRaw = 0;if (!s.IsNullOrEmpty){var parts = ((string)s!).Split('|');usedEps = double.Parse(parts[0], System.Globalization.CultureInfo.InvariantCulture);usedDelRaw= double.Parse(parts[1], System.Globalization.CultureInfo.InvariantCulture);}var (capEps, capDel) = _caps.GetCaps(tenantId, metric, window);double? usedDel = capDel.HasValue ? usedDelRaw : (double?)null;return (usedEps, usedDel, capEps, capDel);}var caps = _caps.GetCaps(tenantId, metric, window);return (usedEps, double.IsNaN(usedDel)? null : usedDel, caps.capEps, caps.capDel);}
}
5.5 发布 Worker(上一窗发布 + 幂等锁 + 小样本抑制 + 预算校验 + 加噪)
public sealed class DpPublishWorker : AsyncPeriodicBackgroundWorkerBase, ITransientDependency {private readonly IClock _clock; private readonly IDpRepository _repo;private readonly IPrivacyAccountant _acct; private readonly IRandomSource _rng = new CryptoRandom();private readonly GaussianSampler _gau; private readonly IConnectionMultiplexer _redis;public DpPublishWorker(AbpTimer timer, IClock clock, IDpRepository repo, IPrivacyAccountant acct, IConnectionMultiplexer redis): base(timer){ _clock = clock; _repo = repo; _acct = acct; _gau = new GaussianSampler(_rng); _redis = redis; Timer.Period = 60_000; }protected override async Task DoWorkAsync(PeriodicBackgroundWorkerContext context){var ct = context.CancellationToken;foreach (var def in await _repo.ListMetricDefinitionsAsync()){ct.ThrowIfCancellationRequested();var (ws, we) = PrevWindowUtc(_clock, def.Window); // 发布上一窗(UTC)// 幂等发布锁:只允许一个实例发布该窗var db = _redis.GetDatabase();int ttlSec = (int)Math.Max(1, (we - _clock.Now.ToUniversalTime()).TotalSeconds);var relKey = $"dp:released:{def.TenantId}:{def.Name}:{(int)def.Window.TotalSeconds}:{ws.ToUnixTimeSeconds()}";bool acquired = await db.StringSetAsync(relKey, "1", TimeSpan.FromSeconds(ttlSec), when: When.NotExists);if (!acquired) continue;var (trueCount, distinctUnits) = await _repo.AggregateAsync(def, ws, we);if (distinctUnits < def.MinDistinctUnits){ await _repo.LogSuppressedAsync(def, ws, we); continue; }var ok = await _acct.TryConsumeAsync(def.TenantId, def.Name, ws, def.Window, def.Epsilon, def.Delta);if (!ok){ await _repo.LogBudgetExceededAsync(def, ws, we); continue; }double noisy;if (def.Mechanism == DpMechanism.Laplace){var b = def.SensitivityDelta / def.Epsilon;noisy = DpNoise.Laplace(trueCount, b, _rng);} else {if (!def.Delta.HasValue) { await _repo.LogErrorAsync(def, ws, we, "Gaussian requires delta"); continue; }var sigma = DpNoise.ClassicGaussianSigma(def.SensitivityDelta, def.Epsilon, def.Delta.Value);noisy = Math.Max(0, trueCount + _gau.Next() * sigma);}await _repo.PublishAsync(def, ws, we, noisy, Hash64Stable(trueCount), def.Epsilon, def.Delta, def.Mechanism.ToString());}}
5.6 比例/均值指标(分子/分母分别加噪 + 小样本抑制)
// 伪代码:同窗同预算,或将 ε 在分子/分母间按重要性拆分
public async Task<double?> ReleaseRatioAsync(MetricDefinition numDef, MetricDefinition denDef, DateTimeOffset ws, DateTimeOffset we){var (num, nu) = await _repo.AggregateAsync(numDef, ws, we);var (den, du) = await _repo.AggregateAsync(denDef, ws, we);var minU = Math.Min(nu, du);if (minU < Math.Min(numDef.MinDistinctUnits, denDef.MinDistinctUnits)) return null; // 小样本抑制// 分子与分母分别预算校验与加噪(略,等同计数)var noisyNum = /* Laplace/Gaussian */ 0.0;var noisyDen = /* Laplace/Gaussian */ 1.0;if (noisyDen <= 0) return null; // 防爆炸var ratio = noisyNum / noisyDen;return Math.Clamp(ratio, 0, 1);
}
6. 灰度与回滚 🎛️
- 🔀 特性开关:按租户启用“DP化指标”。数值型特性可存“灰度比例”,由调用方按此值自定义分流;或在网关层做百分比分流。
- 🧪 A/B 验证:监控 MAPE / P95;达标→放量;不达标→回滚到“内部仅真值”。
7. 可视化与审计 📊
- 📉 面板:ε/δ 消耗曲线、剩余额度、发布热力图、小样本抑制/超额次数、失败原因分布(超 ε/超 δ)。
- 🧾 审计:租户、指标、窗口(UTC)、真值哈希、噪声参数、ε/δ、机制、seedHash、审批单号。
- 🆘 运维:超额硬阻断、速率限制、异常告警(深夜暴增、跨租户)。
⏱️ 窗口与 TTL 对齐示意
8. 评测脚本 🧪
# eval_dp_counter.py(Laplace 误差曲线)
import numpy as npdef laplace_noise(b, size=1, rng=None):rng = rng or np.random.default_rng(2025)u = rng.random(size=size)u = np.clip(u, np.finfo(float).eps, 1-np.finfo(float).eps) - 0.5 # (-0.5,0.5)return -b * np.sign(u) * np.log(1 - 2 * np.abs(u))def eval_mape_p95(true_counts, epsilon, delta_f=1.0, trials=2000):b = delta_f / epsilonrows = []for n in true_counts:preds = np.clip(n + laplace_noise(b, trials), 0, None)mape = np.mean(np.abs(preds - n) / max(1, n))p95 = np.percentile(np.abs(preds - n), 95)rows.append((n, mape, p95))return rowsif __name__ == '__main__':Ns = [50, 100, 300, 1000, 3000, 10000]for eps in [0.3, 0.5, 1.0, 2.0]:print(eps, eval_mape_p95(Ns, eps))
经验近似:Laplace 的
E(|噪声|)≈b
,P95(|噪声|)≈2.996·b
,可用于看板阈值与业务影响初判。
9. 上线步骤 ✅
- 盘点指标 → 划分“公开/内审”;
- 选隐私单元与剪裁 K;
- 为公开集合设预算(每窗 ε/δ 上限、最小样本、发布频率),由 IBudgetPolicyProvider 配置化下发;
- 接入
Abp.DpMetrics
,启用 Redis & Background Worker; - 启用发布幂等锁 与 Lua 原子预算消费;
- 灰度:设置租户特性与分流比例;
- A/B:看板跟踪 MAPE/P95 与预算曲线,达标放量;
- 演练:预算见底/拒绝服务/回滚 SOP;
- 合规:导出审计日志与预算账本快照(UTC)。
10. 仓库结构建议 🧱
/src/Abp.DpMetrics/ // 模块源码(接口/机制/会计/Worker)
/src/Abp.DpMetrics.Tests/ // xUnit:Laplace 误差、预算原子消费并发、窗口边界、幂等锁
/tools/eval/ // 评测脚本
/tools/dashboard/ // ε 账本 & 误差曲线仪表盘模板
/docker/docker-compose.yml // Redis(可选)
docker-compose(可选)
version: "3.9"
services:redis:image: redis:7-alpineports: ["6379:6379"]command: ["redis-server","--appendonly","yes"]
appsettings.json(片段)
{"Redis": { "Configuration": "localhost:6379" },"DistributedCache": { "KeyPrefix": "demo:dp:" }
}
11. 常见坑 🧨
- “解析高斯”误称:若未接入解析校准,应标注为“经典高斯上界”。
- 预算键未对齐窗:用 TTL 滑动计窗会与日报/小时看板错位;务必以
windowStartUtc
为主键,TTL=窗口剩余秒数。 - 上一窗发布 & 幂等:避免按
floor(now,w)
导致跨点漏发/重复发;用released
锁或 DB Upsert。 - 预算原子性:并发发布须用 Lua 原子或分布式锁;推荐 Lua。
- 随机源与开区间:避免
log(0)
边界;Box–Muller 产出双样本需缓存。 - 比例/均值抑制:小样本或分母过小应抑制结果;必要时加无偏修正。
- 审计哈希稳定:固定序列化精度/编码,跨平台一致。
- UTC 一致性:窗口、键、审计时间统一用 UTC,避免时区与夏令时影响。