全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python微服务架构实践

微服务架构将系统拆分为独立服务,每个服务可独立部署和扩展。

微服务核心概念

微服务特征

Python
MICROSERVICE_CHARACTERISTICS = {
    'single_responsibility': '每个服务只做一件事',
    'independent_deploy': '独立部署、独立扩展',
    'own_database': '每个服务独立数据库',
    'loose_coupling': '服务间松耦合',
    'api_communication': '通过API或消息通信',
    'fault_isolation': '故障隔离,不影响其他服务'
}

服务拆分原则

Python
def determine_service_boundary():
    # 拆分维度:
    # 1. 业务能力(按业务功能)
    # 2. 领域边界(DDD限界上下文)
    # 3. 团队结构(康威定律)
    # 4. 数据边界(数据所有权)

    services = {
        'user-service': '用户管理',
        'order-service': '订单处理',
        'product-service': '产品管理',
        'payment-service': '支付处理',
        'notification-service': '消息通知'
    }

    return services

服务实现

Flask微服务示例

Python
# user_service/app.py
from flask import Flask, jsonify, request

app = Flask(__name__)

@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
    user = user_repository.find(user_id)
    if not user:
        return jsonify({'error': 'User not found'}), 404
    return jsonify(user.to_dict())

@app.route('/users', methods=['POST'])
def create_user():
    data = request.get_json()
    user = User.create(data['name'], data['email'])
    user_repository.save(user)
    return jsonify(user.to_dict()), 201

@app.route('/health')
def health():
    return jsonify({'status': 'healthy'})

if __name__ == '__main__':
    app.run(port=5001)

FastAPI微服务示例

Python
# order_service/app.py
from fastapi import FastAPI, Depends
from pydantic import BaseModel

app = FastAPI()

class CreateOrderRequest(BaseModel):
    user_id: int
    products: list

@app.get('/orders/{order_id}')
async def get_order(order_id: int):
    order = order_repo.find(order_id)
    return order

@app.post('/orders')
async def create_order(request: CreateOrderRequest):
    order = Order.create(request.user_id, request.products)
    order_repo.save(order)

    # 调用用户服务验证
    user = await user_client.get_user(request.user_id)

    return order

@app.get('/health')
async def health():
    return {'status': 'healthy'}

服务间通信

HTTP同步调用

Python
import aiohttp

class UserClient:
    "用户服务客户端"

    def __init__(self, base_url='http://user-service:5001'):
        self.base_url = base_url
        self.session = aiohttp.ClientSession()

    async def get_user(self, user_id):
        async with self.session.get(f'{self.base_url}/users/{user_id}') as resp:
            if resp.status == 200:
                return await resp.json()
            return None

    async def close(self):
        await self.session.close()

# 在订单服务中使用
@app.post('/orders')
async def create_order(request: CreateOrderRequest):
    user_client = UserClient()

    # 同步调用用户服务
    user = await user_client.get_user(request.user_id)
    if not user:
        raise HTTPException(404, 'User not found')

    order = Order.create(user['id'], request.products)
    return order

消息队列异步通信

Python
import pika
import json

class MessageQueue:
    "消息队列客户端"

    def __init__(self, host='rabbitmq'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host)
        )
        self.channel = self.connection.channel()

    def publish(self, queue, message):
        self.channel.queue_declare(queue)
        self.channel.basic_publish(
            exchange='',
            routing_key=queue,
            body=json.dumps(message)
        )

    def subscribe(self, queue, callback):
        self.channel.queue_declare(queue)
        self.channel.basic_consume(
            queue=queue,
            on_message_callback=callback
        )
        self.channel.start_consuming()

# 订单服务发布事件
mq = MessageQueue()

@app.post('/orders')
async def create_order(request):
    order = Order.create(request.user_id, request.products)
    order_repo.save(order)

    # 发布订单创建事件
    mq.publish('order_events', {
        'type': 'order_created',
        'order_id': order.id,
        'user_id': order.user_id
    })

    return order

# 通知服务订阅事件
def handle_order_event(ch, method, properties, body):
    event = json.loads(body)
    if event['type'] == 'order_created':
        send_notification(event['user_id'], 'Order created')

mq.subscribe('order_events', handle_order_event)

API网关

简单网关实现

Python
from flask import Flask, request, jsonify
import requests

app = Flask(__name__)

SERVICE_MAP = {
    '/users': 'http://user-service:5001',
    '/orders': 'http://order-service:5002',
    '/products': 'http://product-service:5003'
}

@app.route('/<path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(path):
    # 路由到对应服务
    for prefix, service_url in SERVICE_MAP.items():
        if path.startswith(prefix.lstrip('/')):
            target_url = f'{service_url}/{path}'
            resp = requests.request(
                method=request.method,
                url=target_url,
                json=request.get_json()
            )
            return jsonify(resp.json()), resp.status_code

    return jsonify({'error': 'Not found'}), 404

Kong网关配置

YAML
# Kong配置示例
_format_version: "1.1"

services:
  - name: user-service
    url: http://user-service:5001
    routes:
      - name: user-route
        paths:
          - /users

  - name: order-service
    url: http://order-service:5002
    routes:
      - name: order-route
        paths:
          - /orders

plugins:
  - name: rate-limiting
    config:
      minute: 100
      policy: local

服务发现

简单服务发现

Python
import consul

class ServiceRegistry:
    "服务注册与发现"

    def __init__(self, consul_host='localhost'):
        self.client = consul.Consul(host=consul_host)

    def register(self, service_name, service_id, address, port):
        "注册服务"
        self.client.agent.service.register(
            name=service_name,
            service_id=service_id,
            address=address,
            port=port,
            check=consul.Check.http(f'http://{address}:{port}/health', interval='10s')
        )

    def discover(self, service_name):
        "发现服务"
        _, services = self.client.health.service(service_name, passing=True)
        if services:
            service = services[0]['Service']
            return f'http://{service["Address"]}:{service["Port"]}'
        return None

    def deregister(self, service_id):
        "注销服务"
        self.client.agent.service.deregister(service_id)

# 服务启动时注册
registry = ServiceRegistry()
registry.register('user-service', 'user-1', 'localhost', 5001)

# 发现服务调用
user_url = registry.discover('user-service')

配置管理

集中配置

Python
import os
from pydantic import BaseSettings

class Settings(BaseSettings):
    "服务配置"

    # 服务信息
    service_name: str = 'user-service'
    service_port: int = 5001

    # 数据库配置
    db_host: str = 'localhost'
    db_port: int = 5432
    db_name: str = 'users'

    # 外部服务
    order_service_url: str = 'http://order-service:5002'

    class Config:
        env_file = '.env'

settings = Settings()

# 或从配置中心获取
class ConfigCenter:
    def __init__(self, consul_client):
        self.client = consul_client

    def get_config(self, key):
        _, value = self.client.kv.get(key)
        return value['Value'] if value else None

    def watch_config(self, key, callback):
        "监听配置变化"
        index = None
        while True:
            index, value = self.client.kv.get(key, index=index)
            if value:
                callback(value['Value'])

健康检查与容错

健康检查

Python
from flask import Flask, jsonify
from datetime import datetime

app = Flask(__name__)

@app.route('/health')
def health():
    "健康检查"
    checks = {
        'database': check_database(),
        'cache': check_cache(),
        'message_queue': check_message_queue()
    }

    all_healthy = all(c['status'] == 'healthy' for c in checks.values())

    return jsonify({
        'status': 'healthy' if all_healthy else 'degraded',
        'timestamp': datetime.now().isoformat(),
        'checks': checks
    }), 200 if all_healthy else 503

def check_database():
    try:
        db.ping()
        return {'status': 'healthy'}
    except:
        return {'status': 'unhealthy', 'error': 'Connection failed'}

@app.route('/ready')
def readiness():
    "就绪检查"
    return jsonify({'status': 'ready'})

重试与熔断

Python
import asyncio
from functools import wraps

def retry(max_attempts=3, delay=1):
    "重试装饰器"
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    await asyncio.sleep(delay * (attempt + 1))
        return wrapper
    return decorator

class CircuitBreaker:
    "熔断器"

    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failures = 0
        self.state = 'closed'
        self.last_failure_time = None

    async def call(self, func, *args, **kwargs):
        if self.state == 'open':
            if self._should_attempt_recovery():
                self.state = 'half-open'
            else:
                raise Exception('Circuit breaker is open')

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failures = 0
        self.state = 'closed'

    def _on_failure(self):
        self.failures += 1
        if self.failures >= self.failure_threshold:
            self.state = 'open'
            self.last_failure_time = datetime.now()

    def _should_attempt_recovery(self):
        elapsed = (datetime.now() - self.last_failure_time).total_seconds()
        return elapsed >= self.recovery_timeout

breaker = CircuitBreaker()

@retry(max_attempts=3)
async def call_user_service(user_id):
    return await breaker.call(user_client.get_user, user_id)

分布式追踪

OpenTelemetry集成

Python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger import JaegerExporter

# 配置追踪
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

jaeger_exporter = JaegerExporter(
    agent_host_name='jaeger',
    agent_port=6831
)
trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(jaeger_exporter)
)

@app.post('/orders')
async def create_order(request):
    with tracer.start_as_current_span('create_order') as span:
        span.set_attribute('user_id', request.user_id)

        with tracer.start_as_current_span('call_user_service'):
            user = await user_client.get_user(request.user_id)

        with tracer.start_as_current_span('save_order'):
            order = Order.create(request)
            order_repo.save(order)

        span.set_attribute('order_id', order.id)
        return order

微服务架构检查清单

检查项说明
服务边界按业务能力拆分、职责单一
API网关统一入口、路由、认证
服务发现动态注册发现、健康检查
配置管理集中配置、动态更新
通信机制HTTP同步、消息异步
容错设计重试、熔断、降级
分布式追踪调用链追踪、性能分析

注意:微服务增加运维复杂度,评估团队规模和技术能力后再采用。

要点总结

  • 服务拆分:按业务能力或领域边界,职责单一、独立部署
  • 服务间通信:HTTP同步适合查询,消息队列适合事件驱动
  • API网关:统一入口、路由分发、认证授权、限流熔断
  • 服务发现:Consul注册发现、健康检查自动剔除
  • 配置管理:集中配置中心、环境分离、动态更新
  • 容错设计:重试机制、熔断器保护、降级策略

存放路径articles/PYTHON/专家/架构与设计/微服务架构Python.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python依赖注入模式
下一篇 → Python架构演进与迁移
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库