概述

JMS(Java Message Service) 是Java平台上有关面向消息中间件的技术规范。它提供标准的产生、发送、接收消息的接口简化企业应用的开发。它支持两种消息通信模型:点到点(point-to-point)(P2P)模型和发布/订阅(Pub/Sub)模型。

消息中间件

(1)ActiveMQ

         ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完 全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。我们在本次课程中介绍 ActiveMQ 的使 用。 

(2)RabbitMQ 

        AMQP 协议的领导实现,支持多种场景。淘宝的 MySQL 集群内部有使用它进行通讯, OpenStack 开源云平台的通信组件,最先在金融行业得到运用。 

(3)ZeroMQ 

        史上最快的消息队列系统 

(4)Kafka 

        Apache 下的一个子项目 。特点:高吞吐,在一台普通的服务器上既可以达到 10W/s 的吞吐速率;完全的分布式系统。适合处理海量数据。


正文格式

  • TextMessage--一个字符串对象

  • MapMessage--一套名称-值对

  • ObjectMessage--一个序列化的 Java 对象

  • BytesMessage--一个字节的数据流

  • StreamMessage -- Java 原始值的数据流

  • XMLMessage:一个XML类型的消息。

最常用的是TextMessage和ObjectMessage。

JMS 消息传递类型

对于消息的传递有两种类型:

Queue,一种是点对点的,即一个生产者和一个消费者一一对应;

Topic,另一种是发布/ 订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收;

ActiveMQ使用

消息生产者(点对点)

//1.创建连接工厂
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.30.155:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息生产者
MessageProducer producer = session.createProducer(queue);
//7.创建消息
TextMessage textMessage = session.createTextMessage("点对点生产者的消息");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();

消息消费者(点对点)

//1.创建连接工厂
ConnectionFactory connectionFactory=new
ActiveMQConnectionFactory("tcp://192.168.30.155:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建队列对象
Queue queue = session.createQueue("test-queue");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(queue);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
    TextMessage textMessage=(TextMessage)message;
    try {
        System.out.println("接收到消息:"+textMessage.getText());
    } catch (JMSException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();

消息生产者(发布订阅)

//1.创建连接工厂
ConnectionFactory connectionFactory=new
ActiveMQConnectionFactory("tcp://192.168.30.155:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
Topic topic = session.createTopic("test-topic");
//6.创建消息生产者
MessageProducer producer = session.createProducer(topic);
//7.创建消息
TextMessage textMessage = session.createTextMessage("发布订阅生产者消息");
//8.发送消息
producer.send(textMessage);
//9.关闭资源
producer.close();
session.close();
connection.close();

消息消费者(发布订阅)

//1.创建连接工厂
ConnectionFactory connectionFactory=new
ActiveMQConnectionFactory("tcp://192.168.30.155:61616");
//2.获取连接
Connection connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取 session (参数 1:是否启动事务,参数 2:消息确认模式)
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建主题对象
//Queue queue = session.createQueue("test-queue");
Topic topic = session.createTopic("test-topic");
//6.创建消息消费
MessageConsumer consumer = session.createConsumer(topic);
//7.监听消息
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
    TextMessage textMessage=(TextMessage)message;
        try {
            System.out.println("接收到消息:"+textMessage.getText());
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
});
//8.等待键盘输入
System.in.read();
//9.关闭资源
consumer.close();
session.close();
connection.close();

session 的两个参数:

第 1 个参数 是否使用事务

第 2 个参数 消息的确认模式

  • SESSION_TRANSACTED = 0 事务提交并确认

  • AUTO_ACKNOWLEDGE = 1 自动确认

  • CLIENT_ACKNOWLEDGE = 2 客户端手动确认

  • DUPS_OK_ACKNOWLEDGE = 3 自动批量确认



Q.E.D.