rabbit.js 以 amqplib 為基礎,將原本複雜的設定又再簡化,讓一般性的使用者更方便,如果是使用一般常見的模式(pattern),只要幾行程式碼就可以運作了。
安裝
如果要在 Node.js 中使用 RabbitMQ,首先就是要將基本的 Node.js 與 RabbitMQ 服務先安裝好,安裝說明請參考:接著再使用 Node.js 的 npm 套件管理程式安裝 rabbot.js:
npm install rabbit.js
使用 rabbit.js
rabbit.js 在使用上比 RabbitMQ 官方所提供的教學範例還要簡略,只要建立 RabbitMQ 的連線與對應類型的 socket 即可,以下是使用的方式教學。建立連線
首先以 createContext() 指定 RabbitMQ 的 URL 位址,建立一個新的連線:var context = require('rabbit.js').createContext('amqp://localhost');在連線建立之後,context 會送出 'ready' 這個事件(event)。如果連線發生問題時,則會送出 'error' 事件,並且附帶一個 Error 物件,設計者可以過這些事件判斷連線是否正常。
建立 Socket
建立連線之後,還要再建立指定類型的 socket,而要使用哪一種類型就要看自己的需求而定。這裡以 Publish/Subscribe 這個模型為範例,在一個 JavaScript 程式中同時實作 publish 與 subscribe:var pub = context.socket('PUBLISH'); var sub = context.socket('SUBSCRIBE');這裡建立兩個 socket,一個作為 publish(pub),另一個作為 subscribe(sub)。
接下來,讓兩個 socket 連接到同一個 exchange:
pub.connect('alerts'); sub.connect('alerts');socket 實際上就是 Stream,所以當它的緩衝區有資料可以讀取時,你可以使用 read() 函數或是透過 'data' 事件來讀取,而若要寫入資料,則可使用 write() 函數。
如果傳送的資料都是字串,則可以使用 setEncoding() 來設定字串的編碼:
sub.setEncoding('utf8'); sub.on('data', function(note) { console.log("Alarum! %s", note); });或是在寫入資料時加上一個指定編碼的參數:
pub.write("Emergency. There's an emergency going on", 'utf8');
Stream 所提供的 pipe() 函數可以很方便的將輸出導向至其他的串流:
sub.pipe(process.stdout);
一個 socket 可以同時連接到多個 exchange,例如:
var sub2 = context.socket('SUBSCRIBE'); sub2.connect('system'); sub2.connect('notifications');這裡的 sub2 會同時接收來自於 system 與 notifications 這兩個 exchange 的所有訊息。
如果一個 socket 同時連接多個 exchange 時,無法辨識收到的訊息是來自於哪一個 exchange,如果需要區分訊息來源,就要改用多個 socket。
如果要關閉 socket,可以呼叫 close() 函數,它會清理配置給該 socket 的所有資源,並在完成時送出 'close' 事件。若 socket 的類型是屬於可以寫入資料的,則也可以使用 end([chunk [, encoding]]) 函數來寫入最後一筆資料,當資料寫入後,就會自動關閉 socket,如果呼叫 end() 不加上任何參數,它的效果就跟 close() 函數相同。
Socket 類型
在使用 Context.socket() 建立 socket 時,第一個參數的作用是指定 socket 的類型,以下是所有支援的類型:- PUBLISH / SUBSCRIBE(PUB / SUB)
- PUSH / PULL(使用範例請參考 net)
- REQUEST / REPLY(REQ / REP)(使用範例請參考 ordering)
- PUSH / WORKER
Topic
PUB 與 SUB 這兩個類型的 socket 可以依據指定的 topic 來傳送與接收訊息。PUB socket 可以使用 setsockopt('topic', string) 來設定 socket 的 topic,或是在發送訊息時,使用 publish(topic, message, [encoding]) 直接指定該訊息的 topic。
SUB socket 則可以在呼叫 connect() 函數時,在第二個參數上指定 topic。
指定完 topic 之後,還要設定 'routing' 的類型,才能讓他運作,可用的 'routing' 類型如下:
- 'fanout':這是預設的選項,不管 topic 是什麼,將所有的訊息配送至所有的 SUB socket。
- 'direct':只配送完全符合 topic 名稱的訊息。
- 'topic':依據 AMQP 的萬用字元規則來比對 topic 名稱,比對方法請參考 RabbitMQ 的 Topics。
Socket 設定
若要設定或更改 socket 的設定,可以透過 Socket.setsockopt() 函數,或是在呼叫 Context.socket() 時,將選項的設定值放在第二個參數上。以下是一些可用的 socket 選項:
- routing
- 適用於 PUB 與 SUB 這兩個類型的 socket,跟 topic 配合之後,可以決定如何配送訊息。routing 在 socket 建立的時候就要指定好,連接至相同位置的 sockets 其 routing 也必須吻合。
- topic
- 用於設定 PUB socket 所發送訊息的 topic。
- expiration
- 專門用於可寫入的 socket(如 PUB、PUSH、REQ、REP),設定訊息的有效期間,單位為千分之一秒,例如:
pub.setsockopt('expiration', 60 * 1000)
這樣由 pub 所發送的訊息如果在 60 秒內沒有被接收,那麼伺服器就會將此訊息丟棄。訊息有效期間的功能在 RabbitMQ 3.0.0 以後才有被支援。 - prefetch
- 適用於 WORKER 與 REP socket,設定 RabbitMQ 在訊息處理完成之前,只能配送多少筆訊息給 socket。例如:
var worker = ctx.socket('WORKER', {prefetch: 1});
這樣 RabbitMQ 就會一次只配送一個工作給 worker,等待工作處理完成並呼叫 ack() 之後,才會再繼續配送下一個工作。 如果 prefetch 設定為 0(預設的選項),RabbitMQ 就會不設定任何限制,將所有的訊息都配送給 socket。 - persistent
- 設定訊息在 RabbitMQ 伺服器重新啟動之後,是否還要保存,可用的值為 true 與 false。在 REQ / REP 的狀況下,系統只會保存 requests,而在 PUB / SUB 的狀況則沒有作用。
參考資料:GitHub
沒有留言:
張貼留言