MQ-JMS介绍

The Java Message Service (JMS) API is a messaging standard that allows application components based on the Java Platform Enterprise Edition (Jave EE) to create, send, receive, and read messages. It enables distributed communication that is loosely coupled, reliable, and asynchronous.


JMS(Java Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

什么是JMS?

JMS即Java消息服务(Java Message Service),是指两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持JAVA应用程序开发。在J2EE中,当两个应用程序使用JMS进行通信时,它们之间并不是直接相连的,而是通过一个共同的消息收发服务连接起来,可以达到解耦的效果。

JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。

为什么需要JMS

在JAVA中,如果两个应用程序之间对各自都不了解,甚至这两个程序可能部署在不同的大洲上,那么它们之间如何发送消息呢?举个例子,一个应用程序A部署在印度,另一个应用程序部署在美国,然后每当A触发某件事后,B想从A获取一些更新信息。当然,也有可能不止一个B对A的更新信息感兴趣,可能会有N个类似B的应用程序想从A中获取更新的信息。

在这种情况下,JAVA提供了最佳的解决方案-JMS,完美解决了上面讨论的问题。

JMS同样适用于基于事件的应用程序,如聊天服务,它需要一种发布事件机制向所有与服务器连接的客户端发送消息。JMS与RMI不同,发送消息的时候,接收者不需要在线。服务器发送了消息,然后就不管了;等到客户端上线的时候,能保证接收到服务器发送的消息。这是一个很强大的解决方案,能处理当今世界很多普遍问题。

JMS的优势

  • Asynchronous(异步):
    JMS is asynchronous by default. So to receive a message, the client is not required to send the request. The message will arrive automatically to the client as they become available.(JMS 原本就是一个异步的消息服务,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端)
  • Reliable(可靠):
    JMS provides the facility of assurance that the message will delivered once and only once. You know that duplicate messages create problems. JMS helps you avoiding such problems.(JMS保证消息只会递送一次。大家都遇到过重复创建消息问题,而JMS能帮你避免该问题。)

JMS的两种模型

JMS有两种模型:

  • Point-to-Point(P2P):点对点模型
  • Publish/Subscribe(Pub/Sub):订阅发布模型
    在JMS API出现之前,大部分产品使用“点对点”和“发布/订阅”中的任一方式来进行消息通讯。JMS定义了这两种消息发送模型的规范,它们相互独立。任何JMS的提供者可以实现其中的一种或两种模型,这是它们自己的选择。JMS规范提供了通用接口保证我们基于JMS API编写的程序适用于任何一种模型。

1.P2P模型

在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。

这种模式的特点:

  • 只有一个消费者将获得消息。
  • 生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
  • 每一个成功处理的消息都由接收者签收。

P2P模型图

涉及到的概念

  • 消息队列(Queue)
  • 发送者(Sender)
  • 接收者(Receiver)
  • 每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

P2P的特点

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列
  • 接收者在成功接收消息之后需向队列应答成功,如果你希望发送的每个消息都应该被成功处理的话,那么你需要P2P模式。

2.Pub/Sub模型

发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。

这种模式的特点:

  • 多个消费者可以获得消息
  • 在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

Pub/Sub模式图

涉及到概念

  • 主题(Topic)
  • 发布者(Publisher)
  • 订阅者(Subscriber)
  • 客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

Pub/Sub的特点

  • 一个消息可以传递个多个订阅者(即:一个消息可以有多个接受方)
  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
  • 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

JMS接收消息

在JMS中,消息的产生和消费是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。

  • 同步(Synchronous)
    在同步消费信息模式模式中,订阅者/接收方通过调用 receive()方法来接收消息。在receive()方法中,线程会阻塞直到消息到达或者到指定时间后消息仍未到达。
  • 异步(Asynchronous)
    使用异步方式接收消息的话,消息订阅者需注册一个消息监听者,类似于事件监听器,只要消息到达,JMS服务提供者会通过调用监听器的onMessage()递送消息。

JMS编程模型

在JMS编程模型中涉及到的概念有:

  1. 管理对象(Administered objects)-连接工厂(ConnectionFactory)和目的地(Destination)
  2. 连接对象(Connection)
  3. 会话(Session)
  4. 消息生产者(MessageProducer)
  5. 消息消费者(MessageConsumer)
  6. 消息监听者(MessageListener)

一种典型的JMS 程序需要经过下列几个步骤:

  1. 通过 JNDI 查找 ConnectionFactory。
  2. 用 ConnectionFactory 创建一个 Connection。
  3. 用 Connection 创建一个或多个 Session。
  4. 用 Session 和 Destination 创建所需的 MessageProducer 和 MessageConsumer。
  5. MessageProducer 生产消息至 Destination 或者 MessageConsumer 从 Destination 消费消息。。

0.Broker

先简单介绍JMS的broker,可以把JMS Brokers看做服务器端,这个服务器可以独立运行,也可以随着其他容器,并且它是以内嵌的方式运行。
例如下面是ActiveMQ创建broker的一种方法:

1
2
3
4
5
6
7
public void createBroker() throws Exception {  
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
broker.addConnector("tcp://localhost:61616");
broker.start();
}

此外,也可以通过BrokerFactory来创建broker,例如:

1
BrokerService broker = BrokerFactory.createBroker(new URI(someURI));

通过ActiveMQConnectionFactory还可以隐含创建内嵌的broker。

1.JMS管理对象

管理对象(Administered objects)是预先配置的JMS对象,由系统管理员为使用JMS的客户端创建,主要有两个被管理的对象:

  • 连接工厂(ConnectionFactory)
  • 目的地(Destination)

这两个管理对象由JMS系统管理员通过使用Application Server管理控制台创建,存储在应用程序服务器的JNDI名字空间或JNDI注册表。

2.连接工厂(ConnectionFactory)

ConnectionFactory是用来创建连接
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

例如: ActiveMQ 提供的ActiveMQConnectionFactory。注意(要初始化 JMS,则需要使用连接工厂。客户端通过创建ConnectionFactory建立到 ActveMQ 的连接,一个连接工厂封装了一组连接配置参数,这组参数在配置ActiveMQ时已经定义,例如brokerURL参数,此参数传入的是ActiveMQ服务地址和端口,支持openwire协议的默认连接为 tcp://localhost:61616,支持 stomp协议的默认连接为 tcp://localhost:61613。

ActiveMQConnectionFactory构造方法:

ActiveMQConnectionFactory();
ActiveMQConnectionFactory(String brokerURL);
ActiveMQConnectionFactory(String userName, String password, String brokerURL); 
ActiveMQConnectionFactory(String userName, String password, URI brokerURL);
ActiveMQConnectionFactory(URI brokerURL); //其中 brokerURL为ActiveMQ服务地址和端口。 

例如:

1
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192. 168.0.135:61616");

或者

1
2
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); 
  connectionFactory. setBrokerURL("tcp://192.168.0.135:61616");

3.目的地(Destination)

Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。
所以,Destination实际上就是两种类型的对象:Queue、Topic。可以通过JNDI来查找Destination。

4.连接(Connection)

Connection表示在客户端和JMS系统之间建立的虚拟连接(对TCP/IP socket的包装)。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

注意:当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。一个Connection可以建立一个或多个Session。当一个程序执行完成后,必须关闭之前创建的Connection,否则 ActiveMQ不能释放资源,关闭一个Connection同样也关闭了 Session,MessageProducer和MessageConsumer。

5.会话(Session)

Session是我们操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当我们需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

6.消息的生产者(MessageProducer)

消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

7.消息消费者(MessageConsumer)

消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

8.消息监听者(MessageListener)

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

JMS消息的组成

JMS中的消息传递系统中传递的东西就是消息(Message),JMS消息虽然格式简单但是非常灵活, JMS消息由三部分组成:

1.消息头

消息标头是消息的信封,包含为使消息到达目的地所需要的所有信息,可以直接控制其中一些字段的值,其它值则由JMS提供程序填写。

  • JMSDestination:由Send方法设置。指定消息的目的地,由JMS提供程序填写
  • JMSDeliveryMode:由Send方法设置。提交消息的模式-持续或非持续。发送消息后JMS提供程序填写该字段。
  • JMSMessageID: Send方法设置。包含消息的唯一标识符。发送过程中由JMS提供程序填写
  • JMSTimeStamp: Send方法设置。记录消息被传递给send方法的时间。发送过程中由JMS提供程序填写
  • JMSCorrelationID: 由客户端设置。包含用于将消息连接在一起的ID。客户端一般将其置为所引用消息的ID
  • JMSReplyTo: 由客户端设置。响应消息的目的地,如果客户端期望得到响应消息,则填写该字段
  • JMSRedelivered:由JMS提供程序设置。指出该消息先前被发送过
  • JMSType:由客户端设置。包含由客户端提供的消息类型标识符。是否需要该字段,不同的提供程序有不同要求
  • JMSExpiration:Send方法设置。一个根据客户端提供的年龄计算出来的值,如果GMT比该过期值晚,则销毁消息
  • JMSPriority:Send方法设置。包含客户端在发送消息时所设置有限级值

2.消息属性

属性(property)支持把可选头字段添加到消息。如果你的应用程序需要使用非标准头字段对消息编目和分类,可以添加一个属性到消息以实现这个编目和分类。提供 setProperty(…) 和 getProperty(…) 方法以设置和获取各种 Java 类型的属性,包括:Boolean、Byte、Double、Float、Int、Long、Short、String以及Object。
每一属性均由字符串名字和相关的值组成:

1
2
3
TextMessage msg = tsession.createTextMessage();
msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);
String customer = msg.getStringProperty(“CUSTOMER_NAME”);

3.消息体

消息体包含了消息的核心数据。JMS API定义了五种消息类型:
TextMessage、MapMessage、BytesMessage、StreamMessage、ObjectMessage
我们可以以不同的类型发送和接受消息,选择最合适的消息类型可以使JMS最有效的处理消息。

文本消息(TextMessage)

将数据作为简单字符串存放在主体中(XML就可以作为字符串发)

1
2
TextMessage msg = session.createTextMessage();
msg.setText(text);

有些厂商支持一种XML专用的消息格式,带来了便利,但是不是标准的JMS类型,影响移植性,只自己定义了两个方法setText(String s)、getText()

映射表消息(MapMessage)

使用一张映射表来存放其主体内容,参照Jms API

1
2
3
4
5
6
MapMessage msg = session.createMapMessage();
msg.setString(“CUSTOMER_NAME”,”John”);
msg.setInt(“CUSTOMER_AGE”,12);

String s = msg.getString(“CUSTOMER_NAME”);
int age = msg.getInt(“CUSTOMER_AGE”);

字节消息(BytesMessage)

将字节流存放在消息主体中。适合于下列情况:必须压缩发送的大量数据、需要与现有消息格式保持一致等,参照Jms API:

1
2
3
4
5
6
byte[] data;
BytesMessage msg = session.createBytesMessage();
msg.wirte(data);

byte[] msgData = new byte[256];
int bytesRead = msg.readBytes(msgData);

流消息(StreamMessage)

用于处理原语类型。这里也支持属性字段和MapMessage所支持的数据类型。
使用这种消息格式时,收发双发事先协商好字段的顺序,以保证写读顺序相同,参照Jms API:

1
2
3
4
5
6
StringMessage msg = session.createStreamMessage();
msg.writeString(“John”);
msg.writeInt(12);

String s = msg.readString();
int age = msg.readInt();

对象消息(ObjectMessage)

用于往消息中写入可序列化的对象。
消息中可以存放一个对象,如果要存放多个对象,需要建立一个对象集合,然后把这个集合写入消息。

客户端接收到一个ObjectMessage时,是read-only模式。如果一个客户端试图写message,将会抛出MessageNotWriteableException。如果调用了clearBody方法,message既可以读又可以写
自己只单独定义了两个方法:getObject()setObject(Serializable s)
ObjectMessage包含的只是object的一个快照,set之后object的修改对ObjectMessage的body无效。
Message只读时被set抛出MessageNotWriteableException
set和get时,如果对象序列化失败抛出MessageFormatException

消息的消费

JMS的消息者可以通过两种方式来消费消息:

  • 同步:订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞(等待)。
  • 异步:订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

1.消息发出去后的确认模式

应用程序创建的会话有一般有5 种确认模式(非事务)。五种确认模式说明:

  • AUTO_ACKNOWLEDGE:自动确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收。
  • CLIENT_ACKNOWLEDGE:客户端确认模式。会话对象依赖于应用程序对被接收的消息调用一个acknowledge()方法。一旦这个方法被调用,会话会确认最后一次确认之后所有接收到的消息。这种模式允许应用程序以一个调用来接收,处理并确认一批消息。注意:在管理控制台中,如果连接工厂的Acknowledge Policy(确认方针)属性被设置为”Previous”(提前),但是你希望为一个给定的会话确认所有接收到的消息,那么就用最后一条消息来调用acknowledge()方法。
  • DUPS_OK_ACKNOWLEDGE:允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。在需要考虑资源使用时,这种模式非常有效。注意:如果你的应用程序无法处理重复的消息的话,你应该避免使用这种模式。如果发送消息的初始化尝试失败,那么重复的消息可以被重新发送。
  • NO_ACKNOWLEDGE:不确认模式。不确认收到的消息是需要的。消息发送给一个NO_ACKNOWLEDGE 会话后,它们会被WebLogic 服务器立即删除。在这种模式下,将无法重新获得已接收的消息,而且可能导致下面的结果:1. 消息可能丢失;和(或者)另一种情况:2. 如果发送消息的初始化尝试失败,会出现重复消息被发送的情况。
  • MULTICAST_NO_ACKNOWLEDGE:IP组播下的不确认模式,同样无需确认。发送给一个MULTICAST_NO_ACKNOWLEDGE会话的消息, 会共享之前所述的NO_ACKNOWLEDGE 确认模式一样的特征。这种模式支持希望通过IP 组播方式进行消息通信的应用程序,而且无需依赖会话确认提供的服务质量。注意:如果你的应用程序无法处理消息的丢失或者重复,那么你应该避免使用这种模式。如果发送消息的初始化尝试失败的话,重复的消息可能会被再次发送。

注:在上表的5 种确认模式中,AUTO_ACKNOWLEDGE ,DUPS_OK_ACKNOWLEDGE 和 CLIENT_ACKNOWLEDGE 是JMS 规范定义的,NO_ACKNOWLEDGE 和MULTICAST_NO_ACKNOWLEDGE是WebLogic JMS 提供的。)

2.JMS 支持以下两种消息提交模式:

  • PERSISTENT。指示 JMS provider 持久保存消息,以保证消息不会因为 JMS provider 的失败而丢失。(当服务器重启之后,之前的发送PERSISTENT消息,会获取到!
  • NON_PERSISTENT。不要求 JMS provider 持久保存消息。(当服务器重启之后,之前的发送NON_PERSISTENT消息,会获取不到!
------ 本文完 ------