ActiveMQ-快速入门

简介

ActiveMQ 是一种开源的基于 JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

消息中间件

消息中间件是指两个系统或者两个客户端之间进行消息传送,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

通过提供消息传递和消息排队模型,他可以在分布式环境下扩展进程间的通信。

9a3a206583294410b98c27af6e09d192~tplv-k3u1fbpfcp-zoom-1

总结下来共有三个作用:异步化提升性能、降低耦合度、流量削峰。


消息中间件应用场景

  1. 异步通信
    在一些系统中的一些业务不需要立即处理消息。而消息队列提供了异步处理机制,允许用户把一个消息放入队列中,但是不会立即处理它。最后在需要的时候去处理即可。

  2. 缓冲(过载保护)
    在任何重要的系统中,都有着需要不同处理时间的元素。而消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲层有助于控制和优化数据流过系统的速度和时间,进而调节系统的响应时间。
    在访问量剧增的情况下,应用需要能够继续发挥作用,但这样的突发流量并不能预测。所以通过使用消息队列能够使得重要组件能够顶住突发的访问压力,而不会因为突发的超负荷负载导致系统崩溃。

  3. 解耦
    降低工程系统之间的强依赖程度,针对异构系统进行适配。
    在项目研发启动之初预测未来项目会碰到什么需求是极其困难的,而通过消息系统在处理过程中间插入一个隐含的、基于数据的接口层,这样两边的处理过程都需要实现这一接口。当应用发生变化时,就可以独立的扩展或修改两边的处理过程,只需要确保它们会遵守相同的接口约束即可。

  4. 冗余
    在某些情况下,消息队列处理数据的过程会失败,而这一过程除非通过数据被持久化,否则将会造成数据丢失。
    消息队列间数据进行持久化直到它们已经被处理,通过这一方式可以避免数据丢失风险。许多的消息队列采用的“插入-获取-删除”范式中,在把一个消息从队列中删除之前,都需要消息的消费系统明确的指出该消息已经被处理完毕,从而确保数据可以被安全的保存直到被使用完毕。

  5. 扩展
    由于消息队列解耦了消息消费系统的处理过程,所以增大消息入队和处理的频率时很容易的,只要另外增加处理过程即可。不需要改变代码,不需要调节参数,便于分布式扩容。

  6. 可恢复
    消息消费系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列的消息仍然可以在消费系统恢复之后被继续处理。

  7. 顺序保证
    在大多数场景下,数据处理的顺序性是很重要的。大部分的消息队列都是排序的,并且能够保证数据会按照生产系统插入消息的顺序被消费系统进行处理。

  8. 数据流处理
    分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析,而目前消息队列是完成此类数据收集任务的最好选择。


介绍

概念

ActiveMQ 是 Apache 出品,是最流行的、功能强大的即时通讯和集成模式的开源服务器。
ActiveMQ 是一个完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。
提供客户端支持跨语言和协议,带有易于在充分支持 JMS 1.1 和 1.4 使用 J2EE 企业集成模式和许多先进的功能。

消息模型

点对点模式(P2P)

  • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。
  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。
  • 接收者在成功接收消息之后需向队列应答成功。

1

发布订阅模式(Pub/Sub)

  • 每个消息可以有多个消费者。
  • 发布者和订阅者之间有时间上的依赖性。
    针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
  • 为了缓和这样严格的时间相关性,JMS 允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

2

消息类型

JMS 消息由以下三个部分组成:

  • 消息头
    每个消息头字段都有相应的 gettersetter 方法。
  • 消息属性
    如果需要除消息头字段以外的值,那么可以使用消息属性。
  • 消息体
    JMS 定义的消息类型有 TextMessageMapMessageBytesMessageStreamMessageObjectMessage

消息类型:

  • TextMessage: 文本消息
  • MapMessage: k/v
  • BytesMessage : 字节流
  • StreamMessage: java原始的数据流
  • ObjectMessage: 序列化的java对象

消息确认机制

消息只有在被确认之后,才会被认为成功地消费了。
消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息、消息被确认。
在事务性会话中,当一个事务被提交的时候,确认会自动发生。但是在非事务性会话中,消息何时被确认取决于创建会话时的应答模式。

  1. 优先级
    可以通过使用消息优先级来指示 JMS Provider 首先提交紧急的消息。优先级分 10 个级别,从 0 (最低) 到 9 (最高)。默认优先级为 4。

  2. 消息过期
    可以设置消息在一定时间后过期,默认是永不过期。

  3. 临时目的地
    通过会话创建临时目的地,该临时目的地会话上的消息仅限于创建连接的保持时间,且只有创建该临时目的地的连接上的消费者才能处理消息。

存储方式

  1. KahaDB 存储
    KahaDB 是默认的持久化策略,通过将所有的消息顺序添加到一个日志文件中,同时另外维护一个索引文件指向这些日志的存储地址,并且还有一个事务日志同于消息回复操作。这是一个专门针对消息持久化的解决方案。
    特性:
  • 日志形式存储消息。
  • 消息索引以 B-Tree 结构存储,可以快速更新。
  • 完全支持 JMS 事务。
  • 支持多种恢复机制。
  1. AMQ 方式
    在 5.3 版本之前,AMQ 也是一个文件型数据库,消息最终存储在文件中,同时内存中也有缓存数据。

  2. JDBC 存储
    使用 JDBC 持久化方式,数据库默认创建 3 个表,每个表分别是:

  • activemq_msgs: queue 和 topic 的消息都存储在这个表中。
  • activemq_acks: 存储持久订阅的信息和最后一个持久订阅接收的消息 ID。
  • activemq_lock: 跟 kahaDB 的 lock 文件类似,确保数据在某一时刻只有一个 Broker 在访问。
  1. LevelDB 存储
    LevelDB 的持久化性能高于 KahaDB。
    但是在 ActiveMQ 官网中对于 LevelDB 的表述:LevelDB 官方建议使用以及不再支持,推荐使用的是 KahaDB 。

  2. Memory 消息存储
    顾名思义就是基于内存的消息存储。
    persistent="false" 表示不设置持久化存储,直接存储到内存中,在 Broker 标签处设置。

通信协议

  1. TCP
    默认 Broker 配置,TCP 协议 Client 监听的端口是 61616。
    在网络传输数据前,必须要序列化数据,消息是通过一个叫 wire protocol 的序列化成字节流。默认情况下,ActiveMQ 把 wire protocol 叫做 OpenWire ,他的目的在于促使网络上的效率和数据快速交互。
    TCP 连接的 URI 的形式:
    1
    tcp://hostname:port?key=value&key=value
    TCP 传输的优点:
  • TCP 协议传输采用字节流方式传递,可靠性高,稳定性强。
  • 应用广泛,支持任何平台。
  1. NIO
    NIO 与 TCP 类似,但 NIO 更加侧重于底层的访问操作。
    NIO 连接的 URI 的形式:

    1
    nio://hostname:port?key=value
  2. UDP
    UDP 与 TCP 的区别在于其本身协议上的区别。
    UDP 连接的 URI 形式:

    1
    udp://hostname:port?key=value
  3. SSL
    ssl 连接的 URI 的形式:

    1
    ssl://hostname:port?key=value
  4. 更多协议
    更多协议


高级

安全机制

消息服务器 Broker 安全

  1. conf/activemq.xml 文件中的内添加访问密码。
  2. 在基于 JMS 开发的消息传送的地方进行添加密码修改。

Web 控制台安全

默认管理后台不对外公开,不能在外网访问。

主从集群

主从集群


实践

基本使用

  1. 消息生产者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    package com.dragon.activemq.demo;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    /**
    * @author vgbhfive
    * @version V0.0.1
    */
    public class MessageSender {

    // 发送次数
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在 ActiveMQ 管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "vgbhfive.mq.queue";

    /**
    * <b>function:</b> 发送消息
    *
    * @param session
    * @param producer
    * @throws Exception
    */
    public static void sendMessage(Session session, MessageProducer producer) throws Exception {
    for (int i = 0; i < SEND_NUM; i++) {
    String message = "发送消息第" + (i + 1) + "条";
    TextMessage text = session.createTextMessage(message);

    System.out.println(message);
    producer.send(text);
    }
    }

    public static void run() throws Exception {
    Connection connection = null;
    Session session = null;
    try {
    // 创建链接工厂
    ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
    // 通过工厂创建一个连接
    connection = factory.createConnection();
    // 启动连接
    connection.start();
    // 创建一个session会话
    session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    // 创建一个消息队列
    Destination destination = session.createQueue(DESTINATION);
    // 创建消息制作者
    MessageProducer producer = session.createProducer(destination);
    // 设置持久化模式
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    sendMessage(session, producer);
    // 提交会话
    session.commit();

    } catch (Exception e) {
    throw e;
    } finally {
    // 关闭释放资源
    if (session != null) {
    session.close();
    }
    if (connection != null) {
    connection.close();
    }
    }
    }

    public static void main(String[] args) throws Exception {
    MessageSender.run();
    }

    }

    输出

    1
    2
    3
    4
    5
    发送消息第1
    发送消息第2
    发送消息第3
    发送消息第4
    发送消息第5
  2. 消息消费者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    package com.dragon.activemq.demo;

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;

    /**
    * @author vgbhfive
    * @version V0.0.1
    */
    public class MessageReceiver {

    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在 ActiveMQ 管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "vgbhfive.mq.queue";

    public static void run() throws Exception {

    Connection connection = null;
    Session session = null;
    try {
    // 创建链接工厂
    ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection
    .DEFAULT_PASSWORD, BROKER_URL);
    // 通过工厂创建一个连接
    connection = factory.createConnection();
    // 启动连接
    connection.start();
    // 创建一个session会话
    session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    // 创建一个消息队列
    Destination destination = session.createQueue(DESTINATION);
    // 创建消息制作者
    MessageConsumer consumer = session.createConsumer(destination);

    while (true) {
    // 接收数据的时间(等待) 100 ms
    Message message = consumer.receive(1000 * 100);

    TextMessage text = (TextMessage) message;
    if (text != null) {
    System.out.println("接收:" + text.getText());
    } else {
    break;
    }
    }

    // 提交会话
    session.commit();
    } catch (Exception e) {
    throw e;
    } finally {
    // 关闭释放资源
    if (session != null) {
    session.close();
    }
    if (connection != null) {
    connection.close();
    }
    }
    }

    public static void main(String[] args) throws Exception {
    MessageReceiver.run();
    }
    }

    输出

    1
    2
    3
    4
    5
    接收:发送消息第1
    接收:发送消息第2
    接收:发送消息第3
    接收:发送消息第4
    接收:发送消息第5

更多基本使用

更多基本使用


问题

消息类型转换错误

1
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.activemq.command.ActiveMQTextMessage] to [cn.vgbhfive.activemq.demo.TestEmail] for org.springframework.jms.listener.adapter.AbstractAdaptableMessageListener$MessagingMessageConverterAdapter$LazyResolutionMessage@3189b1ff

解决方法:
在 application 类里加个 bean。

1
2
3
4
5
6
7
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}

发送消息之后不能消费

解决方法:
将 SpringBoot 里的消息加到 JMS 工厂。

1
2
3
4
5
6
7
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}

实体类序列化后报错

1
Caused by: org.springframework.jms.support.converter.MessageConversionException: Could not convert JMS message; nested exception is javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Forbidden class cn.vgbhfive.activemq.demo.TestEmail! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.

解决方法:
设置可以序列化的包列表, 加到 vm 启动参数里面!

1
2
3
#全部包
#org.apache.activemq.SERIALIZABLE_PACKAGES=*
org.apache.activemq.SERIALIZABLE_PACKAGES=cn.vgbhfive.activemq,cn.vgbhfive.demo

报错的详细讲解


总结

时常总结,时常更新,保持独立。


引用

ActiveMQ教程


个人备注

此博客内容均为作者学习所做笔记,侵删!
若转作其他用途,请注明来源!