Tutoriale / Wstęp do routingu w RabbittMQ

59eb3ff6-5524-3e77-06b2-854e25397560

W dwóch pierwszych przykładach implementacji obsługi kolejek przedstawionych do tej pory (pierwszy przykład, drugi przykład) zastosowano uproszczony model architektury RabbitMQ.

W praktyce nadawca nie komunikuje się bezpośrednio z kolejką. Jeśli tak jest, to można zadać pytanie: w jaki sposób działały programy z dwóch pierwszych przykładów?

Otóż w RabbitMQ istnieje tzw. centrala komunikatów nieposiadająca nazwy (ang. nameless exchange), którą określamy w kodzie, używając pustego łańcucha: '', i która jest domyślna, jeśli nie podamy nazwy centrali.

W takim przypadku komunikaty zostaną przesłane do kolejki o określonej nazwie, o ile taka kolejka istnieje:

$channel->basic_publish($message, '', $routingKey);

Jeśli jednak zdefiniujemy nazwę centrali komunikatów, to potrzebujemy jeszcze określić relację pomiędzy centralą komunikatów a kolejką – którą nazywamy w protokole AMQP 0-9-1 angielskim słowem binding, co na język polski można przetłumaczyć jako „powiązanie”. Operacja związania elementu exchange i kolejki opiera się na tzw. kluczu powiązania (ang. routing key).

Klucz routing’u to coś w rodzaju adresu, który może zostać użyty przez centralę komunikatów, aby zadecydować, gdzie komunikat będzie przekazany dalej. Klucz powiązania jest czymś w rodzaju adresu URL dla protokołu HTTP, jednak jest czymś bardziej ogólnym (ang. generic), co można będzie zaraz zobaczyć na bazie różnych centrali komunikatów.

Centrale komunikatów

Wyróżniamy następujące cztery podstawowe typy „tablic routing'u RabbitMQ”:

  • fanout – centrala rozgłośni komunikatów – najprostszy rodzaj spośród czterech typów wymiany komunikatów; w tym przypadku wiadomość jest przesyłana do wszystkich aktywnych, tymczasowych kolejek stworzonych na potrzeby aplikacji odbiorców. Typ ten przypomina pod względem działania koncentrator sieciowy (ang. hub), który rozsyła te same pakiety danych (sygnały elektryczne) z jednego portu na wszystkie pozostałe. W praktyce dla tej centrali komunikatów do RabbitMQ wysyłany jest pusty klucz routing'u.
  • direct – centrala komunikatów bezpośrednich, gdzie komunikaty są przesyłane do kolejek na bazie pasującego klucza routing’u (ang. routing key). Nawiązując do przykładu z telekomunikacji – działanie tego rodzaju centrali komunikatów jest podobne do przełącznika sieciowego (ang. switch), który przesyła dane w odpowiednie miejsce na bazie adresu IP.
  • headers – centrala komunikatów z nagłówkami jest rozwinięciem typu direct, z tą różnicą, że nie używa się w niej klucza routing’u – jest on tutaj ignorowany; zamiast tego wykorzystywane są nagłówki, które mogą być liczbami, łańcuchami znaków lub wynikami działania funkcji skrótu (ang. hash). Przypisanie do kolejki następuje, gdy wartość konkretnego nagłówka jest równa zdefiniowanej wartości. Możliwe jest przypisanie na podstawie dowolnego nagłówka (argument x-match jest ustawiony na any) lub na podstawie zgodności wszystkich nagłówków (wartość all).
  • topic – centrala komunikatów tematycznych jest również rozwinięciem centrali typu direct, jednak w zupełnie inny sposób: bazuje na kluczu routingu, jednak jego format jest tutaj dokładnie określony;

Prezentując cztery podstawowe rodzaje konwencji budowy tablic routing'u, słowo podstawowe zostało celowo użyte. Protokół AMQP 0-9-1 przewiduje możliwość tworzenia własnych typów dyspozycji komunikatami.

Protokół ten wymusza także na systemach pośredniczących w wymianie komunikatów domyślne dostarczenie centrali komunikatów (ang. pre-declared exchanges) odpowiednio dla każdego ich typu:

  • fanout – amq.fanout,
  • direct – amq.direct,
  • headers – amq.headers,
  • topic – amq.topic.

Każda z tych centrali jest oddzielna dla odrębnych hostów RabbitMQ (vhost).

Aplikacje korzystające z RabbitMQ mogą polegać na tym, że centrale o powyższych nazwach będą zawsze znajdowały się w systemie, niejako „dostarczone w jednym pudełku” razem z pośrednikiem komunikatów.

Listowanie elementów RabbitMQ za pomocą linii komend

Listę wszystkich dostępnych centrali komunikatów możemy zobaczyć, wykonując polecenie:

$ rabbitmqctl list_exchanges
Listing exchanges ...
	direct
amq.direct	direct
amq.fanout	fanout
amq.headers	headers
amq.match	headers
amq.rabbitmq.log	topic
amq.rabbitmq.trace	topic
amq.topic	topic
empik	fanout
...done.

Natomiast jeśli chcemy zobaczyć listę wszystkich aktywnych kolejek (w tym tymczasowych), należy wywołać polecenie:

$ rabbitmqctl list_queues
Listing queues ...
amq.gen-FkNW-c5z7dd7ZNJwWRYc7w	0
amq.gen-_UHxPcO7GnDvWgnYoImF0Q	0
task_queue	0

Możemy także wyświetlić listę powiązań (ang. bindings):

$ rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-SurIiFhxXsSbusqBVMaIFg	queue	[]
logs exchange amq.gen-uQPyufcdgwX8lIWWvMJwFA	queue	[]

Diagram przepływu komunikatów dla centrali typu fanout umieszczono w poprzednim tutorialu: wydawca komunikatów i subskrybenci.

Centrala komunikatów bezpośrednich

Wspomniano wcześniej, iż centrale komunikatów pozwalają na filtrowanie komunikatów według określonych kryteriów.

W tym przypadku odbywa się to za pomocą wzorca w postaci kluczy powiązania (ang. binding key).

Diagram przedstawiający centralę komunikatów bezpośrednich na przykładzie scentralizowanego systemu przetwarzania logów serwerów/aplikacji

Na schemacie zaprezentowanym powyżej pokazano dwie drogi wiadomości:

  • pierwszą służącą do raportów o błędach (error),
  • drugą zbierającą informacje na temat działania aplikacji (error, info).

Zasadę działania central komunikatów bezpośrednich można określić jako: odbieraj tylko te komunikaty, dla których klucz routing’u jest dokładnie taki sam, jak ten podany przez aplikację-subskrybenta. Dla jednej centrali można przypisać kilka kluczy, także wtedy subskrybent nasłuchuje kilka rodzajów komunikatów.

Centrala typu direct świetnie nadaje się dla:

  • wiadomości wysyłanych do najbliższego (teoretycznie najlepszego) odbiorcy (ang. unicast routing);
  • wiadomości dla grupy odbiorców (nadawca nie widzi konkretnego odbiorcy, tylko ich grupę, np. lista mailingowa), co w j. angielskim określa się mianem multicast routing i które jest identyczne pod względem działania z centralą typu fanout.

Program bezposredni-nadawca-logow.php

Program przesyłający logi do systemu RabbitMQ jest bardzo podobny do programu wydawca.php utworzonego w ramach poprzedniego tutoriala. Rozpoczynamy w nim z tego samego miejsca co poprzednio:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

// nowe instrukcje

register_shutdown_function('shutdown', $channel, $connection);

Najpierw definiujemy nazwę centrali komunikatów bezpośrednich:

$exchangeName = 'direct_logs';

Następnie dodajemy kod odpowiedzialny za pobieranie rodzaju logów oraz treści loga przekazanego przez użytkownika:

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));

if (empty($data)) {
    $data = 'Witajcie!';
}

później deklarujemy centralę komunikatów o podanej wcześniej nazwie:

$channel->exchange_declare($exchangeName, 'direct', false, false, false);

i publikujemy wiadomość:

$channel->basic_publish($message, $exchangeName, $severity);

Informację o tym wyświetlamy użytkownikowi na terminalu:

echo ' [x] Wysłano ', $data, "\n";

Kompletny program bezposredni-nadawca-logow.php prezentuje się następująco:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$exchangeName = 'direct_logs';

$severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

$data = implode(' ', array_slice($argv, 2));

if (empty($data)) {
    $data = 'Witajcie!';
}

$channel->exchange_declare($exchangeName, 'direct', false, false, false);

$message = new AMQPMessage($data);

$channel->basic_publish($message, $exchangeName, $severity);

echo ' [x] Wysłano ', $data, "\n";

register_shutdown_function('shutdown', $channel, $connection);

Program uruchamiamy następująco:

$ php bezposredni-nadawca-logow.php error "Błąd krytyczny"

Wywołanie programu bez parametrów prześle do RabbitMQ log typu info o treści "Witajcie!". Jednakże w przypadku, gdy żaden odbiorca nie oczekuje na logi przesyłany log zostanie utracony.

Program bezposredni-subskrybent-logow.php

Program-odbiorcę również tworzymy korzystając z kodu z wcześniejszych tutoriali.

Na początku jednak wymuszamy na użytkowniku podanie typu logów jakie go interesują (można podać więcej niż jeden typ):

$severities = array_slice($argv, 1);

if (empty($severities )) {
    file_put_contents('php://stderr', "Korzystanie: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

Uruchomienie programu bez dodatkowych parametrów wyświetli komunikat o następującej treści:

$ php bezposredni-subskrybent-logow.php 
Korzystanie: php bezposredni-subskrybent-logow.php [info] [warning] [error]

Prawidłowe korzystanie z programu wygląda jak przedstawiono poniżej.

Oczekiwanie tylko na logi typu error:

$ php bezposredni-subskrybent-logow.php error
 [*] Oczekiwanie na wiadomości w amq.gen-Ki_rNXGQK8hLQu2obnZ1Zw.
     Naciśnij CTRL+C aby zakończyć.

Oczekiwanie tylko na logi typu error oraz info:

$ php bezposredni-subskrybent-logow.php error info
 [*] Oczekiwanie na wiadomości w amq.gen-3nzM0A2syMnxOqL7ytnClg.
     Naciśnij CTRL+C aby zakończyć.

Dalsza część programu wygląda tak jak w poprzednich przykładach:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

// nowe instrukcje

register_shutdown_function('shutdown', $channel, $connection);

while(count($channel->callbacks)) {
    $channel->wait();
}

Tu także deklarujemy centralę komunikatów bezpośrednich:

$exchangeName = 'direct_logs';

$channel->exchange_declare($exchangeName, 'direct', false, false, false);

następnie przypisujemy do niej tymczasową kolejkę generowaną przez RabbitMQ:

list($queueName, ,) = $channel->queue_declare('', false, false, true, false);

Kolejnym krokiem jest przypisanie do tej kolejki jednego lub kilku kluczy powiązania (ang. binding key):

$severities = array_slice($argv, 1);

if (empty($severities )) {
    file_put_contents('php://stderr', "Korzystanie: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    $channel->queue_bind($queueName, $exchangeName, $severity);
}

A na końcu skonfigurowanie konsumowania przychodzących komunikatów:

$channel->basic_consume($queueName, '', false, true, false, false, function($msg) {
    echo ' [x] ' . $msg->delivery_info['routing_key'] . ': ' . $msg->body . "\n";
});

Kompletny kod programu bezposredni-subskrybent-logow.js wygląda następująco:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$exchangeName = 'direct_logs';

$channel->exchange_declare($exchangeName, 'direct', false, false, false);

list($queueName, ,) = $channel->queue_declare('', false, false, true, false);

$severities = array_slice($argv, 1);

if (empty($severities )) {
    file_put_contents('php://stderr', "Korzystanie: $argv[0] [info] [warning] [error]\n");
    exit(1);
}

foreach ($severities as $severity) {
    $channel->queue_bind($queueName, $exchangeName, $severity);
}

echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo '     Naciśnij CTRL+C aby zakończyć.', "\n";

$channel->basic_consume($queueName, '', false, true, false, false, function($msg) {
    echo ' [x] ' . $msg->delivery_info['routing_key'] . ': ' . $msg->body . "\n";
});

register_shutdown_function('shutdown', $channel, $connection);

while(count($channel->callbacks)) {
    $channel->wait();
}

Po jego uruchomieniu w drugim terminalu:

$ php bezposredni-subskrybent-logow.php info error
 [*] Oczekiwanie na wiadomości w amq.gen-Ki_rNXGQK8hLQu2obnZ1Zw.
     Naciśnij CTRL+C aby zakończyć.

i przesłaniu logów z poziomu pierwszego terminala:

$ php bezposredni-nadawca-logow.php 
 [x] Wysłano info: 'Witajcie!'

Aktywne połączenia zostały pomyślnie zakończone.
$ php bezposredni-nadawca-logow.php error "Błąd krytyczny"
 [x] Wysłano error: 'Błąd krytyczny'

Aktywne połączenia zostały pomyślnie zakończone.

powinniśmy zobaczyć odebrany log i jego typ:

$ php bezposredni-subskrybent-logow.php info error
 [*] Oczekiwanie na wiadomości w amq.gen-Ki_rNXGQK8hLQu2obnZ1Zw.
     Naciśnij CTRL+C aby zakończyć.
 [x] info: 'Witajcie!'
 [x] error: 'Błąd krytyczny'

Przetwarzanie obrazków

Drugim, praktycznym przykładem zastosowania centrali typu direct (być może bardziej przejrzystym dla czytelnika) są operacje na grafikach przesyłanych do serwera, co ilustruje diagram zamieszczony poniżej:

Diagram przedstawiający centralę komunikatów bezpośrednich na przykładzie systemu przetwarzającego przesyłane obrazki (skalowanie, przycinanie, archiwizacja)

Diagram przedstawia rozdzielenie zadań:

  • zmiany rozmiaru – skalowania,
  • przycinania obrazka,
  • kopiowania oryginalnego obrazka do archiwum lub np. do systemu typu CDN (skrót od Content delivery network).

Przykładowe zastosowania

Centrale komunikatów typu direct można stosować przy obsłudze:

  • bezpośredniego dostarczania wiadomości (prawie w czasie rzeczywistym) do konkretnych graczy w grach typu MMO (ang. Massively Multiplayer Online games),
  • dostarczania wiadomości do wybranych lokalizacji geograficznych, np. punktów sprzedaży,
  • dystrybucji zadań pomiędzy wieloma instancjami tej samej aplikacji mających na celu realizację tej samej funkcjonalności np. przetwarzania obrazu,
  • przesyłania danych w obrębie wykonywania określonych kroków w przepływie zadań, gdzie każdy z nich posiada swój własny identyfikator (warto rozważyć tutaj użycie centrali wiadomości z nagłówkami),
  • dostarczania komunikatów do określonych aplikacji w obrębie sieci.

Repozytorium z kodem źródłowym

Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-php

Tomasz Kuter

Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.