Do tej pory (jeśli podążasz za kolejnymi tutorialami) mieliśmy do czynienia tylko z kolejkami. Główną zasadą ich działania było to, że pojedynczy komunikat był przekazywany wyłącznie do jednego odbiorcy, co można porównać do przesyłania listów od nadawcy do odbiorcy.
W ten sposób można było lepiej rozłożyć przetwarzanie zadań w przypadku, gdy dysponowaliśmy większą liczbą robotników.
Teraz zajmiemy się czymś odrobinę bardziej złożonym, a mianowicie dostarczaniem tego samego komunikatu do wielu subskrybentów, czyli czymś w rodzaju dystrybucji prasy czy książek.
Producenta komunikatów będziemy nazywać wydawcą (ang. publisher), zaś konsumentów (odbiorców) komunikatów nazywać będziemy subskrybentami (ang. subscribers).
Komunikat jest przekazywany przez wydawcę:
do centrali komunikatów (ang. exchange):
Ta dodatkowa warstwa zwana jest w terminologii RabbitMQ pod angielskim słowem exchange, a w wolnym tłumaczeniu tablicą routing'u o określonej nazwie lub centralą komunikatów, nawiązującej pełnioną funkcją do centrali telefonicznych (ang. telephone exchange).
Podstawową cechą centrali komunikatów jest filtrowanie i/lub dyspozycja komunikatów na bazie określonych reguł.
Można to zrobić na kilka sposobów co zostanie pokazane w kolejnych częściach tutoriali.
Poniższy diagram ilustruje pełny przepływ komunikatów przetwarzanych przez RabbitMQ w centrali rozgłośni komunikatów (typu fanout):
Główną różnicą w porównaniu do schematu omawianego wcześniej jest fakt, iż komunikat może zostać utracony, gdy nie ma żadnego subskrybenta, zatem komunikat nie jest odkładany w żadnej kolejce dla nieistniejącego odbiorcy.
Dzieje się tak, gdyż dla każdego odbiorcy tworzone są tymczasowe kolejki.
W ramach tego tutoriala napiszemy dwa programy:
W programie tym ponownie będziemy bazować na kodzie jaki stworzyliśmy w ramach poprzednich tutoriali, czyli rozszerzymy następujący program o nowe instrukcje:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// nowe instrukcje
register_shutdown_function('shutdown', $channel, $connection);
Najpierw definiujemy nazwę centrali komunikatów:
$exchangeName = 'empik';
i deklarujemy jej parametry (w tym wypadku typ fanout):
$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
następnie tworzymy nowe wydanie jednego magazynu/kilku magazynów:
$date = new DateTime();
$issue = ((int) $date->format('n') + 1) . '/' . $date->format('Y');
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = 'aktualne wydanie ' . $issue;
}
a na końcu publikujemy komunikat:
$channel->basic_publish($message, $exchangeName);
Cały program wydawca.php wygląda więc następująco:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'empik';
$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
$date = new DateTime();
$issue = ((int) $date->format('n') + 1) . '/' . $date->format('Y');
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = 'aktualne wydanie ' . $issue;
}
$message = new AMQPMessage($data);
$channel->basic_publish($message, $exchangeName);
echo " [x] Wysłano ", $data, "\n";
register_shutdown_function('shutdown', $channel, $connection);
Jeśli teraz spróbujemy wydać jakiś magazyn np. aktualne wydanie
Terminal1 $ php wydawca.php
[x] Wysłano: aktualne wydanie 7/2018
lub jedno z poprzednich:
Terminal1 $ php wydawca.php 2/2016
[x] Wysłano: 2/2016
to rezultatem tego będzie utrata wysłanych komunikatów, gdyż – zgodnie z tym co powiedziano wcześniej – brak aktywnych subskrybentów komunikatów oznacza utracenie przesyłanych danych.
Napiszmy więc teraz program czytelnik.php
W nim ponownie bazujemy na kodzie źródłowym poprzednich programów-odbiorców:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// nowe instrukcje
register_shutdown_function('shutdown', $channel, $connection);
while(count($channel->callbacks)) {
$channel->wait();
}
Znowu definiujemy nazwę centrali komunikatów:
$exchangeName = 'empik';
i jej typ:
$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
Następnie tworzymy tymczasową kolejkę, z której korzystać będzie wyłącznie pojedyncza instancja naszego program czytelinik.php:
list($queueName, ,) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queueName, $exchangeName);
echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo ' Naciśnij CTRL+C aby zakończyć.', "\n";
$channel->basic_consume(
$queueName,
'',
false,
true,
false,
false,
function($message) {
echo " [x] Odebrano ", $message->body, "\n";
}
);
Pełny kod źródłowy programu będzie wyglądał jak poniżej:
require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$exchangeName = 'empik';
$channel->exchange_declare($exchangeName, 'fanout', false, false, false);
list($queueName, ,) = $channel->queue_declare('', false, false, true, false);
$channel->queue_bind($queueName, $exchangeName);
echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo ' Naciśnij CTRL+C aby zakończyć.', "\n";
$channel->basic_consume(
$queueName,
'',
false,
true,
false,
false,
function($message) {
echo " [x] Odebrano ", $message->body, "\n";
}
);
register_shutdown_function('shutdown', $channel, $connection);
while(count($channel->callbacks)) {
$channel->wait();
}
Gdy teraz uruchomimy nasz program czytelnik.php zobaczymy, iż korzysta on z tymczasowej kolejki o wygenerowanej przez RabbitMQ nazwie np.
Terminal2 $ php czytelnik.php
[*] Oczekiwanie na wydanie w amq.gen-RHJP1OPXQifo0ctvaDMFww.
Naciśnij CTRL+C aby zakończyć.
Pora teraz ponownie uruchomić program wydawca.js tak jak poprzednim razem:
Terminal1 $ php wydawca.php
[x] Wysłano: aktualne wydanie 3/2018
Terminal1 $ php wydawca.php 2/2016
[x] Wysłano: 2/2016
Terminal1 $ php wydawca.php 5/2015 6/2015 9/2015
[x] Wysłano: 5/2015 6/2015 9/2015
W drugim terminalu zobaczymy działanie naszego czytelnika:
Terminal2 $ php czytelnik.php
[*] Oczekiwanie na wydanie w amq.gen-RHJP1OPXQifo0ctvaDMFww.
Naciśnij CTRL+C aby zakończyć.
[x] Odebrano aktualne wydanie 3/2018
[x] Odebrano 2/2016
[x] Odebrano 5/2015 6/2015 9/2015
Jeśli teraz w trzeciej konsoli ponownie uruchomimy program czytelnik.php – zobaczymy – tak jak wcześniej wspomniano, iż system RabbitMQ przydzielił mu inną kolejkę niż programowi w drugim terminalu:
Terminal3 $ php czytelnik.php
[*] Oczekiwanie na wydanie w amq.gen-95Qaryoo-GWBXUQeZP01OQ.
Naciśnij CTRL+C aby zakończyć.
Tak więc z punktu nadawcy-wydawcy – nic się nie zmienia – choć nie wie on ilu odbiorców otrzymało przesłany komunikat lub czy ktokolwiek ją otrzymał. Inaczej wygląda to od strony subskrybenta.
Sytuację tą można zobrazować jeszcze w następujący sposób:
Gdybyśmy mieli przetrzymywać komunikaty dla początkowo wyłączonego komputera, a następnie je z nim synchronizować, to rezultatem takiego działania byłoby nieustanne zwiększanie ilości danych do przechowania i przesłania odbiorcy.
Takie podejście ma swoje plusy i minusy. Wszystko zależy od rodzaju komunikatów i tego co chcemy z nimi zrobić.
Jeśli byłyby to np. wiadomości przesyłane w ramach czata – nie moglibyśmy sobie pozwolić na ich utratę. Natomiast np. w przypadku logów aplikacji, czy usługi – możemy skorzystać z takiego rozwiązania zakładając, że zawsze będzie obecna co najmniej jedna aplikacja subskrybentów.
Przykładem takiego rozwiązania może być pobieranie logów i zapisywanie ich na dysku lub w bazie danych przez pierwszą aplikację-odbiorcę oraz drukowanie ich w konsoli / terminalu przez drugi program – gdy potrzebujemy zweryfikować działanie naszej aplikacji w różnych warunkach.
Centrale komunikatów typu fanout można stosować przy obsłudze:
Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-php
Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.