在了解ActiveMQ使用之间,我们先一起来了解一下什么是消息中间件?
消息中间件(MOM)
- MOM的基本功能:将信息以消息的形式,从一个应用程序传输到另一个或者多个应用程序。
-
MOM主要特点:
-
消息异步接受:类似于手机短信的行为,消息发送者不需要等待消息接受者的响应,减少软件多系统集成的耦合度;
-
消息可靠接受:确保消息在消息中间件可靠保存,只要接受方接受到消息后才可删除,多个消息也可以组成原子事务;
JMS的基本概念和模型
- JMS是什么?
JMS是java message service,java消息服务,是javaEE的一个技术。
- JMS规范
JMS定义了java中访问消息中间件的接口,但是没有实现,实现了JMS接口的消息中间件就称为JMS Provider,例如AvtiveMQ。
- JMS provider
实现了JMS接口和规范的消息中间件。
- JMS message
JMS消息,JMS消息有以下三部分组成:
1、消息头:每个消息头字段都有相应的getter和setter方法。
2、消息体:封装具体的消息数据。
3、消息属性:如果需要除消息头之外的值,那么都可以使用消息属性。
- JMS producer:消息生产者。
- JMS consumer:消息消费者。
消息的发送可以采用以下两种方式:
1、同步方式:通过调用的消费里的receive方法从目的地显式的提取消息,receive方法可以一直阻塞到消息到达。
2、异步方式:客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。
- JMS domains:消息传递域。
JMS规范中定义了两种消息传递域:点对点(point-to-point)消息传递域,和发布/订阅(publish/subscribe)消息传递域。
1、点对点消息传递域的特点如下:
(1)、每个消息只能有一个消费者,即每个消息只能被其中一个消费者消费。
(2)、消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息时是否处于运行状态,它都可以提取消息。
以下是点对点的模型图:
(中间的就是队列,client 1就是生产者,它向队列中发送消息,client 2是消费者,它负责从队列中提取消息,消息消费完成后,队列中的消息就消失了)
2、发布/订阅消息传递域的特点:
(1)、每个消息可以被多个消费者消费,前提是这些消息这已经对topic(主题进行了订阅)。
(2)、消息的生产者和消费者之间有时间上的相关性,订阅了该主题的消费者只能消费该主题发布的消息,如果此时该消费者没有设置消息持久化,且没有在线,
而主题已经完成了消息发布,那么该消费者不会收到那主题发布的消息。
以下是发布/订阅的模型图:
(topic为主题,client 1为生产者,它负责向主题发布消息,client 2和client 3均为订阅了该主题的消费者,但client 1发送消息,且client 2和3都在线,那么将可以收到消息)
- JMS中相关对象
- Connection Factory:JMS的连接工厂,ActiveMQ通过new ActiveMQConnectFactory("tcp:部署activemq服务器ip:61616");
- JMS Conection:JMS连接。通过连接工厂创建连接。
- JMS Session:是生产者和消费者消息的一个单线程上下文。用于创建生产者(session.createProducer(destination)、session.createProducer(destination,selector)方法创建,第二种方法后续会讲解。)、创建消费者(session.createConsumer(destination))、创建消息。
- Destination:消息发送到的目的地(为字符串)。
- Acknowledge:签收。在创建session时设置是否签收。connection.createSession(Session.CLIENT_ACKNOWLEDGE);
- Trancsaction:事务。在创建session时设置是否开启事务。connection.createSession(Boolean.TRUE)--表示开启。
JMS的消息结构
- JMS的消息由三部分组成:消息头、消息体、消息属性。
- 消息头包含了消息的识别信息和路由信息。
- 消息头包含的一些标准的属性如下:
- JMSDestinaction:消息发送的目的地,主要是queue和topic,自动分配。
- JMSDeliveryMode:传送模式。有两种:持久化和非持久化。自动分配。
- JMSExpiration:消息过期时间。等于Destination的send方法中的timeToLive值加上发送时刻的GET时间值,如果timeToLive值等于0,则JMSExpiration被设置为0,表示永不过期。如果发送后,在消费过期时间之后消息还没有被消费,那么消息就会消失。自动分配。
- JMSPriority:消息优先级。从0-9 十个级别,0-4是普通消息,5-9是加急消息,默认是4级。自动分配。
- JMSMessageID:唯一识别每个消息的标识,由JMS Provider产生, 自动分配。
- JMSTimestamp:JMS时间戳。一个JMS Provider在调用send()方法时自动设置,它是消息被发送和消费者实际接收的时间差,自动分配。
- JMSReplyTo:JMS消息回复,要求生产者发送消息,消费者消费消息后回复生产者消息已经被消费。
- JMSType:JMS消息类型。(TextMessage 主体包含字符串、BytesMessage 主体包含连续字节流、MapMessage 主体包含键值对、StreamMessage主体包含流、ObjectMessage 主体包含对象)
- JMSRedelivered:JMS消息重递,即消息已经发送出去,但是消费者收到消息,但未签收(acknowledged)可设置该模式,让消息重递。
- 消息体:JMS API定义了5中消息体格式,也叫消息类型,可以使用不同形式发送接口数据。包括:TextMessage、BytesMessage、ObjectMessage、MapMessage、StreamMessage。
- 消息属性,包括以下三种类型的属性:
- 应用程序设置和添加的属性,比如:
message.setProperty("username",username);
message.getProperty("username");等。
2.JMS定义的属性
使用“JMSX”作为属性名的前缀,通过connection.getMetaData().getJMSXPropertyNames();方法返回所有连接支持JMSX的属性名字。
3.JMS定义的属性如下:
1)、JMSXUserID:发送消息的用户标识,发送时提供商设置。
2)、JMSXAppID:发送消息的应用标识,发送时提供商设置。
3)、JMSXDeliveryCount:转发消息重试次数,第一次是1,第二次是2...,依次类推,发送时提供商设置。
4)、JMSXGroupID:消息所在消息组的标识,由客户端设置。
5)、JMSXGroupSeq:组内消息的序号,第一是1,第二是2....依此类推,由客户端设置。
6)、JMSXProducerTXID:产生消息是事务的事务标识,发送时提供商设置。
7)、JMSXConsumerTXID:消费消息的事务的事务标识,接收时提供商设置。
8)、JMSXState:消息状态,1(等待)、2(准备)、3(到期)、4(保留)。
ActiveMQ简介
- ActiveMQ是什么
ActiveMQ是有Apache推出的,一款开源的、完全支持JMS1.1和J2EE1.4规范的JMS Provider实现的消息中间件(Message Oriented Middleware, MOM)
- ActiveMQ能干什么
最主要的功能就是:实现了JMS Provider,用来帮助实现高可用、高性能、易用、可伸缩和安全的企业级面向消息服务的系统。
- ActiveMQ的优点
- 完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务);
- 支持多种传输协议:in-VM、TCP、NIO、UDP、JGroups、JXTA;
- 很容易和Application Server集成使用
- 很容易和spring进行整合;
- 支持通过JDBC和journal提供高速的消息持久化;
- 支持Axis的整合;
ActiveMQ安装和基本使用
- 下载并安装ActiveMQ服务器端
- 从下载最新版本的ActiveMQ
- 直接解压,然后拷贝到你要安装的目录即可。
- 解压后目录结构如下:
bin (windows下面的bat和unix/linux下面的sh) 启动ActiveMQ的启动服务就在这里
conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
data (默认是空的)
docs (index,replease版本里面没有文档)
example (几个例子)
lib (activeMQ使用到的lib)
webapps (系统管理员控制台代码)
webapps-demo(系统示例代码)
activemq-all-5.8.0.jar (ActiveMQ的binary)
user-guide.html (部署指引)
LICENSE.txt
NOTICE.txt
README.txt
- 启动运行
- 普通启动;到ActiveMQ/bin下面,使用命令:./activemq start
- 启动并指定日志文件,使用命令:./activemq start > /tmp/activemqlog
- 检查是否已经启动
activemq默认采用61616端口提供JMS服务,采用8161端口提供管理控制台服务,执行以下命令以便检验是否已经成功启动activemq服务:
- 比如查看61616端口是否打开:netstat -an | grep 61616
- 也可以查看控制台输出或者日志文件;
- 还可以直接访问activemq的管理页面:http://ip:8161/admin/(ip为部署activemq的服务器ip),默认管理页面登录账号和密码都为admin;(注意,如果activemq的管理页面无法打开,可能是防火墙没有开闭,使用命令:systemctl status firewalld.service 查看防火墙状态,如果显示“Avtive:active (running) since Thu...”的字段表示防火墙状态在运行,使用命令:systemctl stop firewalld.service关闭防火墙即可。)
- 启动成功
可以访问管理员界面:,默认用户名和密码admin/admin。如果你想修改用户名和密码的话,在conf/jetty-realm.properties中修改即可。
其中在导航菜单中,Queues是队列方式消息。Topics是主题方式消息。Subscribers消息订阅监控查询。Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接。Network是网络链接数监控。Send可以发送消息数据。
- 关闭activemq服务
- 可以使用命令:./activemq stop
- 或者使用命令:ps -ef | grep activemq,得到activemq的进程数,然后使用命令:kill -9 进程数来杀死进程。
ActiveMQ消息形式
1、ActiviteMQ消息有3种形式:
JMS 公共 | 点对点域 | 发布/订阅域 |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Destination | Queue | Topic |
Session | QueueSession | TopicSession |
MessageProducer | QueueSender | TopicPublisher |
MessageConsumer | QueueReceiver | TopicSubscriber |
(1)、点对点方式(point-to-point)
点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行
(2)、发布/订阅 方式(publish/subscriber Messaging)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。
ActiveMQ接收和发送信息基本流程
- 流程图
发送消息的基本步骤:
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)、使用消息生产者MessageSender发送消息
消息接收者从JMS接受消息的步骤
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。