The core of Sturk's publish-subscribe implementation is the message broker which is responsible for constructing and routing messages. It aggregates instances of two data types:
- channels - in a dictionary (key-value data structure),
- subscribers.
The key used in a channels dictionary is a character array (string) and it is called topic or named channel. Messages are published to channels and received with subscribers. In order for the subscriber to receive a message sent through a channel it must subscribe to a topic that corresponds to the channel.
The size of the message, its structure and the parsing of arguments passed to the publish procedure are controlled by the message API. See the example.
|
| Broker hierarchy |
- Note
- Typical publish-subscribe implementations introduce also a publisher data type. Sturk implementation does not have publishers. Publishing is done directly to a message channel.
Broker example
In this example the message will be a sequence of pipe characters - |. The sequence length will be equal to the integer passed to the publish procedure.
|
| Message for a call "publish(5)" |
Define an API for messages
Define the size of the message
static size_t getsize(void)
{
return sizeof(char*);
}
Define a message constructor
For n pipe characters allocate n + 1 bytes (add one to include the string terminating character - \0) and fill the string with n number of |.
static void init(void* msg, va_list vlist)
{
int n = va_arg(vlist, int) + 1;
char* str = NULL;
if (n > 0) {
str = malloc((size_t)n);
str[--n] = '\0';
while (n--)
str[n] = '|';
}
*(char**)msg = str;
}
Define a message destructor
static void deinit(void* msg)
{
free(*(char**)msg);
}
Define a vtable
{.
size_cb = getsize, .ctor = init, .dtor = deinit}};
Vtable for message construction.
Definition broker.h:52
size_t(* size_cb)(void)
Callback for obtaining the size of the message.
Definition broker.h:60
Complete API
static size_t getsize(void)
{
return sizeof(char*);
}
static void init(void* msg, va_list vlist)
{
int n = va_arg(vlist, int) + 1;
char* str = NULL;
if (n > 0) {
str = malloc((size_t)n);
str[--n] = '\0';
while (n--)
str[n] = '|';
}
*(char**)msg = str;
}
static void deinit(void* msg)
{
free(*(char**)msg);
}
{.
size_cb = getsize, .ctor = init, .dtor = deinit}};
Publish-subscribe
Initialize a broker
struct StBroker StBroker
The message broker.
Definition broker.h:97
#define broker_create
Definition broker.h:50
- Note
- The API passed here is the previously defined vtable.
Create a subscriber
struct StSubscriber StSubscriber
The subscriber.
Definition broker.h:104
#define subscriber_create
Definition broker.h:62
Subscribe to a topic
#define subscribe
Definition broker.h:47
Publish to a topic
#define broker_search
Definition broker.h:56
#define publish
Definition broker.h:44
- Note
- The behaviour of the publish procedure is controlled with the message constructor.
Receive a message
#define subscriber_await
Definition broker.h:68
Complete example
TEST(subscriber, should_receive_enqueued_message)
{
char** msg = NULL;
TEST_ASSERT_EQUAL_STRING("|||||", *msg);
}
#define broker_destroy
Definition broker.h:53