Python安全编码最佳实践
遵循安全编码规范是预防漏洞的第一道防线。结合OWASP Top 10建立安全意识。
OWASP Top 10 对应防护
| OWASP漏洞 | Python防护要点 |
|---|---|
| A01:访问控制失效 | 权限验证、最小权限原则 |
| A02:加密失败 | 安全哈希、TLS配置 |
| A03:注入 | 参数化查询、输入验证 |
| A04:不安全设计 | 安全架构、威胁建模 |
| A05:安全配置错误 | 安全默认、禁用调试 |
| A06:脆弱组件 | 依赖审计、版本更新 |
| A07:身份认证失败 | 强密码、会话安全 |
| A08:数据完整性失败 | 数据验证、签名校验 |
| A09:日志监控失败 | 安全日志、监控告警 |
| A10:服务端请求伪造 | URL验证、白名单 |
输入验证
Python
import re
from typing import Any
class InputSanitizer:
"输入清理器"
@staticmethod
def sanitize_string(value: str, max_length: int = 1000) -> str:
"清理字符串输入"
if not isinstance(value, str):
raise TypeError("期望字符串")
# 截断长度
if len(value) > max_length:
value = value[:max_length]
# 移除危险字符(根据上下文调整)
return value.strip()
@staticmethod
def validate_int(value: Any, min_val: int, max_val: int) -> int:
"验证整数范围"
try:
num = int(value)
except (TypeError, ValueError):
raise ValueError("无效整数")
if num < min_val or num > max_val:
raise ValueError(f"范围: {min_val}-{max_val}")
return num
@staticmethod
def validate_email(email: str) -> str:
"验证邮箱"
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(pattern, email):
raise ValueError("无效邮箱格式")
return email.lower()
@staticmethod
def validate_url(url: str, allowed_schemes: list = None) -> str:
"验证URL"
from urllib.parse import urlparse
allowed_schemes = allowed_schemes or ['http', 'https']
parsed = urlparse(url)
if parsed.scheme not in allowed_schemes:
raise ValueError(f"不允许的协议: {parsed.scheme}")
# 防止SSRF
if not parsed.hostname:
raise ValueError("无效主机名")
return url
sanitizer = InputSanitizer()
safe_email = sanitizer.validate_email(user_input)
安全配置管理
Python
import os
class SecurityConfig:
"安全配置"
# 安全默认值
DEBUG = False
SECRET_KEY = None # 必须从环境变量设置
ALLOWED_HOSTS = []
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
CSRF_COOKIE_SECURE = True
def __init__(self):
self._validate()
def _validate(self):
"验证安全配置"
if os.environ.get('DEBUG', 'false').lower() == 'true':
self.DEBUG = True
# 开发模式警告
import warnings
warnings.warn("DEBUG模式启用,仅用于开发")
secret = os.environ.get('SECRET_KEY')
if not secret:
raise RuntimeError("SECRET_KEY未设置")
if len(secret) < 32:
raise RuntimeError("SECRET_KEY太短(至少32字符)")
self.SECRET_KEY = secret
hosts = os.environ.get('ALLOWED_HOSTS', '')
self.ALLOWED_HOSTS = hosts.split(',') if hosts else []
config = SecurityConfig()
安全会话管理
Python
import secrets
import time
from typing import Optional
class SecureSession:
"安全会话管理"
def __init__(self):
self.sessions = {}
self.session_timeout = 3600 # 1小时
def create_session(self, user_id: str) -> str:
"创建会话"
session_id = secrets.token_urlsafe(32)
self.sessions[session_id] = {
'user_id': user_id,
'created_at': time.time(),
'last_activity': time.time()
}
return session_id
def validate_session(self, session_id: str) -> Optional[str]:
"验证会话"
session = self.sessions.get(session_id)
if not session:
return None
# 检查过期
if time.time() - session['last_activity'] > self.session_timeout:
self.sessions.pop(session_id, None)
return None
# 更新活动时间
session['last_activity'] = time.time()
return session['user_id']
def destroy_session(self, session_id: str):
"销毁会话"
self.sessions.pop(session_id, None)
session_manager = SecureSession()
sid = session_manager.create_session('user_123')
密码安全
Python
import secrets
import hashlib
class PasswordPolicy:
"密码策略"
MIN_LENGTH = 8
REQUIRE_UPPER = True
REQUIRE_LOWER = True
REQUIRE_DIGIT = True
REQUIRE_SPECIAL = True
@staticmethod
def validate(password: str) -> bool:
"验证密码强度"
if len(password) < PasswordPolicy.MIN_LENGTH:
return False
checks = [
any(c.isupper() for c in password),
any(c.islower() for c in password),
any(c.isdigit() for c in password),
any(c in '!@#$%^&*()_+-=[]{}|;:,.<>?'
for c in password)
]
return all(checks)
@staticmethod
def generate(length: int = 16) -> str:
"生成安全密码"
import string
alphabet = string.ascii_letters + string.digits + '!@#$%^&*'
return ''.join(secrets.choice(alphabet) for _ in range(length))
# 使用(实际哈希见密码哈希文章)
if PasswordPolicy.validate(user_password):
# 安全存储
pass
else:
raise ValueError("密码不符合安全要求")
安全日志
Python
import logging
import re
class SecurityLogger:
"安全日志器"
SENSITIVE_FIELDS = ['password', 'token', 'secret', 'key']
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self._setup()
def _setup(self):
"配置日志"
self.logger.setLevel(logging.INFO)
# 不记录到可能被访问的位置
handler = logging.FileHandler('/var/log/app/security.log')
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
def _sanitize(self, message: str) -> str:
"清理敏感信息"
for field in self.SENSITIVE_FIELDS:
pattern = rf'{field}\s*[=:]\s*\S+'
message = re.sub(
pattern,
f'{field}=[REDACTED]',
message,
flags=re.IGNORECASE
)
return message
def log_auth_success(self, user: str):
"记录认证成功"
self.logger.info(f"认证成功: user={user}")
def log_auth_failure(self, user: str, reason: str):
"记录认证失败"
self.logger.warning(f"认证失败: user={user}, reason={reason}")
def log_access_denied(self, user: str, resource: str):
"记录访问拒绝"
self.logger.warning(f"访问拒绝: user={user}, resource={resource}")
security_log = SecurityLogger('app')
security_log.log_auth_success('alice')
权限控制
Python
from functools import wraps
class AccessControl:
"访问控制"
ROLES = {
'admin': ['read', 'write', 'delete', 'manage'],
'editor': ['read', 'write'],
'viewer': ['read']
}
def __init__(self):
self.user_roles = {}
def assign_role(self, user: str, role: str):
"分配角色"
if role not in self.ROLES:
raise ValueError(f"无效角色: {role}")
self.user_roles[user] = role
def check_permission(self, user: str, action: str) -> bool:
"检查权限"
role = self.user_roles.get(user)
if not role:
return False
permissions = self.ROLES.get(role, [])
return action in permissions
def require_permission(self, action: str):
"权限装饰器"
def decorator(func):
@wraps(func)
def wrapper(user, *args, **kwargs):
if not self.check_permission(user, action):
raise PermissionError(
f"用户 {user} 无 {action} 权限"
)
return func(user, *args, **kwargs)
return wrapper
return decorator
ac = AccessControl()
ac.assign_role('alice', 'editor')
@ac.require_permission('write')
def edit_document(user, doc_id):
return f"编辑文档 {doc_id}"
edit_document('alice', 'doc_123') # 成功
# edit_document('bob', 'doc_123') # PermissionError
依赖安全审计
Python
import subprocess
import json
def audit_dependencies():
"审计依赖安全"
# 使用 pip-audit 或 safety
try:
result = subprocess.run(
['pip-audit', '--format', 'json'],
capture_output=True,
text=True,
shell=False
)
vulnerabilities = json.loads(result.stdout)
for vuln in vulnerabilities:
print(f"漏洞: {vuln['name']} {vuln['version']}")
print(f" ID: {vuln['id']}")
print(f" 严重性: {vuln['severity']}")
return len(vulnerabilities) == 0
except FileNotFoundError:
print("请安装 pip-audit: pip install pip-audit")
return False
# 或使用 requirements.txt 检查
def check_requirements():
"检查requirements安全性"
import pkg_resources
# 已知的恶意包名检测
malicious_packages = ['pyyaml-malicious', 'requests-malicious']
installed = [pkg.key for pkg in pkg_resources.working_set]
for pkg in installed:
if pkg in malicious_packages:
raise RuntimeError(f"检测到恶意包: {pkg}")
错误处理安全
Python
class SecureErrorHandler:
"安全错误处理"
def handle_error(self, error: Exception, user_context: bool = True):
"安全处理错误"
# 不向用户暴露内部错误细节
if user_context:
return self._user_message(error)
else:
return self._log_error(error)
def _user_message(self, error: Exception) -> str:
"用户友好消息"
# 通用错误消息,不暴露细节
generic_messages = {
'ValueError': '输入无效,请检查后重试',
'PermissionError': '权限不足',
'KeyError': '请求的资源不存在',
'ConnectionError': '服务暂时不可用',
}
error_type = type(error).__name__
return generic_messages.get(error_type, '操作失败,请稍后重试')
def _log_error(self, error: Exception):
"记录详细错误"
import traceback
import logging
logger = logging.getLogger('errors')
logger.error(
f"错误: {type(error).__name__}: {str(error)}\n"
f"堆栈: {traceback.format_exc()}"
)
handler = SecureErrorHandler()
try:
# 业务代码
pass
except Exception as e:
user_msg = handler.handle_error(e, user_context=True)
print(user_msg) # 不暴露细节
要点总结
- 输入验证是安全的第一道防线
- 安全配置使用安全默认,环境变量存储敏感信息
- 会话管理使用安全随机ID,设置合理过期时间
- 权限控制遵循最小权限原则
- 错误处理不向用户暴露内部细节
📝 发现内容有误?点击此处直接编辑