前一個範例中,我們實作的工作佇列都是假設一個工作只會配送給一個 worker,現在我們要改變一下這個規則,讓一個訊息可以同時傳送給多個 consumers,而這樣的設計模式就稱為 publish/subscribe。
在這裡我們將實作一個記錄檔系統(logging system),整個系統中包含兩個程式,一個是負責產生記錄訊息,另外一個則是負責接收與處理這些訊息。
在這個記錄檔系統中,我們可以使用一個訊息接收程式(receiver)將接收到的訊息寫入硬碟中,另外同時再執行一個訊息接收程式將所接收到的訊息顯示在螢幕上。
基本上在這個記錄檔系統中,所有產生出來的訊息都可以以廣播的方式送給每一個接收程式,讓每一個接收者都收到一份相同的訊息。
Exchanges
在之前的教學範例中,我們都是很簡單的將訊息送進 RabbitMQ 的佇列中,在從佇列中取得訊息,但是整個過程其實不只是這樣,這裡我們將解釋 RabbitMQ 處理訊息的模型與流程。以下是前面教學範例的一些重點:
- producer:一個使用者的程式,負責發送訊息。
- queue:儲存訊息用的緩衝區。
- consumer:一個使用者的程式,負責發送訊息。
在 RabbitMQ 中訊息的配送都是靠 exchange 來處理的,exchange 專門負責從 producers 接收訊息,然後將訊息配送給正確的 queues,exchange 必須清楚每一則訊息的配送規則,例如哪一些訊息要送給哪一個 queue?哪一些訊息要鬼給好幾個 queues?或是哪一些訊息應該被丟棄?而這些規則都是經由 exchange 的類型(type)來定義的。
exchange 的類型分為:direct、topic、headers 與 fanout 這幾種,在這裡我們需要使用的類型是 fanout,首先建立一個 exchange,並命名為 logs:
channel.exchange_declare(exchange='logs', type='fanout')fanout 的 exchange 非常簡單,他就是很單純的把所有接收到的訊息配送給每一個 queues,而這個規則也就是我們在記錄檔系統中所需要的。
接著就可以將訊息發佈至這個 exchange 了:
channel.basic_publish(exchange='logs', routing_key='', body=message)這裡的 exchange 參數可以用來指定 exchange 的名稱。
列出系統的 Exchanges
如果要列出系統中所有的 exchanges,可以使用 rabbitmqctl 指令:sudo rabbitmqctl list_exchanges輸出為
輸出的 exchanges 中,有一些名稱為 amq.* 或是空字串的 exchanges,這些是 RabbitMQ 內建的,這些預設的 exchanges 在這裡並不會用到。Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic logs fanout ...done.
沒有名稱的 Exchange
在之前的教學範例中,我們都是直接使用名稱為空字串的 exchange:channel.basic_publish(exchange='', routing_key='hello', body=message)名稱為空字串的 exchange 是一個內建的 exchange,它會直接把收到的訊息依據 routing_key 所指定的 queue 名稱來配送。
暫時性的 Queues
在之前的範例中,因為我們需要讓 producers 與 consumers 可以共用一個佇列,將大量的工作分散處理,所以我們藉著建立具名佇列的方式,來讓 producers 與 consumers 都可以明確指定要使用的佇列(如之前的 hello 與 task_queue)。但是在這裡的情況跟之前的例子不同,現在我們要讓每一個訊息接收者都可以收到所有的訊息,而且我們只需要接收到最新的訊息即可,過時的訊息對我們來說其實沒有用,所以我們做了兩項改變。
第一個部分就是讓每一個訊息接收者連接至 RabbitMQ 伺服器時,建立一個該訊息接收者專屬的新佇列,因為這個佇列是專門給這個接收者使用,所以也不需要特別指定名稱。
result = channel.queue_declare()如果在呼叫 queue_declare() 時,沒有以 queue 參數指定佇列名稱,則 RabbitMQ 會自行以隨機的方式為這個佇列指定一個名稱,儲存至 result.method.queue。
第二個部分就是在訊息接收者中斷與 RabbitMQ 伺服器的連現時,讓剛剛建立的專屬佇列也可以自動刪除,這個動作可以使用 exclusive 參數來處理:
result = channel.queue_declare(exclusive=True)這樣一來,每一個訊息接收者在連接上 RabbitMQ 伺服器之後,就都會有一個自己的佇列,如此一來不同的接收者就不會互相干擾。
Bindings
建立好新的佇列之後,還要讓之前建立的 fanout exchange 可以將訊息配送至新建立的佇列中,建立 exchange 與佇列之間的聯結動作就稱作 binding。channel.queue_bind(exchange='logs',
queue=result.method.queue)
這樣 logs 這個 exchange 就會將訊息配送至 result.method.queue 這個剛建立好的佇列中。如果要查看系統中所有 binding 的狀態,可以使用
rabbitmqctl list_bindings
完整的記錄檔系統
在這個例子中,producer 的程式碼跟之前的例子都差不多,只是改用 logs 這個 exchange 而已,我們將 producer 的指令稿命名為 emit_log.py:
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print " [x] Sent %r" % (message,) connection.close()這裡在使用 basic_publish() 指定使用 logs 這個 exchange 時,routing_key 雖然還是要指定,不過這個參數對於 fanout exchange 是沒有作用的。
另外 producer 在實作上也要記得宣告 exchange,以避免使用到還沒建立好的 exchange 造成錯誤。
在這個例子中,如果 producer 發送訊息給 exchange 的時候,還沒有任何佇列聯結上該 exchange,那麼該訊息就會被丟棄,這個狀況對於這個範例而言剛好沒有影響,因為我們只希望接收最新的訊息,舊的不用保留,所以沒有關係。
以下是訊息接收者的程式碼,我們將其命名為 receive_logs.py:
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue channel.queue_bind(exchange='logs', queue=queue_name) print ' [*] Waiting for logs. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] %r" % (body,) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()這樣就完成了,如果想要將記錄檔寫入檔案,可以執行:
python receive_logs.py > logs_from_rabbit.log如果想要從螢幕上看到及時的記錄檔訊息,可以再開一的終端機,執行:
python receive_logs.py最後產生記錄訊息:
python emit_log.py
你可以使用 rabbitmqctl 指令來確認 binding 的狀態是否正確:
sudo rabbitmqctl list_bindings如果有兩個 receive_logs.py 在執行的情況下,輸出應該會類似這樣:
Listing bindings ...關於 rabbitmqctl list_bindings 的輸出欄位說明,可以參考 rabbitmqctl 指令的 man 線上手冊(man rabbitmqctl)。
exchange amq.gen-FdsvRZh16fRiV60q0MdzVg queue amq.gen-FdsvRZh16fRiV60q0MdzVg []
exchange amq.gen-fo3VHtgITPC4pe4l_PcSYQ queue amq.gen-fo3VHtgITPC4pe4l_PcSYQ []
logs exchange amq.gen-FdsvRZh16fRiV60q0MdzVg queue amq.gen-FdsvRZh16fRiV60q0MdzVg []
logs exchange amq.gen-fo3VHtgITPC4pe4l_PcSYQ queue amq.gen-fo3VHtgITPC4pe4l_PcSYQ []
...done.
沒有留言:
張貼留言