Tutoriale / Centrala komunikatów z nagłówkami

3cfd3daf-74a7-881f-94e9-36df4f417729

Ten rodzaj centrali wiadomości jest ulepszoną wersją centrali wiadomości bezpośrednich. Zasadniczą zmianą jest wykorzystanie nagłówków zamiast kluczy routing’u.

Każdy nagłówek ma dwie cechy:

  • nazwę,
  • wartość.

Z tego też powodu zamiast używać kluczy routing’u, które mogły być tylko łańcuchami znaków, można użyć np. liczb całkowitych lub funkcji skrótu (ang. hash) na bazie słownika.

Nagłówki

Lista nagłówków może wyglądać następująco:

Nazwa Wartość
x-match any
type error
app node.js
host 10.0.0.15

Wyróżniamy dwa rodzaje sposobów dopasowania wiadomości do określonych filtrów w centralach z nagłówkami ustawiane jako argument dla parametru x-match:

  • any – informujący centralę, że wystarczy, aby pasował przynajmniej jeden z filtrów, co można określić również przy pomocy słowa lub (ang. or);
  • all – informujący centralę, że muszą pasować wszystkie filtry, co można określić przy pomocy słowa oraz (ang. and); Ponieważ na stronie RabbitMQ w sekcji „Tutorials” zaprezentowano wszystkie rodzaje central z wyjątkiem central z nagłówkami, przyjrzymy się temu rodzajowi routing'u dokładniej na konkretnym przykładzie.

Oba rodzaje dopasowania ilustrują dwa diagramy:

  • any:
    Diagram przedstawiający filtrowanie wiadomości przez centralę komunikatów z nagłówkami z dopasowaniem any
  • all:
    Diagram przedstawiający filtrowanie wiadomości przez centralę komunikatów z nagłówkami z dopasowaniem all

Jako bazowy kod weźmy kod programu wydawca.php, który w uproszczeniu wyglądał następująco:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

// nowe instrukcje

register_shutdown_function('shutdown', $channel, $connection);

Zanim zaczniemy pisać dwa nowe programy (nadawcę i odbiorcę), potrzebujemy określić co one będą dla nas robić?

Przypuśćmy, że nadawca przesyła nam z portalu randkowego:

  • imię osoby,
  • płeć,
  • kolor włosów.

Natomiast odbiorca będzie miał ustawione „na sztywno” pożądane cechy drugiej połówki, tzn. w kodzie będzie zaszyte, jaką płeć i jaki kolor włosów swojej wybranki/wybranka preferuje.

Zacznijmy od aplikacji serwisu randkowego, który będzie wysyłał „profile” do systemu RabbitaMQ. Nazwijmy go randkoder.php

Program randkoder.php

Tuż po zaimportowaniu biblioteki amqplib definiujemy sposób korzystania z naszego nadawcy:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$numberOfAttributes = 3;

if (count(array_slice($argv, 1)) < $numberOfAttributes) {
    file_put_contents('php://stderr', "Korzystanie: php $argv[0] [imię] [płeć: k|m] [kolor włosów: blond|rude|siwe|czarne]\n");
    file_put_contents('php://stderr', "             php $argv[0] Karolina k blond\n");
    exit(1);
}

W zmiennej $numberOfAttributes ustaliliśmy, że będziemy podawać trzy cechy konkretnej osoby z portalu randkowego, czyli imię, płeć oraz kolor włosów.

Wywołanie programu z inną liczbą parametrów spowoduje wyświetlenie odpowiedniego komunikatu na konsoli.

Wartości podane do programu mogą być dowolne, jednak aby pomóc użytkownikowi w korzystaniu z programu, dostarczamy mu przykładowe dane, np. rudy kolor włosów. Nie definiujemy oddzielnego słownika, aby program był jak najprostszy.

W sekcji do uzupełnienia najpierw definiujemy nazwę centrali:

$exchangeName = 'randkomierz';

Następnie pobieramy cechy kandydatki/kandydata:

$arguments = array_slice($argv, 1);
$name = $arguments[0];
$attributes = array_slice($arguments, 1);

Potem definiujemy klucze dla naszych nagłówków odpowiadających przekazywanym do programu cechom:

$headersList = ['plec', 'kolor-wlosow'];

oraz wypełniamy nagłówki wartościami:

$headersValues = [];

for ($i=0; $i<count($headersList); $i++) {
    $headerName  = $headersList[$i];
    $headerValue = $attributes[$i];

    $headersValues[$headerName] = $headerValue;
}

Na koniec wysyłamy wiadomość do kuriera wiadomości:

$channel->exchange_declare($exchangeName, 'headers', false, false, false);

$message = new AMQPMessage($name);
$headers = new AMQPTable($headersValues);

$message->set('application_headers', $headers);

$channel->basic_publish($message, $exchangeName, '');

echo ' [x] [x] Wysłano profil ', $name, "\n";

Kompletny program randkoder.php wygląda następująco:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

$numberOfAttributes = 3;

if (count(array_slice($argv, 1)) < $numberOfAttributes) {
    file_put_contents('php://stderr', "Korzystanie: php $argv[0] [imię] [płeć: k|m] [kolor włosów: blond|rude|siwe|czarne]\n");
    file_put_contents('php://stderr', "             php $argv[0] Karolina k blond\n");
    exit(1);
}

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$exchangeName = 'randkomierz';

$arguments = array_slice($argv, 1);
$name = $arguments[0];
$attributes = array_slice($arguments, 1);
$headersList = ['plec', 'kolor-wlosow'];
$headersValues = [];

for ($i=0; $i<count($headersList); $i++) {
    $headerName  = $headersList[$i];
    $headerValue = $attributes[$i];

    $headersValues[$headerName] = $headerValue;
}

$channel->exchange_declare($exchangeName, 'headers', false, false, false);

$message = new AMQPMessage($name);
$headers = new AMQPTable($headersValues);

$message->set('application_headers', $headers);

$channel->basic_publish($message, $exchangeName, '');

echo ' [x] [x] Wysłano profil ', $name, "\n";

register_shutdown_function('shutdown', $channel, $connection);

Pozostało nam już tylko użyć napisanego „randkodera”:

Terminal1 $ php randkoder.js Miłosława k czarne
 [x] Wysłano profil 'Miłosława'

Terminal1 $ ./randkoder.js Anna k rude
 [x] Wysłano w nagłówkach profil 'Anna'

Program swatka.php

Gdy mamy już program nadawcy, pora zabrać się za program swatka.php, który będzie odfiltrowywał nam profile, którymi nie jesteśmy zainteresowani, a wyłapywał tylko te najciekawsze w oparciu o zdefiniowane przez nas cechy.

Ponieważ program nie jest skomplikowany, możemy przejść do analizy od razu całego kodu:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$exchangeName = 'randkomierz';

$channel->exchange_declare($exchangeName, 'headers', false, false, false);

list($queueName, ,) = $channel->queue_declare('', false, false, true, false);

$channel->queue_bind($queueName, $exchangeName, '', false, new AMQPTable([
    'x-match' => 'all',
    'plec' => 'k',
    'kolor-wlosow' => 'czarne'
]));

echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo '     Naciśnij CTRL+C aby zakończyć.', "\n";

$channel->basic_consume($queueName, '', false, true, false, false, function($message) {
    echo ' [x] ' . $message->body . ' pasuje do Ciebie' . "\n";
});

register_shutdown_function('shutdown', $channel, $connection);

while(count($channel->callbacks)) {
    $channel->wait();
}

Definiujemy tutaj centralę wiadomości z nagłówkami o tej samej nazwie co w programie nadawcy – „randkoder”, a następnie podpinamy do niej kolejkę, wskazując interesujące nas cechy.

W centralach z nagłówkami nie używamy kluczy routing’u, zatem w ich miejsce podajemy pusty łańcuch.
W powyższym przykładzie zastosowano dopasowanie „all”, dzięki czemu możemy lepiej zobaczyć, jak program działa z konkretnymi danymi.

Teraz, gdy uruchomimy program swatka.js, będziemy mogli zobaczyć, jak działa nasze filtrowanie:

$ php swatka.php
[*] Oczekiwanie na wiadomości w
     amq.gen-z3KMAIo8WpAW72DaFdqc5w
     Naciśnij CTRL+C, aby zakończyć.
 [x] 'Miłosława' pasuje do Ciebie

Nic nie stoi na przeszkodzie, aby zmienić tę opcję na „any”:

$  php swatka.php
 [*] Oczekiwanie na wiadomości w
     amq.gen-z1KRAIo8WpAW72DaFdqd3z
     Naciśnij CTRL+C, aby zakończyć.
 [x] 'Miłosława' pasuje do Ciebie
 [x] 'Anna' pasuje do Ciebie

Przykładowe zastosowania

Centrale komunikatów typu headers można stosować przy obsłudze:

  • przekazywania wyników pracy w wieloetapowych krokach działania aplikacji pomiędzy jej podsystemami (wzorzec blankietu routing’u – ang. routing slip pattern),
  • systemów dystrybuowania skompilowanych aplikacji (ang. builds) / systemów dostarczania oprogramowania w sposób ciągły (ang. continuous delivery) / systemów aktualizacji oprogramowania w sposób ciągły (ang. continuous deployment) / systemów ciągłej integracji dbające między innymi o jakość wytwarzanego oprogramowania poprzez uruchamianie testów automatycznych i integracyjnych (ang. continuous integration) wykonujące swoje zadania na bazie wielu parametrów (np. systemu operacyjnego, rodzaju architektury procesora, dostępności określonego pakietu).

Repozytorium z kodem źródłowym

Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-php

Tomasz Kuter

Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.