RocketMQ入门样例

梦幻星辰 2024-08-20 ⋅ 14 阅读

引言

RocketMQ是一款高性能分布式消息中间件,它是阿里巴巴开源的,具有可靠、可扩展、高效等特点。本篇博客将介绍如何使用RocketMQ的入门样例,帮助读者快速上手RocketMQ。

环境准备

  • JDK 1.8及以上
  • Maven
  • RocketMQ 4.4.0及以上

安装RocketMQ

首先,我们需要下载并安装RocketMQ。

  1. 访问RocketMQ官方网站,下载最新的RocketMQ版本。
  2. 解压下载的压缩包,并将解压后的文件夹重命名为rocketmq
  3. 打开终端进入解压后的文件夹,执行以下命令启动RocketMQ:./bin/mqnamesrv./bin/mqbroker -n localhost:9876.

编写Producer样例

接下来,我们将编写一个Producer样例来发送消息到RocketMQ。

首先,使用Maven创建一个新的Java项目。在项目的pom.xml文件中添加以下依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

然后,在项目中创建一个新的Java类,命名为ProducerExample。在该类中,我们将编写发送消息的代码:

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class ProducerExample {
    public static void main(String[] args) {
        // 实例化一个生产者组
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");
        
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        
        try {
            // 启动生产者
            producer.start();
            
            // 创建并发送一条消息
            Message message = new Message("topic", "tag", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(message);
            
            System.out.println("消息发送成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭生产者
            producer.shutdown();
        }
    }
}

在上面的样例中,我们首先创建了一个生产者组producer_group。然后,通过setNamesrvAddr方法设置了NameServer的地址。接下来,我们启动生产者,并创建一个消息对象Message,并发送到指定的主题topic和标签tag。最后,我们关闭生产者。

编写Consumer样例

接下来,我们将编写一个Consumer样例来接收RocketMQ中的消息。

在同一个Java项目中创建一个新的Java类,命名为ConsumerExample。在该类中,我们将编写接收消息的代码:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class ConsumerExample {
    public static void main(String[] args) {
        // 实例化一个消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
        
        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
        
        try {
            // 订阅指定的主题和标签
            consumer.subscribe("topic", "tag");
            
            // 注册并启动消息监听器
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                // 消息处理逻辑
                for (MessageExt msg : msgs) {
                    System.out.println("接收到消息:" + new String(msg.getBody()));
                }
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
            
            // 启动消费者
            consumer.start();
            
            System.out.println("消费者已启动");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
}

在上述样例中,我们首先创建了一个消费者组consumer_group。然后,通过setNamesrvAddr方法设置了NameServer的地址。接下来,我们订阅指定的主题topic和标签tag,并注册了一个消息监听器。当消息到达Consumer时,消息会被传递给监听器进行处理。最后,我们启动消费者。

运行样例

在终端中切换到项目的根目录下,执行以下命令来分别编译并运行Producer和Consumer样例:

mvn compile exec:java -Dexec.mainClass="com.example.ProducerExample"
mvn compile exec:java -Dexec.mainClass="com.example.ConsumerExample"

可以看到,在Producer中,我们成功发送了一条消息到RocketMQ,而在Consumer中,我们收到了该消息。

结论

本篇博客介绍了如何使用RocketMQ的入门样例,通过编写简单的Producer和Consumer样例,您可以快速上手RocketMQ,并开始在您的项目中使用它。希望本文对您有所帮助!

参考链接:


全部评论: 0

    我有话说: