Ten rodzaj filtrowania wiadomości jest alternatywą dla omówionej w poprzednim tutorialu centrali komunikatów z nagłówkami, jednak nie może on posiadać dowolnie dobranego klucza routing’u, tak jak to miało miejsce w przypadku centrali komunikatów bezpośrednich.
Tutaj klucz powiązania musi składać się ze słów oddzielonych kropkami np. „gpw.notowania.spolki.kghm”, co można opisać następującym wyrażeniem regularnym:
^[A-Za-z0-9\-_*]+(\.[A-Za-z0-9\-_*]+)*$.
Do konkretnej kolejki zostaną dostarczone wszystkie wiadomości, dla których klucz będzie pasował.
Do jednej kolejki można przypisać kilka takich kluczy, tak jak miało to miejsce w przypadku centrali typu direct. W takim przypadku wystarczy, aby choć jeden klucz pasował, co jest podobne do filtrowania typu „any” w centralach typu headers.
Klucz routing'u składa się ze słów oddzielonych kropkami.
Pojedyncze słowa mogą zostać zastąpione przez:
Część klucza routingu, który chcemy zastąpić płotkiem np. „gpw.#”, nie może zaczynać się od kropki, co możemy zapisać następującym wyrażeniem regularnym:
^([^.][A-Za-z0-9\-_*]+)((\.[A-Za-z0-9\-_*]+)*)$
Centrale tematyczne mają wiele zastosowań.
Gdy zadanie stawiane przed centralą filtrującą komunikaty przewiduje wiele aplikacji subskrybujących je selektywnie, ten rodzaj routing'u jest jednym z najlepszych wyborów.
Świetnym przykładem zastosowania centrali tematycznych jest analiza notowań giełdowych pokazana poniżej:
Na diagramie możemy zobaczyć:
Warto tutaj zwrócić uwagę na przemyślaną hierarchię struktury klucza routing’u, tak aby dodanie kolejnych funkcjonalności nie wiązało się z całkowitym jej przebudowaniem, a w przypadku dużych systemów może się to okazać bardzo trudne lub nawet niewykonalne.
Podobnie jak w przypadku pozostałych przykładów programów-wydawców tak i tutaj bazujemy na przykładzie programu fabryka.js:
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
// nowe instrukcje
});
setTimeout(function() { conn.close(); process.exit(0) }, 500);
});
Deklarujemy nazwę centrali komunikatów tematycznych:
var ex = 'notowania-gpw';
a następnie odczytujemy dodatkowe parametry przesłane z linii poleceń:
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'gpw.info.anonymous';
var msg = args.slice(1).join(' ') || 'Raport skonsolidowany za IV kwartał 2015 roku!';
W przypadku ich braku do centrali komunikatów przesłana zostanie wiadomość „Raport skonsolidowany za IV kwartał 2015 roku!” z kluczem powiązania „gpw.info.anonymous”.
Kolejną rzeczą jest określenie typu centrali komunikatów i jej charakteru:
ch.assertExchange(ex, 'topic', {durable: false});
Na koniec wysyłamy wiadomość do kuriera wiadomości:
ch.publish(ex, key, new Buffer(msg));
console.log(" [x] Wysłano %s: '%s'", key, msg);
Kompletny program notowania-gpw-nadawca.js wygląda następująco:
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var ex = 'notowania-gpw';
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'gpw.info.anonymous';
var msg = args.slice(1).join(' ') || 'Raport skonsolidowany za IV kwartał 2015 roku!';
ch.assertExchange(ex, 'topic', {durable: false});
ch.publish(ex, key, new Buffer(msg));
console.log(" [x] Wysłano %s: '%s'", key, msg);
});
setTimeout(function() { conn.close(); process.exit(0) }, 500);
});
Program subskrybenta wygląda podobnie do programu subskrybenta logów dla centrali komunikatów typu direct.
Na samym początku sprawdzamy, czy użytkownik – uruchamiając program z terminala – podał wymagany co najmniej jeden klucz powiązania, a jeśli nie – wyświetlamy mu odpowiedni komunikat:
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Korzystanie: notowania-gpw-subskrybent.js ");
console.log(" ./notowania-gpw-subskrybent.js gpw.notowania.spolki.kghm");
console.log(" ./notowania-gpw-subskrybent.js gpw.new-connect.*");
console.log(" ./notowania-gpw-subskrybent.js gpw.#");
process.exit(1);
}
Następnie definiujemy nazwę centrali komunikatów i określamy jej typ:
var ex = 'notowania-gpw';
ch.assertExchange(ex, 'topic', {durable: false});
Na końcu przypinamy do centrali tymczasową kolejkę, która będzie powiązana z określonymi przez użytkownika kluczami routing'u:
ch.assertQueue('', {exclusive: true}, function(err, q) {
console.log(" [*] Oczekiwanie na wiadomości.");
console.log(" Naciśnij CTRL+C aby zakończyć.");
args.forEach(function(key) {
ch.bindQueue(q.queue, ex, key);
});
ch.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, {noAck: true});
});
Kompletny program notowania-gpw-nadawca.js wygląda następująco:
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Korzystanie: notowania-gpw-subskrybent.js ");
console.log(" ./notowania-gpw-subskrybent.js gpw.notowania.spolki.kghm");
console.log(" ./notowania-gpw-subskrybent.js gpw.new-connect.*");
console.log(" ./notowania-gpw-subskrybent.js gpw.#");
process.exit(1);
}
amqp.connect('amqp://localhost', function(err, conn) {
conn.createChannel(function(err, ch) {
var ex = 'notowania-gpw';
ch.assertExchange(ex, 'topic', {durable: false});
ch.assertQueue('', {exclusive: true}, function(err, q) {
console.log(" [*] Oczekiwanie na wiadomości.");
console.log(" Naciśnij CTRL+C aby zakończyć.");
args.forEach(function(key) {
ch.bindQueue(q.queue, ex, key);
});
ch.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, {noAck: true});
});
});
});
Centrale komunikatów typu topic można stosować przy:
Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-node.js
Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.