Tutoriale / Centrala komunikatów tematycznych

6fccb180-5fc0-4283-1ba2-060796a66187

Ten rodzaj filtrowania wiadomości jest alternatywą dla omówionej w poprzednim tutorialu centrali komunikatów z nagłówkami, jednak nie może on posiadać dowolnie dobranego klucza routing’u, tak jak to miało miejsce w przypadku centrali komunikatów bezpośrednich.

Ściśle określony klucz powiązania

Tutaj klucz powiązania musi składać się ze słów oddzielonych kropkami np. „gpw.notowania.spolki.kghm”, co można opisać następującym wyrażeniem regularnym:

^[A-Za-z0-9\-_*]+(\.[A-Za-z0-9\-_*]+)*$.

Do konkretnej kolejki zostaną dostarczone wszystkie wiadomości, dla których klucz będzie pasował.

Do jednej kolejki można przypisać kilka takich kluczy, tak jak miało to miejsce w przypadku centrali typu direct. W takim przypadku wystarczy, aby choć jeden klucz pasował, co jest podobne do filtrowania typu „any” w centralach typu headers.

Klucz routing'u składa się ze słów oddzielonych kropkami.

Pojedyncze słowa mogą zostać zastąpione przez:

  • gwiazdki „*” (ang. asterix) zastępujące dokładnie jedno słowo
  • oraz tzw. płotka lub krzyżyka „#” (ang. hash) zastępującego dowolną liczbę słów (zero lub więcej), w tym oddzielających je kropek.

Część klucza routingu, który chcemy zastąpić płotkiem np. „gpw.#”, nie może zaczynać się od kropki, co możemy zapisać następującym wyrażeniem regularnym:

^([^.][A-Za-z0-9\-_*]+)((\.[A-Za-z0-9\-_*]+)*)$

Centrale tematyczne mają wiele zastosowań.

Gdy zadanie stawiane przed centralą filtrującą komunikaty przewiduje wiele aplikacji subskrybujących je selektywnie, ten rodzaj routing'u jest jednym z najlepszych wyborów.

Świetnym przykładem zastosowania centrali tematycznych jest analiza notowań giełdowych pokazana poniżej:

Diagram przedstawiający centralę komunikatów tematycznych na przykładzie przetwarzania informacji giełdowych

Na diagramie możemy zobaczyć:

  • przykład filtrowania składającego się z 4 słów: gpw.notowania.spolki.kghm
  • przykład użycia „gwiazdki”, co pozwala na śledzenie wszystkich informacji związanych z notowaniami spółek na rynku New Connect warszawskiej GPW: gpw.new-connect.*
  • przykład użycia „płotka”, co pozwala śledzić wszystkie dane związane z warszawską Giełdą Papierów Wartościowych: gpw.#

Warto tutaj zwrócić uwagę na przemyślaną hierarchię struktury klucza routing’u, tak aby dodanie kolejnych funkcjonalności nie wiązało się z całkowitym jej przebudowaniem, a w przypadku dużych systemów może się to okazać bardzo trudne lub nawet niewykonalne.

Program notowania-gpw-nadawca.php

Podobnie jak w przypadku pozostałych przykładów programów-wydawców tak i tutaj bazujemy na przykładzie programu fabryka.php:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// nowe instrukcje

register_shutdown_function('shutdown', $channel, $connection);

Na początku sprawdzamy czy program wywołano z parameterami. Jeśli tego nie zrobiono wyświetlamy informację o tym jak korzystać z programu:

if (count(array_slice($argv, 1)) < 2) {
    file_put_contents(
        'php://stderr',
        "Korzystanie: php $argv[0] gpw.notowania.spolki.kghm \"Raport skonsolidowany za IV kwartał 2015 roku!\"\n"
    );
    exit(1);
}

później deklarujemy nazwę centrali komunikatów tematycznych:

$exchangeName = 'notowania-gpw';

a następnie odczytujemy dodatkowe parametry przesłane z linii poleceń:

$args = array_slice($argv, 1);
$key  = isset($args[0]) ? $args[0] : 'gpw.info.anonymous';

if (count($args) > 0) {
    $msg  = implode(' ', array_slice($args, 1));
} else {
    $msg  = 'Raport skonsolidowany za IV kwartał 2015 roku!';
}

W przypadku ich braku do centrali komunikatów przesłana zostanie wiadomość „Raport skonsolidowany za IV kwartał 2015 roku!” z kluczem powiązania „gpw.info.anonymous”.

Kolejną rzeczą jest określenie typu centrali komunikatów i jej charakteru:

$channel->exchange_declare($exchangeName, 'topic', false, false, false);

Na koniec wysyłamy wiadomość do kuriera wiadomości:

$message = new AMQPMessage($msg);

$channel->basic_publish($message, $exchangeName, $key);

echo ' [x] Wysłano ', $msg, "\n";

Kompletny program notowania-gpw-nadawca.php wygląda następująco:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

if (count(array_slice($argv, 1)) < 2) {
    file_put_contents(
        'php://stderr',
        "Korzystanie: php $argv[0] gpw.notowania.spolki.kghm \"Raport skonsolidowany za IV kwartał 2015 roku!\"\n"
    );
    exit(1);
}

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$exchangeName = 'notowania-gpw';

$args = array_slice($argv, 1);
$key  = isset($args[0]) ? $args[0] : 'gpw.info.anonymous';

if (count($args) > 0) {
    $msg  = implode(' ', array_slice($args, 1));
} else {
    $msg  = 'Raport skonsolidowany za IV kwartał 2015 roku!';
}

$channel->exchange_declare($exchangeName, 'topic', false, false, false);

$message = new AMQPMessage($msg);

$channel->basic_publish($message, $exchangeName, $key);

echo ' [x] Wysłano ', $msg, "\n";

register_shutdown_function('shutdown', $channel, $connection);

Program notowania-gpw-subskrybent.php

Program subskrybenta wygląda podobnie do programu subskrybenta logów dla centrali komunikatów typu direct.

Na samym początku sprawdzamy, czy użytkownik – uruchamiając program z terminala – podał wymagany co najmniej jeden klucz powiązania, a jeśli nie – wyświetlamy mu odpowiedni komunikat:

$routingKeys = array_slice($argv, 1);

if (empty($routingKeys)) {
    file_put_contents('php://stderr', "Korzystanie: $argv[0] gpw.notowania.spolki.kghm\n");
    file_put_contents('php://stderr', "             $argv[0] gpw.new-connect.*\n");
    file_put_contents('php://stderr', "             $argv[0] gpw.#\n");
    file_put_contents('php://stderr', "             $argv[0] gpw.notowania.spolki.kghm gpw.notowania.spolki.orlen\n");

    exit(1);
}

Następnie definiujemy nazwę centrali komunikatów i określamy jej typ:

$exchangeName = 'notowania-gpw';

$channel->exchange_declare($exchangeName, 'topic', false, false, false);

Na końcu przypinamy do centrali tymczasową kolejkę, która będzie powiązana z określonymi przez użytkownika kluczami routing'u:

list($queueName, ,) = $channel->queue_declare('', false, false, true, false);

foreach ($routingKeys as $routingKey) {
    $channel->queue_bind($queueName, $exchangeName, $routingKey);
}

echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo '     Naciśnij CTRL+C aby zakończyć.', "\n";

$channel->basic_consume($queueName, '', false, true, false, false, function($message) {
    echo ' [x] ' . $message->delivery_info['routing_key'] . ': ' . $message->body . "\n";
});

Kompletny program notowania-gpw-nadawca.js wygląda następująco:

require_once __DIR__ . '/vendor/autoload.php';
require_once __DIR__ . '/shutdown.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Wire\AMQPTable;

$routingKeys = array_slice($argv, 1);

if (empty($routingKeys)) {
    file_put_contents('php://stderr', "Korzystanie: $argv[0] gpw.notowania.spolki.kghm\n");
    file_put_contents('php://stderr', "             $argv[0] gpw.new-connect.*\n");
    file_put_contents('php://stderr', "             $argv[0] gpw.#\n");
    file_put_contents('php://stderr', "             $argv[0] gpw.notowania.spolki.kghm gpw.notowania.spolki.orlen\n");

    exit(1);
}

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');

$channel = $connection->channel();

$exchangeName = 'notowania-gpw';

$channel->exchange_declare($exchangeName, 'topic', false, false, false);

list($queueName, ,) = $channel->queue_declare('', false, false, true, false);

foreach ($routingKeys as $routingKey) {
    $channel->queue_bind($queueName, $exchangeName, $routingKey);
}

echo sprintf(' [*] Oczekiwanie na wiadomości w %s.', $queueName), "\n";
echo '     Naciśnij CTRL+C aby zakończyć.', "\n";

$channel->basic_consume($queueName, '', false, true, false, false, function($message) {
    echo ' [x] ' . $message->delivery_info['routing_key'] . ': ' . $message->body . "\n";
});

register_shutdown_function('shutdown', $channel, $connection);

while(count($channel->callbacks)) {
    $channel->wait();
}

Przykładowe zastosowania

Centrale komunikatów typu topic można stosować przy:

  • dostarczaniu wiadomości do wybranych lokalizacji geograficznych, np. punktów sprzedaży;
  • przetwarzaniu procesów w tle z wykorzystaniem wielu aplikacji-robotników odpowiedzialnych za konkretne zestawy zadań;
  • śledzeniu notowań giełdowych (w tym różnych operacji finansowych, np. wzrost/spadek ceny akcji);
  • obsłudze aktualności, włączając ich kategoryzację i tagowanie (poprzez skupienie się np. na informacjach sportowych – piłkarskiej lidze w Hiszpanii: Primera División, czy wyników konkretnego zespołu, np. AS Roma);
  • zarządzaniu orkiestrowe (ang. computing orchestration) złożonymi systemami informatycznymi poprzez automatyczne przydzielanie zasobów, koordynację działania, kontrolę działania, pośredniczenie w wymianie wiadomości/informacji (w chmurze – ang. cloud);
  • systemach typu ciągłego dostarczania oprogramowania (ang. continuous deployment) tworzące tzw. buildy aplikacji w oparciu o różne parametry, gdzie pojedyncza aplikacja tzw. buildera jest zdolna do pracy tylko z jedną, konkretną konfiguracją:
    • różne architektury: np. x86/i586, AMD64/IA-64, ARM, MIPS, PowerPC, SPARC;
    • systemy operacyjne: np. Windows, Linuks, MacOS
    • wersje poszczególnych systemów operacyjnych: np. Windows 7, Windows 10, Debian 7, Ubuntu 14.04;
    • czy np. pod poszczególne dystrybucje Linuksa: np. Debian, RedHat, OpenSUSE, Ubuntu.

Repozytorium z kodem źródłowym

Kod programów omawianych w tym tutorialu znajduje się pod adresem:
https://github.com/RattiQue/tutorials-pl-php

Tomasz Kuter

Web Developer z ponad 8-letnim, komercyjnym doświadczeniem w tworzeniu stron i aplikacji internetowych oraz paneli administracyjnych w PHP, JavaScript, HTML i CSS.
Aktualnie zainteresowany architekturą mikroserwisów, które umożliwiają budowanie skalowalnych aplikacji internetowych.