Csilk 0.2.1
A lightweight, high-performance C HTTP web framework
Loading...
Searching...
No Matches
mq.c File Reference

Internal Event Bus (Message Queue) implementation. More...

#include <fcntl.h>
#include <fnmatch.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "csilk/core/internal.h"
#include "csilk/csilk.h"
#include <time.h>
#include "cJSON.h"
Include dependency graph for mq.c:

Functions

void csilk_mq_next (csilk_mq_ctx_t *ctx)
 Advance to the next middleware or subscriber in the MQ handler chain.
 
void csilk_mq_abort (csilk_mq_ctx_t *ctx)
 Abort the current MQ middleware chain immediately.
 
const char * csilk_mq_get_topic (csilk_mq_ctx_t *ctx)
 Get the topic name of the current message in the MQ context.
 
const void * csilk_mq_get_payload (csilk_mq_ctx_t *ctx, size_t *len)
 Get the payload data and length of the current message.
 
static void worker_cb (uv_work_t *req)
 libuv work callback — runs the offloaded handler on a thread pool thread.
 
static void worker_after_cb (uv_work_t *req, int status)
 libuv after-work callback — runs on the main loop thread after worker_cb completes.
 
void csilk_mq_offload (csilk_mq_ctx_t *ctx, csilk_mq_worker_t worker)
 Offload message processing to a libuv thread pool worker.
 
static void on_mq_async (uv_async_t *handle)
 libuv async callback for processing queued MQ messages.
 
static int _mq_enqueue (csilk_mq_t *mq, const char *topic, const void *payload, size_t len)
 Internal: Enqueue a message into the in-memory linked list.
 
static int _mq_recovery (csilk_mq_t *mq)
 Internal: Recover messages from the Write-Ahead Log on startup.
 
static void _mq_broadcast (csilk_mq_t *mq, const char *event, const char *topic, size_t len)
 Internal: Create a new Message Queue instance.
 
void csilk_mq_get_stats (csilk_mq_t *mq, csilk_mq_stats_t *stats)
 Get current MQ statistics.
 
char * csilk_mq_stats_to_json (const csilk_mq_stats_t *stats)
 Convert MQ statistics to a JSON string.
 
void csilk_mq_register_monitor (csilk_mq_t *mq, csilk_ctx_t *c)
 Register a WebSocket monitor for real-time MQ events.
 
csilk_mq_t * _csilk_mq_new (uv_loop_t *loop)
 Internal: Create a new MQ instance bound to a libuv loop.
 
int csilk_mq_set_persistence (csilk_mq_t *mq, const char *wal_path)
 Enable persistent message delivery using a Write-Ahead Log (WAL).
 
static csilk_mq_topic_tget_or_create_topic (csilk_mq_t *mq, const char *name)
 Find or create a topic structure.
 
void csilk_mq_use (csilk_mq_t *mq, const char *topic, csilk_mq_handler_t middleware)
 Register a middleware handler for a specific topic (or globally).
 
void csilk_mq_subscribe (csilk_mq_t *mq, const char *topic, csilk_mq_handler_t subscriber)
 Register a subscriber handler for a topic.
 
static int _mq_append_wal (csilk_mq_t *mq, const char *topic, const void *payload, size_t len)
 Internal: append a message frame to the Write-Ahead Log file.
 
int csilk_mq_publish (csilk_mq_t *mq, const char *topic, const void *payload, size_t len)
 Publish a message to a topic (thread-safe, optionally persistent).
 
static void on_mq_close (uv_handle_t *handle)
 libuv close callback — final cleanup when the MQ async handle is closed.
 
void _csilk_mq_free (csilk_mq_t *mq)
 Internal: initiate asynchronous shutdown and free of a Message Queue.
 

Detailed Description

Internal Event Bus (Message Queue) implementation.

Architecture

The MQ is a publish-subscribe event bus built on libuv async handles. It supports topic-based routing with glob patterns (via fnmatch), middleware chains, thread-safe publishing from any thread, optional persistence via Write-Ahead Log (WAL), and background thread offloading.

Dispatch model

Messages flow through a handler chain assembled dynamically at dispatch time (see on_mq_async). The chain order is:

  1. Global middleware (registered with topic=NULL) — runs for ALL topics.
  2. Topic-specific handlers — matched by fnmatch(topic_pattern, msg_topic).
  3. Subscribers (just handlers registered via csilk_mq_subscribe).

Each handler calls csilk_mq_next() to advance the chain, or csilk_mq_abort() to short-circuit.

Thread safety

Publishing (csilk_mq_publish) is thread-safe and lock-free on the fast path: the message is copied, appended to a mutex-guarded linked list, and an async signal (uv_async_send) wakes the main loop thread to drain the queue.

Persistence (WAL)

When enabled via csilk_mq_set_persistence(), every published message is first written to a binary WAL file before being enqueued in memory. On next startup, _mq_recovery() replays the WAL to restore undelivered messages. The WAL format is: [topic_len:4][topic:N][payload_len:4][payload:M][xor_checksum:4].

Function Documentation

◆ _csilk_mq_free()

void _csilk_mq_free ( csilk_mq_t *  mq)

Internal: initiate asynchronous shutdown and free of a Message Queue.

Internal: Destroy an MQ instance and release all resources.

Triggers uv_close() on the async handle if it is not already closing. The actual cleanup (mutexes, WAL, queue, topics) happens in on_mq_close() when the close callback fires.

Parameters
mqThe MQ instance to free (may be NULL).
Note
This is an async operation — the MQ is not freed immediately. Safe to call with NULL.

◆ _csilk_mq_new()

csilk_mq_t * _csilk_mq_new ( uv_loop_t *  loop)

Internal: Create a new MQ instance bound to a libuv loop.

Parameters
loopThe libuv event loop.
Returns
A new MQ instance (heap-allocated), or NULL on failure.

◆ _mq_append_wal()

static int _mq_append_wal ( csilk_mq_t *  mq,
const char *  topic,
const void *  payload,
size_t  len 
)
static

Internal: append a message frame to the Write-Ahead Log file.

WAL frame format (total frame size = 4 + N + 4 + M + 4)

[topic_len : uint32_t, 4 bytes] — byte length of topic string
[topic_data : uint8_t[], N bytes] — topic UTF-8 bytes (no NUL)
[payload_len : uint32_t, 4 bytes] — byte length of payload
[payload_data : uint8_t[], M bytes] — raw payload bytes
[checksum : uint32_t, 4 bytes] — XOR over topic + payload

After writing all 5 parts via a single uv_fs_write() scatter-gather I/O (5 uv_buf_t entries), the file is fsynced for crash durability.

Parameters
mqThe MQ instance (must have wal_fd >= 0).
topicMessage topic string.
payloadMessage payload data (may be NULL if len == 0).
lenPayload length in bytes.
Returns
0 on success, -1 on write failure.
Note
This is a no-op if the MQ has no WAL file (wal_fd < 0).
The caller should hold wal_mutex, though this function acquires it internally as well for safety.

◆ _mq_broadcast()

static void _mq_broadcast ( csilk_mq_t *  mq,
const char *  event,
const char *  topic,
size_t  len 
)
static

Internal: Create a new Message Queue instance.

Initialization

  1. calloc the MQ struct (zero-initialized — all counts/capacities = 0).
  2. Initialize the queue mutex (protects the in-memory message linked list).
  3. Initialize the async handle bound to the given loop. The async handle's callback (on_mq_async) is triggered by uv_async_send() whenever a new message is enqueued from any thread.
  4. Mark WAL as closed (wal_fd = -1), init the WAL mutex.

The MQ starts with no topics, no handlers, and no WAL — everything is populated lazily.

Parameters
looplibuv event loop to associate the async handle with.
Returns
A new csilk_mq_t instance, or NULL on allocation failure.
Note
The MQ must be freed via _csilk_mq_free(). The MQ is initially non-persistent — call csilk_mq_set_persistence() to enable WAL.

◆ _mq_enqueue()

static int _mq_enqueue ( csilk_mq_t *  mq,
const char *  topic,
const void *  payload,
size_t  len 
)
static

Internal: Enqueue a message into the in-memory linked list.

Internal: enqueue a message in the in-memory linked list.

Copies the topic and payload, appends to the queue's tail, and sends an async signal to wake the event loop for dispatch.

Parameters
mqThe MQ instance.
topicTopic string.
payloadOpaque payload data.
lenPayload length in bytes.
Returns
0 on success, -1 on allocation failure.

Enqueue algorithm

  1. Allocate and populate a new csilk_mq_msg_t (deep-copy topic + payload).
  2. Lock queue_mutex.
  3. Append to the tail of the singly-linked list:
    • If queue_tail != NULL: queue_tail->next = msg
    • Else: queue_head = msg Update queue_tail to msg.
  4. Unlock, call uv_async_send() to wake the event loop.

uv_async_send() is signal-safe and thread-safe — it can be called from any thread. If the loop is already awake, it's a no-op (libuv coalesces).

Parameters
mqThe MQ instance.
topicMessage topic.
payloadMessage payload (may be NULL).
lenPayload length.
Returns
0 on success, -1 on allocation failure.
Note
Thread-safe. The async signal ensures on_mq_async() processes the message on the main loop thread.

◆ _mq_recovery()

static int _mq_recovery ( csilk_mq_t *  mq)
static

Internal: Recover messages from the Write-Ahead Log on startup.

Internal: recover messages from the Write-Ahead Log on startup.

Reads the WAL file sequentially and enqueues each persisted message into memory. Called once during MQ initialization.

Parameters
mqThe MQ instance.
Returns
0 on success, -1 on I/O or replay failure.

Recovery algorithm (WAL replay)

Reads the WAL file sequentially from offset 0 using positional reads (uv_fs_read with offset parameter). For each frame:

  1. Read 4 bytes → topic_len. If < 4 bytes: EOF or corruption → stop.
  2. Read topic_len bytes → topic name. If short read → stop.
  3. Read 4 bytes → payload_len. If < 4 bytes → stop.
  4. Read payload_len bytes → payload. If short read → stop.
  5. Read 4 bytes → stored checksum. If < 4 bytes → stop.
  6. Compute XOR checksum over topic + payload.
  7. If checksum matches: enqueue the message in memory (_mq_enqueue), WITHOUT re-appending to the WAL.
  8. If checksum mismatches: free, stop (treat as corruption boundary).

Why stop at corruption?

The WAL is append-only with no frame-length prefix. If a frame is corrupt, the next frame boundary is unknowable — we stop to avoid misinterpreting garbage as valid data.

Parameters
mqThe MQ instance (must have wal_fd >= 0).
Returns
0 on success (or if no WAL), -1 on allocation failure.
Note
The WAL is NOT truncated after recovery — new messages append after existing ones. A future compaction step could truncate processed entries.

◆ csilk_mq_abort()

void csilk_mq_abort ( csilk_mq_ctx_t *  ctx)

Abort the current MQ middleware chain immediately.

Abort the MQ middleware/subscriber chain.

Sets the aborted flag on the context. Subsequent calls to csilk_mq_next() will be ignored.

Parameters
ctxMessage queue context (may be NULL).

◆ csilk_mq_get_payload()

const void * csilk_mq_get_payload ( csilk_mq_ctx_t *  ctx,
size_t *  len 
)

Get the payload data and length of the current message.

Get the payload of the current message.

Parameters
ctxMessage queue context.
len[out] If non-NULL, receives the payload length in bytes.
Returns
Pointer to the raw payload data, or NULL if the context or message is NULL.
Note
The returned pointer is valid only for the duration of the handler callback. If the data is needed later, the handler must copy it.

◆ csilk_mq_get_stats()

void csilk_mq_get_stats ( csilk_mq_t *  mq,
csilk_mq_stats_t stats 
)

Get current MQ statistics.

Parameters
mqThe MQ instance.
stats[out] Pointer to stats struct to populate.

◆ csilk_mq_get_topic()

const char * csilk_mq_get_topic ( csilk_mq_ctx_t *  ctx)

Get the topic name of the current message in the MQ context.

Get the topic of the current message.

Parameters
ctxMessage queue context.
Returns
The topic string (e.g., "user.created"), or NULL if the context or message is NULL.

◆ csilk_mq_next()

void csilk_mq_next ( csilk_mq_ctx_t *  ctx)

Advance to the next middleware or subscriber in the MQ handler chain.

Pass control to the next middleware or subscriber in the MQ chain.

Execution model

Handlers form a linear chain: [global_mw..., topic_mw..., subscriber...]. Each handler calls csilk_mq_next() to yield control to the next one. This is a non-recursive, non-reentrant manual trampoline — the chain is driven by the handlers themselves, not by a central loop.

If ctx->aborted is set (by a previous csilk_mq_abort() call), this is a no-op. Out-of-bounds handler_index is also silently ignored (end of chain).

Parameters
ctxMessage queue context.
Note
Typically called by middleware to pass control to the next handler or subscriber.

◆ csilk_mq_offload()

void csilk_mq_offload ( csilk_mq_ctx_t *  ctx,
csilk_mq_worker_t  worker 
)

Offload message processing to a libuv thread pool worker.

Offload message processing to a background thread.

Mechanism

  1. Deep-copy topic (strdup) and payload (malloc+memcpy) into a work ctx.
  2. Queue the work via uv_queue_work() — runs worker_cb on a libuv thread pool thread.
  3. worker_after_cb fires on the main loop thread: frees the work ctx.
  4. Call csilk_mq_next() immediately on the main thread to continue the handler chain, without waiting for the background worker.

The deep copy avoids shared mutable state between the background thread and the event loop. The caller's handler chain continues in parallel with the offloaded work — there is no result channel.

Parameters
ctxMessage queue context.
workerWorker function that will receive topic, payload, and length on a background thread.
Note
The payload is deep-copied so the background thread can safely process it without worrying about mutex locking.

◆ csilk_mq_publish()

int csilk_mq_publish ( csilk_mq_t *  mq,
const char *  topic,
const void *  payload,
size_t  len 
)

Publish a message to a topic (thread-safe, optionally persistent).

Publish a message to a topic.

Write-ahead: WAL then memory

The two operations are NOT atomic (WAL can succeed, memory enqueue can fail). This is a deliberate trade-off:

  • If WAL succeeds but enqueue fails: message is safe on disk but lost in memory — it will be recovered on next restart.
  • If WAL fails: the message is dropped entirely (return -1).

Processing is asynchronous — the caller receives no delivery confirmation. The message is deep-copied at both stages.

Parameters
mqThe MQ instance.
topicTarget topic name (cannot be NULL).
payloadMessage payload data (may be NULL).
lenPayload length in bytes.
Returns
0 on success, -1 if the topic is NULL or WAL append fails.
Note
Thread-safe. The caller may free or reuse payload immediately after this call returns — the data is copied internally.

◆ csilk_mq_register_monitor()

void csilk_mq_register_monitor ( csilk_mq_t *  mq,
csilk_ctx_t *  c 
)

Register a WebSocket monitor for real-time MQ events.

Parameters
mqThe MQ instance.
cFramework context (WebSocket connection).

◆ csilk_mq_set_persistence()

int csilk_mq_set_persistence ( csilk_mq_t *  mq,
const char *  wal_path 
)

Enable persistent message delivery using a Write-Ahead Log (WAL).

Enable Write-Ahead Log (WAL) persistence for the MQ.

WAL handshake

  1. Lock wal_mutex (held for the entire setup + recovery).
  2. Close any previously-opened WAL file + free old wal_path.
  3. Open (or create) the WAL file at wal_path with O_CREAT | O_RDWR | O_APPEND.
  4. Store fd and path in the MQ struct.
  5. Call _mq_recovery() to replay any existing messages from the WAL file into the in-memory queue. This ensures messages survive process restarts.

After this call, every csilk_mq_publish() appends to the WAL before enqueuing in memory.

Parameters
mqThe MQ instance.
wal_pathFile path for the WAL. The file is created if it does not exist.
Returns
0 on success, -1 if parameters are NULL or the file cannot be opened.
Note
The WAL uses a simple binary format: [topic_len][topic][payload_len] [payload][checksum] entries. Checksum is a simple XOR for integrity.

◆ csilk_mq_stats_to_json()

char * csilk_mq_stats_to_json ( const csilk_mq_stats_t stats)

Convert MQ statistics to a JSON string.

Parameters
statsPointer to stats struct.
Returns
Heap-allocated JSON string (must be freed).

◆ csilk_mq_subscribe()

void csilk_mq_subscribe ( csilk_mq_t *  mq,
const char *  topic,
csilk_mq_handler_t  subscriber 
)

Register a subscriber handler for a topic.

Register a subscriber for a topic.

Subscribers are treated as handlers appended to the end of the chain (after global and topic middlewares). This is a convenience wrapper around csilk_mq_use().

Parameters
mqThe MQ instance.
topicTopic name to subscribe to.
subscriberHandler function invoked when a message matches.

◆ csilk_mq_use()

void csilk_mq_use ( csilk_mq_t *  mq,
const char *  topic,
csilk_mq_handler_t  middleware 
)

Register a middleware handler for a specific topic (or globally).

Register MQ middleware for a topic.

Global vs topic-specific

  • topic == NULL: handler is appended to mq->global_middlewares[]. These run for EVERY message, regardless of topic, and execute first.
  • topic != NULL: handler is appended to that topic's handlers[] array. The topic is created lazily via get_or_create_topic(). These run only when fnmatch(topic_name, msg_topic) matches.

Both arrays grow by doubling (initial cap = 4) when full.

Parameters
mqThe MQ instance.
topicTopic name (e.g., "user.created"), or NULL for global.
middlewareHandler function to invoke during message processing.
Note
The handler arrays grow dynamically (doubling capacity) as needed. Topic matching supports glob patterns via fnmatch().

◆ get_or_create_topic()

static csilk_mq_topic_t * get_or_create_topic ( csilk_mq_t *  mq,
const char *  name 
)
static

Find or create a topic structure.

Parameters
mqThe MQ instance.
nameTopic name.
Returns
Pointer to topic structure, or NULL on failure.

◆ on_mq_async()

static void on_mq_async ( uv_async_t *  handle)
static

libuv async callback for processing queued MQ messages.

libuv async callback — dequeue and process all pending messages.

Parameters
handlelibuv async handle.

Dispatch algorithm (per message)

This is the core of the MQ — it runs on the main event loop thread.

  1. Atomically swap the queue head to NULL under mutex (drain all).
  2. Walk the linked list of messages.
  3. For each message: a. Count total handlers: global_mw_count + sum of handler_count for every topic where fnmatch(topic->name, msg->topic) == 0. b. Allocate a contiguous handler chain array of that size. c. Bulk-copy global middlewares into the chain. d. For each matching topic: bulk-copy its handlers. e. Create a csilk_mq_ctx_t with {mq, msg, chain, count, -1, 0}. f. Kick off the chain with csilk_mq_next(&ctx) — handler at index 0 runs, which typically calls csilk_mq_next() again. g. Free the chain array (the context is stack-allocated).
  4. Free the message (topic, payload, struct).

Topic matching via fnmatch

fnmatch(3) implements POSIX shell glob patterns: '*' matches any sequence, '?' matches any single char, '[abc]' matches character sets. This allows patterns like "user.*" or "system.event.?".

Parameters
handlelibuv async handle (data points to csilk_mq_t).

◆ on_mq_close()

static void on_mq_close ( uv_handle_t *  handle)
static

libuv close callback — final cleanup when the MQ async handle is closed.

Teardown order

  1. Destroy mutexes (queue_mutex, wal_mutex) — no more concurrent access.
  2. Close WAL file (uv_fs_close), free wal_path.
  3. Drain and free the in-memory message linked list.
  4. Free all topic structures: topic name, handler arrays, structs.
  5. Free global middlewares array.
  6. Free the MQ struct itself.

This is guaranteed to run on the libuv event loop thread (uv_close callback), so it is safe to destroy mutexes here.

Parameters
handlelibuv handle being closed (data points to csilk_mq_t).

◆ worker_after_cb()

static void worker_after_cb ( uv_work_t *  req,
int  status 
)
static

libuv after-work callback — runs on the main loop thread after worker_cb completes.

Frees the work context (topic, payload, and struct). The handler results (if any) should have been communicated back before this point since no result channel is provided.

Parameters
reqlibuv work request (freed by this callback).
statuslibuv status (ignored).

◆ worker_cb()

static void worker_cb ( uv_work_t *  req)
static

libuv work callback — runs the offloaded handler on a thread pool thread.

Extracts the work context from the request, then calls the user's handler with the topic, payload, and length.

Parameters
reqlibuv work request (contains csilk_mq_work_ctx_t in data).