引言
RocketMQ是一款高性能分布式消息中间件,它是阿里巴巴开源的,具有可靠、可扩展、高效等特点。本篇博客将介绍如何使用RocketMQ的入门样例,帮助读者快速上手RocketMQ。
环境准备
- JDK 1.8及以上
- Maven
- RocketMQ 4.4.0及以上
安装RocketMQ
首先,我们需要下载并安装RocketMQ。
- 访问RocketMQ官方网站,下载最新的RocketMQ版本。
- 解压下载的压缩包,并将解压后的文件夹重命名为
rocketmq
。 - 打开终端进入解压后的文件夹,执行以下命令启动RocketMQ:
./bin/mqnamesrv
和./bin/mqbroker -n localhost:9876
.
编写Producer样例
接下来,我们将编写一个Producer样例来发送消息到RocketMQ。
首先,使用Maven创建一个新的Java项目。在项目的pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.4.0</version>
</dependency>
然后,在项目中创建一个新的Java类,命名为ProducerExample
。在该类中,我们将编写发送消息的代码:
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ProducerExample {
public static void main(String[] args) {
// 实例化一个生产者组
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
try {
// 启动生产者
producer.start();
// 创建并发送一条消息
Message message = new Message("topic", "tag", "Hello, RocketMQ!".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(message);
System.out.println("消息发送成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭生产者
producer.shutdown();
}
}
}
在上面的样例中,我们首先创建了一个生产者组producer_group
。然后,通过setNamesrvAddr
方法设置了NameServer的地址。接下来,我们启动生产者,并创建一个消息对象Message
,并发送到指定的主题topic
和标签tag
。最后,我们关闭生产者。
编写Consumer样例
接下来,我们将编写一个Consumer样例来接收RocketMQ中的消息。
在同一个Java项目中创建一个新的Java类,命名为ConsumerExample
。在该类中,我们将编写接收消息的代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ConsumerExample {
public static void main(String[] args) {
// 实例化一个消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
try {
// 订阅指定的主题和标签
consumer.subscribe("topic", "tag");
// 注册并启动消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 消息处理逻辑
for (MessageExt msg : msgs) {
System.out.println("接收到消息:" + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者
consumer.start();
System.out.println("消费者已启动");
} catch (MQClientException e) {
e.printStackTrace();
}
}
}
在上述样例中,我们首先创建了一个消费者组consumer_group
。然后,通过setNamesrvAddr
方法设置了NameServer的地址。接下来,我们订阅指定的主题topic
和标签tag
,并注册了一个消息监听器。当消息到达Consumer时,消息会被传递给监听器进行处理。最后,我们启动消费者。
运行样例
在终端中切换到项目的根目录下,执行以下命令来分别编译并运行Producer和Consumer样例:
mvn compile exec:java -Dexec.mainClass="com.example.ProducerExample"
mvn compile exec:java -Dexec.mainClass="com.example.ConsumerExample"
可以看到,在Producer中,我们成功发送了一条消息到RocketMQ,而在Consumer中,我们收到了该消息。
结论
本篇博客介绍了如何使用RocketMQ的入门样例,通过编写简单的Producer和Consumer样例,您可以快速上手RocketMQ,并开始在您的项目中使用它。希望本文对您有所帮助!
参考链接:
本文来自极简博客,作者:梦幻星辰,转载请注明原文链接:RocketMQ入门样例