28 python 通過 amqp 消息隊列協議中的 qpid 實現數據通信
Python實戰-從菜鳥到大牛的進階之路 作者:極客學院 投票推薦 加入書簽 留言反饋
<h2>簡介:</h2>
這兩天看了消息隊列通信,打算在配置平台上應用起來。以前用過 zeromq 但是這東西太快了,還有就是 rabbitmq 有點大,新浪的朋友推薦了 qpid,簡單輕便。自己總結了下文檔,大家可以瞅瞅。
amqp(消息隊列協議 advanced message queuing protocol)是一種消息協議 ,等同於 jms,但是 jms 隻是 java 平台的方案,amqp 是一個跨語言的協議。
amqp 不分語言平台,主流的語言都支持,運維這邊的 perl,python,ruby 更是支持,所以大家就放心用吧。
主流的消息隊列通信類型:
點對點:a 發消息給 b。廣播:a 發給所有其他人的消息組播:a 發給多個但不是所有其他人的消息。requester/response:類似訪問網頁的通信方式,客戶端發請求並等待,服務端回複該請求pub-sub:類似雜誌發行,出版雜誌的人並不知道誰在看這本雜誌,訂閱的人並不關心誰在發表這本雜誌。出版的人隻管將信息發布出去,訂閱的人也隻在需要的時候收到該信息。store-and-forward:存儲轉發模型類似信件投遞,寫信的人將消息寫給某人,但在將信件發出的時候,收信的人並不一定在家等待,也並不知道有消息給他。但這個消息不會丟失,會放在收信者的信箱中。這種模型允許信息的異步交換。其他通信模型。。。 </pre>
publisher --->exchange ---> messagequeue --->consumer
整個過程是異步的.publisher,consumer 相互不知道對方的存在,exchange 負責交換/路由,依靠 routing key,每個消息者有一個 routing key,每個 binding 將自已感興趣的 routingkey 告訴 exchange,以便 exchange 將相關的消息轉發給相應的 queue! <h2>幾個概念</h2>
幾個概念producer,routing key,exchange,binding,queue,consumer.producer: 消息的創建者,消息的發送者routing key:唯一用來映射消息該進入哪個隊列的標識exchange:負責消息的路由,交換binding:定義 queue 和 exchange 的映射關係queue:消息隊列consumer:消息的使用者exchange類型fan-out:類似於廣播方式,不管 routingkeydirect:根據 routingkey,進行關聯投寄topic:類似於 direct,但是支持多個 key 關聯,以組的方式投寄。 key以.來定義界限。類似於 usea.news,usea.weather.這兩個消息是一組的。 </pre>
圖片 28.1 pic
qpid
qpid 是 apache 開發的一款麵向對象的消息中間件,它是一個 amqp 的實現,可以和其他符合 amqp 協議的係統進行通信。qpid 提供了 c++/python/java/c# 等主流編程語言的客戶端庫,安裝使用非常方便。相對於其他的 amqp 實現,qpid 社區十分活躍,有望成為標準 amqp 中間件產品。除了符合 amqp 基本要求之外,qpid 提供了很多額外的 ha 特性,非常適於集群環境下的消息通信!
基本功能外提供以下特性:
采用 corosync(?)來保證集群環境下的 fault-tolerant(?) 特性
支持 xml 的 exchange,消息為 xml 時,彩用 xquery 過濾
支持 plugin
提供安全認證,可對 producer/consumer 提供身份認證
qpidd --port --no-data-dir --auth
port:端口
--no-data-dir:不指定數據目錄
--auth:不啟用安全身份認證
啟動後自動創建一些 exchange,amp.topic,amp.direct,amp.fanout
tools:
qpid-config:維護 queue,exchange,內部配置qpid-route:配置 broker federation(聯盟?集群?)qpid-tool:監控
咱們說完介紹了,這裏就趕緊測試下。
服務器端的安裝:
yum install qpid-cpp-serveryum install qpid-tools/etc/init.d/qpidd start </pre>
發布端的測試代碼:
圖片 28.2 pic
一些測試代碼轉自: ibm 的開發社區
#!/usr/bin/env python #xiaorui #轉自 ibm 開發社區import optparse, timefrom qpid.messaging import *from qpid.util import urlfrom qpid.log import enable, debug, warndef nameval(st): idx = st.find("=") if idx >= 0: name = st[0:idx] value = st[idx+1:] else: name = st value = none return name, valueparser = optparse.optionparser(usage="usage: %prog [options] address [ content ... ]", description="send messages to the supplied address.")parser.add_option("-b", "--broker", default="localhost", help="connect to specified broker (default %default)")parser.add_option("-r", "--reconnect", action="store_true", help="enable auto reconnect")parser.add_option("-i", "--reconnect-interval", type="float", default=3, help="interval between reconnect attempts")parser.add_option("-l", "--reconnect-limit", type="int", help="maximum number of reconnect attempts")parser.add_option("-c", "--count", type="int", default=1, help="stop after count messages have been sent, zero disables (default %default)")parser.add_option("-t", "--timeout", type="float", default=none, help="exit after the specified time")parser.add_option("-i", "--id", help="use the supplied id instead of generating one")parser.add_option("-s", "--subject", help="specify a subject")parser.add_option("-r", "--reply-to", help="specify reply-to address")parser.add_option("-p", "--property", dest="properties", action="append", default=, metavar="name=value", help="specify message property")parser.add_option("-m", "--map", dest="entries", action="append", default=, metavar="key=value", help="specify map entry for message body")parser.add_option("-v", dest="verbose", action="store_true", help="enable logging")opts, args = parser.parse_argsif opts.verbose: enable("qpid", debug)else: enable("qpid", warn)if opts.id is none: spout_id = str(uuid4)else: spout_id = opts.idif args: addr = args.pop(0)else: parser.error("address is required")content = noneif args: text = " ".join(args)else: text = noneif opts.entries: content = {} if text: content["text"] = text for e in opts.entries: name, val = nameval(e) content[name] = valelse: content = textconn = connection(opts.broker, reconnect=opts.reconnect, reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit)try: conn.open ssn = conn.session snd = ssn.sender(addr) count = 0 start = time.time while (opts.count == 0 or count < opts.count) and (opts.timeout is none or time.time - start < opts.timeout): msg = message(subject=opts.subject, reply_to=opts.reply_to, content=content) msg.properties["spout-id"] = "%s:%s" % (spout_id, count) for p in opts.properties: name, val = nameval(p) msg.properties[name] = val snd.send(msg) count += 1 print msgexcept senderror, e: print eexcept keyboardinterrupt: passconn.close </pre>
客戶端的測試代碼:
圖片 28.3 pic
#!/usr/bin/env python #xiaorui ##轉自 ibm 開發社區import optparsefrom qpid.messaging import *from qpid.util import urlfrom qpid.log import enable, debug, warnparser = optparse.optionparser(usage="usage: %prog [options] address ...", description="drain messages from the supplied address.")parser.add_option("-b", "--broker", default="localhost", help="connect to specified broker (default %default)")parser.add_option("-c", "--count", type="int", help="number of messages to drain")parser.add_option("-f", "--forever", action="store_true", help="ignore timeout and wait forever")parser.add_option("-r", "--reconnect", action="store_true", help="enable auto reconnect")parser.add_option("-i", "--reconnect-interval", type="float", default=3, help="interval between reconnect attempts")parser.add_option("-l", "--reconnect-limit", type="int", help="maximum number of reconnect attempts")parser.add_option("-t", "--timeout", type="float", default=0, help="timeout in seconds to wait before exiting (default %default)")parser.add_option("-p", "--print", dest="format", default="%(m)s", help="format string for printing messages (default %default)")parser.add_option("-v", dest="verbose", action="store_true", help="enable logging")opts, args = parser.parse_argsif opts.verbose: enable("qpid", debug)else: enable("qpid", warn)if args: addr = args.pop(0)else: parser.error("address is required")if opts.forever: timeout = noneelse: timeout = opts.timeoutss formatter: def __init__(self, message): self.message = message self.environ = {"m": self.message, "p": self.message.properties, "c": self.message.content} def __getitem__(self, st): return eval(st, self.environ)conn = connection(opts.broker, reconnect=opts.reconnect, reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit)try: conn.open ssn = conn.session rcv = ssn.receiver(addr) count = 0 while not opts.count or count < opts.count: try: msg = rcv.fetch(timeout=timeout) print opts.format % formatter(msg) count += 1 ssn.acknowledge except empty: breakexcept receivererror, e: print eexcept keyboardinterrupt: passconn.close </pre>
browse 模式的意思是,瀏覽的意思,一個特殊的需求,我訪問了一次,別人也能訪問。
consume 模式的意思是,我瀏覽了一次後,刪除這一條。別人就訪問不到啦。
這個是瀏覽模式:
圖片 28.4 pic
sub-pub 通信的例子
pub-sub 是另一種很有用的通信模型。恐怕它的名字就源於出版發行這種現實中的信息傳遞方式吧,publisher 就是出版商,subscriber 就是訂閱者。
服務端qpid-config add exchange topic news-service./spout news-service/news xiaorui客戶端:./drain -t 120 news-service/#.news </pre>
pub 端,創建 topic 點!
圖片 28.5 pic
sub端,也就是接收端!
圖片 28.6 pic <h2>總結:</h2>
qpid 挺好用的,比 rabbitmq 要輕型,比 zeromq 保險點! 各方麵的文檔也都很健全,值得一用。話說,這三個消息隊列我也都用過,最一開始用的是 redis 的 pubsub 做日誌收集和信息通知,後來在做集群相關的項目的時候,我自己搞了一套 zeromq 的分布式任務分發,和 saltstack 很像,當然了遠沒有萬人用的 salt 強大。 rabbitmq 的用法,更是看中他的安全和持久化,當然性能真的不咋地。
關於 qpid 的性能我沒有親自做大量的測試,但是聽朋友說,加持久化可以到 7k,不加持久化可以到 1500 左右。
本文出自 “峰雲,就她了。” 博客,謝絕轉載!
這兩天看了消息隊列通信,打算在配置平台上應用起來。以前用過 zeromq 但是這東西太快了,還有就是 rabbitmq 有點大,新浪的朋友推薦了 qpid,簡單輕便。自己總結了下文檔,大家可以瞅瞅。
amqp(消息隊列協議 advanced message queuing protocol)是一種消息協議 ,等同於 jms,但是 jms 隻是 java 平台的方案,amqp 是一個跨語言的協議。
amqp 不分語言平台,主流的語言都支持,運維這邊的 perl,python,ruby 更是支持,所以大家就放心用吧。
主流的消息隊列通信類型:
點對點:a 發消息給 b。廣播:a 發給所有其他人的消息組播:a 發給多個但不是所有其他人的消息。requester/response:類似訪問網頁的通信方式,客戶端發請求並等待,服務端回複該請求pub-sub:類似雜誌發行,出版雜誌的人並不知道誰在看這本雜誌,訂閱的人並不關心誰在發表這本雜誌。出版的人隻管將信息發布出去,訂閱的人也隻在需要的時候收到該信息。store-and-forward:存儲轉發模型類似信件投遞,寫信的人將消息寫給某人,但在將信件發出的時候,收信的人並不一定在家等待,也並不知道有消息給他。但這個消息不會丟失,會放在收信者的信箱中。這種模型允許信息的異步交換。其他通信模型。。。 </pre>
publisher --->exchange ---> messagequeue --->consumer
整個過程是異步的.publisher,consumer 相互不知道對方的存在,exchange 負責交換/路由,依靠 routing key,每個消息者有一個 routing key,每個 binding 將自已感興趣的 routingkey 告訴 exchange,以便 exchange 將相關的消息轉發給相應的 queue! <h2>幾個概念</h2>
幾個概念producer,routing key,exchange,binding,queue,consumer.producer: 消息的創建者,消息的發送者routing key:唯一用來映射消息該進入哪個隊列的標識exchange:負責消息的路由,交換binding:定義 queue 和 exchange 的映射關係queue:消息隊列consumer:消息的使用者exchange類型fan-out:類似於廣播方式,不管 routingkeydirect:根據 routingkey,進行關聯投寄topic:類似於 direct,但是支持多個 key 關聯,以組的方式投寄。 key以.來定義界限。類似於 usea.news,usea.weather.這兩個消息是一組的。 </pre>
圖片 28.1 pic
qpid
qpid 是 apache 開發的一款麵向對象的消息中間件,它是一個 amqp 的實現,可以和其他符合 amqp 協議的係統進行通信。qpid 提供了 c++/python/java/c# 等主流編程語言的客戶端庫,安裝使用非常方便。相對於其他的 amqp 實現,qpid 社區十分活躍,有望成為標準 amqp 中間件產品。除了符合 amqp 基本要求之外,qpid 提供了很多額外的 ha 特性,非常適於集群環境下的消息通信!
基本功能外提供以下特性:
采用 corosync(?)來保證集群環境下的 fault-tolerant(?) 特性
支持 xml 的 exchange,消息為 xml 時,彩用 xquery 過濾
支持 plugin
提供安全認證,可對 producer/consumer 提供身份認證
qpidd --port --no-data-dir --auth
port:端口
--no-data-dir:不指定數據目錄
--auth:不啟用安全身份認證
啟動後自動創建一些 exchange,amp.topic,amp.direct,amp.fanout
tools:
qpid-config:維護 queue,exchange,內部配置qpid-route:配置 broker federation(聯盟?集群?)qpid-tool:監控
咱們說完介紹了,這裏就趕緊測試下。
服務器端的安裝:
yum install qpid-cpp-serveryum install qpid-tools/etc/init.d/qpidd start </pre>
發布端的測試代碼:
圖片 28.2 pic
一些測試代碼轉自: ibm 的開發社區
#!/usr/bin/env python #xiaorui #轉自 ibm 開發社區import optparse, timefrom qpid.messaging import *from qpid.util import urlfrom qpid.log import enable, debug, warndef nameval(st): idx = st.find("=") if idx >= 0: name = st[0:idx] value = st[idx+1:] else: name = st value = none return name, valueparser = optparse.optionparser(usage="usage: %prog [options] address [ content ... ]", description="send messages to the supplied address.")parser.add_option("-b", "--broker", default="localhost", help="connect to specified broker (default %default)")parser.add_option("-r", "--reconnect", action="store_true", help="enable auto reconnect")parser.add_option("-i", "--reconnect-interval", type="float", default=3, help="interval between reconnect attempts")parser.add_option("-l", "--reconnect-limit", type="int", help="maximum number of reconnect attempts")parser.add_option("-c", "--count", type="int", default=1, help="stop after count messages have been sent, zero disables (default %default)")parser.add_option("-t", "--timeout", type="float", default=none, help="exit after the specified time")parser.add_option("-i", "--id", help="use the supplied id instead of generating one")parser.add_option("-s", "--subject", help="specify a subject")parser.add_option("-r", "--reply-to", help="specify reply-to address")parser.add_option("-p", "--property", dest="properties", action="append", default=, metavar="name=value", help="specify message property")parser.add_option("-m", "--map", dest="entries", action="append", default=, metavar="key=value", help="specify map entry for message body")parser.add_option("-v", dest="verbose", action="store_true", help="enable logging")opts, args = parser.parse_argsif opts.verbose: enable("qpid", debug)else: enable("qpid", warn)if opts.id is none: spout_id = str(uuid4)else: spout_id = opts.idif args: addr = args.pop(0)else: parser.error("address is required")content = noneif args: text = " ".join(args)else: text = noneif opts.entries: content = {} if text: content["text"] = text for e in opts.entries: name, val = nameval(e) content[name] = valelse: content = textconn = connection(opts.broker, reconnect=opts.reconnect, reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit)try: conn.open ssn = conn.session snd = ssn.sender(addr) count = 0 start = time.time while (opts.count == 0 or count < opts.count) and (opts.timeout is none or time.time - start < opts.timeout): msg = message(subject=opts.subject, reply_to=opts.reply_to, content=content) msg.properties["spout-id"] = "%s:%s" % (spout_id, count) for p in opts.properties: name, val = nameval(p) msg.properties[name] = val snd.send(msg) count += 1 print msgexcept senderror, e: print eexcept keyboardinterrupt: passconn.close </pre>
客戶端的測試代碼:
圖片 28.3 pic
#!/usr/bin/env python #xiaorui ##轉自 ibm 開發社區import optparsefrom qpid.messaging import *from qpid.util import urlfrom qpid.log import enable, debug, warnparser = optparse.optionparser(usage="usage: %prog [options] address ...", description="drain messages from the supplied address.")parser.add_option("-b", "--broker", default="localhost", help="connect to specified broker (default %default)")parser.add_option("-c", "--count", type="int", help="number of messages to drain")parser.add_option("-f", "--forever", action="store_true", help="ignore timeout and wait forever")parser.add_option("-r", "--reconnect", action="store_true", help="enable auto reconnect")parser.add_option("-i", "--reconnect-interval", type="float", default=3, help="interval between reconnect attempts")parser.add_option("-l", "--reconnect-limit", type="int", help="maximum number of reconnect attempts")parser.add_option("-t", "--timeout", type="float", default=0, help="timeout in seconds to wait before exiting (default %default)")parser.add_option("-p", "--print", dest="format", default="%(m)s", help="format string for printing messages (default %default)")parser.add_option("-v", dest="verbose", action="store_true", help="enable logging")opts, args = parser.parse_argsif opts.verbose: enable("qpid", debug)else: enable("qpid", warn)if args: addr = args.pop(0)else: parser.error("address is required")if opts.forever: timeout = noneelse: timeout = opts.timeoutss formatter: def __init__(self, message): self.message = message self.environ = {"m": self.message, "p": self.message.properties, "c": self.message.content} def __getitem__(self, st): return eval(st, self.environ)conn = connection(opts.broker, reconnect=opts.reconnect, reconnect_interval=opts.reconnect_interval, reconnect_limit=opts.reconnect_limit)try: conn.open ssn = conn.session rcv = ssn.receiver(addr) count = 0 while not opts.count or count < opts.count: try: msg = rcv.fetch(timeout=timeout) print opts.format % formatter(msg) count += 1 ssn.acknowledge except empty: breakexcept receivererror, e: print eexcept keyboardinterrupt: passconn.close </pre>
browse 模式的意思是,瀏覽的意思,一個特殊的需求,我訪問了一次,別人也能訪問。
consume 模式的意思是,我瀏覽了一次後,刪除這一條。別人就訪問不到啦。
這個是瀏覽模式:
圖片 28.4 pic
sub-pub 通信的例子
pub-sub 是另一種很有用的通信模型。恐怕它的名字就源於出版發行這種現實中的信息傳遞方式吧,publisher 就是出版商,subscriber 就是訂閱者。
服務端qpid-config add exchange topic news-service./spout news-service/news xiaorui客戶端:./drain -t 120 news-service/#.news </pre>
pub 端,創建 topic 點!
圖片 28.5 pic
sub端,也就是接收端!
圖片 28.6 pic <h2>總結:</h2>
qpid 挺好用的,比 rabbitmq 要輕型,比 zeromq 保險點! 各方麵的文檔也都很健全,值得一用。話說,這三個消息隊列我也都用過,最一開始用的是 redis 的 pubsub 做日誌收集和信息通知,後來在做集群相關的項目的時候,我自己搞了一套 zeromq 的分布式任務分發,和 saltstack 很像,當然了遠沒有萬人用的 salt 強大。 rabbitmq 的用法,更是看中他的安全和持久化,當然性能真的不咋地。
關於 qpid 的性能我沒有親自做大量的測試,但是聽朋友說,加持久化可以到 7k,不加持久化可以到 1500 左右。
本文出自 “峰雲,就她了。” 博客,謝絕轉載!