GO使用RabbitMQ实现消息队列

发布日期: 2023-12-29 14:45:22 作者: Stephen 评论: 0

RabbitMQKafka 一样作为一种分布式发布订阅消息系统,是Rabbit公司使用 Erlang 语言开发的高可用、消息可靠性高的消息队列组件,支持 AMQPXMPPSMTPSTOMP 协议。

RabbitMQ ActiveMQ RocketMQ Kafka
公司 Rabbit Apache 阿里巴巴 Apache
开发语言 Erlang Java Java Scala&Java
协议支持 AMQP、XMPP、SMTP、STOPMP OpenWire、STOMP、REST、XMPP、AMQP 自定义协议 自定义协议
可用性 一般
单机吞吐量 一般 非常高
消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
消息可靠性 一般 一般

RabbitMQ 安装

docker run \
    -e RABBITMQ_DEFAULT_USER=admin \
    -e RABBITMQ_DEFAULT_PASS=123456 \
    --name rabbitmq \
    --hostname rabbitmq \
    -p 5672:5672 \
    -p 15672:15672 \
    -d \
    rabbitmq:3.8-management

docker-compose.yaml

rabbitmq:
    image: rabbitmq:3.8-management
    container_name: rabbitmq
    restart: always
    hostname: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: 123456

浏览器打开 http://127.0.0.1:15672 进入 RabbitMQ 控制台

简单使用

go get github.com/rabbitmq/amqp091-go

publisher.go

package main

import (
    "context"
    "time"
    amqp "github.com/rabbitmq/amqp091-go"
)

func Consumer() {
    conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672/")
    defer conn.Close()

    ch, _ := conn.Channel()
    defer ch.Close()

    // china.news 是交换机和队列绑定的Routing Key
    q, _ := ch.QueueDeclare("china.news", false, false, false, false, nil)

    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

// amq.topic: exchange name
    ch.PublishWithContext(ctx, "amq.topic", q.Name, false, false, amqp.Publishing{
        Body: []byte("hello world"),
    })
}

consumer.go

package main

import (
    amqp "github.com/rabbitmq/amqp091-go"
    "log"
)

func Consumer() {
    conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672/")
    defer conn.Close()

    ch, _ := conn.Channel()
    defer ch.Close()

// topic.queue1: queue name
    q, _ := ch.QueueDeclare("topic.queue", false, false, false, false, nil)

    msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil)

    forever := make(chan struct{}, 0)

    go func() {
        for msg := range msgs {
            log.Printf("Received a message: %s", msg.Body)
        }
    }()

    <-forever
}

快来抢沙发