Gestione dei messaggi PsqlMessages


Il sistema di gestione dei messaggi PsqlMessages si basa fondamentalmente sul meccanismo di messaggi di PostgreSQL (LISTEN/NOTYFY).

Per uniformare l'interfaccia e per garantire sicurezza è disponibile una libreria di funzioni PostgreSQL orientata alla comunicazione asincrona tra più processi punto-punt (RPC) e punto-multipunto (Topic).

Performance

Le performance medie registrate utilizzando un PC virtuale (4 x 5900 Bobomips) sono le seguenti:

dimensione
pacchetto
N° pacchetti
ogni secondo
1 byte 3.500
1 Kbyte 3.400
10 Kbyte 1.900
50 Kbyte 800
100 Kbyte 600

Caratteristiche

  • configurazione dei nodi e delle code predeterminata (tabella)
  • i nodi in ascolto si possono registrare anche prima dei nodi che trasmettono
  • funzionalità RPC tra nodi sia punto-punto che punto-multipunto
  • ricezione degli eventi in modalità asincrona
  • utilizzo della memoria condivisa /run/shm/ per la trasmissione di messaggi di dimensione superiore a 8000 caratteri (limite in PosgreSQL del payload nella trasmissione con NOTIFY)

Sicurezza

  • ogni nodo riceve gli eventi con un unico indirizzo riservato il cui nome è comunicato solo al processo che lo utilizza
  • configurazione dei nodi, delle code e relativi livelli di accesso dipendenti dagli ruoli PostgreSQL

Implementazione

La libreria è sviluppata nel linguaggio Pl/PerlU integrato nel server PostgreSQL.
Le funzioni di interfaccia, utilizzabili dai client postgresql, sono definite nel linguaggio SQL.

I nodi client connessi al database PostgreSQL possono abilitare le funzionalità della libreria richiamando la funzione init indicando il nome attribuito al nodo, che deve essere univoco.

Per ulteriori dettagli si rimanda alla consultazione del codice sergente.

Tabella delle autorizzazioni

Nella tabella messages.auth sono presenti i seguenti campi:

Campo Descrizione
id Chiave primaria univoca
role Utente postgres autorizzato
ident Nome del nodo client
type Tipo di messaggio: Rpc o Topic
dest Destinazione del messaggio.
Può essere in nome di un nodo (ident) o il nome di una coda (topic)
mode W:trasmissione R:ricezione
timeout Timeout (sec) di conservazione del messaggio nella memoria condivisa
(solo per i messaggi più grandi di 8000 byte)
Di seguito alcuni esempi:

id role ident type dest mode timeout descrizione
1 user1 NODE1 Topic MYTOPIC W 10 Autorizza il nodo NODE1 connesso con utente user1 ad inviare messaggi del topic MYTOPIC. 10 sec di timeout.
2 user2 NODE2 Topic MYTOPIC RW 10 Autorizza il nodo NODE2 connesso con utente user2 a ricevere e trasmettere i messaggi del topic MYTOPIC. 10 sec di timeout.
3 user3 NODE3 Topic MYTOPIC R 10 Autorizza il nodo NODE3 connesso con utente user3 ad ricevere i messaggi del topic coda MYTOPIC. 10 sec di timeout.
4 user1 NODE1 Rpc NODE2 W 5 Autorizza il nodo NODE1 connesso con utente user1 ad inviare messaggi al nodo NODE2. 5 sec di timeout.
5 user2 NODE2 Rpc NODE1 W 5 Autorizza il nodo NODE2 connesso con utente user2 ad inviare messaggi al nodo NODE1. 5 sec di timeout.
Se il nodo trasmittente indica anche un token i client sono autorizzati ad inviare un messaggio di risposta al mittente.

Messaggi asincroni

Il nodo client riceve le notifiche dalla connessione PostgreSQL sempre dallo stesso canale il cui nome è restituito dalla funzione messages.init.

Il payload dei messaggi PostgreSQL è costituito da un blocco dati nel formato JSON seguito dal messaggio inviato dal nodo client (BODY), separato dal carattere \n (Carriage Return).

Se il messaggio complessivo supera la dimensione consentita (8000 byte) sarà presente solo il blocco dati JSON. Il messaggio del nodo client (BODY) in tal caso viene recuperato richiamando la funzione messages.get_data.

Il blocco dati JSON è costituito da una Hash contenente i seguenti campi:

Campo Descrizione
Type Tipo di messaggio: Rpc Topic Return Sys
From Nodo mittente del messaggio
To Destinazione (nodo o topic)
Size Dimensione in byte del messaggio
Dkey Se presente indica la chiave di riferimento del blocco dati per il recupero del messaggio (BODY)
Token Se presente e il tipo messaggio non é Return indica che il mittente richiede l'invio di una risposta
Err Codice errore (vedi tabella più sotto)
Errstr Descrizione dell'errore
Alcuni esempi:

{"Type":"Topic","Size":15,"To":"TOPIC","From":"SENDER"}
Messaggio corto

{"Type":"Rpc","Size":9,"To":"RECEIVER","Token":"577298d7 5 TOKEN_RISPOSTA","From":"SENDER"}
Parametro

{"Type":"Return","Size":12364,"Dkey":"TOPIC.10.10731zoslhltsftmmoiupkfcsghdcsvxertvh", "To":"SENDER","Token":"TOKEN_RISPOSTA","From":"RECEIVER"}

{"Type":"Topic","Size":15,"To":"TOPIC","Token":"84b8f516 10 TOKEN_TOPIC2","From":"SENDER"}
Altro parametro

{"Type":"Sys", "Evt":"topic_listen", "Ident":"NODE3", "Topic":"MYTOPIC"}

Funzioni

Le funzioni sono tutte appartenenti allo schema messages

Tutte le funzioni ritornano una hash JSON. Nella hash è sempre presente il parametro Err .

Funzione Esempio Json restituito dalla funzione Descrizione
init('NODE') {"Channel":"node2_8705nppfemgbculsjkyr",
"Already":1,"Err":"0"}
Registrazione del nodo client "NODE"
La funzione va chiamata per prima. Restituisce il nome del canale al quale verranno inviati i messaggi PostgreSQL. Ritorna Already:1 se il client era già registrato.
deinit('NODE') {"Err":"0"} Deregistrazione del nodo client "NODE"
La funzione va chiamata per ultima. Ritorna Already:1 se il client non è registrato.
topic_auth('TOPIC') {"Err":"0","Notifiers":["NODE2", "NODE3"],
"Listeners":["RECEIVER"]}
Restituisce l'elenco dei client autorizzati all'ascolto e alla trasmissione
topic_status('TOPIC') {"Err":"0","Notifiers":["NODE2"],
"Listeners":["RECEIVER"]}
Restituisce l'elenco dei client attualmente in ascolto e in trasmissione
topic_notify('TOPIC') {"Err":"0","Already":1} Registra il client a trasmettere i messaggi del topic TOPIC. Ritorna Already:1 se il client era già registrato.
topic_unnotify('TOPIC') {"Err":"0"} Disabilita il client alla trasmissione dei messaggi del topic TOPIC. Ritorna Already:1 se il client non era registrato.
topic_listen('TOPIC') {"Err":"0","Already":1} Registra il client a ricevere i messaggi del topic TOPIC. Ritorna Already:1 se il client era già registrato.
topic_unlisten('TOPIC') {"Err":"0"} Disabilita il client alla ricezione dei messaggi del topic TOPIC. Ritorna Already:1 se il client non era registrato.
topic_send('TOPIC', 'MESSAGE', 'TOKEN') {"Err":"0"} Trasmissione di un messaggio; il Token è facoltativo; se indicato i client possono restituire un messaggio di risposta
get_data('JSON') {"Err":"0","Type":"Topic","Size":8021,"To":"TOPIC","From":"SENDER"}
DATADATA....DATADATA
Restituisce il messaggio completo del BODY, JSON è il messaggio Json ricevuto dal client e contenuto nel "payload" del messaggio PostgreSQL
rpc('NODE', 'TOKEN', 'PARAM') {"Err":"0"} Trasmette PARAM al nodo NODE.
return('NODE', 'TOKEN', 'PARAM') {"Err":"0"} Restituisce al nodo NODE il parametro PARAM con token TOKEN

Errori

Le funzioni restituiscono le eccezioni aggiungendo nella risposta in formato Hash JSON due campi Err e Errstr:

Err Errstr Funzioni Descrizione
0   * Nessun errore. Il parametro Err="0" è sempre indicato nella risposta priva di errori.
1 destination is missing * Il client indicato non è definito nella configurazione
2 cannot use $type to $dest * non si è autorizzati ad usare la destinazione $dest per inviare o ricevere messaggi di tipo $type
3 cannot auth $mode with $type $dest * non si è autorizzati ad usare la destinazione $dest per trasmettere ($mode==W) o ricevere ($mode=R) messaggi di tipo $type
4 cannot init without identity init chiamata della funzione messages.init con identità vuota
5 there are no permissions for ident $ident init non sono presenti permessi associati al nodo di nome $ident
6 Client $ident exists with same pid init Warning: client $ident già connesso
7 Client $ident already exists with pid $pid and user1 $role init Avverte che c'è già un client connesso con diverso utente $role
8 Client $ident already exists with pid $pid init Avverte che c'è già un client connesso
9 Topic $topic is not existent topic_* Il topic $topic non è stato definito
10 cannot listeners for Topic $topic topic_send Non ci sono nodi client attualmente in ascolto
11 You are not listening from topic $topic get_data Tentativo di prelievo di un blocco dati da parte di un client che non è in ascolto
12 Message $dkey expired get_data Il messaggio nella memoria condivisa è scaduto
13 Message $dkey not exists get_data Il messaggio non è più presente
14 Token empty rpc return Campo Token vuoto
15 Ident empty rpc *_send return Campo identità del destinatario vuoto
16 Client $ident don't exist rpc *_send return Identità del destinatario sconosciuta
17 Client $ident is not connected rpc *_send return Il client non è attualmente connesso
18 token_tm_crypt malformed return Il campo Token restituito dal client è mal formato
19 Client $ident don't exist return Il client mittente non esiste
20 $ident cannot return to $dest with token $token return Il client $ident non è autorizzato a inviare la risposta al mittente $dest
21 $ident cannot send to $topic prior to call topic_notify topic_send Il client $ident deve attivare la trasmissione al topic $topic (funzione messages.topic_notify) prima di inviare un messaggio

Messaggi di sistema

I nodi client collegati ricevono dei messaggi di segnalazione da PsqlMessages con le stesse modalità con le quali ricevono i messaggi dagli altri nodi client.
Ogni client connesso deve ricevere gli eventi di sistema relativi ai client e topic su cui quel cliente è autorizzato a comunicare.

Il sistema valuta i cambiamento di stato che determinano l'invio degli eventi ad ogni chiamata delle funzioni utilizzabili dai client.
Per motivi di efficenza, nel caso di chiamate alle funzioni rpc, return, topic_send e get_data, la valutazione viene fatta soltanto se è trascorso almeno un secondo dalla precedente valutazione.

I messaggi di sistema hanno il campo Type = Sys e il campo From assente. Sono riportati i seguenti campi aggiuntivi:

Campo Descrizione
Evt Identificativo dell'evento
Ident Identità nodo client
Topic Nome del topic

I messaggi inviati ai nodi client dal sistema possono contenere le seguenti segnalazioni:

Identificativo eventoSorted descending Esempio di messaggio trasmesso Descrizione
topic_unnotify {"Type":"Sys", "Evt":"topic_unnotify", "Ident":"CLIENT", "Topic":"TOPIC"} Il client CLIENT ha interrotto la trasmissione al topic TOPIC
topic_unlisten {"Type":"Sys", "Evt":"topic_unlisten", "Ident":"CLIENT", "Topic":"TOPIC"} Il client CLIENT ha interrotto l'ascolto del topic TOPIC
topic_notify {"Type":"Sys", "Evt":"topic_notify", "Ident":"CLIENT", "Topic":"TOPIC"} Il client CLIENT si è messo in trasmissione al topic TOPIC
topic_none_notify {"Type":"Sys", "Evt":"topic_none_notify", "Topic":"TOPIC"} Nessun client in trasmissione al topic TOPIC.
Trasmesso dopo l'evento topic_unnotify dell'ultimo client che abbandona il topic TOPIC.
topic_none_listen {"Type":"Sys", "Evt":"topic_none_listen", "Topic":"TOPIC"} Nessun client in ascolto del topic TOPIC.
Trasmesso dopo l'evento topic_unlisten dell'ultimo client che abbandona il topic TOPIC.
topic_listen {"Type":"Sys", "Evt":"topic_listen", "Ident":"CLIENT", "Topic":"TOPIC"} Il client CLIENT si è messo in ascolto del topic TOPIC
disconnect {"Type":"Sys", "Evt":"disconnect", "Ident":"CLIENT"} Il client CLIENT si è disconnesso
crash {"Type":"Sys", "Evt":"crash", "Ident":"CLIENT"} Client CLIENT disconnesso in modo anomalo
connect {"Type":"Sys", "Evt":"connect", "Ident":"CLIENT"} Il client CLIENT si è connesso

Debug

Per attivare i log cambiare il livello dei LOG del client con l'istruzione SQL.
Per attivare i log sui comandi:
set client_min_messages = DEBUG1;
Per attivare i log anche sugli eventi di sistema:
-- Livello DEBUG2 o superiore
set client_min_messages = DEBUG2;

Prove sul campo

Di seguito alcuni esempi di conversazioni tra due nodi, utilizzando psql

  • Configurare nodi e destinazioni usando un utente amministratore

insert into messages.auth (role, ident, dest, type, mode, timeout) values ('user1', 'NODE1', 'MYTOPIC', 'Topic', 'W', 60);
insert into messages.auth (role, ident, dest, type, mode, timeout) values ('user1', 'NODE1', 'NODE2', 'Rpc', 'W', 60);
insert into messages.auth (role, ident, dest, type, mode, timeout) values ('user2', 'NODE2', 'MYTOPIC', 'Topic', 'R', 60);
insert into messages.auth (role, ident, dest, type, mode, timeout) values ('user2', 'NODE2', 'NODE1', 'Rpc', 'W', 60);
Nota: Il timeout è impostato a 60 secondi per dar tempo all'utente di recuperare i messaggi lunghi prima della loro scadenza.

  • Connettersi al database, utente !"user1" con due distinti terminali (stesso PC o da PC differenti)

user1=# select messages.init('NODE1');
                   init                    
-------------------------------------------

{"Err":"0","Channel":"user1_17662fnpfvpnkyhxatjfu"}
(1 row)
user2=# select messages.init('NODE2');
                    init                    
--------------------------------------------

{"Err":"0","Channel":"user2_17673gyjexfwdzgcpuncg"}
(1 row)

  • Registro NODE2 a ricevere gli eventi dal Topic MYTOPIC
user2=# select messages.topic_listen('MYTOPIC');
 topic_listen
--------------

{"Err":"0"}
(1 row)

  • Verifico da NODE1 chi è in ascolto nel Topic MYTOPIC
user1=# select messages.topic_status('MYTOPIC');
                   topic_status                   
------------------------------------------------

{"Err":"0","Notifiers":["NODE2","NODE1"],"Listeners":["NODE2"]}
(1 row)

  • Registro NODE1 ad osservare la coda MYTOPIC
user1=# select messages.topic_listen('MYTOPIC');
ATTENZIONE:  WebHome Messages err:3 cannot auth R with Topic MYTOPIC

CONTESTO: funzione PL/Perl "topic_listen"
                        topic_listen                        
------------------------------------------------------------

{"Errstr":"cannot auth R with Topic MYTOPIC","Err":"3"}
(1 row)

  • Da NODE1 trasmetto un piccolo messaggio
user1=# select messages.topic_send('MYTOPIC','{"MyEvent":"Esempio di messaggio nel formato JSON"}');
  topic_send  
--------------

{"Err":"0"}
(1 row)

  • Verifico in NODE2 se la notifica è arrivata
user2=# select true;
 bool
------

t
(1 row)

Asynchronous notification "user2_17673gyjexfwdzgcpuncg" with payload "{"Type":"Topic","Size":51,"To":"MYTOPIC","From":"NODE1"}
{"MyEvent":"Esempio di messaggio nel formato JSON"}" received from server process with PID 17300.

  • Da NODE1 trasmetto un lungo messaggio
user1=# select messages.topic_send('MYTOPIC', repeat(E'Messaggio lungo di 8000 caratteri ASCII\n', 200));
  topic_send  
--------------

{"Err":"0"}
(1 row)

  • E verifico
user2=# select true;
 bool
------

t
(1 row)

Asynchronous notification "user2_17673gyjexfwdzgcpuncg" with payload "{"Type":"Topic","Size":8000,"To":"MYTOPIC","Dkey":"60.17662ifehivuyjlvxkmbjkzivxfzihctjlsgz","From":"NODE1"}" received from server process with PID 17662.

  • Recupero il BODY del messaggio fornendo come parametro il payload della notifica ricevuta
user2=# select messages.get_data('{"Type":"Topic","Size":8000,"To":"MYTOPIC","Dkey":"60.17662ifehivuyjlvxkmbjkzivxfzihctjlsgz","From":"NODE1"}');
                           get_data                            
---------------------------------------------------------------

{"Err":"0","Type":"Topic","Size":8000,"To":"MYTOPIC","From":"NODE1"}+
 Messaggio lungo di 8000 caratteri ASCII                      +
=== altre 188 righe ===
 Messaggio lungo di 8000 caratteri ASCII                      +

(1 row)

  • Da NODE2 invio una chiamata RPC
user2=# select messages.rpc('NODE1', 'MyToken', '{"Param1":1,"Param2":2, "Param3":"tre"}');
     rpc      
--------------

{"Err":"0"}
(1 row)

  • Controllo su NODE1 l'arrivo della notifica
user1=# select true;
 bool
------

t
(1 row)

Asynchronous notification "user1_17662fnpfvpnkyhxatjfu" with payload "{"Type":"Rpc","Size":39,"To":"NODE1","Token":"ad29dde3 60 MyToken ","From":"NODE2"}
{"Param1":1,"Param2":2, "Param3":"tre"}" received from server process with PID 17673.

  • Inoltro la risposta
user1=# select messages.return('NODE2', 'ad29dde3 60 MyToken ', '{"MyResult":"Il mio risultato"}');
    return    
--------------

{"Err":"0"}
(1 row)

  • In NODE2 la risposta è arrivata ...
psqlm=# select true;
 bool
------

t
(1 row)

Asynchronous notification "user2_17673gyjexfwdzgcpuncg" with payload "{"Type":"Return","Size":31,"To":"NODE2","Token":"MyToken","From":"NODE1"}
{"MyResult":"Il mio risultato"}" received from server process with PID 17662.

  • Interrompo la connessione del nodo NODE1 e provo ad inviare una chiamata RPC da NODE2
user2=# select messages.rpc('NODE1', 'MyToken', 'Funzionerà?');
ATTENZIONE:  WebHome Messages err:17 Client NODE1 is not connected

CONTESTO: funzione PL/Perl "rpc"
                          rpc                          
-------------------------------------------------------

{"Errstr":"Client NODE1 is not connected","Err":"17"}
(1 row)

File interessati nel server

  • /opt/psqlmessages/deploy/psqlmessages.sql - Script di inizializzazione
  • /opt/psqlmessages/lib/perl/PsqlMessages/FileShared.pm - Classe per gestire gli accesi alla memoria condivisa
  • /opt/psqlmessages/lib/perl/PsqlMessages/Messages.pm - Classe per gestire i messaggi
  • /run/shm/PsqlMessages.Clients - Memoria condivisa con le informazioni riguardanti i client collegati
  • /run/shm/PsqlMessages.Topic.* - Memoria condivisa con le informazioni riguardanti i Topic
  • /run/shm/PsqlMessages.Mess.MYTOPIC.60.17662ajalhrhvsdgcwjywnizkflqcxswhhjsc - Esempio di messaggio lungo del Topic "MYTOPIC" con timeout 60 secondi

File rilasciati

La libreria è disponibile con due licenze:
  • Affero G.P.L. rel.1
  • Licenza commerciale (Contattateci per conoscere condizioni e prezzi)

Il codice può essere scaricato dal server SVN:

Installazione

sudo apt-get install libjson-xs-perl
sudo mkdir -p /opt/psqlmessages
cd ~
svn checkout --username guest https://www.leader.it/svn/psqlmessages/psqlmessages_v01
cd psqlmessages_v01
sudo mv ./lib /opt/psqlmessages/
sudo chown -R root.root /opt/psqlmessages
echo 'CREATE SCHEMA messages;' | su -c psql\ YOURDATABASE postgres
cat ./deploy/psqlmessages.sql | su -c psql\ YOURDATABASE postgres
cd ~
rm -r psqlmessages_v01

Codice client di esempio

Di seguito un esempio di codice per trasmettere ed inviare messaggi, con il quale valutare le performance.

# Esempio di nodo client in trasmissione
# Parametri:
#    nome o indirizzo server postgresql
#    nome del database
#    nome utente
#    password
#    nome client (ident)
#    nome del topic
#    numero di messaggi da trasmettere
#    dimensione in byte del messaggio
#    intervallo in microsecondi tra un messaggio e il successivo
./sender.pl 127.0.0.1 my_database myuser mypassword SENDER TOPIC 1000 100 100

# Esempio di nodo client in ascolto
# Parametri:
#    nome o indirizzo server postgresql
#    nome del database
#    nome utente
#    password
#    nome client (ident)
#    nome del topic
./receiver.pl 127.0.0.1 my_database myuser mypassword RECEIVER TOPIC

Alternative

  • PgQ Gestore code di Skytools
    • non utilizza LISTEN/NOTIFY (poco reattiva)
    • non implementa RPC
    • adatta per la elaborazione in lotti

  • pg-message-queue
    • gestione permessi limitato
    • nome dei canali condivisi tra i client (scarsa sicurezza)
    • non implementa RPC
    • dimensione messaggi < 8000 byte

PsqlMessages Web Utilities

Versione pagina: r42 - 14 Mar 2019, GuidoBrugnara
Questo sito utilizza FoswikiCopyright (©) Leader.IT - Italy P.I. IT01434390223 Informativa privacy & uso dei cookies