云服务器

RabbitMQ部署与简单使用

2017-05-08 19:43:49 0

RabbitMQ介绍 消息队列是一个在项目开发中常见的功能集,特别是涉及到多机器,多服务单元之间协调通信,RabbitMQ是一个用erlang写的开源消息队列,建立在AMQP的基础之上,已经被许多大项目所采用,比如openstack用RabbitMQ作为内部命令消息的传递。

部署zookeeper 我们采用ubuntu server 14.04.1作为部署环境,为什么采用ubuntu而不是centos,主要是考虑到ubuntu安装比较方便,特别是新版本的软件。在ubuntu里面安装RabbitMQ只需要执行一条命令即可。

#安装RabbitMQ aptitude install -y rabbitmq-server

#安装pika,一个RabbitMQ的python第三方库,用于下面的python写RabbitMQ的客户端测试 #python的RabbitMQ第三库有三个,pika,py-amqp,py-amqplib #其中pika是RabbitMQ开发团队推荐使用的,所以我们就选择pika aptitude install -y python-pika

启动zookeeper服务 #启动RabbitMQ /etc/init.d/rabbitmq-server start

#检查RabbitMQ当前运行信息 rabbitmqctl status

RabbitMQ基本概念与测试代码

官网例子 http://www.rabbitmq.com/getstarted.html 我们介绍RabbitMQ的6种通信模型

1.”Hello World”

Alt textAlt text

这个模型比较好理解,P代表生产者,C代表消费者,通过队列queue把产生的消息传递给消费者,下面贴出生产者代码send.py和消费者代码receive.py

生产者 send.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#!/usr/bin/env python
import pika

#连接到本地运行的 RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

#声明一个hello的队列 channel.queue_declare(queue='hello')

#发送消息 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()

消费者 receive.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

#回调函数 def callback(ch, method, properties, body): print " [x] Received %r" % (body,)

channel.basic_consume(callback, queue='hello', no_ack=True)

channel.start_consuming()

运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#运行生产者测试程序
$ python send.py
 [x] Sent 'Hello World!'

#运行消费者测试程序 $ python receive.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!'

#运行rabbitmqctl看看当前队列列表 #可以看到hello队列 $ rabbitmqctl list_queues Listing queues ... hello 0 task_queue 0 ...done.

2.”Work queues”

Alt textAlt text

第二个模型是对于第一个模型的改进,有多个消费者去同一个队列取消息,同时还会介绍一些高级属性,如队列,消息的可靠性,下面先贴出代码及运行结果,之后再做分析

生产者 new_task.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

#这里声明队列task_queue,在RabbitMQ队列一旦创建了就无法修改其参数属性, #所以这里不能继续服用第一个例子的hello队列了,同时这里增加了一个新的属性 #durable=True,使得task_queue队列不会因为RabbitMQ崩溃或者异常退出而销毁 channel.queue_declare(queue='task_queue', durable=True)

#这里发送消息也增加了一个属性delivery_mode = 2,使得消息有持久化 message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close()

消费者 worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" #这里增加了对消息进行ack回复的机制,使得消费者ack某个消息,确定收到了才把消息从消息队列删除 ch.basic_ack(delivery_tag = method.delivery_tag)

#这个prefetch_count可以设置消费者每次从队列取消息的个数,可以通过设置这个值来达到 #调整多个消费者之间的权值 channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue')

channel.start_consuming()

运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#启动2个work
#worker1
$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C

#worker2 $ python worker.py [*] Waiting for messages. To exit press CTRL+C

#启动生产者,分别发送4次不同的消息 $ python new_task.py 123 $ python new_task.py 456 $ python new_task.py 789 $ python new_task.py 012

#work1收到的输出信息如下 [*] Waiting for messages. To exit press CTRL+C [x] Received '123' [x] Done [x] Received '789' [x] Done

#work2收到的输出信息如下 [*] Waiting for messages. To exit press CTRL+C [x] Received '456' [x] Done [x] Received '012' [x] Done

#可以看到,消息是被轮询到2个消费者里面

在RabbitMQ里面,要使得消息可靠,需要对队列和消息进行持久化设置,这样队列以及消息不会因为RabbitMQ的崩溃或者异常退出而丢失。 1.队列durable=True 2.发送消息增加delivery_mode = 2

有多个消费者在获取同一个队列的消息,假如其中一个消费者异常崩溃退出了,没有把分配给它的消息处理完并回复ack,RabbitMQ会有一个消息超时机制,某个消息超时了会重新把消息推送给另外正常的消费者进行处理。

3.”Publish/Subscribe”

Alt textAlt text

发布与订阅模式(这个模式与之前第二个模型工作队列work queues不同,在2模式里面,同一个消息只会轮询到某个消费者,在3这个模式里面,只要是订阅者都会收到消息),需要引入一个exchange的单元,就是图片里面蓝色X,在RabbitMQ里面,exchange有几种类型:direct, topic, headers and fanout,在这个例子我们使用的是fanout(扇出,以后的其它例子会介绍其它类似的使用),就是把生产者P产生的消息复制多份并发到多个订阅者的消息队列去,同时消费者C需要创建一个属于自己的匿名消息队列,然后把这个消息队列绑定(bind)到对应的exchange去,这样就可以收到exchange发过来生产者的消息了。先贴出代码,运行并分析其结果

生产者 emit_log.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

#声明一个exchange单元,同时指定type=fanout channel.exchange_declare(exchange='logs', type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!" #生产者直接把消息发到exchange,而不是之前那样直接发到消息队列 channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()

消费者 receive_logs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

#声明exchange单元 channel.exchange_declare(exchange='logs', type='fanout')

#消费者声明一个自己的匿名消息队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue

#把消息队列绑定到exchange单元去 channel.queue_bind(exchange='logs', queue=queue_name)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body): print " [x] %r" % (body,)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#运行2个订阅者
$ python receive_logs.py
 [*] Waiting for logs. To exit press CTRL+C

$ python receive_logs.py [*] Waiting for logs. To exit press CTRL+C

#运行发布者 $ python emit_log.py

#2个订阅者同时收到发布者的消息,同时输出消息 [*] Waiting for logs. To exit press CTRL+C [x] 'info: Hello World!'

[*] Waiting for logs. To exit press CTRL+C [x] 'info: Hello World!'

4.”Routing”

Alt textAlt text

Routing是在发布者/订阅者基础上的修改,使得不同的消费者只收到指定关键字的消息,该模式也是需要引入exchange单元,type熟悉设置为direct,同时发布者,订阅者需要指定routing_key关键字,作为Routing的条件,先上代码

发布者 emit_log_direct.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

#声明exchange单元,type=direct channel.exchange_declare(exchange='direct_logs', type='direct')

#指定Routing关键字,启动程序的时候输入,如果没有默认只有info severity = sys.argv[1] if len(sys.argv) > 1 else 'info' message = ' '.join(sys.argv[2:]) or 'Hello World!' #发布者发送消息的时候指定了routing_key关键字 channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print " [x] Sent %r:%r" % (severity, message) connection.close()

订阅者 receive_logs_direct.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

#声明exchange单元,type=direct channel.exchange_declare(exchange='direct_logs', type='direct')

#声明自己的匿名消息队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue

#启动程序的时候输入Routing关键字 severities = sys.argv[1:] if not severities: print >> sys.stderr, "Usage: %s [info] [warning] [error]" % </span> (sys.argv[0],) sys.exit(1)

#绑定消息队列到exchange单元和关键字 for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
#运行订阅者,Routing关键字是 info,error,debug
$ python receive_logs_direct.py info error debug 
 [*] Waiting for logs. To exit press CTRL+C

#运行发布者并指定Routing关键字和消息内容 $ python emit_logdirect.py info "info message" $ python emitlogdirect.py test "test message" $ python emitlog_direct.py debug "debug message"

#订阅者收到的消息如下,订阅者只收到info和debug关键字的消息 [*] Waiting for logs. To exit press CTRL+C [x] 'info':'info message' [x] 'debug':'debug message'

5.”Topics”

Alt textAlt text

在Routing模式里面,消息的指定推送需要围绕着routing_key来控制,有些时候我们并不能明确指定routing_key,如果能针对消息内容本身的关键字进行匹配来作为routing_key实现订阅则相对比较灵活,如针对linux的syslog日志,我们可以匹配头部的关键字是包含了debug的,或者info的等等,在RabbitMQ里面,Topics模式就可以实现类似的功能了

  • 代表匹配1个或者以上

    代表匹配0个或者以上

发布者 emit_log_topic.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

#声明exchange单元,type=topic channel.exchange_declare(exchange='topic_logs', type='topic')

#这里也是使用routing_key属性,不过跟之前的关键字算法不同,是根据消息内容的字符串匹配 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print " [x] Sent %r:%r" % (routing_key, message) connection.close()

订阅者 receive_logs_topic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) channel = connection.channel()

#声明exchange单元,type=topic channel.exchange_declare(exchange='topic_logs', type='topic')

#声明一个自己的匿名消息队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue

binding_keys = sys.argv[1:] if not binding_keys: print >> sys.stderr, "Usage: %s [binding_key]..." % (sys.argv[0],) sys.exit(1)

#绑定相关的routing_key到exchange for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print ' [*] Waiting for logs. To exit press CTRL+C'

def callback(ch, method, properties, body): print " [x] %r:%r" % (method.routing_key, body,)

channel.basic_consume(callback, queue=queue_name, no_ack=True)

channel.start_consuming()

运行测试程序
1
2
3
4
5
6
7
8
9
10
11
12
13
#运行订阅者并指定匹配的关键字
$ python receive_logs_topic.py info.* error.* debug.*
 [*] Waiting for logs. To exit press CTRL+C

#运行发布者并发布消息内容,这里只需要消息有匹配特定关键字即可 $ python emit_logtopic.py debug.message $ python emitlogtopic.py test.message $ python emitlog_topic.py info.message

#订阅者只收到关系的消息,输出如下 [*] Waiting for logs. To exit press CTRL+C [x] 'debug.message':'Hello World!' [x] 'info.message':'Hello World!'

6.”RPC”

Alt textAlt text

RPC是远程调用,这个是比较常见的技术,在RabbitMQ官网文档里面,好像没有提及并发是怎么处理的,比如有多个客户端同时发起RPC调用,服务器端是串行地从队列里面获取请求处理后返回会给客户端的,在rpc里面,有一个correlation_id的属性用于把请求和回复一一对应起来,不说了上代码运行

服务端 rpc_server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n): if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2)

def on_request(ch, method, props, body): n = int(body)

print " [.] fib(%s)" % (n,) response = fib(n)

ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = </span> props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1) channel.basic_consume(on_request, queue='rpc_queue')

print " [x] Awaiting RPC requests" channel.start_consuming()

客户端 rpc_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python
import pika
import uuid

class FibonacciRpcClient(object): def init(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))

self.channel = self.connection.channel()

result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue

self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)

def on_response(self, ch, method, props, body): if self.corr_id == props.correlation_id: self.response = body

def call(self, n): self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) while self.response is None: self.connection.process_data_events() return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print " [x] Requesting fib(30)" response = fibonacci_rpc.call(30) print " [.] Got %r" % (response,)

运行测试程序
1
2
3
4
5
6
7
8
9
#运行服务端
$ python rpc_server.py 
 [x] Awaiting RPC requests
 [.] fib(30)

#运行客户端 $ python rpc_client.py [x] Requesting fib(30) [.] Got 832040

上一篇: 无

微信关注

获取更多技术咨询