RabbitMQ介绍
消息队列是一个在项目开发中常见的功能集,特别是涉及到多机器,多服务单元之间协调通信,RabbitMQ是一个用erlang写的开源消息队列,建立在AMQP的基础之上,已经被许多大项目所采用,比如openstack用RabbitMQ作为内部命令消息的传递。
部署zookeeper
我们采用ubuntu server 14.04.1作为部署环境,为什么采用ubuntu而不是centos,主要是考虑到ubuntu安装比较方便,特别是新版本的软件。在ubuntu里面安装RabbitMQ只需要执行一条命令即可。
#安装RabbitMQ
aptitude install -y rabbitmq-server
#安装pika,一个RabbitMQ的python第三方库,用于下面的python写RabbitMQ的客户端测试
#python的RabbitMQ第三库有三个,pika,py-amqp,py-amqplib
#其中pika是RabbitMQ开发团队推荐使用的,所以我们就选择pika
aptitude install -y python-pika
启动zookeeper服务
#启动RabbitMQ
/etc/init.d/rabbitmq-server start
#检查RabbitMQ当前运行信息
rabbitmqctl status
RabbitMQ基本概念与测试代码
官网例子
http://www.rabbitmq.com/getstarted.html
我们介绍RabbitMQ的6种通信模型
1.”Hello World”
Alt text
这个模型比较好理解,P代表生产者,C代表消费者,通过队列queue把产生的消息传递给消费者,下面贴出生产者代码send.py和消费者代码receive.py
生产者 send.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python
import pika
#连接到本地运行的 RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
#声明一个hello的队列
channel.queue_declare(queue='hello')
#发送消息
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
|
消费者 receive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
channel.basic_consume(callback,
queue='hello',
no_ack=True)
channel.start_consuming()
|
运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#运行生产者测试程序
$ python send.py
[x] Sent 'Hello World!'
#运行消费者测试程序
$ python receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Hello World!'
#运行rabbitmqctl看看当前队列列表
#可以看到hello队列
$ rabbitmqctl list_queues
Listing queues ...
hello 0
task_queue 0
...done.
|
2.”Work queues”
Alt text
第二个模型是对于第一个模型的改进,有多个消费者去同一个队列取消息,同时还会介绍一些高级属性,如队列,消息的可靠性,下面先贴出代码及运行结果,之后再做分析
生产者 new_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2,
))
print " [x] Sent %r" % (message,)
connection.close()
|
消费者 worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
|
运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
#启动2个work
#worker1
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
#worker2
$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
#启动生产者,分别发送4次不同的消息
$ python new_task.py 123
$ python new_task.py 456
$ python new_task.py 789
$ python new_task.py 012
#work1收到的输出信息如下
[*] Waiting for messages. To exit press CTRL+C
[x] Received '123'
[x] Done
[x] Received '789'
[x] Done
#work2收到的输出信息如下
[*] Waiting for messages. To exit press CTRL+C
[x] Received '456'
[x] Done
[x] Received '012'
[x] Done
#可以看到,消息是被轮询到2个消费者里面
|
在RabbitMQ里面,要使得消息可靠,需要对队列和消息进行持久化设置,这样队列以及消息不会因为RabbitMQ的崩溃或者异常退出而丢失。
1.队列durable=True
2.发送消息增加delivery_mode = 2
有多个消费者在获取同一个队列的消息,假如其中一个消费者异常崩溃退出了,没有把分配给它的消息处理完并回复ack,RabbitMQ会有一个消息超时机制,某个消息超时了会重新把消息推送给另外正常的消费者进行处理。
3.”Publish/Subscribe”
Alt text
发布与订阅模式(这个模式与之前第二个模型工作队列work queues不同,在2模式里面,同一个消息只会轮询到某个消费者,在3这个模式里面,只要是订阅者都会收到消息),需要引入一个exchange的单元,就是图片里面蓝色X,在RabbitMQ里面,exchange有几种类型:direct, topic, headers and fanout,在这个例子我们使用的是fanout(扇出,以后的其它例子会介绍其它类似的使用),就是把生产者P产生的消息复制多份并发到多个订阅者的消息队列去,同时消费者C需要创建一个属于自己的匿名消息队列,然后把这个消息队列绑定(bind)到对应的exchange去,这样就可以收到exchange发过来生产者的消息了。先贴出代码,运行并分析其结果
生产者 emit_log.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
print " [x] Sent %r" % (message,)
connection.close()
|
消费者 receive_logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r" % (body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
|
运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#运行2个订阅者
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
$ python receive_logs.py
[*] Waiting for logs. To exit press CTRL+C
#运行发布者
$ python emit_log.py
#2个订阅者同时收到发布者的消息,同时输出消息
[*] Waiting for logs. To exit press CTRL+C
[x]
[*] Waiting for logs. To exit press CTRL+C
[x]
|
4.”Routing”
Alt text
Routing是在发布者/订阅者基础上的修改,使得不同的消费者只收到指定关键字的消息,该模式也是需要引入exchange单元,type熟悉设置为direct,同时发布者,订阅者需要指定routing_key关键字,作为Routing的条件,先上代码
发布者 emit_log_direct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', type='direct')
severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
print " [x] Sent %r:%r" % (severity, message)
connection.close()
|
订阅者 receive_logs_direct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
print >> sys.stderr, "Usage: %s [info] [warning] [error]" % </span>
(sys.argv[0],)
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
|
运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
|
$ python receive_logs_direct.py info error debug
[*] Waiting for logs. To exit press CTRL+C
$ python emit_logdirect.py info "info message"
$ python emitlogdirect.py test "test message"
$ python emitlog_direct.py debug "debug message"
[*] Waiting for logs. To exit press CTRL+C
[x] 'info':'info message'
[x] 'debug':'debug message'
|
5.”Topics”
Alt text
在Routing模式里面,消息的指定推送需要围绕着routing_key来控制,有些时候我们并不能明确指定routing_key,如果能针对消息内容本身的关键字进行匹配来作为routing_key实现订阅则相对比较灵活,如针对linux的syslog日志,我们可以匹配头部的关键字是包含了debug的,或者info的等等,在RabbitMQ里面,Topics模式就可以实现类似的功能了
发布者 emit_log_topic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print " [x] Sent %r:%r" % (routing_key, message)
connection.close()
|
订阅者 receive_logs_topic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],)
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print ' [*] Waiting for logs. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] %r:%r" % (method.routing_key, body,)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
|
运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
|
$ python receive_logs_topic.py info.* error.* debug.*
[*] Waiting for logs. To exit press CTRL+C
$ python emit_logtopic.py debug.message
$ python emitlogtopic.py test.message
$ python emitlog_topic.py info.message
[*] Waiting for logs. To exit press CTRL+C
[x] 'debug.message':'Hello World!'
[x] 'info.message':'Hello World!'
|
6.”RPC”
Alt text
RPC是远程调用,这个是比较常见的技术,在RabbitMQ官网文档里面,好像没有提及并发是怎么处理的,比如有多个客户端同时发起RPC调用,服务器端是串行地从队列里面获取请求处理后返回会给客户端的,在rpc里面,有一个correlation_id的属性用于把请求和回复一一对应起来,不说了上代码运行
服务端 rpc_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print " [.] fib(%s)" % (n,)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = </span>
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print " [x] Awaiting RPC requests"
channel.start_consuming()
|
客户端 rpc_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
import pika
import uuid
class FibonacciRpcClient(object):
def init(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print " [x] Requesting fib(30)"
response = fibonacci_rpc.call(30)
print " [.] Got %r" % (response,)
|
运行测试程序
1
2
3
4
5
6
7
8
9
|
#运行服务端
$ python rpc_server.py
[x] Awaiting RPC requests
[.] fib(30)
#运行客户端
$ python rpc_client.py
[x] Requesting fib(30)
[.] Got 832040
|