数据脱敏与加密:保护用户隐私的工程实践
大约 5 分钟
前言
数据安全等于企业生命。
在金融、支付、医疗等涉及用户隐私的行业,数据泄露可能导致:
- 用户身份被冒用
- 资金被盗用
- 企业被罚款(GDPR最高罚款全球收入4%)
本文讲解两个防护层:脱敏(数据展示安全)和加密(数据存储安全)。
一、数据脱敏
1.1 定义
脱敏:在不影响数据分析的前提下,对敏感数据进行转换,使其无法反推真实值。
1.2 脱敏规则
class DataMasking:
"""数据脱敏规则库"""
@staticmethod
def mask_phone(phone: str) -> str:
"""
脱敏手机号
13912345678 → 139****5678
"""
if len(phone) < 8:
return phone
return phone[:3] + '*' * (len(phone) - 7) + phone[-4:]
@staticmethod
def mask_email(email: str) -> str:
"""
脱敏邮箱
alice@example.com → a***@example.com
"""
parts = email.split('@')
if len(parts[0]) <= 1:
return parts[0] + '@' + parts[1]
return parts[0][0] + '*' * (len(parts[0]) - 1) + '@' + parts[1]
@staticmethod
def mask_id_card(id_card: str) -> str:
"""
脱敏身份证
110101199001011234 → 1101****9001****1234
"""
if len(id_card) < 8:
return id_card
return id_card[:4] + '*' * (len(id_card) - 8) + id_card[-4:]
@staticmethod
def mask_bank_card(card: str) -> str:
"""
脱敏银行卡
6226092036151234 → 622609****151234
"""
if len(card) < 8:
return card
return card[:6] + '*' * (len(card) - 10) + card[-4:]
@staticmethod
def mask_ip_address(ip: str) -> str:
"""
脱敏IP地址
192.168.1.100 → 192.168.1.*
"""
parts = ip.split('.')
if len(parts) == 4:
return '.'.join(parts[:3]) + '.*'
return ip
@staticmethod
def mask_amount(amount: float, precision: int = 2) -> str:
"""
脱敏金额(用于非精确显示)
12345.67 → 1234* (保留部分数字)
"""
amount_str = str(int(amount))
return amount_str[:-2] + '**' if len(amount_str) > 2 else '***'
1.3 应用场景
┌─────────────────────────┐
│ 日志系统 │
│ ├─ 不记录密码 │
│ ├─ 脱敏手机号、身份证 │
│ └─ 脱敏IP地址 │
├─────────────────────────┤
│ API响应 │
│ ├─ 列表查询:脱敏手机号│
│ ├─ 详情查询:脱敏身份证│
│ └─ 下载报表:脱敏银行卡│
├─────────────────────────┤
│ 监控告警 │
│ ├─ 不记录完整密码 │
│ ├─ 脱敏关键参数 │
│ └─ 仅记录脱敏错误信息 │
└─────────────────────────┘
二、数据加密
2.1 对称加密 vs 非对称加密
┌─────────────────────────────────┐
│ 对称加密(AES) │
├─────────────────────────────────┤
│ 用途:数据库中的敏感字段 │
│ 加密密钥 = 解密密钥 │
│ 速度快,但密钥共享困难 │
│ │
│ 场景:用户密码、手机号 │
│ ├─ 密钥存在密钥管理系统(KMS) │
│ └─ 应用无权访问原始密钥 │
└─────────────────────────────────┘
┌─────────────────────────────────┐
│ 非对称加密(RSA) │
├─────────────────────────────────┤
│ 公钥加密,私钥解密 │
│ 速度慢,但密钥分离好 │
│ │
│ 场景:API通信、文件传输 │
│ ├─ 公钥给客户端(可公开) │
│ └─ 私钥服务器保管(严格保护) │
└─────────────────────────────────┘
2.2 字段级加密实现
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import os
class FieldEncryption:
"""
字段级加密
在数据库存储前加密,取出时解密
"""
def __init__(self, kms_client):
"""
kms_client: 密钥管理系统客户端
从KMS获取加密密钥,而不是硬编码
"""
self.kms = kms_client
self.cipher_suite = None
def encrypt(self, plaintext: str, field_name: str) -> str:
"""
加密敏感字段
"""
# 1. 从KMS获取加密密钥(不缓存!每次都获取)
key = self.kms.get_encryption_key(field_name)
# 2. 加密
cipher_suite = Fernet(key)
ciphertext = cipher_suite.encrypt(plaintext.encode())
# 3. 返回Base64编码的密文
return ciphertext.decode()
def decrypt(self, ciphertext: str, field_name: str) -> str:
"""
解密敏感字段
"""
# 1. 从KMS获取解密密钥
key = self.kms.get_decryption_key(field_name)
# 2. 解密
cipher_suite = Fernet(key)
plaintext = cipher_suite.decrypt(ciphertext.encode())
return plaintext.decode()
# 在ORM中应用加密
class UserModel:
"""
用户模型
某些字段需要自动加密/解密
"""
def __init__(self, encryption: FieldEncryption):
self.encryption = encryption
def save_user(self, user_id: str, phone: str, email: str):
"""
保存用户时自动加密敏感字段
"""
encrypted_phone = self.encryption.encrypt(phone, 'user.phone')
encrypted_email = self.encryption.encrypt(email, 'user.email')
# 存储到数据库
db.execute("""
INSERT INTO users (user_id, phone, email)
VALUES (%s, %s, %s)
""", user_id, encrypted_phone, encrypted_email)
def get_user(self, user_id: str):
"""
查询用户时自动解密敏感字段
"""
row = db.fetchone("""
SELECT user_id, phone, email FROM users WHERE user_id = %s
""", user_id)
user = {
'user_id': row['user_id'],
'phone': self.encryption.decrypt(row['phone'], 'user.phone'),
'email': self.encryption.decrypt(row['email'], 'user.email')
}
return user
2.3 密钥管理(KMS)
class KMSClient:
"""
密钥管理系统客户端
所有加密密钥都应该由KMS中心管理,而不是硬编码在应用中
"""
def __init__(self, kms_endpoint: str, api_key: str):
self.endpoint = kms_endpoint
self.api_key = api_key
self.key_cache = {} # 临时缓存(带TTL)
def get_encryption_key(self, field_name: str, ttl_hours: int = 1):
"""
获取加密密钥
密钥应该定期轮换(密钥版本管理)
"""
cache_key = f"key:{field_name}"
# 检查缓存(避免每次都请求KMS,加快速度)
if cache_key in self.key_cache:
cached_key, timestamp = self.key_cache[cache_key]
if time.time() - timestamp < ttl_hours * 3600:
return cached_key
# 从KMS获取最新密钥
response = requests.post(
f"{self.endpoint}/keys/get",
headers={'Authorization': f'Bearer {self.api_key}'},
json={'field_name': field_name, 'action': 'encrypt'}
)
key = response.json()['key']
self.key_cache[cache_key] = (key, time.time())
return key
def get_decryption_key(self, field_name: str):
"""
获取解密密钥
注意:解密密钥应该包含密钥版本信息
"""
return self.get_encryption_key(field_name)
def rotate_key(self, field_name: str):
"""
密钥轮换
定期更换密钥(推荐:每90天)
老密钥仍保留用于解密,但新加密用新密钥
"""
response = requests.post(
f"{self.endpoint}/keys/rotate",
headers={'Authorization': f'Bearer {self.api_key}'},
json={'field_name': field_name}
)
# 清除缓存,强制重新获取
self.key_cache.pop(f"key:{field_name}", None)
return response.json()['success']
2.4 国密算法(中国标准)
# 在中国和涉及跨境的系统中,某些法规要求使用国密算法
# 而不是RSA/AES
from gmssl import sm4
class SM4Encryption:
"""
SM4国密加密(128位分组密码)
类似AES,但是中国密码学标准
"""
def __init__(self, key: bytes):
"""
key: 16字节的加密密钥
"""
self.sm4 = sm4.SM4()
self.sm4.setKey(key)
def encrypt(self, plaintext: str) -> str:
"""加密"""
self.sm4.setMode(sm4.SM4_ENCRYPT)
ciphertext = self.sm4.crypt_ecb(plaintext.encode())
return ciphertext.hex()
def decrypt(self, ciphertext: str) -> str:
"""解密"""
self.sm4.setMode(sm4.SM4_DECRYPT)
plaintext = self.sm4.crypt_ecb(bytes.fromhex(ciphertext))
return plaintext.decode()
三、合规性要求
3.1 常见法规
┌─────────────────────────────────┐
│ GDPR(欧盟) │
│ ├─ 用户有权删除数据(被遗忘权) │
│ ├─ 数据泄露48小时内通知用户 │
│ └─ 违反最高罚款全球收入4% │
├─────────────────────────────────┤
│ 个人信息保护法(中国) │
│ ├─ 个人信息必须加密存储 │
│ ├─ 敏感个人信息双重加密 │
│ └─ 违反可能罚款50万-500万 │
├─────────────────────────────────┤
│ 支付卡行业标准(PCI-DSS) │
│ ├─ 银行卡数据必须加密 │
│ ├─ 完整性检查(不能篡改) │
│ └─ 定期安全审计 │
└─────────────────────────────────┘
3.2 审计日志
class AuditLog:
"""
敏感数据访问审计
记录谁在什么时候访问了什么数据
"""
@staticmethod
def log_access(user_id: str, resource: str, action: str, success: bool):
"""
记录数据访问
user_id: 谁访问的
resource: 访问了什么数据
action: 什么操作(read/write/delete)
success: 是否成功
"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user_id': user_id,
'resource': resource,
'action': action,
'success': success,
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent')
}
# 写入审计日志(不能删除或修改!)
# 通常写入append-only log系统
audit_db.insert('audit_logs', log_entry)
总结
数据安全的分层防护:
┌──────────────────────────────────┐
│ 脱敏 │
│ └─ 展示层安全(用户看不到敏感字)│
├──────────────────────────────────┤
│ 加密 │
│ └─ 存储层安全(数据库中加密存储) │
├──────────────────────────────────┤
│ 密钥管理 │
│ └─ 密钥层安全(KMS中心管理) │
├──────────────────────────────────┤
│ 访问控制 │
│ └─ 权限层安全(谁能访问什么) │
├──────────────────────────────────┤
│ 审计日志 │
│ └─ 追踪层安全(记录所有访问) │
└──────────────────────────────────┘
黄金法则:
- 敏感数据必须加密存储
- 密钥由KMS中心管理
- 定期密钥轮换(90天)
- 访问审计完整记录
- 遵守法规要求