全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-19 10 分钟 ✍️ juanwangdev

Python输入验证与清洗

输入验证是防止注入攻击的核心手段。建立分层验证策略,确保数据安全进入系统。

验证层次架构

Python
输入数据
    ↓
┌─────────────────────┐
│ 1. 类型验证         │  ← 确保正确的数据类型
└─────────────────────┘
    ↓
┌─────────────────────┐
│ 2. 格式验证         │  ← 符合预期的格式规范
└─────────────────────┘
    ↓
┌─────────────────────┐
│ 3. 范围验证         │  ← 在允许的范围内
└─────────────────────┘
    ↓
┌─────────────────────┐
│ 4. 内容验证         │  ← 不包含危险内容
└─────────────────────┘
    ↓
┌─────────────────────┐
│ 5. 业务验证         │  ← 符合业务规则
└─────────────────────┘

类型验证

Python
from typing import Any, Union

class TypeValidator:
    "类型验证器"

    @staticmethod
    def validate_type(value: Any, expected_type: Union[type, tuple]) -> Any:
        "验证类型"
        if not isinstance(value, expected_type):
            raise TypeError(
                f"期望类型 {expected_type}, 实际 {type(value).__name__}"
            )
        return value

    @staticmethod
    def validate_int(value: Any) -> int:
        "验证整数"
        try:
            if isinstance(value, bool):  # bool不是int(虽然isinstance返回True)
                raise TypeError("bool不是有效整数")
            return int(value)
        except (TypeError, ValueError):
            raise ValueError("无效整数")

    @staticmethod
    def validate_str(value: Any) -> str:
        "验证字符串"
        if isinstance(value, bytes):
            return value.decode('utf-8')
        return TypeValidator.validate_type(value, str)

    @staticmethod
    def validate_list(value: Any, item_type: type = None) -> list:
        "验证列表"
        TypeValidator.validate_type(value, list)
        if item_type:
            for item in value:
                TypeValidator.validate_type(item, item_type)
        return value

validator = TypeValidator()
safe_int = validator.validate_int("123")  # 123
safe_list = validator.validate_list([1, 2, 3], int)

格式验证

Python
import re
from datetime import datetime

class FormatValidator:
    "格式验证器"

    # 预定义模式
    EMAIL_PATTERN = re.compile(
        r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    )
    PHONE_PATTERN = re.compile(r'^\+?[1-9]\d{1,14}$')
    USERNAME_PATTERN = re.compile(r'^[a-zA-Z0-9_]{3,20}$')
    DATE_PATTERN = re.compile(r'^\d{4}-\d{2}-\d{2}$')
    URL_PATTERN = re.compile(
        r'^https?://[a-zA-Z0-9.-]+(:\d+)?(/.*)?$'
    )

    @staticmethod
    def validate_email(email: str) -> str:
        "验证邮箱"
        email = email.strip().lower()
        if not FormatValidator.EMAIL_PATTERN.match(email):
            raise ValueError("无效邮箱格式")
        return email

    @staticmethod
    def validate_phone(phone: str) -> str:
        "验证电话"
        phone = phone.strip()
        if not FormatValidator.PHONE_PATTERN.match(phone):
            raise ValueError("无效电话格式")
        return phone

    @staticmethod
    def validate_username(name: str) -> str:
        "验证用户名"
        name = name.strip()
        if not FormatValidator.USERNAME_PATTERN.match(name):
            raise ValueError("用户名: 3-20字符,字母数字下划线")
        return name

    @staticmethod
    def validate_date(date_str: str) -> datetime:
        "验证日期"
        if not FormatValidator.DATE_PATTERN.match(date_str):
            raise ValueError("日期格式: YYYY-MM-DD")

        try:
            return datetime.strptime(date_str, '%Y-%m-%d')
        except ValueError:
            raise ValueError("无效日期")

    @staticmethod
    def validate_url(url: str, allowed_schemes: list = None) -> str:
        "验证URL"
        from urllib.parse import urlparse

        parsed = urlparse(url)

        allowed = allowed_schemes or ['http', 'https']
        if parsed.scheme not in allowed:
            raise ValueError(f"仅允许协议: {allowed}")

        if not parsed.hostname:
            raise ValueError("缺少主机名")

        return url

format_validator = FormatValidator()
safe_email = format_validator.validate_email("test@example.com")

范围验证

Python
from typing import Any

class RangeValidator:
    "范围验证器"

    @staticmethod
    def validate_int_range(value: int, min_val: int, max_val: int) -> int:
        "验证整数范围"
        if value < min_val or value > max_val:
            raise ValueError(f"值必须在 {min_val}{max_val} 之间")
        return value

    @staticmethod
    def validate_str_length(value: str, min_len: int, max_len: int) -> str:
        "验证字符串长度"
        length = len(value)
        if length < min_len or length > max_len:
            raise ValueError(f"长度必须在 {min_len}{max_len} 之间")
        return value

    @staticmethod
    def validate_list_length(value: list, min_len: int, max_len: int) -> list:
        "验证列表长度"
        length = len(value)
        if length < min_len:
            raise ValueError(f"至少需要 {min_len} 个元素")
        if length > max_len:
            raise ValueError(f"最多允许 {max_len} 个元素")
        return value

    @staticmethod
    def validate_in_set(value: Any, allowed: set) -> Any:
        "验证值在允许集合内"
        if value not in allowed:
            raise ValueError(f"值必须是: {allowed}")
        return value

range_validator = RangeValidator()
safe_age = range_validator.validate_int_range(25, 0, 150)
safe_choice = range_validator.validate_in_set("option1", {"option1", "option2"})

内容清洗

Python
import re
from html.parser import HTMLParser

class ContentSanitizer:
    "内容清洗器"

    @staticmethod
    def remove_html_tags(text: str) -> str:
        "移除HTML标签"
        class HTMLTagRemover(HTMLParser):
            def __init__(self):
                super().__init__()
                self.result = []

            def handle_data(self, data):
                self.result.append(data)

        remover = HTMLTagRemover()
        remover.feed(text)
        return ''.join(remover.result)

    @staticmethod
    def sanitize_html(text: str) -> str:
        "转义HTML特殊字符"
        replacements = {
            '<': '&lt;',
            '>': '&gt;',
            '&': '&amp;',
            '"': '&quot;',
            "'": '&#x27;',
        }

        for char, replacement in replacements.items():
            text = text.replace(char, replacement)

        return text

    @staticmethod
    def sanitize_sql(text: str) -> str:
        "清洗SQL危险字符"
        # 注意:这不是替代参数化查询的方法
        dangerous_chars = ["'", ";", "--", "/*", "*/"]
        result = text

        for char in dangerous_chars:
            result = result.replace(char, '')

        return result

    @staticmethod
    def sanitize_shell_arg(arg: str) -> str:
        "清洗shell参数"
        # 只允许安全字符
        if not re.match(r'^[\w\-\.\/]+$', arg):
            raise ValueError("参数包含危险字符")
        return arg

    @staticmethod
    def sanitize_filename(filename: str) -> str:
        "清洗文件名"
        # 移除路径遍历
        filename = filename.replace('..', '')
        filename = filename.replace('/', '')
        filename = filename.replace('\\', '')

        # 只保留安全字符
        filename = re.sub(r'[^\w\-\.]', '', filename)

        if not filename:
            raise ValueError("无效文件名")

        return filename

    @staticmethod
    def remove_null_bytes(text: str) -> str:
        "移除空字节"
        return text.replace('\x00', '')

sanitizer = ContentSanitizer()
safe_text = sanitizer.sanitize_html("<script>alert('xss')</script>")
# 输出: &lt;script&gt;alert('xss')&lt;/script&gt;

白名单验证

Python
from typing import Any

class WhitelistValidator:
    "白名单验证器"

    def __init__(self):
        self.allowed_fields = {}
        self.allowed_values = {}

    def add_allowed_field(self, name: str, validators: list):
        "添加允许的字段"
        self.allowed_fields[name] = validators

    def add_allowed_value(self, name: str, values: set):
        "添加允许的值"
        self.allowed_values[name] = values

    def validate_dict(self, data: dict) -> dict:
        "验证字典数据"
        # 检查未知字段
        unknown = set(data.keys()) - set(self.allowed_fields.keys())
        if unknown:
            raise ValueError(f"未知字段: {unknown}")

        # 验证每个字段
        result = {}
        for name, value in data.items():
            validators = self.allowed_fields[name]

            for validator in validators:
                value = validator(value)

            # 白名单值检查
            if name in self.allowed_values:
                if value not in self.allowed_values[name]:
                    raise ValueError(f"{name}: 不允许的值")

            result[name] = value

        return result

# 使用示例
whitelist = WhitelistValidator()
whitelist.add_allowed_field('action', [
    lambda v: str(v).strip(),
    lambda v: v if v in {'create', 'read', 'update', 'delete'} else None
])
whitelist.add_allowed_field('id', [
    lambda v: int(v),
    lambda v: v if v > 0 else None
])

safe_data = whitelist.validate_dict({'action': 'create', 'id': '1'})

组合验证器

Python
from typing import Any, Callable, List

class CompositeValidator:
    "组合验证器"

    def __init__(self):
        self.validators: List[Callable] = []

    def add(self, validator: Callable) -> 'CompositeValidator':
        "添加验证器"
        self.validators.append(validator)
        return self

    def validate(self, value: Any) -> Any:
        "执行所有验证"
        result = value
        for validator in self.validators:
            result = validator(result)
        return result

# 预定义验证器工厂
class ValidatorFactory:
    "验证器工厂"

    @staticmethod
    def email() -> CompositeValidator:
        return CompositeValidator() \
            .add(lambda v: str(v).strip()) \
            .add(lambda v: v.lower()) \
            .add(FormatValidator.validate_email)

    @staticmethod
    def username() -> CompositeValidator:
        return CompositeValidator() \
            .add(lambda v: str(v).strip()) \
            .add(FormatValidator.validate_username)

    @staticmethod
    def age() -> CompositeValidator:
        return CompositeValidator() \
            .add(TypeValidator.validate_int) \
            .add(lambda v: RangeValidator.validate_int_range(v, 0, 150))

    @staticmethod
    def safe_text(max_length: int = 1000) -> CompositeValidator:
        return CompositeValidator() \
            .add(lambda v: str(v)) \
            .add(ContentSanitizer.remove_html_tags) \
            .add(ContentSanitizer.sanitize_html) \
            .add(lambda v: RangeValidator.validate_str_length(v, 0, max_length))

# 使用
email_validator = ValidatorFactory.email()
username_validator = ValidatorFactory.username()

safe_email = email_validator.validate(" TEST@Example.COM ")
safe_name = username_validator.validate("alice123")

数据验证装饰器

Python
from functools import wraps

def validate_input(**validators):
    "输入验证装饰器"
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 验证关键字参数
            for param, validator in validators.items():
                if param in kwargs:
                    kwargs[param] = validator(kwargs[param])

            return func(*args, **kwargs)
        return wrapper
    return decorator

# 使用示例
@validate_input(
    email=ValidatorFactory.email().validate,
    age=ValidatorFactory.age().validate,
    username=ValidatorFactory.username().validate
)
def register_user(email, age, username):
    return {"email": email, "age": age, "username": username}

result = register_user(
    email="user@example.com",
    age="25",
    username="alice"
)

JSON输入验证

text
import json

class JSONInputValidator:
    "JSON输入验证器"

    def __init__(self, schema: dict):
        self.schema = schema

    def validate(self, data: str) -> dict:
        "验证JSON字符串"
        # 解析JSON
        try:
            parsed = json.loads(data)
        except json.JSONDecodeError:
            raise ValueError("无效JSON格式")

        # 验证结构
        return self._validate_schema(parsed, self.schema)

    def _validate_schema(self, data: dict, schema: dict) -> dict:
        "验证数据结构"
        result = {}

        for field, rules in schema.items():
            if field not in data:
                if rules.get('required', False):
                    raise ValueError(f"缺少必需字段: {field}")
                result[field] = rules.get('default')
                continue

            value = data[field]
            type_ = rules.get('type')

            # 类型验证
            if type_ == 'int':
                value = TypeValidator.validate_int(value)
            elif type_ == 'str':
                value = TypeValidator.validate_str(value)
            elif type_ == 'email':
                value = FormatValidator.validate_email(value)

            # 范围验证
            if 'min' in rules and 'max' in rules:
                value = RangeValidator.validate_int_range(
                    value, rules['min'], rules['max']
                )

            result[field] = value

        return result

# Schema示例
user_schema = {
    'name': {'type': 'str', 'required': True},
    'email': {'type': 'email', 'required': True},
    'age': {'type': 'int', 'min': 0, 'max': 150, 'required': False},
}

validator = JSONInputValidator(user_schema)
safe_data = validator.validate('{"name": "Alice", "email": "alice@example.com"}')

要点总结

  1. 分层验证:类型→格式→范围→内容→业务
  2. 白名单优先:只允许已知安全的输入
  3. 内容清洗:移除/转义危险字符
  4. 组合验证器:灵活组合验证逻辑
  5. 验证与清洗分离:验证拒绝无效,清洗处理有效

📝 发现内容有误?点击此处直接编辑

← 上一篇 Python敏感数据处理
下一篇 → Python DevOps与自动化
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库