Management of messages PsqlMessages


The managing system of the messages PsqlMessage is essentially based on the mechanism of messaging of PostgreSQL (LISTEN/NOTIFY).

To standardize the interface and to ensure safety it's available a library of functions PostgreSQL which is oriented to the asynchronous communication between multiple processes point-point (RPC) and point-multipoint (Topic) .

Performance

The average recorded performances using a virtual PC (4 x 5900 Bobomips) are as follows:

packet
size
N° of packets
every sec
1 byte 3.500
1 Kbyte 3.400
10 Kbyte 1.900
50 Kbyte 800
100 Kbyte 600

Characteristics

  • configuration of nodes and predetermined queues ( table )
  • listening nodes can be recorded even before the transmitting nodes
  • RPC functionality between nodes point-point and point-multipoint
  • reception of events in asynchronously mode
  • usage of shared memory /run/shm/ for the transmission of messages larger than 8000 characters (limit PosgreSQL payload transmission with NOTIFY)

Safety

  • each node receives events with a unique private address whose name is provided only to the process that uses it
  • node configuration, queues and relative access levels dependent from roles PostgreSQL

Implementation

The library is developed in Pl/PerlU language, built into the server PostgreSQL.
The interface functions, used by postgresql client, are defined with the SQL language.

Client nodes connected to the database PostgreSQL can enable the functionalities of the library by calling the function init indicating the name given to the node , which must be unique.

For more details please consult the source code.

Authorizations table

In the messages.auth table there are the following fields

Field Description
id Primary unique key
role Authorized user postgres
ident Name of client node
type Type of message: Rpc or Topic
dest Message destination .
It can Be in the name of a node (ident) or the name of a queue (topic)
mode W:transmission R:reception
timeout Timeout (sec) of conversation of message in shared memory
(only for messages larger than 8000 bytes)
Here are some examples:

id role ident type dest mode timeoutSorted ascending description
4 user1 NODE1 Rpc NODE2 W 5 Authorizes node NODE1 which is connected to user user1 to send messages to NODE2. 5 sec of timeout.
5 user2 NODE2 Rpc NODE1 W 5 Authorizes node NODE2 which is connected to user user2 to send messages to node NODE1. 5 sec timeout
1 user1 NODE1 Topic MYTOPIC W 10 Authorizes node NODE1 which is connected to user user1 to send messages of topic MYTOPIC. 10 sec of timeout.
2 user2 NODE2 Topic MYTOPIC RW 10 Authorizes node NODE2 which is connected to user user2 to receive and transmit messages of topic MYTOPIC. 10 sec of timeout.
3 user3 NODE3 Topic MYTOPIC R 10 Authorizes node NODE3 which is connected to user user3 to receive messages of topic MYTOPIC. 10 sec of timeout.
If the transmitting node indicates even one token clients are authorized to send a reply message to the sender.

Asynchronous messages

The client node receives the notifications from the connection PostgreSQL always by the same channel whose name is returned by the function messages.init.

The payload of messages PostgreSQL consists of a block of data in JSON format followed by the message sent by the client node (BODY), separated by the character \n (Carriage Return).

If the overall message exceeds the size allowed (8000 bytes) will be present only the data block JSON. The message of the client node (BODY) in that case is retrieved by calling the function messages.get_data.

The data Block JSON is constituted by a Hash containing the following fields :

Field Description
Type Type of message: Rpc Topic Return Sys
From Message's sender node
To Destination (node or topic)
Size Size in bytes of message
Dkey If present, it indicates the reference key of the data block for recovering the message (BODY)
Token If present and the message type is not Return , it indicates that sender requires an answer
Err Error Code (see table below)
Errstr Error description
Some examples

{"Type":"Topic","Size":15,"To":"TOPIC","From":"SENDER"}
Messaggio corto

{"Type":"Rpc","Size":9,"To":"RECEIVER","Token":"577298d7 5 TOKEN_RISPOSTA","From":"SENDER"}
Parametro

{"Type":"Return","Size":12364,"Dkey":"TOPIC.10.10731zoslhltsftmmoiupkfcsghdcsvxertvh", "To":"SENDER","Token":"TOKEN_RISPOSTA","From":"RECEIVER"}

{"Type":"Topic","Size":15,"To":"TOPIC","Token":"84b8f516 10 TOKEN_TOPIC2","From":"SENDER"}
Altro parametro

{"Type":"Sys", "Evt":"topic_listen", "Ident":"NODE3", "Topic":"MYTOPIC"}

Functions

All functions belong to the scheme messages

All functions return a JSON hash. In the hash is always present the parameter Err .

Function Json returned by function example Description
init('NODE') {"Channel":"node2_8705nppfemgbculsjkyr",
"Already":1,"Err":"0"}
Registration of client node "NODE".
The fuction should be called first.It returns the name of the channel to which the PostgreSQL messages will be sent. Returns Already:1 if the client was already registered.
deinit('NODE') {"Err":"0"} Deregistration of client node "NODE".
The fuction should be called last. Returns Already:1 if the client was not registered.
topic_auth('TOPIC') {"Err":"0","Notifiers":["NODE2", "NODE3"],
"Listeners":["RECEIVER"]}
Returns the list of authorized client that can listening and trasmitting.
topic_status('TOPIC') {"Err":"0","Notifiers":["NODE2"],
"Listeners":["RECEIVER"]}
Returns the list of listening and trasmitting clients.
topic_notify('TOPIC') {"Err":"0","Already":1} The client to transmitting messages of topic TOPIC is registered . Returns Already:1 if the client was already registered.
topic_unnotify('TOPIC') {"Err":"0"} Disables the client from transmiting messages of topic TOPIC. Returns Already:1 if client was not registered.
topic_listen('TOPIC') {"Err":"0","Already":1} The client to reiceive messages of topic TOPIC is registered . Returns Already:1 if the client was already registered.
topic_unlisten('TOPIC') {"Err":"0"} Disables the client from receiving messages of topic TOPIC. Returns Already:1 if client was not registered.
topic_send('TOPIC', 'MESSAGE', 'TOKEN') {"Err":"0"} Transmission of a message; the Token is optional; if indicated, clients can return a reply message.
get_data('JSON') {"Err":"0","Type":"Topic","Size":8021,"To":"TOPIC","From":"SENDER"}
DATADATA....DATADATA
Returns the full message of the BODY , JSON is the message Json received from the client , and contained in the "payload" of the message PostgreSQL.
rpc('NODE', 'TOKEN', 'PARAM') {"Err":"0"} Transmit PARAM to the node NODE.
return('NODE', 'TOKEN', 'PARAM') {"Err":"0"} Returns the parameter PARAM with token TOKEN to node NODE

Errors

The functions return the exceptions including in the answer of format Hash JSON two fields Err and Errstr

Err Errstr Functions Description
0   * No error. The Err="0" parameter is always indicated in the response error-free.
1 destination is missing * The indicated client is not defined in the configuration
2 cannot use $type to $dest * You are not allowed to use the destination $dest in order to send or receive messages of type $type.
3 cannot auth $mode with $type $dest * You are not allowed to use the destination $dest for transmittning ($mode==W) or receiving ($mode=R) messages of type $type.
4 cannot init without identity init Calling of function messages.init with empty identity.
5 there are no permissions for ident $ident init There are no permissions associated with the node named $ident.
6 Client $ident exists with same pid init Warning: client $ident already connected.
7 Client $ident already exists with pid $pid and user1 $role init Warns that there is already a client connected with a different user.
8 Client $ident already exists with pid $pid init Warns that a client is already connected.
9 Topic $topic is not existent topic_* The topic $topic has not been defined.
10 cannot listeners for Topic $topic topic_send There are no actual client nodes listening.
11 You are not listening from topic $topic get_data Withdrawal attempt of a data block from a client that is not listening.
12 Message $dkey expired get_data The message in shared memory has expired.
13 Message $dkey not exists get_data The message is no more present.
14 Token empty rpc return Token field empty.
15 Ident empty rpc *_send return Identity field of recipient.
16 Client $ident don't exist rpc *_send return Recipient's identity unknown.
17 Client $ident is not connected rpc *_send return The client is not connected.
18 token_tm_crypt malformed return Token field from client is badly formed.
19 Client $ident don't exist return The sender client does not exist.
20 $ident cannot return to $dest with token $token return The client $ident is not authorized to send the reply to the sender $dest.
21 $ident cannot send to $topic prior to call topic_notify topic_send The client $ident must start transmission to topic $topic (messages.topic_notify function) before sending a message

System messages

The connected client nodes receive some signaling messages from PsqlMessages in the same manner in which they receive messages from other client nodes.
Every connected client must receive system events related to client and topic on which the client is authorized to communicate.

The system evaluates the change of state that determine the sending of events at every call of the functions used by the client.
For reasons of efficiency, in the case of calls to the functions rpc, return, topic_send and get_data, the evaluation is done only if at least one second has elapsed since the previous evaluation.

The system messages have a field Type = Sys and the field From absent. The following additional fields are shown:

Fied Description
Evt Event identification
Ident Client node identity
Topic Name of topic

The sent system messages to the client nodes can contain the following reports.

Event identification Example of trasmitted message Description
connect {"Type":"Sys", "Evt":"connect", "Ident":"CLIENT"} The client CLIENT is connected
disconnect {"Type":"Sys", "Evt":"disconnect", "Ident":"CLIENT"} The client CLIENT is disconnected
crash {"Type":"Sys", "Evt":"crash", "Ident":"CLIENT"} Client CLIENT disconnected abnormally
topic_notify {"Type":"Sys", "Evt":"topic_notify", "Ident":"CLIENT", "Topic":"TOPIC"} The client CLIENT got in transmission to the topic TOPIC
topic_unnotify {"Type":"Sys", "Evt":"topic_unnotify", "Ident":"CLIENT", "Topic":"TOPIC"} The client CLIENT has stopped delivering to the topic TOPIC
topic_none_notify {"Type":"Sys", "Evt":"topic_none_notify", "Topic":"TOPIC"} No client transmitting to the topic TOPIC.
It is fired after
topic_listen {"Type":"Sys", "Evt":"topic_listen", "Ident":"CLIENT", "Topic":"TOPIC"} The client CLIENT is put in listening to the topic TOPIC
topic_unlisten {"Type":"Sys", "Evt":"topic_unlisten", "Ident":"CLIENT", "Topic":"TOPIC"} The client CLIENT has stopped listening to the topic TOPIC
topic_none_listen {"Type":"Sys", "Evt":"topic_none_listen", "Topic":"TOPIC"} No client listens to the topic TOPIC.
It is fired after

Debug

To enable the logs, change the client LOG level with the SQL statement.
To enable the logs on the commands:
set client_min_messages = DEBUG1;
To activate the log also on system events:
-- DEBUG2 level or higher
set client_min_messages = DEBUG2;

Fields attempts

As follows, some conversation examples between two nodes, using psql

  • Configure nodes and destinations using one administrator user

insert into messages.auth (role, ident, dest, type, mode, timeout) values ('user1', 'NODE1', 'MYTOPIC', 'Topic', 'W', 60);
insert into messages.auth (role, ident, dest, type, mode, timeout) values ('user1', 'NODE1', 'NODE2', 'Rpc', 'W', 60);
insert into messages.auth (role, ident, dest, type, mode, timeout) values ('user2', 'NODE2', 'MYTOPIC', 'Topic', 'R', 60);
insert into messages.auth (role, ident, dest, type, mode, timeout) values ('user2', 'NODE2', 'NODE1', 'Rpc', 'W', 60);
P.S.: The timeout is set to 60 seconds in order to provide time to the user, to retrieve long messages before they expire.

  • Connecting to the database, user !"user1" with two separate terminals(same PC or different ones):

user1=# select messages.init('NODE1');
                   init                    
-------------------------------------------

{"Err":"0","Channel":"user1_17662fnpfvpnkyhxatjfu"}
(1 row)
user2=# select messages.init('NODE2');
                    init                    
--------------------------------------------

{"Err":"0","Channel":"user2_17673gyjexfwdzgcpuncg"}
(1 row)

  • Enable NODE2 to receive the events from Topic MYTOPIC
user2=# select messages.topic_listen('MYTOPIC');
 topic_listen
--------------

{"Err":"0"}
(1 row)

  • Check who is listening at Topic from NODE1:
user1=# select messages.topic_status('MYTOPIC');
                   topic_status                   
------------------------------------------------

{"Err":"0","Notifiers":["NODE2","NODE1"],"Listeners":["NODE2"]}
(1 row)

  • Enable NODE1 to observe the queue
user1=# select messages.topic_listen('MYTOPIC');
ATTENZIONE:  WebHome Messages err:3 cannot auth R with Topic MYTOPIC

CONTESTO: funzione PL/Perl "topic_listen"
                        topic_listen                        
------------------------------------------------------------

{"Errstr":"cannot auth R with Topic MYTOPIC","Err":"3"}
(1 row)

  • Transmitting small message from NODE1:
user1=# select messages.topic_send('MYTOPIC','{"MyEvent":"Esempio di messaggio nel formato JSON"}');
  topic_send  
--------------

{"Err":"0"}
(1 row)

  • Check if the notification has reached the destination in NODE2:
user2=# select true;
 bool
------

t
(1 row)

Asynchronous notification "user2_17673gyjexfwdzgcpuncg" with payload "{"Type":"Topic","Size":51,"To":"MYTOPIC","From":"NODE1"}
{"MyEvent":"Esempio di messaggio nel formato JSON"}" received from server process with PID 17300.

  • Transmittng long message from NODE1:
user1=# select messages.topic_send('MYTOPIC', repeat(E'Messaggio lungo di 8000 caratteri ASCII\n', 200));
  topic_send  
--------------

{"Err":"0"}
(1 row)

  • And checking:
user2=# select true;
 bool
------

t
(1 row)

Asynchronous notification "user2_17673gyjexfwdzgcpuncg" with payload "{"Type":"Topic","Size":8000,"To":"MYTOPIC","Dkey":"60.17662ifehivuyjlvxkmbjkzivxfzihctjlsgz","From":"NODE1"}" received from server process with PID 17662.

  • Recover the BODY of the message providing as parameter the payload of the received notification:
user2=# select messages.get_data('{"Type":"Topic","Size":8000,"To":"MYTOPIC","Dkey":"60.17662ifehivuyjlvxkmbjkzivxfzihctjlsgz","From":"NODE1"}');
                           get_data                            
---------------------------------------------------------------

{"Err":"0","Type":"Topic","Size":8000,"To":"MYTOPIC","From":"NODE1"}+
 Messaggio lungo di 8000 caratteri ASCII                      +
=== altre 188 righe ===
 Messaggio lungo di 8000 caratteri ASCII                      +

(1 row)

  • Sending a RPC call from NODE2:
user2=# select messages.rpc('NODE1', 'MyToken', '{"Param1":1,"Param2":2, "Param3":"tre"}');
     rpc      
--------------

{"Err":"0"}
(1 row)

  • Check for the notification in NODE1:
user1=# select true;
 bool
------

t
(1 row)

Asynchronous notification "user1_17662fnpfvpnkyhxatjfu" with payload "{"Type":"Rpc","Size":39,"To":"NODE1","Token":"ad29dde3 60 MyToken ","From":"NODE2"}
{"Param1":1,"Param2":2, "Param3":"tre"}" received from server process with PID 17673.

  • Forwarding the aswer:
user1=# select messages.return('NODE2', 'ad29dde3 60 MyToken ', '{"MyResult":"Il mio risultato"}');
    return    
--------------

{"Err":"0"}
(1 row)

  • In NODE2 the answer has already arrived ... :
psqlm=# select true;
 bool
------

t
(1 row)

Asynchronous notification "user2_17673gyjexfwdzgcpuncg" with payload "{"Type":"Return","Size":31,"To":"NODE2","Token":"MyToken","From":"NODE1"}
{"MyResult":"Il mio risultato"}" received from server process with PID 17662.

  • Interrupting NODE1 conection and trying to send a RPC call from NODE2:
user2=# select messages.rpc('NODE1', 'MyToken', 'Funzionerà?');
ATTENZIONE:  WebHome Messages err:17 Client NODE1 is not connected

CONTESTO: funzione PL/Perl "rpc"
                          rpc                          
-------------------------------------------------------

{"Errstr":"Client NODE1 is not connected","Err":"17"}
(1 row)

Affected files on the server

  • /opt/psqlmessages/deploy/psqlmessages.sql - Initialization script
  • /opt/psqlmessages/lib/perl/PsqlMessages/FileShared.pm - Class that handles shared memory access
  • /opt/psqlmessages/lib/perl/PsqlMessages/Messages.pm - Class that handles messages
  • /run/shm/PsqlMessages.Clients - Shared memory with informations regarding connected clients
  • /run/shm/PsqlMessages.Topic.* - Shared memory with informations regarding Topics
  • /run/shm/PsqlMessages.Mess.MYTOPIC.60.17662ajalhrhvsdgcwjywnizkflqcxswhhjsc - Long message example of Topic "MYTOPIC" with a 60 seconds timeout

Releases files

The library is available under two licenses:
  • Affero G.P.L. rel.1
  • Commercial licence (please call to know conditions and prices)

The code can be downloaded from the SVN server:

Deploy

sudo apt-get install libjson-xs-perl
sudo mkdir -p /opt/psqlmessages
cd ~
svn checkout --username guest https://www.leader.it/svn/psqlmessages/psqlmessages_v01
cd psqlmessages_v01
sudo mv ./lib /opt/psqlmessages/
sudo chown -R root.root /opt/psqlmessages
echo 'CREATE SCHEMA messages;' | su -c psql\ YOURDATABASE postgres
cat ./deploy/psqlmessages.sql | su -c psql\ YOURDATABASE postgres
cd ~
rm -r psqlmessages_v01

Client code example

Here's an example of code to transmit and send messages, with which to assess performance.

# Example of node client to sending
# Parameters:
#    hostname server postgresql
#    database name
#    user name
#    password
#    node client name  (ident)
#    topic name
#    number of messages to send
#    message dimension in byte
#    interval between one message and the next in microseconds 
./sender.pl 127.0.0.1 my_database myuser mypassword SENDER TOPIC 1000 100 100

# Example of node client to listening
# Parameters:
#    hostname server postgresql
#    database name
#    user name
#    password
#    node client name
#    topic name 
./receiver.pl 127.0.0.1 my_database myuser mypassword RECEIVER TOPIC

Alternatives

  • PgQ Queueing solution from Skytools.
    • does not use the LISTEN/NOTIFY (unresponsive)
    • does not implement RPC
    • suitable for batch processing

  • pg-message-queue
    • limited management permissions
    • name of shared channels between clients (poor security)
    • does not implement RPC
    • messages dimension < 8000 byte

PsqlMessages Web Utilities

Topic revision: r42 - 14 Mar 2019, GuidoBrugnara
This site is powered by FoswikiCopyright (©) Leader.IT - Italy P.I. IT01434390223 Privacy policy & use of cookies