全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python 日志最佳实践

生产环境日志需要考虑性能、可追溯性和可分析性。

结构化日志

使用 JSON 格式便于日志分析系统处理。

Python
import logging
import json
from logging import Formatter

class JsonFormatter(Formatter):
    def format(self, record):
        log_obj = {
            'timestamp': self.formatTime(record),
            'level': record.levelname,
            'message': record.getMessage(),
            'logger': record.name,
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        if record.exc_info:
            log_obj['exception'] = self.formatException(record.exc_info)
        return json.dumps(log_obj)

# 配置结构化日志
logger = logging.getLogger('app')
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
logger.addHandler(handler)

logger.info("用户登录", extra={'user_id': 123, 'ip': '192.168.1.1'})
# 输出: {"timestamp": "...", "level": "INFO", "message": "用户登录", ...}

extra 字段扩展

Python
import logging

logger = logging.getLogger('app')

# 添加业务字段
logger.info("订单创建",
    extra={
        'order_id': 'ORD001',
        'user_id': 123,
        'amount': 99.99,
        'trace_id': 'abc123'
    }
)

# 自定义格式包含 extra
formatter = logging.Formatter(
    '%(asctime)s [%(trace_id)s] %(levelname)s %(message)s'
)

合理分级使用

Python
import logging

logger = logging.getLogger('app')

# DEBUG:详细调试信息(生产环境关闭)
logger.debug("SQL查询: SELECT * FROM users WHERE id=123")

# INFO:关键业务事件
logger.info("用户登录成功", extra={'user_id': 123})

# WARNING:潜在问题
logger.warning("内存使用率过高: 85%")

# ERROR:错误但系统仍运行
logger.error("支付接口调用失败", extra={'order_id': 'ORD001'})

# CRITICAL:严重错误影响服务
logger.critical("数据库连接池耗尽")

日志轮转

Python
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler

# 按大小轮转
handler = RotatingFileHandler(
    'app.log',
    maxBytes=10*1024*1024,  # 10MB
    backupCount=5           # 保留5个备份
)

# 按时间轮转
handler = TimedRotatingFileHandler(
    'app.log',
    when='midnight',  # 每天轮转
    interval=1,
    backupCount=30    # 保留30天
)

logger = logging.getLogger('app')
logger.addHandler(handler)

异步日志写入

Python
import logging
import threading
from queue import Queue

class AsyncLogHandler(logging.Handler):
    def __init__(self, handler):
        super().__init__()
        self.handler = handler
        self.queue = Queue()
        self.thread = threading.Thread(target=self._worker, daemon=True)
        self.thread.start()

    def _worker(self):
        while True:
            record = self.queue.get()
            self.handler.emit(record)

    def emit(self, record):
        self.queue.put(record)

# 使用异步处理器
async_handler = AsyncLogHandler(logging.FileHandler('app.log'))
logger.addHandler(async_handler)

日志聚合配置

Python
import logging
import logging.config

# 集中式日志系统配置(如 ELK)
LOGGING_CONFIG = {
    'version': 1,
    'formatters': {
        'json': {
            'class': 'app.JsonFormatter'
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'formatter': 'json',
            'level': 'INFO'
        },
        'file': {
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': '/var/log/app/app.log',
            'maxBytes': 10485760,
            'backupCount': 10,
            'formatter': 'json',
            'level': 'DEBUG'
        }
    },
    'loggers': {
        'app': {
            'handlers': ['console', 'file'],
            'level': 'DEBUG',
            'propagate': False
        }
    }
}

logging.config.dictConfig(LOGGING_CONFIG)

Trace ID 追踪

Python
import logging
import uuid
from contextvars import ContextVar

# 请求追踪 ID
trace_id_var = ContextVar('trace_id', default='')

def set_trace_id():
    trace_id = str(uuid.uuid4())[:8]
    trace_id_var.set(trace_id)
    return trace_id

class TraceFormatter(logging.Formatter):
    def format(self, record):
        record.trace_id = trace_id_var.get()
        return super().format(record)

formatter = TraceFormatter('[%(trace_id)s] %(levelname)s %(message)s')

# 在请求处理中
trace_id = set_trace_id()
logger.info("处理请求开始")

敏感信息过滤

Python
import logging
import re

class SensitiveFilter(logging.Filter):
    SENSITIVE_PATTERNS = [
        (r'password=\S+', 'password=***'),
        (r'token=\S+', 'token=***'),
        (r'\b\d{16,19}\b', 'CARD_NUMBER'),  # 银行卡号
    ]

    def filter(self, record):
        msg = record.getMessage()
        for pattern, replacement in self.SENSITIVE_PATTERNS:
            msg = re.sub(pattern, replacement, msg)
        record.msg = msg
        return True

logger = logging.getLogger('app')
logger.addFilter(SensitiveFilter())
logger.info("用户登录 password=secret123")  # 输出: password=***

性能优化要点

优化项建议
异步写入使用 QueueHandler
格式化简洁格式减少开销
级别过滤生产环境设置 INFO 或 WARNING
文件轮转防止单个文件过大
批量写入高频日志考虑批量提交

要点总结

  • 结构化 JSON 格式便于日志分析
  • extra 字段添加业务上下文信息
  • DEBUG 用于调试,INFO 用于关键事件
  • RotatingFileHandler 实现日志轮转
  • 异步写入避免日志阻塞业务
  • Trace ID 实现请求全链路追踪
  • SensitiveFilter 过滤敏感信息
  • 生产环境使用 INFO 或 WARNING 级别

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python 断言与测试断言
下一篇 → Python 测试覆盖率
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库