全部学科
Python全栈
python
NodeJS全栈
nodejs
小程序首页
📅 2026-05-22 8 分钟 ✍️ juanwangdev

任务处理耗时模拟

通过模拟不同耗时的任务,可以直观验证 RabbitMQ 工作队列的分发效果,下面梳理实现方法。

Maven 依赖

XML
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.20.0</version>
</dependency>

生产者 - 发送不同耗时任务

Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;

public class TaskProducerExample {
    private static final String QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // 发送不同耗时的任务(数字表示模拟处理秒数)
            String[] tasks = {"task:1", "task:5", "task:2", "task:8", "task:3", "task:6"};
            for (String task : tasks) {
                channel.basicPublish("", QUEUE_NAME, 
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    task.getBytes(StandardCharsets.UTF_8));
                System.out.println("已发送: " + task);
            }
            
            System.out.println("所有任务已发送至队列");
        }
    }
}

消费者 - 模拟耗时处理

Java
import com.rabbitmq.client.*;
import java.nio.charset.StandardCharsets;

public class TaskConsumerExample {
    private static final String QUEUE_NAME = "task_queue";
    private static final String consumerName;

    static {
        // 通过命令行参数或环境变量区分不同消费者实例
        consumerName = System.getProperty("consumer.name", "C1");
    }

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            
            // 使用公平分发策略
            channel.basicQos(1);
            
            System.out.println("[" + consumerName + "] 等待任务...");
            
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
                int sleepSeconds = parseSleepTime(message);
                
                System.out.println("[" + consumerName + "] 收到: " + message);
                System.out.println("[" + consumerName + "] 开始处理,预计耗时: " + sleepSeconds + "秒");
                
                try {
                    doWork(sleepSeconds);
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    System.out.println("[" + consumerName + "] 完成: " + message);
                } catch (Exception e) {
                    System.err.println("[" + consumerName + "] 失败: " + message);
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                }
            };
            
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});
        }
    }

    private static int parseSleepTime(String message) {
        try {
            String[] parts = message.split(":");
            return Integer.parseInt(parts[1]);
        } catch (Exception e) {
            return 1; // 默认1秒
        }
    }

    private static void doWork(int seconds) throws InterruptedException {
        for (int i = 0; i < seconds; i++) {
            Thread.sleep(1000);
            System.out.print(".");
        }
        System.out.println();
    }
}

运行验证方式

Bash
# 终端1:启动消费者1
java -Dconsumer.name=C1 -cp .:amqp-client-5.20.0.jar TaskConsumerExample

# 终端2:启动消费者2
java -Dconsumer.name=C2 -cp .:amqp-client-5.20.0.jar TaskConsumerExample

# 终端3:启动生产者发送任务
java -cp .:amqp-client-5.20.0.jar TaskProducerExample

观察结果

轮询分发模式下

text
C1 收到: task:1  (耗时1秒)
C2 收到: task:5  (耗时5秒)
C1 收到: task:2  (耗时2秒)
C2 收到: task:8  (耗时8秒)  <- C2 被长任务阻塞
C1 收到: task:3  (耗时3秒)
C2 收到: task:6  (耗时6秒)  <- C2 继续积压

公平分发模式下

text
C1 收到: task:1  (耗时1秒,快速完成)
C2 收到: task:5  (耗时5秒)
C1 收到: task:2  (耗时2秒,快速完成)
C1 收到: task:3  (耗时3秒,空闲继续)  <- C1 多处理任务
C2 收到: task:8  (耗时8秒完成后才接收)
C1 收到: task:6  (耗时6秒)            <- C1 承担更多工作

注意事项

验证公平分发效果时需确保 channel.basicQos(1) 已设置,否则退化为轮询分发。

消费者需通过不同进程启动(而非多线程),否则无法体现多消费者分发效果。

任务消息中的耗时数字仅用于模拟,实际场景中可替换为真实业务处理逻辑。

长任务场景建议配合消息超时和重试机制,避免任务无限积压。

要点总结

  • 任务耗时模拟通过 Thread.sleep 实现,用于验证不同分发策略效果。
  • 轮询分发下消息均匀分配,可能出现慢消费者积压问题。
  • 公平分发下空闲消费者优先接收,自动实现负载均衡。
  • 验证时需启动多个独立消费者进程,观察任务分配差异。
  • 公平分发需配合 basicQos(1) 和手动确认机制才能生效。

📝 发现内容有误?点击此处直接编辑

← 上一篇 连接异常处理
下一篇 → 工作队列基础
想查看更多题目和详细解析?
小程序提供完整的题库、模拟考试和详细解析
马上就来

长按或扫描二维码,立即体验

扫码体验小程序
马上就来
使用微信扫描二维码
立即体验完整题库