<h2>簡介:</h2>


    這兩天看了消息隊列通信,打算在配置平台上應用起來。以前用過 zeromq 但是這東西太快了,還有就是 rabbitmq 有點大,新浪的朋友推薦了 qpid,簡單輕便。自己總結了下文檔,大家可以瞅瞅。


    amqp(消息隊列協議 advanced message queuing protocol)是一種消息協議 ,等同於 jms,但是 jms 隻是 java 平台的方案,amqp 是一個跨語言的協議。


    amqp 不分語言平台,主流的語言都支持,運維這邊的 perl,python,ruby 更是支持,所以大家就放心用吧。


    主流的消息隊列通信類型:


    點對點:a 發消息給 b。廣播:a 發給所有其他人的消息組播:a 發給多個但不是所有其他人的消息。requester/response:類似訪問網頁的通信方式,客戶端發請求並等待,服務端回複該請求pub-sub:類似雜誌發行,出版雜誌的人並不知道誰在看這本雜誌,訂閱的人並不關心誰在發表這本雜誌。出版的人隻管將信息發布出去,訂閱的人也隻在需要的時候收到該信息。store-and-forward:存儲轉發模型類似信件投遞,寫信的人將消息寫給某人,但在將信件發出的時候,收信的人並不一定在家等待,也並不知道有消息給他。但這個消息不會丟失,會放在收信者的信箱中。這種模型允許信息的異步交換。其他通信模型。。。  </pre>


    publisher --->exchange ---> messagequeue --->consumer


    整個過程是異步的.publisher,consumer 相互不知道對方的存在,exchange 負責交換/路由,依靠 routing key,每個消息者有一個 routing key,每個 binding 將自已感興趣的 routingkey 告訴 exchange,以便 exchange 將相關的消息轉發給相應的 queue!  <h2>幾個概念</h2>


    幾個概念producer,routing key,exchange,binding,queue,consumer.producer: 消息的創建者,消息的發送者routing key:唯一用來映射消息該進入哪個隊列的標識exchange:負責消息的路由,交換binding:定義 queue 和 exchange 的映射關係queue:消息隊列consumer:消息的使用者exchange類型fan-out:類似於廣播方式,不管 routingkeydirect:根據 routingkey,進行關聯投寄topic:類似於 direct,但是支持多個 key 關聯,以組的方式投寄。      key以.來定義界限。類似於 usea.news,usea.weather.這兩個消息是一組的。  </pre>


    圖片 28.1 pic


    qpid


    qpid 是 apache 開發的一款麵向對象的消息中間件,它是一個 amqp 的實現,可以和其他符合 amqp 協議的係統進行通信。qpid 提供了 c++/python/java/c# 等主流編程語言的客戶端庫,安裝使用非常方便。相對於其他的 amqp 實現,qpid 社區十分活躍,有望成為標準 amqp 中間件產品。除了符合 amqp 基本要求之外,qpid 提供了很多額外的 ha 特性,非常適於集群環境下的消息通信!


    基本功能外提供以下特性:


    采用 corosync(?)來保證集群環境下的 fault-tolerant(?) 特性


    支持 xml 的 exchange,消息為 xml 時,彩用 xquery 過濾


    支持 plugin


    提供安全認證,可對 producer/consumer 提供身份認證


    qpidd --port --no-data-dir --auth


    port:端口


    --no-data-dir:不指定數據目錄


    --auth:不啟用安全身份認證


    啟動後自動創建一些 exchange,amp.topic,amp.direct,amp.fanout


    tools:


    qpid-config:維護 queue,exchange,內部配置qpid-route:配置 broker federation(聯盟?集群?)qpid-tool:監控


    咱們說完介紹了,這裏就趕緊測試下。


    服務器端的安裝:


    yum install qpid-cpp-serveryum install qpid-tools/etc/init.d/qpidd start  </pre>


    發布端的測試代碼:


    圖片 28.2 pic


    一些測試代碼轉自: ibm 的開發社區


    #!/usr/bin/env python #xiaorui #轉自 ibm 開發社區import optparse, timefrom qpid.messaging import *from qpid.util import urlfrom qpid.log import enable, debug, warndef nameval(st):  idx = st.find("=")  if idx >= 0:    name = st[0:idx]    value = st[idx+1:]  else:    name = st    value = none  return name, valueparser = optparse.optionparser(usage="usage: %prog [options] address [ content ... ]",       description="send messages to the supplied address.")parser.add_option("-b", "--broker", default="localhost",  help="connect to specified broker (default %default)")parser.add_option("-r", "--reconnect", action="store_true",  help="enable auto reconnect")parser.add_option("-i", "--reconnect-interval", type="float", default=3,  help="interval between reconnect attempts")parser.add_option("-l", "--reconnect-limit", type="int",  help="maximum number of reconnect attempts")parser.add_option("-c", "--count", type="int", default=1,  help="stop after count messages have been sent, zero disables (default %default)")parser.add_option("-t", "--timeout", type="float", default=none,  help="exit after the specified time")parser.add_option("-i", "--id", help="use the supplied id instead of generating one")parser.add_option("-s", "--subject", help="specify a subject")parser.add_option("-r", "--reply-to", help="specify reply-to address")parser.add_option("-p", "--property", dest="properties", action="append", default=,  metavar="name=value", help="specify message property")parser.add_option("-m", "--map", dest="entries", action="append", default=,  metavar="key=value",  help="specify map entry for message body")parser.add_option("-v", dest="verbose", action="store_true",  help="enable logging")opts, args = parser.parse_argsif opts.verbose:  enable("qpid", debug)else:  enable("qpid", warn)if opts.id is none:  spout_id = str(uuid4)else:  spout_id = opts.idif args:  addr = args.pop(0)else:  parser.error("address is required")content = noneif args:  text = " ".join(args)else:  text = noneif opts.entries:  content = {}  if text:    content["text"] = text  for e in opts.entries:    name, val = nameval(e)    content[name] = valelse:  content = textconn = connection(opts.broker,  reconnect=opts.reconnect,  reconnect_interval=opts.reconnect_interval,  reconnect_limit=opts.reconnect_limit)try:  conn.open  ssn = conn.session  snd = ssn.sender(addr)  count = 0  start = time.time  while (opts.count == 0 or count < opts.count) and (opts.timeout is none or time.time - start < opts.timeout):    msg = message(subject=opts.subject,  reply_to=opts.reply_to,  content=content)    msg.properties["spout-id"] = "%s:%s" % (spout_id, count)    for p in opts.properties:      name, val = nameval(p)      msg.properties[name] = val    snd.send(msg)    count += 1    print msgexcept senderror, e:  print eexcept keyboardinterrupt:  passconn.close </pre>


    客戶端的測試代碼:


    圖片 28.3 pic


    #!/usr/bin/env python #xiaorui ##轉自 ibm 開發社區import optparsefrom qpid.messaging import *from qpid.util import urlfrom qpid.log import enable, debug, warnparser = optparse.optionparser(usage="usage: %prog [options] address ...",       description="drain messages from the supplied address.")parser.add_option("-b", "--broker", default="localhost",  help="connect to specified broker (default %default)")parser.add_option("-c", "--count", type="int",  help="number of messages to drain")parser.add_option("-f", "--forever", action="store_true",  help="ignore timeout and wait forever")parser.add_option("-r", "--reconnect", action="store_true",  help="enable auto reconnect")parser.add_option("-i", "--reconnect-interval", type="float", default=3,  help="interval between reconnect attempts")parser.add_option("-l", "--reconnect-limit", type="int",  help="maximum number of reconnect attempts")parser.add_option("-t", "--timeout", type="float", default=0,  help="timeout in seconds to wait before exiting (default %default)")parser.add_option("-p", "--print", dest="format", default="%(m)s",  help="format string for printing messages (default %default)")parser.add_option("-v", dest="verbose", action="store_true",  help="enable logging")opts, args = parser.parse_argsif opts.verbose:  enable("qpid", debug)else:  enable("qpid", warn)if args:  addr = args.pop(0)else:  parser.error("address is required")if opts.forever:  timeout = noneelse:  timeout = opts.timeoutss formatter:  def __init__(self, message):    self.message = message    self.environ = {"m": self.message,    "p": self.message.properties,    "c": self.message.content}  def __getitem__(self, st):    return eval(st, self.environ)conn = connection(opts.broker,  reconnect=opts.reconnect,  reconnect_interval=opts.reconnect_interval,  reconnect_limit=opts.reconnect_limit)try:  conn.open  ssn = conn.session  rcv = ssn.receiver(addr)  count = 0  while not opts.count or count < opts.count:    try:      msg = rcv.fetch(timeout=timeout)      print opts.format % formatter(msg)      count += 1      ssn.acknowledge    except empty:      breakexcept receivererror, e:  print eexcept keyboardinterrupt:  passconn.close  </pre>


    browse 模式的意思是,瀏覽的意思,一個特殊的需求,我訪問了一次,別人也能訪問。


    consume 模式的意思是,我瀏覽了一次後,刪除這一條。別人就訪問不到啦。


    這個是瀏覽模式:


    圖片 28.4 pic


    sub-pub 通信的例子


    pub-sub 是另一種很有用的通信模型。恐怕它的名字就源於出版發行這種現實中的信息傳遞方式吧,publisher 就是出版商,subscriber 就是訂閱者。


    服務端qpid-config add exchange topic news-service./spout news-service/news xiaorui客戶端:./drain -t 120 news-service/#.news  </pre>


    pub 端,創建 topic 點!


    圖片 28.5 pic


    sub端,也就是接收端!


    圖片 28.6 pic  <h2>總結:</h2>


    qpid 挺好用的,比 rabbitmq 要輕型,比 zeromq 保險點! 各方麵的文檔也都很健全,值得一用。話說,這三個消息隊列我也都用過,最一開始用的是 redis 的 pubsub 做日誌收集和信息通知,後來在做集群相關的項目的時候,我自己搞了一套 zeromq 的分布式任務分發,和 saltstack 很像,當然了遠沒有萬人用的 salt 強大。  rabbitmq 的用法,更是看中他的安全和持久化,當然性能真的不咋地。


    關於 qpid 的性能我沒有親自做大量的測試,但是聽朋友說,加持久化可以到 7k,不加持久化可以到 1500 左右。


    本文出自 “峰雲,就她了。” 博客,謝絕轉載!

章節目錄

閱讀記錄

Python實戰-從菜鳥到大牛的進階之路所有內容均來自互聯網,uu小說網隻為原作者極客學院的小說進行宣傳。歡迎各位書友支持極客學院並收藏Python實戰-從菜鳥到大牛的進階之路最新章節