消息中间件-2(ActiveMQ使用1)
创始人
2025-05-29 06:39:25

1、JMS和ActiveMQ

JMS(Java Message Service)是java平台上有关与面向消息中间件的技术规范,实际上是一套api接口,他便与消息系统中的Java应用程序进行消息交换,并且提供标准的产生、发送、接收消息的接口简化

JMS规范

1)连接工厂。
2)JMS连接。
3)JMS会话。
4)JMS目的。
5)JMS生产者和消费者。

JMS中消息的定义

消息头
消息属性
消息体

JMS消息模型

point to point(点对点)

在这里插入图片描述
每条队列里面的消息只能被一个消费者消费,消息一旦被消费,消息就不在消息队列中

Topic主题(发布订阅模式)

在这里插入图片描述
topic的消息会被所有的消费者消费

2、ActiveMQ安装、部署和运行

在这里插入图片描述

下载 Windows版 ActiveMQ,解压,运行bin目录下的activemq.bat即可。Linux下操作类似(进入bin目录运行./activemq start启动,./activemq stop关闭)。
下载地址:http://activemq.apache.org/activemq-580-release.html
运行后在浏览器中访问http://127.0.0.1:8161/admin,即可看到ActiveMQ的管理控制台
ActiveMQ中,61616为服务端口,8161为管理控制台端口。
启动成功界面如下:
在这里插入图片描述

3、使用ActiveMQ

1)原生API使用

详情见代码:no-spring下usemq包
1、pom文件配置

  org.apache.activemqactivemq-all5.9.0

2、消息生成端代码

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 生产者端*/
public class JmsProducer {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/*默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/*默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;/*发送数量*/private static final int SENDNUM = 3;public static void main(String[] args){/*连接工厂*/ConnectionFactory connectionFactory;/*连接*/Connection connection = null;/*会话*/Session session;/*消息的目的地*/Destination destination;/*消息的生产者*/MessageProducer messageProducer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try {/*** 通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*** 启动连接*/connection.start();/*** 创建session会话,* 第一个参数是否使用事务,当消息发送者向消息提供者发送消息时,消息发送者等待消息代理的确认* 没有回应则抛出异常,消息发送程序会处理这个错误* 第二个参数消息的确认模式:* AUTO_ACKNOWLEDGE:指定消息接收者在每次收到消息时自动发送确认,* 消息只向目标发送一次,但传输过程中可能因为错误而丢失消息(会通知消息提供者收到消息)* CLIENT_ACKNOWLEDGE:由消息接收者确认收到消息,通过调用消息的acknowledge()方法* DUPS_OK_ACKNOWLEDGE:指定消息提供者在消息接收者没有确认发送时重新发送消息* (这种确认不在乎接收者收到重复的消息)*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*** 创建一个名为HelloWorld的消息队列*///创建一个topic// destination = session.createTopic("HelloTopic");destination = session.createQueue("HelloActiveMqQueue");/*** 创建消息生产者*/messageProducer = session.createProducer(destination);/*** 循环发送消息*/for (int i = 0; i < SENDNUM; i++){String msg = "发送消息" + i + " " + System.currentTimeMillis();TextMessage textMessage = session.createTextMessage(msg);System.out.println("标准用法"+ msg);/*** 消息生产者发送消息*/messageProducer.send(textMessage);}}catch (Exception e){e.printStackTrace();}finally {//关闭mq连接if(connection != null){try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}}

3、同步消息消费端

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 消费者端-同步接受消息*/
public class JmsConsumer {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/* 默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/* 默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args){/* 连接工厂*/ConnectionFactory connectionFactory;/* 连接*/Connection connection = null;/* 会话*/Session session;/* 消息的目的地*/Destination destination;/*消息的消费者*/MessageConsumer messageConsumer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try{/*通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*启动连接*/connection.start();/*创建session*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*创建一个名为HelloWorld的消息队列*/destination = session.createQueue("HelloActiveMqQueue");//创建一个topic// destination = session.createTopic("HelloTopic");/*创建消息消费者*/messageConsumer = session.createConsumer(destination);Message message;while ((message = messageConsumer.receive()) != null){System.out.println(((TextMessage)message).getText());}}catch (JMSException e){e.printStackTrace();}finally {if (connection != null) {try {connection.close();} catch (JMSException e) {e.printStackTrace();}}}}
}

4、异步消费端代码

package tan.usemq;import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;/*** 消费者端--异步接收消息*/
public class JmsConsumerAsyn {/*默认连接用户名*/private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;/* 默认连接密码*/private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;/* 默认连接地址*/private static final String BROKEURL= ActiveMQConnection.DEFAULT_BROKER_URL;public static void main(String[] args){/* 连接工厂*/ConnectionFactory connectionFactory;/* 连接*/Connection connection = null;/* 会话*/Session session;/* 消息的目的地*/Destination destination;/*消息的消费者*/MessageConsumer messageConsumer;/*实例化连接工厂*/connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);try{/*通过连接工厂获取连接*/connection = connectionFactory.createConnection();/*启动连接*/connection.start();/*创建session*/session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);/*创建一个名为HelloWorld的消息队列*///destination = session.createQueue("HelloActiveMqQueue");//创建一个topicdestination = session.createTopic("HelloTopic");/*创建消息消费者*/messageConsumer = session.createConsumer(destination);/*设置消费者监听器,监听消息*/messageConsumer.setMessageListener(new MessageListener() {public void onMessage(Message message) {try {System.out.println(((TextMessage) message).getText());} catch (JMSException e) {e.printStackTrace();}}});}catch (JMSException e){e.printStackTrace();}}
}

2)与Spring结合

详情见代码:am_with_spring_p;am_with_spring_c

3)与SpringBoot结合

详情见代码:am_with_springboot

4、请求应答模式(request-reponse模式)

在这里插入图片描述

5、用户注册的异步处理代码

在这里插入图片描述
在这里插入图片描述
1、串行实现
2、并行实现
3、使用MQ实现
4、使用MQ实现消息回调
详情见代码:asyncApp

相关内容

热门资讯

大洋生物进军关键战略材料 ... PEEK作为特种工程塑料中的“黄金材料”,长期被英国威格斯等国际巨头垄断。而今,随着核心技术壁垒的打...
泽连斯基:俄乌“和平计划”可以... 本文转自【央视新闻客户端】;总台记者当地时间14日获悉,乌克兰总统泽连斯基表示,旨在解决俄乌冲突的“...
关于开展我们的节日 中秋主题活... 中秋节是我国人民思念故乡,思念亲人的传统节日。下面太阳教育网为大家带来了开展中秋节庆祝活动的通知,希...
最新或2023(历届)关于开展... “九九”重阳节,是我国尊老敬老爱老的传统节日。下面小编为大家带来了开展重阳节活动的通知,希望对你有帮...
英文感谢信例范文两篇 升职感谢...   Appreciation letter  (One)   Dear Sharon,   I am...