Ten rodzaj filtrowania wiadomości jest alternatywą dla omówionej w poprzednim tutorialu centrali komunikatów z nagłówkami, jednak nie może on posiadać dowolnie dobranego klucza routing’u, tak jak to miało miejsce w przypadku centrali komunikatów bezpośrednich.
Tutaj klucz powiązania musi składać się ze słów oddzielonych kropkami np. „gpw.notowania.spolki.kghm”, co można opisać następującym wyrażeniem regularnym:
^[A-Za-z0-9\-_*]+(\.[A-Za-z0-9\-_*]+)*$.
Do konkretnej kolejki zostaną dostarczone wszystkie wiadomości, dla których klucz będzie pasował.
Do jednej kolejki można przypisać kilka takich kluczy, tak jak miało to miejsce w przypadku centrali typu direct. W takim przypadku wystarczy, aby choć jeden klucz pasował, co jest podobne do filtrowania typu „any” w centralach typu headers.
Klucz routing'u składa się ze słów oddzielonych kropkami.
Pojedyncze słowa mogą zostać zastąpione przez:
Część klucza routingu, który chcemy zastąpić płotkiem np. „gpw.#”, nie może zaczynać się od kropki, co możemy zapisać następującym wyrażeniem regularnym:
^([^.][A-Za-z0-9\-_*]+)((\.[A-Za-z0-9\-_*]+)*)$
Centrale tematyczne mają wiele zastosowań.
Gdy zadanie stawiane przed centralą filtrującą komunikaty przewiduje wiele aplikacji subskrybujących je selektywnie, ten rodzaj routing'u jest jednym z najlepszych wyborów.
Świetnym przykładem zastosowania centrali tematycznych jest analiza notowań giełdowych pokazana poniżej:
Na diagramie możemy zobaczyć:
Warto tutaj zwrócić uwagę na przemyślaną hierarchię struktury klucza routing’u, tak aby dodanie kolejnych funkcjonalności nie wiązało się z całkowitym jej przebudowaniem, a w przypadku dużych systemów może się to okazać bardzo trudne lub nawet niewykonalne.
Podobnie jak w przypadku pozostałych przykładów programów-wydawców tak i tutaj bazujemy na przykładzie programu fabryka.php:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// nowe instrukcje
register_shutdown_function('shutdown', $channel, $connection);
Na początku sprawdzamy czy program wywołano z parameterami. Jeśli tego nie zrobiono wyświetlamy informację o tym jak korzystać z programu:
if (count(array_slice($argv, 1)) < 2) {
file_put_contents(
'php://stderr',
"Korzystanie: php $argv[0] gpw.notowania.spolki.kghm \"Raport skonsolidowany za IV kwartał 2015 roku!\"\n"
);
exit(1);
}
później deklarujemy nazwę centrali komunikatów tematycznych:
$exchangeName = 'notowania-gpw';
a następnie odczytujemy dodatkowe parametry przesłane z linii poleceń:
$args = array_slice($argv, 1);
$key = isset($args[0]) ? $args[0] : 'gpw.info.anonymous';
if (count($args) > 0) {
$msg = implode(' ', array_slice($args, 1));
} else {
$msg = 'Raport skonsolidowany za IV kwartał 2015 roku!';
}
W przypadku ich braku do centrali komunikatów przesłana zostanie wiadomość „Raport skonsolidowany za IV kwartał 2015 roku!” z kluczem powiązania „gpw.info.anonymous”.
Kolejną rzeczą jest określenie typu centrali komunikatów i jej charakteru:
$channel->exchange_declare($exchangeName, 'topic', false, false, false);
Na koniec wysyłamy wiadomość do kuriera wiadomości:
$message = new AMQPMessage($msg);
$channel->basic_publish($message, $exchangeName, $key);
echo ' [x] Wysłano ', $msg, "\n";
Kompletny program notowania-gpw-nadawca.php wygląda następująco:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
if (count(array_slice($argv, 1)) < 2) {
file_put_contents(
'php://stderr',
"Korzystanie: php $argv[0] gpw.notowania.spolki.kghm \"Raport skonsolidowany za IV kwartał 2015 roku!\"\n"
);
exit(1);
}
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'notowania-gpw';
$args = array_slice($argv, 1);
$key = isset($args[0]) ? $args[0] : 'gpw.info.anonymous';
if (count($args) > 0) {
$msg = implode(' ', array_slice($args, 1));
} else {
$msg = 'Raport skonsolidowany za IV kwartał 2015 roku!';
}
$channel->exchange_declare($exchangeName, 'topic', false, false, false);
$message = new AMQPMessage($msg);
$channel->basic_publish($message, $exchangeName, $key);
echo ' [x] Wysłano ', $msg, "\n";
register_shutdown_function('shutdown', $channel, $connection);
Program subskrybenta wygląda podobnie do programu subskrybenta logów dla centrali komunikatów typu direct.
Na samym początku sprawdzamy, czy użytkownik – uruchamiając program z terminala – podał wymagany co najmniej jeden klucz powiązania, a jeśli nie – wyświetlamy mu odpowiedni komunikat:
$routingKeys = array_slice($argv, 1);
if (empty($routingKeys)) {
file_put_contents('php://stderr', "Korzystanie: $argv[0] gpw.notowania.spolki.kghm\n");
file_put_contents('php://stderr', " $argv[0] gpw.new-connect.*\n");
file_put_contents('php://stderr', " $argv[0] gpw.#\n");
file_put_contents('php://stderr', " $argv[0] gpw.notowania.spolki.kghm gpw.notowania.spolki.orlen\n");
exit(1);
}
Następnie definiujemy nazwę centrali komunikatów i określamy jej typ:
$exchangeName = 'notowania-gpw';
$channel->exchange_declare($exchangeName, 'topic', false, false, false);
Na końcu przypinamy do centrali tymczasową kolejkę, która będzie powiązana z określonymi przez użytkownika kluczami routing'u:
list($queueName, ,) = $channel->queue_declare('', false, false, true, false);
foreach ($routingKeys as $routingKey) {
$channel->queue_bind($queueName, $exchangeName, $routingKey);
}
echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo ' Naciśnij CTRL+C aby zakończyć.', "\n";
$channel->basic_consume($queueName, '', false, true, false, false, function($message) {
echo ' [x] ' . $message->delivery_info['routing_key'] . ': ' . $message->body . "\n";
});
Kompletny program notowania-gpw-nadawca.js wygląda następująco:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;
$routingKeys = array_slice($argv, 1);
if (empty($routingKeys)) {
file_put_contents('php://stderr', "Korzystanie: $argv[0] gpw.notowania.spolki.kghm\n");
file_put_contents('php://stderr', " $argv[0] gpw.new-connect.*\n");
file_put_contents('php://stderr', " $argv[0] gpw.#\n");
file_put_contents('php://stderr', " $argv[0] gpw.notowania.spolki.kghm gpw.notowania.spolki.orlen\n");
exit(1);
}
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'notowania-gpw';
$channel->exchange_declare($exchangeName, 'topic', false, false, false);
list($queueName, ,) = $channel->queue_declare('', false, false, true, false);
foreach ($routingKeys as $routingKey) {
$channel->queue_bind($queueName, $exchangeName, $routingKey);
}
echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo ' Naciśnij CTRL+C aby zakończyć.', "\n";
$channel->basic_consume($queueName, '', false, true, false, false, function($message) {
echo ' [x] ' . $message->delivery_info['routing_key'] . ': ' . $message->body . "\n";
});
register_shutdown_function('shutdown', $channel, $connection);
while(count($channel->callbacks)) {
$channel->wait();
}
Centrale komunikatów typu topic można stosować przy:
Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-php
Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.