RabbitMQ
和 Kafka
一样作为一种分布式发布订阅消息系统,是Rabbit公司使用 Erlang
语言开发的高可用、消息可靠性高的消息队列组件,支持 AMQP
、XMPP
、SMTP
和 STOMP
协议。
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司 | Rabbit | Apache | 阿里巴巴 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议支持 | AMQP、XMPP、SMTP、STOPMP | OpenWire、STOMP、REST、XMPP、AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
RabbitMQ 安装
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name rabbitmq \
--hostname rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-d \
rabbitmq:3.8-management
或 docker-compose.yaml
rabbitmq:
image: rabbitmq:3.8-management
container_name: rabbitmq
restart: always
hostname: rabbitmq
ports:
- 5672:5672
- 15672:15672
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: 123456
浏览器打开 http://127.0.0.1:15672
进入 RabbitMQ
控制台
简单使用
go get github.com/rabbitmq/amqp091-go
publisher.go
package main
import (
"context"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
func Consumer() {
conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
// china.news 是交换机和队列绑定的Routing Key
q, _ := ch.QueueDeclare("china.news", false, false, false, false, nil)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// amq.topic: exchange name
ch.PublishWithContext(ctx, "amq.topic", q.Name, false, false, amqp.Publishing{
Body: []byte("hello world"),
})
}
consumer.go
package main
import (
amqp "github.com/rabbitmq/amqp091-go"
"log"
)
func Consumer() {
conn, _ := amqp.Dial("amqp://admin:123456@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
// topic.queue1: queue name
q, _ := ch.QueueDeclare("topic.queue", false, false, false, false, nil)
msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil)
forever := make(chan struct{}, 0)
go func() {
for msg := range msgs {
log.Printf("Received a message: %s", msg.Body)
}
}()
<-forever
}