全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python安全编码最佳实践

遵循安全编码规范是预防漏洞的第一道防线。结合OWASP Top 10建立安全意识。

OWASP Top 10 对应防护

OWASP漏洞Python防护要点
A01:访问控制失效权限验证、最小权限原则
A02:加密失败安全哈希、TLS配置
A03:注入参数化查询、输入验证
A04:不安全设计安全架构、威胁建模
A05:安全配置错误安全默认、禁用调试
A06:脆弱组件依赖审计、版本更新
A07:身份认证失败强密码、会话安全
A08:数据完整性失败数据验证、签名校验
A09:日志监控失败安全日志、监控告警
A10:服务端请求伪造URL验证、白名单

输入验证

Python
import re
from typing import Any

class InputSanitizer:
    "输入清理器"

    @staticmethod
    def sanitize_string(value: str, max_length: int = 1000) -> str:
        "清理字符串输入"
        if not isinstance(value, str):
            raise TypeError("期望字符串")

        # 截断长度
        if len(value) > max_length:
            value = value[:max_length]

        # 移除危险字符(根据上下文调整)
        return value.strip()

    @staticmethod
    def validate_int(value: Any, min_val: int, max_val: int) -> int:
        "验证整数范围"
        try:
            num = int(value)
        except (TypeError, ValueError):
            raise ValueError("无效整数")

        if num < min_val or num > max_val:
            raise ValueError(f"范围: {min_val}-{max_val}")

        return num

    @staticmethod
    def validate_email(email: str) -> str:
        "验证邮箱"
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        if not re.match(pattern, email):
            raise ValueError("无效邮箱格式")
        return email.lower()

    @staticmethod
    def validate_url(url: str, allowed_schemes: list = None) -> str:
        "验证URL"
        from urllib.parse import urlparse

        allowed_schemes = allowed_schemes or ['http', 'https']
        parsed = urlparse(url)

        if parsed.scheme not in allowed_schemes:
            raise ValueError(f"不允许的协议: {parsed.scheme}")

        # 防止SSRF
        if not parsed.hostname:
            raise ValueError("无效主机名")

        return url

sanitizer = InputSanitizer()
safe_email = sanitizer.validate_email(user_input)

安全配置管理

Python
import os

class SecurityConfig:
    "安全配置"

    # 安全默认值
    DEBUG = False
    SECRET_KEY = None  # 必须从环境变量设置
    ALLOWED_HOSTS = []

    SESSION_COOKIE_SECURE = True
    SESSION_COOKIE_HTTPONLY = True
    CSRF_COOKIE_SECURE = True

    def __init__(self):
        self._validate()

    def _validate(self):
        "验证安全配置"
        if os.environ.get('DEBUG', 'false').lower() == 'true':
            self.DEBUG = True
            # 开发模式警告
            import warnings
            warnings.warn("DEBUG模式启用,仅用于开发")

        secret = os.environ.get('SECRET_KEY')
        if not secret:
            raise RuntimeError("SECRET_KEY未设置")
        if len(secret) < 32:
            raise RuntimeError("SECRET_KEY太短(至少32字符)")

        self.SECRET_KEY = secret

        hosts = os.environ.get('ALLOWED_HOSTS', '')
        self.ALLOWED_HOSTS = hosts.split(',') if hosts else []

config = SecurityConfig()

安全会话管理

Python
import secrets
import time
from typing import Optional

class SecureSession:
    "安全会话管理"

    def __init__(self):
        self.sessions = {}
        self.session_timeout = 3600  # 1小时

    def create_session(self, user_id: str) -> str:
        "创建会话"
        session_id = secrets.token_urlsafe(32)
        self.sessions[session_id] = {
            'user_id': user_id,
            'created_at': time.time(),
            'last_activity': time.time()
        }
        return session_id

    def validate_session(self, session_id: str) -> Optional[str]:
        "验证会话"
        session = self.sessions.get(session_id)

        if not session:
            return None

        # 检查过期
        if time.time() - session['last_activity'] > self.session_timeout:
            self.sessions.pop(session_id, None)
            return None

        # 更新活动时间
        session['last_activity'] = time.time()
        return session['user_id']

    def destroy_session(self, session_id: str):
        "销毁会话"
        self.sessions.pop(session_id, None)

session_manager = SecureSession()
sid = session_manager.create_session('user_123')

密码安全

Python
import secrets
import hashlib

class PasswordPolicy:
    "密码策略"

    MIN_LENGTH = 8
    REQUIRE_UPPER = True
    REQUIRE_LOWER = True
    REQUIRE_DIGIT = True
    REQUIRE_SPECIAL = True

    @staticmethod
    def validate(password: str) -> bool:
        "验证密码强度"
        if len(password) < PasswordPolicy.MIN_LENGTH:
            return False

        checks = [
            any(c.isupper() for c in password),
            any(c.islower() for c in password),
            any(c.isdigit() for c in password),
            any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?'
                for c in password)
        ]

        return all(checks)

    @staticmethod
    def generate(length: int = 16) -> str:
        "生成安全密码"
        import string
        alphabet = string.ascii_letters + string.digits + '!@#$%^&*'
        return ''.join(secrets.choice(alphabet) for _ in range(length))

# 使用(实际哈希见密码哈希文章)
if PasswordPolicy.validate(user_password):
    # 安全存储
    pass
else:
    raise ValueError("密码不符合安全要求")

安全日志

Python
import logging
import re

class SecurityLogger:
    "安全日志器"

    SENSITIVE_FIELDS = ['password', 'token', 'secret', 'key']

    def __init__(self, name: str):
        self.logger = logging.getLogger(name)
        self._setup()

    def _setup(self):
        "配置日志"
        self.logger.setLevel(logging.INFO)

        # 不记录到可能被访问的位置
        handler = logging.FileHandler('/var/log/app/security.log')
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(handler)

    def _sanitize(self, message: str) -> str:
        "清理敏感信息"
        for field in self.SENSITIVE_FIELDS:
            pattern = rf'{field}\s*[=:]\s*\S+'
            message = re.sub(
                pattern,
                f'{field}=[REDACTED]',
                message,
                flags=re.IGNORECASE
            )
        return message

    def log_auth_success(self, user: str):
        "记录认证成功"
        self.logger.info(f"认证成功: user={user}")

    def log_auth_failure(self, user: str, reason: str):
        "记录认证失败"
        self.logger.warning(f"认证失败: user={user}, reason={reason}")

    def log_access_denied(self, user: str, resource: str):
        "记录访问拒绝"
        self.logger.warning(f"访问拒绝: user={user}, resource={resource}")

security_log = SecurityLogger('app')
security_log.log_auth_success('alice')

权限控制

Python
from functools import wraps

class AccessControl:
    "访问控制"

    ROLES = {
        'admin': ['read', 'write', 'delete', 'manage'],
        'editor': ['read', 'write'],
        'viewer': ['read']
    }

    def __init__(self):
        self.user_roles = {}

    def assign_role(self, user: str, role: str):
        "分配角色"
        if role not in self.ROLES:
            raise ValueError(f"无效角色: {role}")
        self.user_roles[user] = role

    def check_permission(self, user: str, action: str) -> bool:
        "检查权限"
        role = self.user_roles.get(user)
        if not role:
            return False

        permissions = self.ROLES.get(role, [])
        return action in permissions

    def require_permission(self, action: str):
        "权限装饰器"
        def decorator(func):
            @wraps(func)
            def wrapper(user, *args, **kwargs):
                if not self.check_permission(user, action):
                    raise PermissionError(
                        f"用户 {user}{action} 权限"
                    )
                return func(user, *args, **kwargs)
            return wrapper
        return decorator

ac = AccessControl()
ac.assign_role('alice', 'editor')

@ac.require_permission('write')
def edit_document(user, doc_id):
    return f"编辑文档 {doc_id}"

edit_document('alice', 'doc_123')  # 成功
# edit_document('bob', 'doc_123')  # PermissionError

依赖安全审计

Python
import subprocess
import json

def audit_dependencies():
    "审计依赖安全"

    # 使用 pip-audit 或 safety
    try:
        result = subprocess.run(
            ['pip-audit', '--format', 'json'],
            capture_output=True,
            text=True,
            shell=False
        )

        vulnerabilities = json.loads(result.stdout)

        for vuln in vulnerabilities:
            print(f"漏洞: {vuln['name']} {vuln['version']}")
            print(f"  ID: {vuln['id']}")
            print(f"  严重性: {vuln['severity']}")

        return len(vulnerabilities) == 0

    except FileNotFoundError:
        print("请安装 pip-audit: pip install pip-audit")
        return False

# 或使用 requirements.txt 检查
def check_requirements():
    "检查requirements安全性"
    import pkg_resources

    # 已知的恶意包名检测
    malicious_packages = ['pyyaml-malicious', 'requests-malicious']

    installed = [pkg.key for pkg in pkg_resources.working_set]

    for pkg in installed:
        if pkg in malicious_packages:
            raise RuntimeError(f"检测到恶意包: {pkg}")

错误处理安全

Python
class SecureErrorHandler:
    "安全错误处理"

    def handle_error(self, error: Exception, user_context: bool = True):
        "安全处理错误"

        # 不向用户暴露内部错误细节
        if user_context:
            return self._user_message(error)
        else:
            return self._log_error(error)

    def _user_message(self, error: Exception) -> str:
        "用户友好消息"
        # 通用错误消息,不暴露细节
        generic_messages = {
            'ValueError': '输入无效,请检查后重试',
            'PermissionError': '权限不足',
            'KeyError': '请求的资源不存在',
            'ConnectionError': '服务暂时不可用',
        }

        error_type = type(error).__name__
        return generic_messages.get(error_type, '操作失败,请稍后重试')

    def _log_error(self, error: Exception):
        "记录详细错误"
        import traceback
        import logging

        logger = logging.getLogger('errors')
        logger.error(
            f"错误: {type(error).__name__}: {str(error)}\n"
            f"堆栈: {traceback.format_exc()}"
        )

handler = SecureErrorHandler()
try:
    # 业务代码
    pass
except Exception as e:
    user_msg = handler.handle_error(e, user_context=True)
    print(user_msg)  # 不暴露细节

要点总结

  1. 输入验证是安全的第一道防线
  2. 安全配置使用安全默认,环境变量存储敏感信息
  3. 会话管理使用安全随机ID,设置合理过期时间
  4. 权限控制遵循最小权限原则
  5. 错误处理不向用户暴露内部细节

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python安全的随机数生成
下一篇 → Python密码哈希与存储
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库