Python微服务架构实践
微服务架构将系统拆分为独立服务,每个服务可独立部署和扩展。
微服务核心概念
微服务特征
Python
MICROSERVICE_CHARACTERISTICS = {
'single_responsibility': '每个服务只做一件事',
'independent_deploy': '独立部署、独立扩展',
'own_database': '每个服务独立数据库',
'loose_coupling': '服务间松耦合',
'api_communication': '通过API或消息通信',
'fault_isolation': '故障隔离,不影响其他服务'
}
服务拆分原则
Python
def determine_service_boundary():
# 拆分维度:
# 1. 业务能力(按业务功能)
# 2. 领域边界(DDD限界上下文)
# 3. 团队结构(康威定律)
# 4. 数据边界(数据所有权)
services = {
'user-service': '用户管理',
'order-service': '订单处理',
'product-service': '产品管理',
'payment-service': '支付处理',
'notification-service': '消息通知'
}
return services
服务实现
Flask微服务示例
Python
# user_service/app.py
from flask import Flask, jsonify, request
app = Flask(__name__)
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
user = user_repository.find(user_id)
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify(user.to_dict())
@app.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
user = User.create(data['name'], data['email'])
user_repository.save(user)
return jsonify(user.to_dict()), 201
@app.route('/health')
def health():
return jsonify({'status': 'healthy'})
if __name__ == '__main__':
app.run(port=5001)
FastAPI微服务示例
Python
# order_service/app.py
from fastapi import FastAPI, Depends
from pydantic import BaseModel
app = FastAPI()
class CreateOrderRequest(BaseModel):
user_id: int
products: list
@app.get('/orders/{order_id}')
async def get_order(order_id: int):
order = order_repo.find(order_id)
return order
@app.post('/orders')
async def create_order(request: CreateOrderRequest):
order = Order.create(request.user_id, request.products)
order_repo.save(order)
# 调用用户服务验证
user = await user_client.get_user(request.user_id)
return order
@app.get('/health')
async def health():
return {'status': 'healthy'}
服务间通信
HTTP同步调用
Python
import aiohttp
class UserClient:
"用户服务客户端"
def __init__(self, base_url='http://user-service:5001'):
self.base_url = base_url
self.session = aiohttp.ClientSession()
async def get_user(self, user_id):
async with self.session.get(f'{self.base_url}/users/{user_id}') as resp:
if resp.status == 200:
return await resp.json()
return None
async def close(self):
await self.session.close()
# 在订单服务中使用
@app.post('/orders')
async def create_order(request: CreateOrderRequest):
user_client = UserClient()
# 同步调用用户服务
user = await user_client.get_user(request.user_id)
if not user:
raise HTTPException(404, 'User not found')
order = Order.create(user['id'], request.products)
return order
消息队列异步通信
Python
import pika
import json
class MessageQueue:
"消息队列客户端"
def __init__(self, host='rabbitmq'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
self.channel = self.connection.channel()
def publish(self, queue, message):
self.channel.queue_declare(queue)
self.channel.basic_publish(
exchange='',
routing_key=queue,
body=json.dumps(message)
)
def subscribe(self, queue, callback):
self.channel.queue_declare(queue)
self.channel.basic_consume(
queue=queue,
on_message_callback=callback
)
self.channel.start_consuming()
# 订单服务发布事件
mq = MessageQueue()
@app.post('/orders')
async def create_order(request):
order = Order.create(request.user_id, request.products)
order_repo.save(order)
# 发布订单创建事件
mq.publish('order_events', {
'type': 'order_created',
'order_id': order.id,
'user_id': order.user_id
})
return order
# 通知服务订阅事件
def handle_order_event(ch, method, properties, body):
event = json.loads(body)
if event['type'] == 'order_created':
send_notification(event['user_id'], 'Order created')
mq.subscribe('order_events', handle_order_event)
API网关
简单网关实现
Python
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
SERVICE_MAP = {
'/users': 'http://user-service:5001',
'/orders': 'http://order-service:5002',
'/products': 'http://product-service:5003'
}
@app.route('/<path>', methods=['GET', 'POST', 'PUT', 'DELETE'])
def proxy(path):
# 路由到对应服务
for prefix, service_url in SERVICE_MAP.items():
if path.startswith(prefix.lstrip('/')):
target_url = f'{service_url}/{path}'
resp = requests.request(
method=request.method,
url=target_url,
json=request.get_json()
)
return jsonify(resp.json()), resp.status_code
return jsonify({'error': 'Not found'}), 404
Kong网关配置
YAML
# Kong配置示例
_format_version: "1.1"
services:
- name: user-service
url: http://user-service:5001
routes:
- name: user-route
paths:
- /users
- name: order-service
url: http://order-service:5002
routes:
- name: order-route
paths:
- /orders
plugins:
- name: rate-limiting
config:
minute: 100
policy: local
服务发现
简单服务发现
Python
import consul
class ServiceRegistry:
"服务注册与发现"
def __init__(self, consul_host='localhost'):
self.client = consul.Consul(host=consul_host)
def register(self, service_name, service_id, address, port):
"注册服务"
self.client.agent.service.register(
name=service_name,
service_id=service_id,
address=address,
port=port,
check=consul.Check.http(f'http://{address}:{port}/health', interval='10s')
)
def discover(self, service_name):
"发现服务"
_, services = self.client.health.service(service_name, passing=True)
if services:
service = services[0]['Service']
return f'http://{service["Address"]}:{service["Port"]}'
return None
def deregister(self, service_id):
"注销服务"
self.client.agent.service.deregister(service_id)
# 服务启动时注册
registry = ServiceRegistry()
registry.register('user-service', 'user-1', 'localhost', 5001)
# 发现服务调用
user_url = registry.discover('user-service')
配置管理
集中配置
Python
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
"服务配置"
# 服务信息
service_name: str = 'user-service'
service_port: int = 5001
# 数据库配置
db_host: str = 'localhost'
db_port: int = 5432
db_name: str = 'users'
# 外部服务
order_service_url: str = 'http://order-service:5002'
class Config:
env_file = '.env'
settings = Settings()
# 或从配置中心获取
class ConfigCenter:
def __init__(self, consul_client):
self.client = consul_client
def get_config(self, key):
_, value = self.client.kv.get(key)
return value['Value'] if value else None
def watch_config(self, key, callback):
"监听配置变化"
index = None
while True:
index, value = self.client.kv.get(key, index=index)
if value:
callback(value['Value'])
健康检查与容错
健康检查
Python
from flask import Flask, jsonify
from datetime import datetime
app = Flask(__name__)
@app.route('/health')
def health():
"健康检查"
checks = {
'database': check_database(),
'cache': check_cache(),
'message_queue': check_message_queue()
}
all_healthy = all(c['status'] == 'healthy' for c in checks.values())
return jsonify({
'status': 'healthy' if all_healthy else 'degraded',
'timestamp': datetime.now().isoformat(),
'checks': checks
}), 200 if all_healthy else 503
def check_database():
try:
db.ping()
return {'status': 'healthy'}
except:
return {'status': 'unhealthy', 'error': 'Connection failed'}
@app.route('/ready')
def readiness():
"就绪检查"
return jsonify({'status': 'ready'})
重试与熔断
Python
import asyncio
from functools import wraps
def retry(max_attempts=3, delay=1):
"重试装饰器"
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
raise
await asyncio.sleep(delay * (attempt + 1))
return wrapper
return decorator
class CircuitBreaker:
"熔断器"
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failures = 0
self.state = 'closed'
self.last_failure_time = None
async def call(self, func, *args, **kwargs):
if self.state == 'open':
if self._should_attempt_recovery():
self.state = 'half-open'
else:
raise Exception('Circuit breaker is open')
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failures = 0
self.state = 'closed'
def _on_failure(self):
self.failures += 1
if self.failures >= self.failure_threshold:
self.state = 'open'
self.last_failure_time = datetime.now()
def _should_attempt_recovery(self):
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.recovery_timeout
breaker = CircuitBreaker()
@retry(max_attempts=3)
async def call_user_service(user_id):
return await breaker.call(user_client.get_user, user_id)
分布式追踪
OpenTelemetry集成
Python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger import JaegerExporter
# 配置追踪
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)
jaeger_exporter = JaegerExporter(
agent_host_name='jaeger',
agent_port=6831
)
trace.get_tracer_provider().add_span_processor(
BatchSpanProcessor(jaeger_exporter)
)
@app.post('/orders')
async def create_order(request):
with tracer.start_as_current_span('create_order') as span:
span.set_attribute('user_id', request.user_id)
with tracer.start_as_current_span('call_user_service'):
user = await user_client.get_user(request.user_id)
with tracer.start_as_current_span('save_order'):
order = Order.create(request)
order_repo.save(order)
span.set_attribute('order_id', order.id)
return order
微服务架构检查清单
| 检查项 | 说明 |
|---|---|
| 服务边界 | 按业务能力拆分、职责单一 |
| API网关 | 统一入口、路由、认证 |
| 服务发现 | 动态注册发现、健康检查 |
| 配置管理 | 集中配置、动态更新 |
| 通信机制 | HTTP同步、消息异步 |
| 容错设计 | 重试、熔断、降级 |
| 分布式追踪 | 调用链追踪、性能分析 |
注意:微服务增加运维复杂度,评估团队规模和技术能力后再采用。
要点总结
- 服务拆分:按业务能力或领域边界,职责单一、独立部署
- 服务间通信:HTTP同步适合查询,消息队列适合事件驱动
- API网关:统一入口、路由分发、认证授权、限流熔断
- 服务发现:Consul注册发现、健康检查自动剔除
- 配置管理:集中配置中心、环境分离、动态更新
- 容错设计:重试机制、熔断器保护、降级策略
存放路径:articles/PYTHON/专家/架构与设计/微服务架构Python.md
📝 发现内容有误?点击此处直接编辑