RabbitMQ:在消息森林中漫步

2020.07.30 14:07:42
4
阅读约3分钟

引言 #

Message Queue:翻译为消息队列,通过典型的生产者消费者模型,生产者不断向消息队列中产生消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送何接受,没有业务逻辑的侵入,轻松的实现系统之间的解耦,别名为消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

MQ有哪些 #

ActiveMQ、RabbitMQ、Kafka,阿里的RockeMq

不同的特点 #

  1. ActiveMQ是一个完全支持JMS规范的消息中间件,丰富的api,多种集群架构模式让它在业界成位老牌的消息中间件,在中小型企业比较受欢迎。
  2. Kafka追求高吞吐量,不支持事务,对消息的重复,丢失,错误没有严格的要求。
  3. RocketMQ是阿里开源的中间件。
  4. RabbitMQ基于AMQP协议,面向消息、队列、路由、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性,稳定性和可靠性要求较高的场景,对性能和吞吐量的要求在其次。 img

docker安装rabbitmq #

建议在linux上进行安装

docker pull rabbitmq:3.7.18-management

启动容器

docker run -d --name rabbitmq3.7.18 -p 5672:5672 -p 15672:15672 -v `pwd`/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=heshenghao 2a0e4a1d8b3e

说明:

-d 后台运行容器;

–name 指定容器名;

-p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);

-v 映射目录或文件;

–hostname 主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);

-e 指定环境变量;(RABBITMQ_DEFAULT_VHOST:默认虚拟机名;RABBITMQ_DEFAULT_USER:默认的用户名;RABBITMQ_DEFAULT_PASS:默认用户名的密码)

2a0e4a1d8b3e为镜像id

启动完成后可以打开15672端口查看。

img

springboot中使用mq #

添加依赖 #

     <dependency>
             <groupId>org.springframework.amqp</groupId>
              <artifactId>spring-rabbit</artifactId>
              <version>2.2.2.RELEASE</version>
        </dependency>

配置文件 #

spring:
  rabbitmq:
    host: 106.54.247.224
    username: admin
    password: heshenghao
    port: 5672
    virtual-host: my_vhost

其中virtual-host比较重要,与启动容器时的参数一致。

测试类 #

创建队列 #

   @Test
    public void create(){
        amqpAdmin.declareQueue(new Queue("amqp.queue"));//创建队列
        System.out.println("队列创建成功");
        amqpAdmin.declareExchange(new DirectExchange("ampq.exchage"));//创建直连型交换机
        System.out.println("交换机创建成功");
        amqpAdmin.declareBinding(new Binding("amqp.queue",
                Binding.DestinationType.QUEUE,"ampq.exchage","amqp",null));//绑定信息
        System.out.println("队列与交换机的绑定创建成功");
    }

img

消息发送 #

    @Test
    public void sendMessage(){
		"Map<String,Object> map = new HashMap<>();"
		map.put("msg","我的测试直连消息队列");
		"map.put("date",new Date());"
		rabbitTemplate.convertAndSend("ampq.exchage","amqp",map);
    }

消息接收 #

   /**
     * 接收消息队列中的数据
     */
    @Test
    public void receiverMsg(){
        Object obj = rabbitTemplate.receiveAndConvert("amqp.queue");
        System.out.println(obj.getClass());
        System.out.println(obj);
    }

img

stomp #

开启插件 #

进入容器内部

. /rabbitmq- plugins enable rabbitmq_web_stomp

开启插件

![img](https://shuxie.oss-cn-hangzhou.aliyuncs.com/post/1/2020-08-02/image.png23833?x-oss-process=style/post-picture)

前端安装stompjs

npm install stompjs
npm install --save net

开启响应的容器端口,以及修改安全组。
在vue中监听消息队列

<template>
  <div class="bg-white h-100">
    <el-container class="pt-3 px-3">
      <div>
        <el-button type="primary" @click="doConnect">开启连接</el-button>
      </div>
      <div class="px-3">
        <el-button type="primary" @click="disconnect">关闭连接</el-button>
      </div>
    </el-container>
  </div>
</template>
<script lang="ts">
import { Vue, Component, Watch } from 'nuxt-property-decorator'
import stomp from 'stompjs'
import config from '~/plugins/config/website.js'
import { connect } from 'net'
@Component({
  components: {},
})
export default class index extends Vue {
  client: any = null

  doConnect() {
    this.client = stomp.client(config.RTMQ_SERVICE)
    this.connect()
  }

  connect() {
    this.client.connect(
      'admin',
      'heshenghao',
      this.on_connect,
      this.on_error,
      'my_vhost'
    )
    // this.client.heartbeat.incoming = 5000
    // this.client.heartbeat.outgoing = 5000
    console.log('>>>成功连接服务器')
  }

  on_connect() {
    console.log(234)
    this.client.subscribe('/queue/amqp.queue', (data: any) => {
      this.$notify({
        title: '成功',
        message: data.body,
        type: 'success',
      })
    })
  }

  disconnect() {
    this.client.disconnect(() => {
      this.$notify({
        title: '成功',
        message: 'See you next time!',
        type: 'success',
      })
    })
  }

  on_error() {
    console.log('MQ 连接失败,5秒后重连')
    // window.setTimeout(() => {
    //   this.doConnect()
    // }, 5000)
  }
}
</script>
<style lang='scss' scoped>
</style>

img

阅读:4 . 字数:800 发布于 14 天前
本作品系 原创 , 采用 《署名-非商业性使用-禁止演绎 4.0 国际》许可协议
推荐阅读
资源
  • 创意设计
  • 书栈网
  • 帮助中心
  • 声望与权限
  • 服务中心
  • 合作
  • 关于我
  • 广告投放
  • 职位发布
  • 联系我们
  • 关注
  • 技术日志
  • 运营日志
  • ❤ 609 天
  • 条款
  • 服务条款
  • 隐私政策
  • 下载 App
  • Copyright © 2018-2020 Siques