W dwóch pierwszych przykładach implementacji obsługi kolejek przedstawionych do tej pory (pierwszy przykład, drugi przykład) zastosowano uproszczony model architektury RabbitMQ.
W praktyce nadawca nie komunikuje się bezpośrednio z kolejką. Jeśli tak jest, to można zadać pytanie: w jaki sposób działały programy z dwóch pierwszych przykładów?
Otóż w RabbitMQ istnieje tzw. centrala komunikatów nieposiadająca nazwy (ang. nameless exchange), którą określamy w kodzie, używając pustego łańcucha: '', i która jest domyślna, jeśli nie podamy nazwy centrali.
W takim przypadku komunikaty zostaną przesłane do kolejki o określonej nazwie, o ile taka kolejka istnieje:
ch.sendToQueue('hello', new Buffer('Witamy!'));
Jeśli jednak zdefiniujemy nazwę centrali komunikatów, to potrzebujemy jeszcze określić relację pomiędzy centralą komunikatów a kolejką – którą nazywamy w protokole AMQP 0-9-1 angielskim słowem binding, co na język polski można przetłumaczyć jako „powiązanie”. Operacja związania elementu exchange i kolejki opiera się na tzw. kluczu powiązania (ang. routing key).
Klucz routing’u to coś w rodzaju adresu, który może zostać użyty przez centralę komunikatów, aby zadecydować, gdzie komunikat będzie przekazany dalej. Klucz powiązania jest czymś w rodzaju adresu URL dla protokołu HTTP, jednak jest czymś bardziej ogólnym (ang. generic), co można będzie zaraz zobaczyć na bazie różnych centrali komunikatów.
Wyróżniamy następujące cztery podstawowe typy „tablic routing'u RabbitMQ”:
Prezentując cztery podstawowe rodzaje konwencji budowy tablic routing'u, słowo podstawowe zostało celowo użyte. Protokół AMQP 0-9-1 przewiduje możliwość tworzenia własnych typów dyspozycji komunikatami.
Protokół ten wymusza także na systemach pośredniczących w wymianie komunikatów domyślne dostarczenie centrali komunikatów (ang. pre-declared exchanges) odpowiednio dla każdego ich typu:
Każda z tych centrali jest oddzielna dla odrębnych hostów RabbitMQ (vhost).
Aplikacje korzystające z RabbitMQ mogą polegać na tym, że centrale o powyższych nazwach będą zawsze znajdowały się w systemie, niejako „dostarczone w jednym pudełku” razem z pośrednikiem komunikatów.
Listę wszystkich dostępnych centrali komunikatów możemy zobaczyć, wykonując polecenie:
$ rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
empik fanout
...done.
Natomiast jeśli chcemy zobaczyć listę wszystkich aktywnych kolejek (w tym tymczasowych), należy wywołać polecenie:
$ rabbitmqctl list_queues
Listing queues ...
amq.gen-FkNW-c5z7dd7ZNJwWRYc7w 0
amq.gen-_UHxPcO7GnDvWgnYoImF0Q 0
task_queue 0
Możemy także wyświetlić listę powiązań (ang. bindings):
$ rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-SurIiFhxXsSbusqBVMaIFg queue []
logs exchange amq.gen-uQPyufcdgwX8lIWWvMJwFA queue []
Diagram przepływu komunikatów dla centrali typu fanout umieszczono w poprzednim tutorialu: wydawca komunikatów i subskrybenci.
Wspomniano wcześniej, iż centrale komunikatów pozwalają na filtrowanie komunikatów według określonych kryteriów.
W tym przypadku odbywa się to za pomocą wzorca w postaci kluczy powiązania (ang. binding key).
Na schemacie zaprezentowanym powyżej pokazano dwie drogi wiadomości:
Zasadę działania central komunikatów bezpośrednich można określić jako: odbieraj tylko te komunikaty, dla których klucz routing’u jest dokładnie taki sam, jak ten podany przez aplikację-subskrybenta. Dla jednej centrali można przypisać kilka kluczy, także wtedy subskrybent nasłuchuje kilka rodzajów komunikatów.
Centrala typu direct świetnie nadaje się dla:
Program przesyłający logi do systemu RabbitMQ jest bardzo podobny do programu wydawca.js utworzonego w ramach poprzedniego tutoriala. Rozpoczynamy w nim z tego samego miejsca co poprzednio:
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
// nowe instrukcje
});
setTimeout(function() { conn.close(); process.exit(0) }, 500);
});
Najpierw definiujemy nazwę centrali komunikatów bezpośrednich:
var ex = 'direct_logs';
Następnie dodajemy kod odpowiedzialny za pobieranie rodzaju logów oraz treści loga przekazanego przez użytkownika:
var args = process.argv.slice(2);
var msg = args.slice(1).join(' ') || 'Witajcie!';
var severity = (args.length > 0) ? args[0] : 'info';
później deklarujemy centralę komunikatów o podanej wcześniej nazwie:
ch.assertExchange(ex, 'direct', {durable: false});
i publikujemy wiadomość:
ch.publish(ex, severity, new Buffer(msg));
Informację o tym wyświetlamy użytkownikowi na terminalu:
console.log(" [x] Wysłano %s: '%s'", severity, msg);
Kompletny program bezposredni-nadawca-logow.js prezentuje się następująco:
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var ex = 'direct_logs';
var args = process.argv.slice(2);
var msg = args.slice(1).join(' ') || 'Witajcie!';
var severity = (args.length > 0) ? args[0] : 'info';
ch.assertExchange(ex, 'direct', {durable: false});
ch.publish(ex, severity, new Buffer(msg));
console.log(" [x] Wysłano %s: '%s'", severity, msg);
});
setTimeout(function() { conn.close(); process.exit(0) }, 500);
});
Program uruchamiamy następująco:
$ ./bezposredni-nadawca-logow.js error "Błąd krytyczny"
Wywołanie programu bez parametrów prześle do RabbitMQ log typu info o treści "Witajcie!". Jednakże w przypadku, gdy żaden odbiorca nie oczekuje na logi przesyłany log zostanie utracony.
Program-odbiorcę również tworzymy korzystając z kodu z wcześniejszych tutoriali.
Na początku jednak wymuszamy na użytkowniku podanie typu logów jakie go interesują (można podać więcej niż jeden typ):
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Korzystanie: bezposredni-subskrybent-logow.js [info] [warning] [error]");
process.exit(1);
}
Uruchomienie programu bez dodatkowych parametrów wyświetli komunikat o następującej treści:
$ ./bezposredni-subskrybent-logow.js
Korzystanie: bezposredni-subskrybent-logow.js [info] [warning] [error]
Prawidłowe korzystanie z programu wygląda jak przedstawiono poniżej.
Oczekiwanie tylko na logi typu error:
$ ./bezposredni-subskrybent-logow.js error
[*] Oczekiwanie na wiadomości w amq.gen-Ki_rNXGQK8hLQu2obnZ1Zw.
Naciśnij CTRL+C aby zakończyć.
Oczekiwanie tylko na logi typu error oraz info:
$ ./bezposredni-subskrybent-logow.js error info
[*] Oczekiwanie na wiadomości w amq.gen-3nzM0A2syMnxOqL7ytnClg.
Naciśnij CTRL+C aby zakończyć.
Dalsza część programu wygląda tak jak w poprzednich przykładach:
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
// nowe instrukcje
});
});
Tu także deklarujemy centralę komunikatów bezpośrednich:
var ex = 'direct_logs';
ch.assertExchange(ex, 'direct', {durable: false});
następnie przypisujemy do niej tymczasową kolejkę generowaną przez RabbitMQ:
ch.assertQueue('', {exclusive: true}, function(err, q) {};
Kolejnym krokiem jest przypisanie do tej kolejki jednego lub kilku kluczy powiązania (ang. binding key):
args.forEach(function(severity) {
ch.bindQueue(q.queue, ex, severity);
});
A na końcu skonfigurowanie konsumowania przychodzących komunikatów:
ch.consume(q.queue, function(msg) {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
}, {noAck: true});
Kompletny kod programu bezposredni-subskrybent-logow.js wygląda następująco:
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Korzystanie: bezposredni-subskrybent-logow.js [info] [warning] [error]");
process.exit(1);
}
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var ex = 'direct_logs';
ch.assertExchange(ex, 'direct', {durable: false});
ch.assertQueue('', {exclusive: true}, function(err, q) {
console.log(" [*] Oczekiwanie na wiadomości w %s.", q.queue);
console.log(" Naciśnij CTRL+C aby zakończyć.");
args.forEach(function(severity) {
ch.bindQueue(q.queue, ex, severity);
});
ch.consume(q.queue, function(msg) {
console.log(" [x] %s: '%s'", msg.fields.routingKey, msg.content.toString());
}, {noAck: true});
});
});
});
Po jego uruchomieniu w drugim terminalu:
$ ./bezposredni-subskrybent-logow.js info error
[*] Oczekiwanie na wiadomości w amq.gen-Ki_rNXGQK8hLQu2obnZ1Zw.
Naciśnij CTRL+C aby zakończyć.
i przesłaniu logów z poziomu pierwszego terminala:
$ ./bezposredni-nadawca-logow.js
[x] Wysłano info: 'Witajcie!'
$ ./bezposredni-nadawca-logow.js error "Błąd krytyczny"
[x] Wysłano error: 'Błąd krytyczny'
powinniśmy zobaczyć odebrany log i jego typ:
$ ./bezposredni-subskrybent-logow.js info error
[*] Oczekiwanie na wiadomości w amq.gen-Ki_rNXGQK8hLQu2obnZ1Zw.
Naciśnij CTRL+C aby zakończyć.
[x] info: 'Witajcie!'
[x] error: 'Błąd krytyczny'
Drugim, praktycznym przykładem zastosowania centrali typu direct (być może bardziej przejrzystym dla czytelnika) są operacje na grafikach przesyłanych do serwera, co ilustruje diagram zamieszczony poniżej:
Diagram przedstawia rozdzielenie zadań:
Centrale komunikatów typu direct można stosować przy obsłudze:
Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-node.js
Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.