Python开发【第十首】:RabbitMQ队列

configparser模块

configparser用于拍卖特定格式的文书,其面目是使用open来操作文件。

文件a.txt

  1. [section1]

  2. k1 = 123

  3. k2:v2

  4.  

  5. [section2]

  6. k1 = 234

加载文件a.txt

  1. import configparser

  2.  

  3. config = configparser.ConfigParser()

  1. config.read(‘a.txt’,encoding=’utf-8′)

本着文本有关操作

  1. import configparser

  2.  

  3. config = configparser.ConfigParser()

  1. config.read(‘a.txt’,encoding=’utf-8′)
  1.  

  2. #得有节点

  3. ret = config.sections()

  4. print(ret)

  5.  

  6. #取指定节点下之享有键值对

  7. ret = config.items(‘section1’)

  8. print(ret)

  9.  

  10. #博指定节点下具有的键

  11. ret = config.options(‘section1’)

  1. print(ret)

  2.  

  3. #取得指定节点下指定key的值

  4.  

  5. ret = config.get(‘section1′,’k1’)

  6. #转换成int

  7. # ret = config.getint(‘section1′,’k1’)

  1. #转换成float

  2. # ret =
    config.getfloat(‘section1′,’k1’)

  3. #转换成boolean

  4. # ret =
    config.getboolean(‘section1′,’k1’)

  5.  

  6. print(ret)

  7.  

  8. #检查、删除、设置指定组内的键值对

  1.  

  2. #检查

  3. has_opt =
    config.has_option(‘section1’)

  4. print(has_opt)

  5.  

  6. #补给加节点

  7. config.add_section(‘SEC_1’)

  8. config.write(open(‘a.txt’,’w’))

  9.  

  10. #删去节点

  11. config.remove_section(‘SEC_1’)

  1. config.write(open(‘a.txt’,’w’))

  2.  

  3. #自我批评、删除、设置指定组内的键值对

  1.  

  2. #检查

  3. has_opt =
    config.has_option(‘section1′,’k1’)

  4. print(has_opt)

  5.  

  6. #删除

  7. config.remove_option(‘section1′,’k1’)

  1. config.write(open(‘a.txt’,’w’))

  2.  

  3. #设置

  4. config.set(‘section1′,’k10′,’123’)

  5. config.write(open(‘a.txt’,’w’))

简介

RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的正规化兑现。

XML模块

xml是实现不同语言或程序中进行数据交换的说道。

文件xml_text.xml

  1. <data>

  2.     <country name=”Liechtenstenin”>

  3.         <rank update=”yes”>2</rank>

  4.         <year>2023</year>

  1.         <gdppc>141100</gbppc>
  1.         <neighbor direction=”E” name=”Austria”/>

  2.         <neighbor direction=”W” name=”Switzerland”/>

  3.     </country>

  4.     <country name=”Sinagapore”>

  5.         <rank update=”yes”>5</rank>

  6.         <year>2026</year>

  1.         <gdppc>59900</gdppc>
  1.         <neighbor direction=”N” name=”Malaysia”/>

  2.     </country>

  3.     <country name=”Panama”>

  4.         <rank update=”yes”>69</rank>

  5.         <year>2026</year>

  1.         <gdppc>13600</gdppc>
  1.         <neighbor direction=”W” name=”Costa Rica”/>

  2.         <neighbor direction=”E” name=”Costa Rica”/>

  3.     </country>

  4. </data>

安装

率先安装erlang环境。

官网:http://www.erlang.org/

Windows版下载地址:http://erlang.org/download/otp\_win64\_20.0.exe

Linux版:yum安装

xml相关操作

解析xml文件

  1. from xml.etree import ElementTree as ET

  2. #直白解析xml文件

  3. tree = ET.parse(‘xml_test.xml’)

  1.  

  2. #获取xml文件的根节点

  3. root = tree.getroot()

  4. print(root)

  5. print(root.tag)

  6. print(root.attrib)

  7. “””

  8. 输出:

  9. <Element ‘data’ at
    0x00000000006D0688>

  10. data

  11. {‘title_2’: ‘test_2’, ‘title_1’:
    ‘test_1’}

  12. “””

剖析字符串

  1. from xml.etree import ElementTree as ET

  2.  

  3. #打开文件,读取xml内容

  4. str_xml =
    open(‘xm_test.xml’,’r’).read()

  5.  

  6. #将字符串解析成xml特殊目标,root代指xml文件的根节点

  1. root = ET.XML(str_xml)

遍历xml文档的拥有情节

  1. from xml.etree import ElementTree as ET

  2. #直解析xml文件

  3. tree = ET.parse(‘xml_test.xml’)

  1.  

  2. #落xml文件的根节点

  3. root = tree.getroot()

  4. #遍历xml文档的次重叠

  5. for
    child in root:

  6.     #仲重叠节点的竹签号以及标签属性

  1.     print(child.tag,child.attrib)
  1.     #遍历xml文档的老三交汇

  2.     for i in
    child:

  3.         #其三重叠节点的签号与情节

  1.         print(i.tag,i.text)

  2. “””

  3. 输出:

  4. country {‘name’: ‘Liechtenstenin’}

  1. rank 2

  2. year 2023

  3. gdppc 141100

  4. neighbor None

  5. neighbor None

  6. country {‘name’: ‘Sinagapore’}

  7. rank 5

  8. year 2026

  9. gdppc 59900

  10. neighbor None

  11. country {‘name’: ‘Panama’}

  12. rank 69

  13. year 2026

  14. gdppc 13600

  15. neighbor None

  16. neighbor None

  17. “””

修改xml

浅析字符串方式展开改动

  1. from xml.etree import ElementTree as ET

  2. str_xml =
    open(‘xml_test.xml’,’r’).read()

  3. root = ET.XML(str_xml)

  4.  

  5. #取得顶层标签

  6. print(root.tag)

  7.  

  8. #巡回所有的year节点

  9. for
    node in root.iter(‘year’):

  10.     #拿year节点中之始末从加一

  11.     new_year = int(node.text) + 1

  12.     node.text = str(new_year)

  13.  

  14.     #设置属性

  15.     node.set(‘name’,’alex’)

  16.     node.set(‘age’,’19’)

  17.  

  18.     #剔除属性

  19.     del node.attrib[‘name’]

  20.  

  21. #创新文件

  22. tree = ET.ElementTree(root)

  23. tree.write(“new_xml_test.xml”,encoding=’utf-8′)

分析文件方式进行改动

直调用tree.write写副即可。

  1. from xml.etree import ElementTree as ET

  2. tree = ET.parse(‘xml_test.xml’)

  1. root = tree.getroot()

  2.  

  3. “””

  4. 操作

  5. “””

  6.  

  7. #履新文件

  8. tree.write(“new_xml_test2.xml”,encoding=’utf-8′)

加加节点

  1. from xml.etree import ElementTree as ET

  2. tree = ET.parse(‘xml_test.xml’)

  1. root = tree.getroot()

  2.  

  3. ele = ET.Element(‘Alex’,{‘k1′:’v1’})

  1. ele.text = “test”

  2. root.append(ele)

  3.  

  4. tree.write(“new.xml”,encoding=’utf-8′)

结果new.xml:

  1. <data title_1=”test_1″ title_2=”test_2″>

  2.  

  3. “””

  4. 原内容

  5. “””

  6.  

  7. <Alex k1=”v1″>test</Alex></data>

出于原生保存xml时默认无缩进,如果如装缩进需要修改保存方法。

  1. from xml.etree import ElementTree as ET

  2. from xml.dom import minidom

  3.  

  4. def prettify(elem):

  5.     “””

  6.     将节点换成为字符串,并上加缩进

  1.     :param elem:

  2.     :return:

  3.     “””

  4.     rough_string =
    ET.tostring(elem,’utf-8′)

  5.     reparsed =
    minidom.parseString(rough_string)

  6.     return reparsed.toprettyxml(indent=”\t”)

  7.  

  8. #缔造根节点

  9. root = ET.Element(“family”)

  10.  

  11. #缔造大儿子

  12. # son1 =
    ET.Element(‘son’,{‘name’:’儿1′})

  13. son1 =
    root.makeelement(‘son’,{‘name’:’儿1′})

  14. #创小儿子

  15. # son2 =
    ET.Element(‘son’,{‘name’:’儿2′})

  16. son2 =
    root.makeelement(‘son’,{‘name’:’儿2′})

  17.  

  18. #以儿子被创造2个孙子

  19. # grandson1 =
    ET.Element(‘grandson’,{‘name’:’儿11′})

  20. grandson1 =
    root.makeelement(‘grandson’,{‘name’:’儿11′})

  21. # grandson2 =
    ET.Element(‘grandon’,{‘name’:’儿12′})

  22. grandson2 =
    root.makeelement(‘grandon’,{‘name’:’儿12′})

  23.  

  24. son1.append(grandson1)

  25. son2.append(grandson2)

  26.  

  27. #把幼子上加到根节点

  28. root.append(son1)

  29. root.append(son2)

  30.  

  31. raw_str = prettify(root)

  32.  

  33. f =
    open(‘family.xml’,’w’,encoding=’utf-8′)

  34. f.write(raw_str)

  35. f.close()

family.xml:

  1. <?xml version=”1.0″ ?>

  2. <family>

  3.    <son name=”儿1″>

  4.       <grandson name=”儿11″/>

  5.    </son>

  6.    <son name=”儿2″>

  7.       <grandon name=”儿12″/>

  8.    </son>

  9. </family>

Windows安装步骤

率先步运行

语言 1

第二步

语言 2

第三步

语言 3

第四步

语言 4

第五步

语言 5

Erlang安装好。

然后安装RabbitMQ,首先下载RabbitMQ的Windows版本。

官网:http://www.rabbitmq.com/

Windows版下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.10/rabbitmq-server-3.6.10.exe

开拓安装程序,按照下面步骤安装。

语言 6

语言 7

语言 8

语言 9

语言 10

RabbitMQ安装完成。

开始菜单中进入管理工具。

语言 11

语言 12

运行命令

  1. rabbitmq-plugins enable
    rabbitmq_management

语言 13

查看RabbitMQ服务是否启动。

语言 14

语言 15

时至今日全部装置好。

shutil

高等的文书、文件夹、压缩包处理模块。

shutil.copyfileobj(ferc,fdst[,length])

将文件内容拷贝到其它一个文书中

  1. import shutil

  2.  

  3. shutil.copyfileobj(open(‘old.xml’,’r’),open(‘new.xml’,’w’))

shuit.copyfile(src,dst)

拷贝文件

  1. shutil.copyfile(‘f1.log’,’f2.log’)

Linux安装步骤

安装erlang。

  1. yum -y install erlang

安装RabbitMQ。

  1. wget https://github.com/rabbitmq/rabbitmq-server/archive/rabbitmq\_v3\_6\_10.tar.gz
  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

RabbitMQ安装失败,报错如下。

  1. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  2. error: Failed dependencies:

  3.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

  4.         socat is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

由是yum安装的erlang版本太没有,这里提供的RabbitMQ是时髦版本3.6.10,所欲的erlang版本最低为R16B-03,否则编译时以败,也就是上述荒唐。

重新安装erlang。

  1. wget http://erlang.org/download/otp\_src\_20.0.tar.gz
  1. tar xvzf otp_src_20.0.tar.gz

  2. cd otp_src_20.0

  3. ./configure

  4. make && make install

重新安装erlang完毕。

运行erlang。

  1. erl

  2. Erlang/OTP 20 [erts-9.0] [source]
    [64-bit] [smp:1:1] [ds:1:1:10] [async-threads:10] [hipe]
    [kernel-poll:false]

  3.  

  4. Eshell V9.0 (abort with ^G)

安装socat。

  1. yum install -y socat

再度安装RabbitMQ。

  1. rpm -ivh
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

  2. warning:
    rabbitmq-server-3.6.10-1.el6.noarch.rpm: Header V4 RSA/SHA512
    Signature, key ID 6026dfca: NOKEY

  3. error: Failed dependencies:

  4.         erlang >= R16B-03 is needed by
    rabbitmq-server-3.6.10-1.el6.noarch

上述错误信息显示安装失败,因为rabbitMQ的因关系所导致,所以一旦不经意依赖,执行以下命令。

  1. rpm -ivh –nodeps
    rabbitmq-server-3.6.10-1.el6.noarch.rpm

装成功。

启动、停止RabbitMQ。

  1. rabbitmq-server start     #启动
  1. rabbitmq-server stop     #停止
  1. rabbitmq-server restart    #重启

 

zipfile、tarfile

zipfile创建压缩包

  1. import zipfile

  2.  

  3. #压缩

  4. z = zipfile.ZipFile(‘test.zip’,’a’)

  1. z.write(‘new.xml’)

  2. z.write(‘family.xml’)

  3. z.close

zipfile解压压缩包

  1. #解压

  2. z = zipfile.ZipFile(‘test.zip’,’r’)

  1.  

  2. #解压全部

  3. # z.extractall()

  4.  

  5. #解压单个文件

  6. z.extract(“new.xml”)

  7.  

  8. #得到压缩包的积极分子

  9. for
    item in z.namelist():

  10.     print(item)

  11.  

  12. z.close()

tarfile创建压缩包

  1. import tarfile

  2.  

  3. #压缩

  4. tar = tarfile.open(“test.tar”,’w’)

  5. #arcname重命名

  6. tar.add(‘test.py’,arcname=’test_1.py’)

  1. tar.add(‘xml_test.py’,arcname=’xml_test.py’)
  1. tar.close()

tarfile解压压缩包

  1. tar = tarfile.open(‘test.tar’,’r’)
  1.  

  2. #而是安装解压路径

  3. # tar.extractall()

  4.  

  5. for
    item in tar.getmembers():

  6.     print(item,type(item))

  7.  

  8. obj = tar.getmember(“test_1.py”)

  9. print(obj,type(obj))

  10. tar.extract(obj)

  11.  

  12. tar.close()

RabbitMQ使用

心想事成最简易的阵通信

语言 16

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’)

  2. print(“[x] Sent ‘hello word!'”)

  3. connection.close()

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.  

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=True
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

系统命令

no_ack分析

no_ack属性是当调用Basic.Consume方法时得以装的一个根本参数。no_ack的用途是保message被consumer成功处理了。这里成功的觉察是,在安了no_ack=false的场面下,只要consumer手动应答了Basic.Ack,就到底其成拍卖了。

call

抱状态码,0正常。

  1. import subprocess

  2.  

  3. #shell=False命令传入方式也列表

  4. ret = subprocess.call([“ls”,”-l”],shell=False)

  5.  

  6. #shell=True命令传入方式呢字符串

  1. ret = subprocess.call(“ls -l”,shell=True)

no_ack=true(此时啊全自动回复)

于这种情况下,consumer会在收取及Basic.Deliver+Content-Header+Content-Body之后,立即回复Ack,而之Ack是TCP协议被的Ack。此Ack的死灰复燃不体贴consumer是否对接到及的数额进行了拍卖,当然也非关注处理多少所要的耗时。

check_call

执行命令,如果执行状态码是0,则回回0,否则抛出异常。

  1. ret = subprocess.check_call(“ls -l”,shell=True)

no_ack=False(此时也手动应答)

每当这种情景下,要求consumer在拍卖终结接收到之Basic.Deliver+Content-Header+Content-Body之后才回复Ack,而者Ack是AMQP协议中之Basic.Ack。此Ack的回复和工作处理有关,所以具体的复时间该使取决于业务处理的耗时。

check_output

执行命令,如果状态码是0,则赶回执行结果,否则抛出异常。

  1. ret = subprocess.check_output(“ls -l”,shell=True)

总结

Basic.Ack发给RabbitMQ以告,可以用相应message从RabbitMQ的信从缓存中移除。

Basic.Ack未为consumer发给RabbitMQ前出现了好,RabbitMQ发现和拖欠consumer对应的连续为断开,将该该message以轮询方式发送给其它consumer(需要有多个consumer订阅同一个queue)。

在no_ack=true的情状下,RabbitMQ看message一旦被deliver出去后即既给确认了,所以会就将缓存中的message删除,因此在consumer异常时会见造成信息丢失。

出自consumer的Basic.Ack与发送给Producer的Basic.Ack没有一直关联。

函数式编程和面向对象编程实现发送邮件功能。

函数实现:

  1. def mail(email,message):

  2.     print(“发送”)

  3.    return True

  4.  

  5. mail(“xxxx.@126.com”,”hello”)

面向对象实现:

  1. class
    Foo:

  2.     #方法

  3.    def mail(self,email,message):

  1.        print(“发送”)

  2.       return True

  3.  

  4. #调用

  5. #1、创建对象,类名()

  6. obj = Foo()

  7. #2、通过对象去实施措施

  8. obj.mail(“xxxx.@126.com”,”hello”)

信持久化

类似以及目标

1、创建类:

class 类名:

def 方法名(self,xxx):

pass

2、创建对象

对象 = 类名()

3、通过对象执行方

对象.方法名(xxx)

函数式

def fetch(host,username,passwd,sql):

pass

def create(host,username,passwd,sql):

pass

def remove(host,username,passwd,sql):

pass

def modify(host,username,passwd,sql):

pass

fetch(…)

面向对象:

class SQLHelper:

def fetch(self,host,username,passwd,sql):

pass

def create(self,host,username,passwd,sql):

pass

def remove(self,host,username,passwd,nid):

pass

def modify(self,host,username,passwd,name):

pass

obj = SQLHelper()

obj.fetch(…)

面向对象优化:

class SQLHelper:

def fetch(self, sql):

pass

def create(self,sql):

pass

def remove(self,nid):

pass

def modify(self,name):

pass

obj = SQLHelper()

obj.hhost = “xxx.xxx.xxx”

obj.uusername = “xxxx”

obj.passwd = “xxxx”

obj.fetch(“sql”)

  1. class
    SQLHelper:

  2.     def fetch(self, sql):

  3.         #链接数据库

  4.         print(self.hhost)

  5.         print(self.uusername)

  6.         print(self.passwd)

  7.         print(sql)

  8.     def create(self,sql):

  9.         pass

  10.     def remove(self,nid):

  11.         pass

  12.     def modify(self,name):

  13.         pass

  14. obj = SQLHelper()

  15. obj.hhost = “xxx.xxx.xxx”

  16. obj.uusername = “xxxx”

  17. obj.passwd = “xxxx”

  18. obj.fetch(“select * from A”)

  19. 输出:

  20. xxx.xxx.xxx

  21. xxxx

  22. xxxx

  23. select * from A

acknowledgment消息持久化

no-ack=False,如果consumer挂掉了,那么RabbitMQ会重新以该任务添加到队中。

扭动调函数着

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

receive端(consumer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello’)

  1.  

  2. # 定义回调函数

  3. def
    callback(ch,method,properties,body):

  4.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. #
    no_ack=False表示花费完以后不主动将状态通知RabbitMQ

  3. channel.basic_consume(callback,

  1.                       queue=’hello’,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

嘿时以面向对象?

当一些函数具有同等参数时,可以使用面向对象的主意,将参数值一次性打包到目标吃,方便以后失去对象中取值。

durable消息持久化

producer发送消息时挂掉了,consumer接收信息不时挂掉了,以下办法会给RabbitMQ重新以该消息添加至队中。

回调函数吃

  1. ch.basic_ack(delivery_tag=method.delivery_tag)

basic_consume中

  1. no_ack=False

basic_publish中添加参数

  1. properties=pika.BasicProperties(delivery_mode=2)

channel.queue_declare中添加参数

  1. channel.queue_declare(queue=’hello’,durable=True)

send端(producer)

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. # 声明queue

  4. channel.queue_declare(queue=’hello’,durable=True)

  1.  

  2. channel.basic_publish(exchange=”,

  1.                       routing_key=’hello’,
  1.                       body=’hello
    word’,

  2.                       properties=pika.BasicProperties(delivery_mode=2))

  1. print(“[x] Sent ‘hello word!'”)

  2. connection.close()

receive端(consumer)与acknowledgment消息持久化中receive端(consumer)相同。

self是什么?

self是一个python自动传值的参数,那个目标实施方,self就是何许人也。

obj1 = SQLHelper()

obj1.hhost = “1xxx.xxx.xxx”

obj1.uusername = “xxxx”

obj1.passwd = “xxxx”

obj1.fetch(“sql”) #self==obj1

 

obj2 = SQLHelper()

obj2.hhost = “2xxx.xxx.xxx”

obj2.uusername = “xxxx”

obj2.passwd = “xxxx”

obj2.fetch(“sql”) #self==obj2

消息分发

默认消息队列里的数目是准顺序分发及各个消费者,但是多数情况下,消息队列后端的消费者服务器的处理能力是无相同的,这虽会现出局部服务器闲置时间比较丰富,资源浪费之景象。那么,我们不怕需要改变默认的信息队列获取顺序。可以当逐一消费者端配置prefetch_count=1,意思就是是喻RabbitMQ在斯消费者当前信息还从来不拍卖完的时节便甭还发新信息了。

语言 17

买主端

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4. __author__ = ‘Golden’

  5. #!/usr/bin/env python3

  6. # -*- coding:utf-8 -*-

  7.  

  8. import pika,time

  9.  

  10. connection =
    pika.BlockingConnection(pika.ConnectionParameters(‘localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’hello2′,durable=True)

  1.  

  2. def
    callback(ch,method,properties,body):

  3.     print(‘–>’,ch,method,properties)

  1.     print(“[x] Received %s” % body)

  2.     time.sleep(30)

  3.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(callback,
  1.                       queue=’hello2′,
  1.                       no_ack=False
  1.                       )

  2.  

  3. print(‘[*] waiting for messages.To exit press CTRL+C’)

  1. channel.start_consuming()

生产者端不更换。

构造方法

类似中生一个异常的法__init__,类()自动为执行。

  1. class
    SQLHelper:

  2.     def __init__(self,a1,a2,a3):

  1.         self.hhost = a1

  2.         self.uusername = a2

  3.         self.passwd = a3

  4.         print(“自动执行init”)

  5.     def fetch(self, sql):

  6.         #链接数据库

  7.         print(self.hhost)

  8.         print(self.uusername)

  9.         print(self.passwd)

  10.         print(sql)

  11.     def create(self,sql):

  12.         pass

  13.     def remove(self,nid):

  14.         pass

  15.     def modify(self,name):

  16.         pass

  17. obj1 = SQLHelper(“1xxx.xxx.xxx”,”xxxx”,”xxxx”)

  18. obj1.fetch(“select * from A”)

  19. obj2 = SQLHelper(“2xxx.xxx.xxx”,”xxxx”,”xxxx”)

  20. obj2.fetch(“select * form A”)

  21. 输出:

  22. 自行执行init

  23. 1xxx.xxx.xxx

  24. xxxx

  25. xxxx

  26. select * from A

  27. 自动执行init

  28. 2xxx.xxx.xxx

  29. xxxx

  30. xxxx

  31. select * form A

面向对象三那个特点:封装、继承、多态。

信披露与订阅(publish\subscribe)

揭晓与订阅与简短的消息队列区别在于,发布以及订阅会将信息发送给有的订阅者,而消息队列中之多少让消费一样赖就是消失。所以,RabbitMQ实现发布以及订阅时,会呢各一个订阅者创建一个队,而发布者发布信息时,会用信息放置在装有相关队列中。类似广播的职能,这时候就要用到exchange。Exchange在概念的时候是产生品种的,以控制到底是怎么样Queue符合条件,可以接受信息。

fanout:所有bind到此exchange的queue都好接信息。

direct:通过routingKey和exchange决定的谁唯一的queue可以收到信息。

topic:所有符合routingKey(可以是一个表达式)的routingKey所bind的queue可以接过信息。

表达式符号说明

#:一个或多单字符

*:任何字符

例如:#.a会匹配a.a,aa.a,aaa.a等。

*.a会匹配a.a,b.a,c.a等。

注意:使用RoutingKey为#,Exchange Type也topic的早晚相对于采取fanout。

heaers:通过headers来支配把消息发给哪些queue。

语言 18

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1.  

  2. message = ”.join(sys.argv[1:]) or
    ‘info:Hello World!’

  3. channel.basic_publish(exchange=’logs’,

  1.                       routing_key=”,
  1.                       body=message)
  1.  

  2. print(‘[x] Send %r’ % message)

  1. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2. channel.exchange_declare(exchange=’logs’,type=’fanout’)

  1. #
    不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在行使此queue的消费者断开后,自动将queue删除
  1. result =
    channel.queue_declare(exclusive=True)

  2. queue_name = result.method.queue

  1. channel.queue_bind(exchange=’logs’,queue=queue_name)
  1. print(‘[*]Waiting for logs.To exit press CTRL+C’)

  2. def
    callback(ch,method,properties,body):

  3.     print(‘[*] %s’%body)

  4.  

  5. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

封装

面向对象的主次设计着,某个类将所需要的数额(类的习性)和对数据的操作(类的行事)全部且封闭装于接近中,分别称为类的分子变量和办法(成员函数)。这种将成员变量和分子函数封装在共同的编程特性称为封装。

  1. class
    c1:

  2.     def __init__(self,name,obj):

  1.         self.name = name

  2.         self.obj = obj

  3.  

  4. class
    c2:

  5.     def __init__(self,name,age):

  1.         self.name = name

  2.         self.age = age

  3.  

  4.     def show(self):

  5.         print(self.name)

  6.  

  7. class
    c3:

  8.     def __init__(self,a1):

  9.         self.money = 123

  10.         self.aaa = a1

  11.  

  12.  

  13. c2_obj = c2(‘aa’,11)

  14.  

  15. c1_obj = c1(“alex”,c2_obj)

  16. print(c1_obj.obj.age)

  17.  

  18. c3_obj = c3(c1_obj)

  19. print(c3_obj.aaa.obj.age)

  20. 输出:

  21. 11

  22. 11

要害字发送(echange type=direct)

发送信息时明显指定某个队列并向里面发送信息,RabbitMQ还支持因重点字发送,即行绑定关键字,发送者将数据依据重大字发送到消息exchange,exchange根据重点字判定该拿数据发送到哪个队。

语言 19

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. # severity = ‘error’

  3. severity = sys.argv[1] if len(sys.argv) > 1 else ‘info’

  4. # message = ‘Hello World!’

  5. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  6.  

  7. channel.basic_publish(exchange=’direct_logs’,

  1.                       routing_key=severity,
  1.                       body=message)
  1. print(‘[x] Send %r:%r’ %
    (severity,message))

  2. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’direct_logs’,

  1.                          type=’direct’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. severities = sys.argv[1:]

  3. if not
    severities:

  4.     sys.stderr.write(‘Usage: %s
    [info] [warning] [error]\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    severity in severities:

  8.     channel.queue_bind(exchange=’direct_logs’,

  1.                        queue=queue_name,
  1.                        routing_key=severity)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[*] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

启动subscriber1

  1. python3 direct_subscriber.py warning

启动subscriber2

  1. python3 direct_subscriber.py error

启动publisher1

  1. python3 direct_publisher.py info

启动publisher2

  1. python3 direct_publisher.py warning

启动publisher3

  1. python3 direct_publisher.py error

结果

语言 20

继承

延续是个别独八九不离十或多个类之间的父子关系,子进程继续父亲进程的有所国有实例变量和法。继承实现了代码的用。重用已经有的数据以及作为,减少代码的重新编写,python在类名后用一对准圆括号表示继续关系,括号中的切近表示父类,如果父类定义了__init__主意,则子类必须出示地调用父类的__init__法,如果子类需要扩大父类的作为,可以增长__init__道的参数。

张冠李戴匹配(exchange type=topic)

每当topic类型下,可以于队列绑定几只模糊的主要字,发送者将数据发送到exchange,exchange将盛传”路由于值”和”关键字”进行匹配,匹配成功则用数据发送至指定队列。

语言 21

*:匹配任意一个字符

#:匹配任意个字符

publisher

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info’

  3. message = ”.join(sys.argv[2:]) or
    ‘Hello World!’

  4. channel.basic_publish(exchange=’topic_logs’,

  1.                       routing_key=routing_key,
  1.                       body=message)
  1.  

  2. print(‘[x] Sent %r:%r’ %
    (routing_key,message))

  3. connection.close()

subscriber

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,sys

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.exchange_declare(exchange=’topic_logs’,

  1.                          type=’topic’)
  1.  

  2. result =
    channel.queue_declare(exclusive=True)

  3. queue_name = result.method.queue

  1.  

  2. binding_keys = sys.argv[1:]

  3. if not
    binding_keys:

  4.     sys.stderr.write(‘Usage: %s
    [binding_key]…\n’ % sys.argv[0])

  5.     sys.exit(1)

  6.  

  7. for
    binding_key in binding_keys:

  1.     channel.queue_bind(exchange=’topic_logs’,
  1.                        queue=queue_name,
  1.                        routing_key=binding_key)
  1.  

  2. print(‘[*] Waiting for logs.To exit press CTRL+C’)

  3.  

  4. def
    callback(ch,method,properties,body):

  5.     print(‘[x] %r:%r’ %
    (method.routing_key,body))

  6.  

  7. channel.basic_consume(callback,

  1.                       queue=queue_name,
  1.                       no_ack=True)
  1.  

  2. channel.start_consuming()

测试

语言 22

单继承

  1. class
    F1:#父类、基类

  2.     def show(self):

  3.         print(‘show’)

  4.  

  5. #F2继承F1

  6. class
    F2(F1):#子类、派生类

  7.     def bar(self):

  8.         print(‘bar’)

  9.  

  10. obj = F2()

  11. obj.bar()

  12. obj.show()

  13. 输出:

  14. bar

  15. show

 

  1. class
    F1:#父类、基类

  2.     def show(self):

  3.         print(‘show’)

  4.  

  5.     def foo(self):

  6.         print(self.name)

  7.  

  8. #F2继承F1

  9. class
    F2(F1):#子类、派生类

  10.     def __init__(self,name):

  1.         self.name = name

  2.  

  3.     def bar(self):

  4.         print(‘bar’)

  5.  

  6.     def show(self):#友善的优先级更胜

  1.         print(‘F2 show’)

  2.  

  3. obj = F2(‘alex’)

  4. obj.bar()

  5. obj.show()

  6. obj.foo()

  7. 输出:

  8. bar

  9. F2 show

  10. alex

 

  1. class
    S1:

  2.     def F1(self):

  3.         self.F2()

  4.  

  5.     def F2(self):

  6.         print(‘S1.F2()’)

  7.  

  8. class
    S2(S1):

  9.     def F3(self):

  10.         self.F1()

  11.  

  12.     def F2(self):

  13.         print(‘S2.F2()’)

  14.  

  15. obj = S2()

  16. obj.F3()

  17. 输出:

  18. S2.F2()

长途过程调用(RPC)

RPC(Remote Procedure Call
Protocol)远程过程调用协议。在一个特大型的商家,系统由大大小小的服务做,不同之团体维护不同之代码,部署在不同的服务器。但是以做开发之早晚屡次要因此到任何组织的计,因为既闹矣贯彻。但是这些劳务配置在不同之服务器,想只要调用就用网络通信,这些代码繁琐且复杂,一不小心便会好没用。PRC协议定义了统筹,其它的柜都叫闹了不同的实现。比如微软的wcf,以及WebApi。

每当RabbitMQ中RPC的贯彻是深粗略快捷之,现在客户端、服务端都是信息发布者与信息接受者。

语言 23

率先客户端通过RPC向服务端发生请求。correlation_id:请求标识,erply_to:结果返回队列。(我此出一些数量要你让自身处理一下,correlation_id是本身呼吁标识,你处理好后把结果回到到erply_to队列)

服务端拿到要,开始拍卖并回。correlation_id:客户端请求标识。(correlation_id这是您的乞求标识,还让你。这时候客户端用自己之correlation_id与服务端返回的correlation_id进行对比,相同则接受。)

rpc_server

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,time

  6.  

  7. connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1. channel = connection.channel()

  2.  

  3. channel.queue_declare(queue=’rpc_queue’)

  1. def fib(n):

  2.     if
    n == 0:

  3.         return 0

  4.     elif n == 1:

  5.         return 1

  6.     else:

  7.         return fib(n-1) + fib(n-2)

  8.  

  9. def on_request(ch,method,props,body):

  1.     n = int(body)

  2.     print(‘[.] fib(%s)’ % n)

  3.     response = fib(n)

  4.     ch.basic_publish(exchange=”,

  1.                      routing_key=props.reply_to,
  1.                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
  1.                      body =
    str(response))

  2.     ch.basic_ack(delivery_tag=method.delivery_tag)

  1.  

  2. channel.basic_qos(prefetch_count=1)

  1. channel.basic_consume(on_request,queue=’rpc_queue’)
  1.  

  2. print(‘[x] Awaiting RPC requests’)

  1. channel.start_consuming()

rpc_client

  1. __author__ = ‘Golden’

  2. #!/usr/bin/env python3

  3. # -*- coding:utf-8 -*-

  4.  

  5. import pika,uuid

  6.  

  7. class
    FibonacciRpcClient(object):

  8.     def __init__(self):

  9.         self.connection =
    pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))

  1.         self.channel =
    self.connection.channel()

  2.         result =
    self.channel.queue_declare(exclusive=True)

  3.         self.callback_queue =
    result.method.queue

  4.         self.channel.basic_consume(self.on_response,no_ack=True,

  1.                                    queue=self.callback_queue)
  1.  

  2.     def
    on_response(self,ch,method,props,body):

  3.         if self.corr_id ==
    props.correlation_id:

  4.             self.response = body

  1.  

  2.     def call(self,n):

  3.         self.response = None

  4.         self.corr_id =
    str(uuid.uuid4())

  5.         self.channel.basic_publish(exchange=”,

  1.                                    routing_key=’rpc_queue’,
  1.                                    properties=pika.BasicProperties(
  1.                                        reply_to=self.callback_queue,
  1.                                        correlation_id=self.corr_id,),
  1.                                    body=str(n))
  1.         while self.response is None:

  2.             self.connection.process_data_events()

  1.         return int(self.response)

  2.  

  3. fibonacci_rpc = FibonacciRpcClient()

  1.  

  2. print(‘[x] Requesting fib(10)’)

  1. response = fibonacci_rpc.call(10)
  1. print(‘[.] Got %r ‘ % response)

 

多继承

  1. class
    C1:

  2.     def f1(self):

  3.         pass

  4.  

  5. class
    C2:

  6.     def f2(self):

  7.         pass

  8.  

  9. class
    C3(C2,C1):

  10.     pass

  11.  

  12. obj = C3()

 

  1. class
    C1:

  2.     def f1(self):

  3.         print(“C1.f1()”)

  4.  

  5. class
    C2:

  6.     def f1(self):

  7.         print(“C2.f1()”)

  8.  

  9. class
    C3(C2,C1):

  10.     pass

  11.  

  12. obj = C3()

  13. obj.f1()

  14. 输出:

  15. C2.f1()

 

  1. class
    C0:

  2.     def f1(self):

  3.         print(“C0.f1()”)

  4.  

  5. class
    C1():

  6.     def f1(self):

  7.         print(“C1.f1()”)

  8.  

  9. class
    C2(C0):

  10.     def f2(self):

  11.         print(“C2.f1()”)

  12.  

  13. class
    C3(C2,C1):

  14.     pass

  15.  

  16. obj = C3()

  17. obj.f1()

  18. 输出:

  19. C0.f1()

 

  1. class
    C_2:

  2.     def f1(self):

  3.         print(“C_2.f1()”)

  4.  

  5. class
    C_1(C_2):

  6.     def f1(self):

  7.         print(“C_1.f1()”)

  8.  

  9. class
    C0(C_2):

  10.     def f2(self):

  11.         print(“C0.f1()”)

  12.  

  13. class
    C1(C_1):

  14.     def f1(self):

  15.         print(“C1.f1()”)

  16.  

  17. class
    C2(C0):

  18.     def f2(self):

  19.         print(“C2.f1()”)

  20.  

  21. class
    C3(C2,C1):

  22.     pass

  23.  

  24. obj = C3()

  25. obj.f1()

  26. 输出:

  27. C1.f1()

累总结

对继往开来其艺术(属性)可能定义在现阶段类似,也说不定出自于基类,所以在法调用时即便需对现阶段看似与基类进行搜寻以确定方法所在的职。而寻的逐条就是所谓的主意分析顺序(MRO、Method
Resolution
Order)。对于单继承来说,MRO一般比较简单,而对多延续来说,MRO比较复杂。下面就是第二种为主的后续模式解析。

语言 24

语言 25

发表评论

电子邮件地址不会被公开。 必填项已用*标注

网站地图xml地图