W dwóch pierwszych przykładach implementacji obsługi kolejek przedstawionych do tej pory (pierwszy przykład, drugi przykład) zastosowano uproszczony model architektury RabbitMQ.
W praktyce nadawca nie komunikuje się bezpośrednio z kolejką. Jeśli tak jest, to można zadać pytanie: w jaki sposób działały programy z dwóch pierwszych przykładów?
Otóż w RabbitMQ istnieje tzw. centrala komunikatów nieposiadająca nazwy (ang. nameless exchange), którą określamy w kodzie, używając pustego łańcucha: '', i która jest domyślna, jeśli nie podamy nazwy centrali.
W takim przypadku komunikaty zostaną przesłane do kolejki o określonej nazwie, o ile taka kolejka istnieje:
$channel->basic_publish($message, '', $routingKey);
Jeśli jednak zdefiniujemy nazwę centrali komunikatów, to potrzebujemy jeszcze określić relację pomiędzy centralą komunikatów a kolejką – którą nazywamy w protokole AMQP 0-9-1 angielskim słowem binding, co na język polski można przetłumaczyć jako „powiązanie”. Operacja związania elementu exchange i kolejki opiera się na tzw. kluczu powiązania (ang. routing key).
Klucz routing’u to coś w rodzaju adresu, który może zostać użyty przez centralę komunikatów, aby zadecydować, gdzie komunikat będzie przekazany dalej. Klucz powiązania jest czymś w rodzaju adresu URL dla protokołu HTTP, jednak jest czymś bardziej ogólnym (ang. generic), co można będzie zaraz zobaczyć na bazie różnych centrali komunikatów.
Wyróżniamy następujące cztery podstawowe typy „tablic routing'u RabbitMQ”:
Prezentując cztery podstawowe rodzaje konwencji budowy tablic routing'u, słowo podstawowe zostało celowo użyte. Protokół AMQP 0-9-1 przewiduje możliwość tworzenia własnych typów dyspozycji komunikatami.
Protokół ten wymusza także na systemach pośredniczących w wymianie komunikatów domyślne dostarczenie centrali komunikatów (ang. pre-declared exchanges) odpowiednio dla każdego ich typu:
Każda z tych centrali jest oddzielna dla odrębnych hostów RabbitMQ (vhost).
Aplikacje korzystające z RabbitMQ mogą polegać na tym, że centrale o powyższych nazwach będą zawsze znajdowały się w systemie, niejako „dostarczone w jednym pudełku” razem z pośrednikiem komunikatów.
Listę wszystkich dostępnych centrali komunikatów możemy zobaczyć, wykonując polecenie:
$ rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
empik fanout
...done.
Natomiast jeśli chcemy zobaczyć listę wszystkich aktywnych kolejek (w tym tymczasowych), należy wywołać polecenie:
$ rabbitmqctl list_queues
Listing queues ...
amq.gen-FkNW-c5z7dd7ZNJwWRYc7w 0
amq.gen-_UHxPcO7GnDvWgnYoImF0Q 0
task_queue 0
Możemy także wyświetlić listę powiązań (ang. bindings):
$ rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-SurIiFhxXsSbusqBVMaIFg queue []
logs exchange amq.gen-uQPyufcdgwX8lIWWvMJwFA queue []
Diagram przepływu komunikatów dla centrali typu fanout umieszczono w poprzednim tutorialu: wydawca komunikatów i subskrybenci.
Wspomniano wcześniej, iż centrale komunikatów pozwalają na filtrowanie komunikatów według określonych kryteriów.
W tym przypadku odbywa się to za pomocą wzorca w postaci kluczy powiązania (ang. binding key).
Na schemacie zaprezentowanym powyżej pokazano dwie drogi wiadomości:
Zasadę działania central komunikatów bezpośrednich można określić jako: odbieraj tylko te komunikaty, dla których klucz routing’u jest dokładnie taki sam, jak ten podany przez aplikację-subskrybenta. Dla jednej centrali można przypisać kilka kluczy, także wtedy subskrybent nasłuchuje kilka rodzajów komunikatów.
Centrala typu direct świetnie nadaje się dla:
Program przesyłający logi do systemu RabbitMQ jest bardzo podobny do programu wydawca.php utworzonego w ramach poprzedniego tutoriala. Rozpoczynamy w nim z tego samego miejsca co poprzednio:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// nowe instrukcje
register_shutdown_function('shutdown', $channel, $connection);
Najpierw definiujemy nazwę centrali komunikatów bezpośrednich:
$exchangeName = 'direct_logs';
Następnie dodajemy kod odpowiedzialny za pobieranie rodzaju logów oraz treści loga przekazanego przez użytkownika:
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = 'Witajcie!';
}
później deklarujemy centralę komunikatów o podanej wcześniej nazwie:
$channel->exchange_declare($exchangeName, 'direct', false, false, false);
i publikujemy wiadomość:
$channel->basic_publish($message, $exchangeName, $severity);
Informację o tym wyświetlamy użytkownikowi na terminalu:
echo ' [x] Wysłano ', $data, "\n";
Kompletny program bezposredni-nadawca-logow.php prezentuje się następująco:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'direct_logs';
$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
$data = implode(' ', array_slice($argv, 2));
if (empty($data)) {
$data = 'Witajcie!';
}
$channel->exchange_declare($exchangeName, 'direct', false, false, false);
$message = new AMQPMessage($data);
$channel->basic_publish($message, $exchangeName, $severity);
echo ' [x] Wysłano ', $data, "\n";
register_shutdown_function('shutdown', $channel, $connection);
Program uruchamiamy następująco:
$ php bezposredni-nadawca-logow.php error "Błąd krytyczny"
Wywołanie programu bez parametrów prześle do RabbitMQ log typu info o treści "Witajcie!". Jednakże w przypadku, gdy żaden odbiorca nie oczekuje na logi przesyłany log zostanie utracony.
Program-odbiorcę również tworzymy korzystając z kodu z wcześniejszych tutoriali.
Na początku jednak wymuszamy na użytkowniku podanie typu logów jakie go interesują (można podać więcej niż jeden typ):
$severities = array_slice($argv, 1);
if (empty($severities )) {
file_put_contents('php://stderr', "Korzystanie: $argv[0] [info] [warning] [error]\n");
exit(1);
}
Uruchomienie programu bez dodatkowych parametrów wyświetli komunikat o następującej treści:
$ php bezposredni-subskrybent-logow.php
Korzystanie: php bezposredni-subskrybent-logow.php [info] [warning] [error]
Prawidłowe korzystanie z programu wygląda jak przedstawiono poniżej.
Oczekiwanie tylko na logi typu error:
$ php bezposredni-subskrybent-logow.php error
[*] Oczekiwanie na wiadomości w amq.gen-Ki_rNXGQK8hLQu2obnZ1Zw.
Naciśnij CTRL+C aby zakończyć.
Oczekiwanie tylko na logi typu error oraz info:
$ php bezposredni-subskrybent-logow.php error info
[*] Oczekiwanie na wiadomości w amq.gen-3nzM0A2syMnxOqL7ytnClg.
Naciśnij CTRL+C aby zakończyć.
Dalsza część programu wygląda tak jak w poprzednich przykładach:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// nowe instrukcje
register_shutdown_function('shutdown', $channel, $connection);
while(count($channel->callbacks)) {
$channel->wait();
}
Tu także deklarujemy centralę komunikatów bezpośrednich:
$exchangeName = 'direct_logs';
$channel->exchange_declare($exchangeName, 'direct', false, false, false);
następnie przypisujemy do niej tymczasową kolejkę generowaną przez RabbitMQ:
list($queueName, ,) = $channel->queue_declare('', false, false, true, false);
Kolejnym krokiem jest przypisanie do tej kolejki jednego lub kilku kluczy powiązania (ang. binding key):
$severities = array_slice($argv, 1);
if (empty($severities )) {
file_put_contents('php://stderr', "Korzystanie: $argv[0] [info] [warning] [error]\n");
exit(1);
}
foreach ($severities as $severity) {
$channel->queue_bind($queueName, $exchangeName, $severity);
}
A na końcu skonfigurowanie konsumowania przychodzących komunikatów:
$channel->basic_consume($queueName, '', false, true, false, false, function($msg) {
echo ' [x] ' . $msg->delivery_info['routing_key'] . ': ' . $msg->body . "\n";
});
Kompletny kod programu bezposredni-subskrybent-logow.js wygląda następująco:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'direct_logs';
$channel->exchange_declare($exchangeName, 'direct', false, false, false);
list($queueName, ,) = $channel->queue_declare('', false, false, true, false);
$severities = array_slice($argv, 1);
if (empty($severities )) {
file_put_contents('php://stderr', "Korzystanie: $argv[0] [info] [warning] [error]\n");
exit(1);
}
foreach ($severities as $severity) {
$channel->queue_bind($queueName, $exchangeName, $severity);
}
echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo ' Naciśnij CTRL+C aby zakończyć.', "\n";
$channel->basic_consume($queueName, '', false, true, false, false, function($msg) {
echo ' [x] ' . $msg->delivery_info['routing_key'] . ': ' . $msg->body . "\n";
});
register_shutdown_function('shutdown', $channel, $connection);
while(count($channel->callbacks)) {
$channel->wait();
}
Po jego uruchomieniu w drugim terminalu:
$ php bezposredni-subskrybent-logow.php info error
[*] Oczekiwanie na wiadomości w amq.gen-Ki_rNXGQK8hLQu2obnZ1Zw.
Naciśnij CTRL+C aby zakończyć.
i przesłaniu logów z poziomu pierwszego terminala:
$ php bezposredni-nadawca-logow.php
[x] Wysłano info: 'Witajcie!'
Aktywne połączenia zostały pomyślnie zakończone.
$ php bezposredni-nadawca-logow.php error "Błąd krytyczny"
[x] Wysłano error: 'Błąd krytyczny'
Aktywne połączenia zostały pomyślnie zakończone.
powinniśmy zobaczyć odebrany log i jego typ:
$ php bezposredni-subskrybent-logow.php info error
[*] Oczekiwanie na wiadomości w amq.gen-Ki_rNXGQK8hLQu2obnZ1Zw.
Naciśnij CTRL+C aby zakończyć.
[x] info: 'Witajcie!'
[x] error: 'Błąd krytyczny'
Drugim, praktycznym przykładem zastosowania centrali typu direct (być może bardziej przejrzystym dla czytelnika) są operacje na grafikach przesyłanych do serwera, co ilustruje diagram zamieszczony poniżej:
Diagram przedstawia rozdzielenie zadań:
Centrale komunikatów typu direct można stosować przy obsłudze:
Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-php
Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.