Sturk 1.1.0
Publish-subscribe C implementation.
Loading...
Searching...
No Matches
Broker

The core of Sturk's publish-subscribe implementation is the message broker which is responsible for constructing and routing messages. It aggregates instances of two data types:

  1. channels - in a dictionary (key-value data structure),
  2. subscribers.

The key used in a channels dictionary is a character array (string) and it is called topic or named channel. Messages are published to channels and received with subscribers. In order for the subscriber to receive a message sent through a channel it must subscribe to a topic that corresponds to the channel.

The size of the message, its structure and the parsing of arguments passed to the publish procedure are controlled by the message API. See the example.

broker-hierarchy
Broker hierarchy
Note
Typical publish-subscribe implementations introduce also a publisher data type. Sturk implementation does not have publishers. Publishing is done directly to a message channel.

Broker example

In this example the message will be a sequence of pipe characters - |. The sequence length will be equal to the integer passed to the publish procedure.

broker-example-message
Message for a call "publish(5)"

Define an API for messages

Define the size of the message

static size_t getsize(void)
{
return sizeof(char*);
}

Define a message constructor

For n pipe characters allocate n + 1 bytes (add one to include the string terminating character - \0) and fill the string with n number of |.

static void init(void* msg, va_list vlist)
{
int n = va_arg(vlist, int) + 1;
char* str = NULL;
if (n > 0) {
str = malloc((size_t)n);
str[--n] = '\0';
while (n--)
str[n] = '|';
}
*(char**)msg = str;
}

Define a message destructor

static void deinit(void* msg)
{
free(*(char**)msg);
}

Define a vtable

const struct StMessageVt SAMPLE_MESSAGE_API[] = {
{.size_cb = getsize, .ctor = init, .dtor = deinit}};
Vtable for message construction.
Definition broker.h:52
size_t(* size_cb)(void)
Callback for obtaining the size of the message.
Definition broker.h:60

Complete API

static size_t getsize(void)
{
return sizeof(char*);
}
static void init(void* msg, va_list vlist)
{
int n = va_arg(vlist, int) + 1;
char* str = NULL;
if (n > 0) {
str = malloc((size_t)n);
str[--n] = '\0';
while (n--)
str[n] = '|';
}
*(char**)msg = str;
}
static void deinit(void* msg)
{
free(*(char**)msg);
}
const struct StMessageVt SAMPLE_MESSAGE_API[] = {
{.size_cb = getsize, .ctor = init, .dtor = deinit}};

Publish-subscribe

Initialize a broker

StBroker* broker = broker_create(SAMPLE_MESSAGE_API);
struct StBroker StBroker
The message broker.
Definition broker.h:97
#define broker_create
Definition broker.h:50
Note
The API passed here is the previously defined vtable.

Create a subscriber

struct StSubscriber StSubscriber
The subscriber.
Definition broker.h:104
#define subscriber_create
Definition broker.h:62

Subscribe to a topic

subscribe(sber, "test");
#define subscribe
Definition broker.h:47

Publish to a topic

publish(broker_search(broker, "test"), 5);
#define broker_search
Definition broker.h:56
#define publish
Definition broker.h:44
Note
The behaviour of the publish procedure is controlled with the message constructor.

Receive a message

msg = subscriber_await(sber);
#define subscriber_await
Definition broker.h:68

Complete example

TEST(subscriber, should_receive_enqueued_message)
{
StBroker* broker = broker_create(SAMPLE_MESSAGE_API);
char** msg = NULL;
subscribe(sber, "test");
publish(broker_search(broker, "test"), 5);
msg = subscriber_await(sber);
TEST_ASSERT_EQUAL_STRING("|||||", *msg);
broker_destroy(broker);
}
#define broker_destroy
Definition broker.h:53