技术文章
当前位置:首页 > Python技术文章 > Python中线程的MQ消息队列实现及优缺点介绍

Python中线程的MQ消息队列实现及优缺点详解

  • 发布时间:
  • 作者:码农之家原创
  • 点击:149

这篇文章主要知识点是关于Python、实现、队列、关于Python如何操作消息队列(RabbitMQ)的方法教程 的内容,如果大家想对相关知识点有系统深入的学习,可以参阅以下电子书

数据结构与算法:Python语言实现
  • 类型:数据结构算法大小:32.9 MB格式:PDF出版:机械工业出版社作者:迈克尔 T. 古德里奇
立即下载

更多Python相关的学习资源可以参阅 Python电子书程序设计电子书 等栏目。

Python中线程的MQ消息队列实现及优缺点介绍

消息队列(MQ,Message Queue)在消息数据传输中的保存作用为数据通信提供了保障和实时处理上的便利,这里我们就来看一下Python中线程的MQ消息队列实现以及消息队列的优点解析

 

“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。相信对任何架构或应用来说,消息队列都是一个至关重要的组件,下面是十个理由:

Python的消息队列示例:

1.threading+Queue实现线程队列

#!/usr/bin/env python
 
import Queue
import threading
import time
 
queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
 """没打印一个数字等待1秒,并发打印10个数字需要多少秒?"""
 def __init__(self, queue):
  threading.Thread.__init__(self)
  self.queue = queue
 
 def run(self):
  whileTrue:
   #消费者端,从队列中获取num
   num = self.queue.get()
   print "i'm num %s"%(num)
   time.sleep(1)
   #在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号
   self.queue.task_done()
 
start = time.time()
def main():
 #产生一个 threads pool, 并把消息传递给thread函数进行处理,这里开启10个并发
 for i in range(10):
  t = ThreadNum(queue)
  t.setDaemon(True)
  t.start()
  
 #往队列中填错数据 
 for num in range(10):
   queue.put(num)
 #wait on the queue until everything has been processed
 queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

运行结果:

i'm num 0
i'm num 1
i'm num 2
i'm num 3
i'm num 4
i'm num 5
i'm num 6
i'm num 7
i'm num 8
i'm num 9
Elapsed Time: 1.01399993896

解读:

具体工作步骤描述如下:

1,创建一个 Queue.Queue() 的实例,然后使用数据对它进行填充。

2,将经过填充数据的实例传递给线程类,后者是通过继承 threading.Thread 的方式创建的。

3,生成守护线程池。

4,每次从队列中取出一个项目,并使用该线程中的数据和 run 方法以执行相应的工作。

5,在完成这项工作之后,使用 queue.task_done() 函数向任务已经完成的队列发送一个信号。

6,对队列执行 join 操作,实际上意味着等到队列为空,再退出主程序。

在使用这个模式时需要注意一点:通过将守护线程设置为 true,程序运行完自动退出。好处是在退出之前,可以对队列执行 join 操作、或者等到队列为空。

2.多个队列

所谓多个队列,一个队列的输出可以作为另一个队列的输入

#!/usr/bin/env python
import Queue
import threading
import time
 
queue = Queue.Queue()
out_queue = Queue.Queue()
 
class ThreadNum(threading.Thread):
  def __init__(self, queue, out_queue):
    threading.Thread.__init__(self)
    self.queue = queue
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中取消息
      num = self.queue.get()
      bkeep = num
      
      #将bkeep放入队列中
      self.out_queue.put(bkeep)
 
      #signals to queue job is done
      self.queue.task_done()
 
class PrintLove(threading.Thread):
  def __init__(self, out_queue):
    threading.Thread.__init__(self)
    self.out_queue = out_queue
 
  def run(self):
    whileTrue:
      #从队列中获取消息并赋值给bkeep
      bkeep = self.out_queue.get()  
      keke = "I love " + str(bkeep)
      print keke,
      print self.getName()
      time.sleep(1)
 
      #signals to queue job is done
      self.out_queue.task_done()
 
start = time.time()
def main():
  #populate queue with data
  for num in range(10):
    queue.put(num)
    
  #spawn a pool of threads, and pass them queue instance
  for i in range(5):
    t = ThreadNum(queue, out_queue)
    t.setDaemon(True)
    t.start()
 
 
  for i in range(5):
    pl = PrintLove(out_queue)
    pl.setDaemon(True)
    pl.start()
 
  #wait on the queue until everything has been processed
  queue.join()
  out_queue.join()
 
main()
print "Elapsed Time: %s" % (time.time() - start)

运行结果:

I love 0 Thread-6
I love 1 Thread-7
I love 2 Thread-8
I love 3 Thread-9
I love 4 Thread-10
I love 5 Thread-7
I love 6 Thread-6
I love 7 Thread-9
I love 8 Thread-8
I love 9 Thread-10
Elapsed Time: 2.00300002098

 

解读:

ThreadNum 类工作流程

定义队列--->继承threading---->初始化queue---->定义run函数--->get queue中的数据---->处理数据---->put数据到另外一个queue-->发信号告诉queue该条处理完毕

 

main函数工作流程:

--->往自定义queue中扔数据

--->for循环确定启动的线程数---->实例化ThreadNum类---->启动线程并设置守护

--->for循环确定启动的线程数---->实例化PrintLove类--->启动线程并设置为守护

--->等待queue中的消息处理完毕后执行join。即退出主程序。

了解了MQ的大概实现以后,我们来总结一下消息队列的优点:

1. 解耦

在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息队列在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2. 冗余

有时在处理数据的时候处理过程会失败。除非数据被持久化,否则将永远丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。在被许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。

3. 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的;只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

4. 灵活性 & 峰值处理能力

当你的应用上了Hacker News的首页,你将发现访问流量攀升到一个不同寻常的水平。在访问量剧增的情况下,你的应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为 以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住增长的访问压力,而不是因为超出负荷的请求而完全崩溃。 请查看我们关于峰值处理能力的博客文章了解更多此方面的信息。

5. 可恢复性

当体系的一部分组件失效,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。而这种允许重试或者延后处理请求的能力通常是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。

6. 送达保证

消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列即可。在此基础上,IronMQ提供了一个"只送达一次"保证。无论有多少进 程在从队列中领取数据,每一个消息只能被处理一次。这之所以成为可能,是因为获取一个消息只是"预定"了这个消息,暂时把它移出了队列。除非客户端明确的 表示已经处理完了这个消息,否则这个消息会被放回队列中去,在一段可配置的时间之后可再次被处理。

7.排序保证

在许多情况下,数据处理的顺序都很重要。消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。IronMO保证消息浆糊通过FIFO(先进先出)的顺序来处理,因此消息在队列中的位置就是从队列中检索他们的位置。

8.缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行--写入队列的处理会尽可能的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流经过系统的速度。

9. 理解数据流

在一个分布式系统里,要得到一个关于用户操作会用多长时间及其原因的总体印象,是个巨大的挑战。消息系列通过消息被处理的频率,来方便的辅助确定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。

10. 异步通信

很多时候,你不想也不需要立即处理消息。消息队列提供了异步处理机制,允许你把一个消息放入队列,但并不立即处理它。你想向队列中放入多少消息就放多少,然后在你乐意的时候再去处理它们。

以上就是Python中线程的MQ消息队列实现及优缺点介绍的详细内容,更多请关注码农之家其它相关文章!

关于Python如何操作消息队列(RabbitMQ)的方法教程

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。下面这篇文章主要给大家介绍了关于利用Python操作消息队列RabbitMQ的方法教程,需要的朋友可以参考下。

前言

RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

应用场景:

RabbitMQ无疑是目前最流行的消息队列之一,对各种语言环境的支持也很丰富,作为一个.NET developer有必要学习和了解这一工具。消息队列的使用场景大概有3种:

1、系统集成,分布式系统的设计。各种子系统通过消息来对接,这种解决方案也逐步发展成一种架构风格,即“通过消息传递的架构”。

2、当系统中的同步处理方式严重影响了吞吐量,比如日志记录。假如需要记录系统中所有的用户行为日志,如果通过同步的方式记录日志势必会影响系统的响应速度,当我们将日志消息发送到消息队列,记录日志的子系统就会通过异步的方式去消费日志消息。

3、系统的高可用性,比如电商的秒杀场景。当某一时刻应用服务器或数据库服务器收到大量请求,将会出现系统宕机。如果能够将请求转发到消息队列,再由服务器去消费这些消息将会使得请求变得平稳,提高系统的可用性。

一、安装环境

首先是在 Linux 上安装 rabbitmq


# 环境为CentOS 7
yum install rabbitmq-server # 安装RabbitMQ
systemctl start rabbitmq-server # 启动
systemctl enable rabbitmq-server # 开机自启
systemctl stop firewall-cmd  # 临时关闭防火墙

然后用 pip 安装 Python3 的开发包


pip3 install pika

安装好软件之后可以访问http://115.xx.xx.xx:15672/来访问自带的 web 页面来查看和管理 RabbitMQ。默认管理员的用户密码都是guest

二、简单的向队列中加入消息


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:25
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Producer
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 提交消息
for i in range(10):
 channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i))
 print("sent...")
# 关闭连接
connection.close()

三、简单的从队列中获取消息


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 19:40
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Consumer
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ服务器
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 指定一个队列,如果该队列不存在则创建
channel.queue_declare(queue='test_queue')
# 定义一个回调函数
def callback(ch, method, properties, body):
 print(body.decode('utf-8'))
# 告诉RabbitMQ使用callback来接收信息
channel.basic_consume(callback, queue='test_queue', no_ack=False)
print('waiting...')
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。
channel.start_consuming()

四、万一消费者掉线了

想象这样一种情况:

消费者从消息队列中获取了 n 条数据,正要处理呢结果宕机了,那该怎么办?在 RabbieMQ 中有一个 ACK 可以用来确认消费者处理结束。就有点类似网络中的 ACK,消费者每次从队列中获取了数据之后队列不会立刻将数据移除,而是等待对应的 ACK。消费者获取到数据并处理完成之后会向队列发送一个 ACK 包,通知 RabbitMQ 这堆消息已经处理妥当了,可以删除了,这时候 RabbitMQ 才会将数据从队列中移除。所以这种情况下即使消费者掉线也没有什么问题,数据依旧会在队列中存在,留给其他消费者处理。

在 Python 中这样实现:

消费者有这样一行代码channel.basic_consume(callback, queue='test_queue', no_ack=False) ,其中no_ack=False表示不发送确认包。将其修改为no_ack=True就会在每次处理完之后向 RabbitMQ 发送一个确认包,以确认消息处理完毕。

五、万一 RabbitMQ 宕机了呢

虽然有了 ACK 包,但是万一 RabbitMQ 挂了那数据还是会损失。所以我们可以给 RabbitMQ 设置一个数据持久化存储。RabbitMQ 会将数据持久化存储到磁盘上,保证下次再启动的时候队列还在。

在 Python 中这样实现:

我们声明一个队列是这样的channel.queue_declare(queue='test_queue') ,如果需要持久化一个队列可以这样声明channel.queue_declare(queue='test_queue', durable=True) 。不过这行直接放在代码中是不能执行的,因为以前已经有了一个名为test_queue的队列,RabbitMQ 不允许用不同的方式声明同一个队列,所以可以换一个队列名新建来指定数据持久化存储。不过如果只是这样声明的话,在 RabbitMQ 宕机重启后确实队列还在,不过队列里的数据就没有了。除非我们这样来声明队列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))

六、最简单的发布订阅

最简单的发布订阅在 RabbitMQ 中称之为Fanout模式。也就是说订阅者订阅某个频道,然后发布者向这个频道中发布消息,所有订阅者就都能接收到这条消息。不过因为发布者需要使用订阅者创建的随机队列所以需要先启动订阅者才能启动发布者。

发布者代码:


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:21
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Publisher
import pika
# 创建连接对象
connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx'))
# 创建频道对象
channel = connection.channel()
# 定义交换机,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
       type='fanout')
message = 'Hello Python'
# 将消息发送到交换机
channel.basic_publish(exchange='my_fanout', # 指定exchange
      routing_key='', # fanout下不需要配置,配置了也不会生效
      body=message)
connection.close()

订阅者代码:


#!/usr/bin/env python3
# coding=utf-8
# @Time : 2017/6/13 20:20
# @Author : Shawn
# @Blog : https://blog.just666.cn
# @Email : shawnbluce@gmail.com
# @purpose : RabbitMQ_Subscriber
import pika
credentials = pika.PlainCredentials('guest', 'guest')
# 连接到RabbitMQ
connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials))
channel = connection.channel()
# 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型
channel.exchange_declare(exchange='my_fanout',
       type='fanout')
# 随机创建队列
result = channel.queue_declare(exclusive=True) # exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除
queue_name = result.method.queue
# 将队列与exchange进行绑定
channel.queue_bind(exchange='my_fanout',
     queue=queue_name)
# 定义回调方法
def callback(ch, method, properties, body):
 print(body.decode('utf-8'))
# 从队列获取信息
channel.basic_consume(callback,
      queue=queue_name,
      no_ack=True)
channel.start_consuming()

总结

以上就是关于Python如何操作消息队列(RabbitMQ)的方法教程的详细内容,更多请关注码农之家其它相关文章!

以上就是本次给大家分享的关于Python的全部知识点内容总结,大家还可以在下方相关文章里找到python里dict变成list的实例方、 python字符串与url编码转换、 Python3结合Dlib实现人脸识别、 等python文章进一步学习,感谢大家的阅读和支持。

上一篇:Python3.X线程中信号量的使用实例讲解

下一篇:Python+Socket实现基于TCP协议的客户与服务端中文自动回复聊天功能实现方法

展开 +

收起 -

相关电子书
学习笔记
网友NO.124218

ActiveMQ:使用Python访问ActiveMQ的方法

Windows 10家庭中文版,Python 3.6.4,stomp.py 4.1.21 ActiveMQ支持Python访问,提供了基于STOMP协议(端口为61613)的库。 ActiveMQ的官文Cross Language Clients中给出了更详细的介绍,并附有示例代码,如下图: 第一行为常规Python访问,第二行为使用Jython访问的方式,四个操作。 Python访问ActiveMQ需要使用stomp.py,见其官网。 下载官网的代码,解压,命令行进入其目录,使用pyhthon setup.py install即可安装好,然后就可以使用stomp.py了。 官方示例代码:给队列test发送一个消息,也可以把第7行的destination的“/queue/”去掉,只剩test。 import stompconn = stomp.Connection()conn.set_listener('', MyListener())conn.start()conn.connect('admin', 'password', wait=True)conn.send(body=' '.join(sys.argv[1:]), destination='/queue/test')conn.disconnect() 测试结果:test队列接收到消息数量增加了 stomp.Connection()默认是connect.StompConnection11(协议……

网友NO.296615

python如何安装rabbitmq

第一步:下载并安装Erlang 原因:RabbitMQ服务器端代码是使用Erlang语言编写的,它依赖于Erlang运行。 下载地址:http://www.erlang.org/downloads 根据本机系统的位数选择合适的版本。双击下载好的安装包,开始安装。 安装完成后要配置环境变量,在系统环境变量中新建。 变量名:ERLANG_HOME 变量值就是刚才erlang的安装地址,点击确定。 然后双击系统变量path 点击“新建”,将%ERLANG_HOME%\bin加入到path中。 最后windows键+R键,输入cmd,再输入erl,看到版本号就说明erlang安装成功了。 相关推荐:《Python视频教程》 第二步:下载并安装RabbitMQ 下载地址:http://www.rabbitmq.com/download.html 双击下载后的.exe文件,安装过程与erlang的安装过程相同。 RabbitMQ安装好后接下来安装RabbitMQ-Plugins插件。打开命令行cd,输入RabbitMQ的sbin目录。 我的目录是:D:\Program Files\RabbitMQ Server\rabbitmq_s……

网友NO.774721

python实现RabbitMQ的消息队列的示例代码

最近在研究redis做消息队列时,顺便看了一下RabbitMQ做消息队列的实现。以下是总结的RabbitMQ中三种exchange模式的实现,分别是fanout, direct和topic。 base.py: import pika# 获取认证对象,参数是用户名、密码。远程连接时需要认证credentials = pika.PlainCredentials("admin", "admin")# BlockingConnection(): 实例化连接对象# ConnectionParameters(): 实例化链接参数对象connection = pika.BlockingConnection(pika.ConnectionParameters( "192.168.0.102", 5672, "/", credentials))# 创建新的channel(通道)channel = connection.channel() fanout模式:向绑定到指定exchange的queue中发送消息,消费者从queue中取出数据,类似于广播模式、发布订阅模式。 绑定方式: 在接收端channel.queue_bind(exchange="logs", queue=queue_name) 代码: publisher.py: from base import channel, connection# 声明exchange, 不声明queuechannel.exchange_declare(exchange="logs", exchange_type="fanout") # 广播……

网友NO.999767

Python操作rabbitMQ的示例代码

引入 RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。 rabbitMQ是一款基于AMQP协议的消息中间件,它能够在应用之间提供可靠的消息传输。在易用性,扩展性,高可用性上表现优秀。使用消息中间件利于应用之间的解耦,生产者(客户端)无需知道消费者(服务端)的存在。而且两端可以使用不同的语言编写,大大提供了灵活性。 中文文档 安装 # 安装配置epel源 rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm # 安装erlang yum -y install erlang # 安装RabbitMQ yum -y install rabbitmq-server# 启动/停止 service rabbitmq-server start/stop rabbitMQ工作模型 简单模式 生产者 import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost'))channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')print(" [x] Sent 'Hello……

<
1
>

Copyright 2018-2020 xz577.com 码农之家

电子书资源由网友、会员提供上传,本站记录提供者的基本信息及资源来路

鸣谢: “ 码小辫 ” 公众号提供回调API服务、“ 脚本CDN ”提供网站加速(本站寻求更多赞助支持)

版权投诉 / 书籍推广 / 赞助:520161757@qq.com

上传资源(网友、会员均可提供)

查看最新会员资料及资源信息