跳至主要內容

数据脱敏与加密:保护用户隐私的工程实践

郑天祺大约 5 分钟安全数据脱敏与加密

前言

数据安全等于企业生命

在金融、支付、医疗等涉及用户隐私的行业,数据泄露可能导致:

  • 用户身份被冒用
  • 资金被盗用
  • 企业被罚款(GDPR最高罚款全球收入4%)

本文讲解两个防护层:脱敏(数据展示安全)和加密(数据存储安全)。


一、数据脱敏

1.1 定义

脱敏:在不影响数据分析的前提下,对敏感数据进行转换,使其无法反推真实值。

1.2 脱敏规则

class DataMasking:
    """数据脱敏规则库"""
    
    @staticmethod
    def mask_phone(phone: str) -> str:
        """
        脱敏手机号
        13912345678 → 139****5678
        """
        if len(phone) < 8:
            return phone
        return phone[:3] + '*' * (len(phone) - 7) + phone[-4:]
    
    @staticmethod
    def mask_email(email: str) -> str:
        """
        脱敏邮箱
        alice@example.com → a***@example.com
        """
        parts = email.split('@')
        if len(parts[0]) <= 1:
            return parts[0] + '@' + parts[1]
        return parts[0][0] + '*' * (len(parts[0]) - 1) + '@' + parts[1]
    
    @staticmethod
    def mask_id_card(id_card: str) -> str:
        """
        脱敏身份证
        110101199001011234 → 1101****9001****1234
        """
        if len(id_card) < 8:
            return id_card
        return id_card[:4] + '*' * (len(id_card) - 8) + id_card[-4:]
    
    @staticmethod
    def mask_bank_card(card: str) -> str:
        """
        脱敏银行卡
        6226092036151234 → 622609****151234
        """
        if len(card) < 8:
            return card
        return card[:6] + '*' * (len(card) - 10) + card[-4:]
    
    @staticmethod
    def mask_ip_address(ip: str) -> str:
        """
        脱敏IP地址
        192.168.1.100 → 192.168.1.*
        """
        parts = ip.split('.')
        if len(parts) == 4:
            return '.'.join(parts[:3]) + '.*'
        return ip
    
    @staticmethod
    def mask_amount(amount: float, precision: int = 2) -> str:
        """
        脱敏金额(用于非精确显示)
        12345.67 → 1234* (保留部分数字)
        """
        amount_str = str(int(amount))
        return amount_str[:-2] + '**' if len(amount_str) > 2 else '***'

1.3 应用场景

┌─────────────────────────┐
│ 日志系统                 │
│ ├─ 不记录密码           │
│ ├─ 脱敏手机号、身份证   │
│ └─ 脱敏IP地址           │
├─────────────────────────┤
│ API响应                 │
│ ├─ 列表查询:脱敏手机号│
│ ├─ 详情查询:脱敏身份证│
│ └─ 下载报表:脱敏银行卡│
├─────────────────────────┤
│ 监控告警                 │
│ ├─ 不记录完整密码       │
│ ├─ 脱敏关键参数         │
│ └─ 仅记录脱敏错误信息   │
└─────────────────────────┘

二、数据加密

2.1 对称加密 vs 非对称加密

┌─────────────────────────────────┐
│ 对称加密(AES)                 │
├─────────────────────────────────┤
│ 用途:数据库中的敏感字段        │
│ 加密密钥 = 解密密钥             │
│ 速度快,但密钥共享困难          │
│                                 │
│ 场景:用户密码、手机号          │
│ ├─ 密钥存在密钥管理系统(KMS)    │
│ └─ 应用无权访问原始密钥         │
└─────────────────────────────────┘

┌─────────────────────────────────┐
│ 非对称加密(RSA)               │
├─────────────────────────────────┤
│ 公钥加密,私钥解密              │
│ 速度慢,但密钥分离好            │
│                                 │
│ 场景:API通信、文件传输         │
│ ├─ 公钥给客户端(可公开)       │
│ └─ 私钥服务器保管(严格保护)   │
└─────────────────────────────────┘

2.2 字段级加密实现

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import os

class FieldEncryption:
    """
    字段级加密
    
    在数据库存储前加密,取出时解密
    """
    
    def __init__(self, kms_client):
        """
        kms_client: 密钥管理系统客户端
        从KMS获取加密密钥,而不是硬编码
        """
        self.kms = kms_client
        self.cipher_suite = None
    
    def encrypt(self, plaintext: str, field_name: str) -> str:
        """
        加密敏感字段
        """
        # 1. 从KMS获取加密密钥(不缓存!每次都获取)
        key = self.kms.get_encryption_key(field_name)
        
        # 2. 加密
        cipher_suite = Fernet(key)
        ciphertext = cipher_suite.encrypt(plaintext.encode())
        
        # 3. 返回Base64编码的密文
        return ciphertext.decode()
    
    def decrypt(self, ciphertext: str, field_name: str) -> str:
        """
        解密敏感字段
        """
        # 1. 从KMS获取解密密钥
        key = self.kms.get_decryption_key(field_name)
        
        # 2. 解密
        cipher_suite = Fernet(key)
        plaintext = cipher_suite.decrypt(ciphertext.encode())
        
        return plaintext.decode()

# 在ORM中应用加密
class UserModel:
    """
    用户模型
    
    某些字段需要自动加密/解密
    """
    
    def __init__(self, encryption: FieldEncryption):
        self.encryption = encryption
    
    def save_user(self, user_id: str, phone: str, email: str):
        """
        保存用户时自动加密敏感字段
        """
        encrypted_phone = self.encryption.encrypt(phone, 'user.phone')
        encrypted_email = self.encryption.encrypt(email, 'user.email')
        
        # 存储到数据库
        db.execute("""
            INSERT INTO users (user_id, phone, email)
            VALUES (%s, %s, %s)
        """, user_id, encrypted_phone, encrypted_email)
    
    def get_user(self, user_id: str):
        """
        查询用户时自动解密敏感字段
        """
        row = db.fetchone("""
            SELECT user_id, phone, email FROM users WHERE user_id = %s
        """, user_id)
        
        user = {
            'user_id': row['user_id'],
            'phone': self.encryption.decrypt(row['phone'], 'user.phone'),
            'email': self.encryption.decrypt(row['email'], 'user.email')
        }
        
        return user

2.3 密钥管理(KMS)

class KMSClient:
    """
    密钥管理系统客户端
    
    所有加密密钥都应该由KMS中心管理,而不是硬编码在应用中
    """
    
    def __init__(self, kms_endpoint: str, api_key: str):
        self.endpoint = kms_endpoint
        self.api_key = api_key
        self.key_cache = {}  # 临时缓存(带TTL)
    
    def get_encryption_key(self, field_name: str, ttl_hours: int = 1):
        """
        获取加密密钥
        
        密钥应该定期轮换(密钥版本管理)
        """
        cache_key = f"key:{field_name}"
        
        # 检查缓存(避免每次都请求KMS,加快速度)
        if cache_key in self.key_cache:
            cached_key, timestamp = self.key_cache[cache_key]
            if time.time() - timestamp < ttl_hours * 3600:
                return cached_key
        
        # 从KMS获取最新密钥
        response = requests.post(
            f"{self.endpoint}/keys/get",
            headers={'Authorization': f'Bearer {self.api_key}'},
            json={'field_name': field_name, 'action': 'encrypt'}
        )
        
        key = response.json()['key']
        self.key_cache[cache_key] = (key, time.time())
        
        return key
    
    def get_decryption_key(self, field_name: str):
        """
        获取解密密钥
        
        注意:解密密钥应该包含密钥版本信息
        """
        return self.get_encryption_key(field_name)
    
    def rotate_key(self, field_name: str):
        """
        密钥轮换
        
        定期更换密钥(推荐:每90天)
        老密钥仍保留用于解密,但新加密用新密钥
        """
        response = requests.post(
            f"{self.endpoint}/keys/rotate",
            headers={'Authorization': f'Bearer {self.api_key}'},
            json={'field_name': field_name}
        )
        
        # 清除缓存,强制重新获取
        self.key_cache.pop(f"key:{field_name}", None)
        
        return response.json()['success']

2.4 国密算法(中国标准)

# 在中国和涉及跨境的系统中,某些法规要求使用国密算法
# 而不是RSA/AES

from gmssl import sm4

class SM4Encryption:
    """
    SM4国密加密(128位分组密码)
    
    类似AES,但是中国密码学标准
    """
    
    def __init__(self, key: bytes):
        """
        key: 16字节的加密密钥
        """
        self.sm4 = sm4.SM4()
        self.sm4.setKey(key)
    
    def encrypt(self, plaintext: str) -> str:
        """加密"""
        self.sm4.setMode(sm4.SM4_ENCRYPT)
        ciphertext = self.sm4.crypt_ecb(plaintext.encode())
        return ciphertext.hex()
    
    def decrypt(self, ciphertext: str) -> str:
        """解密"""
        self.sm4.setMode(sm4.SM4_DECRYPT)
        plaintext = self.sm4.crypt_ecb(bytes.fromhex(ciphertext))
        return plaintext.decode()

三、合规性要求

3.1 常见法规

┌─────────────────────────────────┐
│ GDPR(欧盟)                    │
│ ├─ 用户有权删除数据(被遗忘权) │
│ ├─ 数据泄露48小时内通知用户   │
│ └─ 违反最高罚款全球收入4%     │
├─────────────────────────────────┤
│ 个人信息保护法(中国)         │
│ ├─ 个人信息必须加密存储        │
│ ├─ 敏感个人信息双重加密        │
│ └─ 违反可能罚款50万-500万     │
├─────────────────────────────────┤
│ 支付卡行业标准(PCI-DSS)      │
│ ├─ 银行卡数据必须加密          │
│ ├─ 完整性检查(不能篡改)      │
│ └─ 定期安全审计                │
└─────────────────────────────────┘

3.2 审计日志

class AuditLog:
    """
    敏感数据访问审计
    
    记录谁在什么时候访问了什么数据
    """
    
    @staticmethod
    def log_access(user_id: str, resource: str, action: str, success: bool):
        """
        记录数据访问
        
        user_id: 谁访问的
        resource: 访问了什么数据
        action: 什么操作(read/write/delete)
        success: 是否成功
        """
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'user_id': user_id,
            'resource': resource,
            'action': action,
            'success': success,
            'ip_address': request.remote_addr,
            'user_agent': request.headers.get('User-Agent')
        }
        
        # 写入审计日志(不能删除或修改!)
        # 通常写入append-only log系统
        audit_db.insert('audit_logs', log_entry)

总结

数据安全的分层防护:

┌──────────────────────────────────┐
│ 脱敏                             │
│ └─ 展示层安全(用户看不到敏感字)│
├──────────────────────────────────┤
│ 加密                             │
│ └─ 存储层安全(数据库中加密存储) │
├──────────────────────────────────┤
│ 密钥管理                         │
│ └─ 密钥层安全(KMS中心管理)    │
├──────────────────────────────────┤
│ 访问控制                         │
│ └─ 权限层安全(谁能访问什么)    │
├──────────────────────────────────┤
│ 审计日志                         │
│ └─ 追踪层安全(记录所有访问)    │
└──────────────────────────────────┘

黄金法则

  1. 敏感数据必须加密存储
  2. 密钥由KMS中心管理
  3. 定期密钥轮换(90天)
  4. 访问审计完整记录
  5. 遵守法规要求
上次编辑于:
贡献者: zhengtianqi