全部学科
NodeJS全栈
nodejs
Python全栈
python
小程序首页
📅 2026-05-21 8 分钟 ✍️ juanwangdev

死锁检测与处理

死锁指两个或多个事务互相等待对方释放锁,导致无限期阻塞。数据库会自动检测并回滚其中一个事务,但业务层需做好重试与日志。

死锁场景识别

典型死锁案例

两个事务以不同顺序更新相同行:

Go
事务 A: 锁定行 1 -> 尝试锁定行 2
事务 B: 锁定行 2 -> 尝试锁定行 1
Go
// 错误示例:未固定锁定顺序
func TransferBad(db *gorm.DB, id1, id2 uint, amount float64) error {
    return db.Transaction(func(tx *gorm.DB) error {
        var a, b User
        // 按参数顺序锁定,可能产生死锁
        tx.Clauses(clause.Locking{Strength: "UPDATE"}).First(&a, id1)
        tx.Clauses(clause.Locking{Strength: "UPDATE"}).First(&b, id2)
        // ... 业务逻辑
        return nil
    })
}

正确锁定顺序

统一按主键排序锁定,消除循环等待:

Go
func TransferSafe(db *gorm.DB, id1, id2 uint, amount float64) error {
    return db.Transaction(func(tx *gorm.DB) error {
        // 按 ID 升序锁定,避免交叉等待
        ids := []uint{id1, id2}
        sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })

        var users []User
        tx.Clauses(clause.Locking{Strength: "UPDATE"}).
            Where("id IN ?", ids).Order("id ASC").Find(&users)

        // ... 业务逻辑
        return nil
    })
}

死锁产生的四个必要条件:互斥、占有等待、不可抢占、循环等待。打破任一条件即可避免死锁。

死锁错误识别

数据库返回错误

不同数据库死锁错误标识不同:

Go
import (
    "errors"
    "strings"

    "gorm.io/driver/mysql"
    "gorm.io/gorm"
)

func IsDeadlockError(err error) bool {
    if err == nil {
        return false
    }

    errStr := strings.ToLower(err.Error())

    // MySQL 死锁错误
    // Error 1213: Deadlock found when trying to get lock
    if strings.Contains(errStr, "deadlock") {
        return true
    }

    // PostgreSQL 死锁错误
    // ERROR: deadlock detected (SQLSTATE 40P01)
    if strings.Contains(errStr, "deadlock detected") || strings.Contains(errStr, "40p01") {
        return true
    }

    // SQLite 数据库忙
    if strings.Contains(errStr, "database is locked") {
        return true
    }

    return false
}

重试机制

指数退避重试

死锁被数据库回滚后,应用层应自动重试:

Go
type DeadlockRetryConfig struct {
    MaxRetries  int
    InitialBackoff time.Duration
    MaxBackoff     time.Duration
    Multiplier     float64
}

var DefaultRetryConfig = DeadlockRetryConfig{
    MaxRetries:     3,
    InitialBackoff: 50 * time.Millisecond,
    MaxBackoff:     500 * time.Millisecond,
    Multiplier:     2.0,
}

func WithDeadlockRetry(db *gorm.DB, fn func(*gorm.DB) error, cfg *DeadlockRetryConfig) error {
    if cfg == nil {
        cfg = &DefaultRetryConfig
    }

    var lastErr error
    backoff := cfg.InitialBackoff

    for attempt := 0; attempt <= cfg.MaxRetries; attempt++ {
        err := db.Transaction(fn)

        if err == nil {
            return nil
        }

        if !IsDeadlockError(err) {
            return err // 非死锁错误,直接返回
        }

        lastErr = err
        log.Printf("死锁检测:第 %d 次重试,错误: %v", attempt+1, err)

        // 指数退避等待
        if attempt < cfg.MaxRetries {
            time.Sleep(backoff)
            backoff = time.Duration(float64(backoff) * cfg.Multiplier)
            if backoff > cfg.MaxBackoff {
                backoff = cfg.MaxBackoff
            }
        }
    }

    return fmt.Errorf("死锁重试已达最大次数 %d: %w", cfg.MaxRetries, lastErr)
}

使用示例

Go
func ProcessOrder(db *gorm.DB, orderID uint) error {
    return WithDeadlockRetry(db, func(tx *gorm.DB) error {
        var order Order
        if err := tx.Clauses(clause.Locking{Strength: "UPDATE"}).
            First(&order, orderID).Error; err != nil {
            return err
        }

        order.Status = "paid"
        return tx.Save(&order).Error
    }, nil)
}

超时控制

事务执行时间过长可能间接导致死锁概率上升:

Go
func TransactionWithTimeout(db *gorm.DB, timeout time.Duration, fn func(*gorm.DB) error) error {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()

    tx := db.WithContext(ctx)
    return tx.Transaction(fn)
}

// 使用示例
err := TransactionWithTimeout(db, 5*time.Second, func(tx *gorm.DB) error {
    // 超过 5 秒自动取消
    return processComplexBusiness(tx)
})

if errors.Is(err, context.DeadlineExceeded) {
    log.Error("事务超时")
}

死锁日志记录

结构化日志

记录死锁关键信息,便于排查:

text
type DeadlockLog struct {
    Timestamp    time.Time `json:"timestamp"`
    TransactionID string   `json:"transaction_id"`
    Table        string    `json:"table"`
    Query        string    `json:"query"`
    Error        string    `json:"error"`
    RetryCount   int       `json:"retry_count"`
}

func LogDeadlock(tx *gorm.DB, err error, retryCount int) {
    log := DeadlockLog{
        Timestamp:    time.Now(),
        TransactionID: uuid.New().String(),
        Error:        err.Error(),
        RetryCount:   retryCount,
    }

    // 从 Statement 获取 SQL 信息
    if tx.Statement.Table != "" {
        log.Table = tx.Statement.Table
    }

    // 结构化日志输出
    logJSON, _ := json.Marshal(log)
    log.Printf("DEADLOCK: %s", string(logJSON))

    // 可推送到监控系统
    // metrics.Increment("deadlock_count")
}

注意事项

  • 重试幂等性:重试的业务逻辑必须幂等,否则可能造成数据重复处理
  • 重试次数限制:死锁重试次数不宜过多,一般 3-5 次,否则应降级处理
  • 退避时间合理:退避时间太短无法缓解冲突,太长影响响应时间
  • 监控告警:死锁频率突增应触发告警,可能暗示业务逻辑设计问题
  • 预防优于处理:通过固定锁定顺序、缩短事务时间、降低锁粒度来预防死锁

要点总结

  • 死锁由交叉锁定不同行引起,统一锁定顺序可有效预防
  • 使用字符串匹配识别不同数据库的死锁错误
  • 业务层实现指数退避重试,重试次数一般 3-5 次
  • 事务超时控制可间接降低死锁概率
  • 必须记录结构化死锁日志,便于排查和监控告警

存放路径:D:\git2\jwdev\articles\GORM\专家\高级并发与锁机制\死锁检测与处理.md

📝 发现内容有误?点击此处直接编辑

← 上一篇 悲观锁实现
下一篇 → GORM 内核架构解析
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库