|
Csilk 0.2.1
A lightweight, high-performance C HTTP web framework
|
Internal Event Bus (Message Queue) implementation. More...
#include <fcntl.h>#include <fnmatch.h>#include <stdio.h>#include <stdlib.h>#include <string.h>#include "csilk/core/internal.h"#include "csilk/csilk.h"#include <time.h>#include "cJSON.h"
Functions | |
| void | csilk_mq_next (csilk_mq_ctx_t *ctx) |
| Advance to the next middleware or subscriber in the MQ handler chain. | |
| void | csilk_mq_abort (csilk_mq_ctx_t *ctx) |
| Abort the current MQ middleware chain immediately. | |
| const char * | csilk_mq_get_topic (csilk_mq_ctx_t *ctx) |
| Get the topic name of the current message in the MQ context. | |
| const void * | csilk_mq_get_payload (csilk_mq_ctx_t *ctx, size_t *len) |
| Get the payload data and length of the current message. | |
| static void | worker_cb (uv_work_t *req) |
| libuv work callback — runs the offloaded handler on a thread pool thread. | |
| static void | worker_after_cb (uv_work_t *req, int status) |
| libuv after-work callback — runs on the main loop thread after worker_cb completes. | |
| void | csilk_mq_offload (csilk_mq_ctx_t *ctx, csilk_mq_worker_t worker) |
| Offload message processing to a libuv thread pool worker. | |
| static void | on_mq_async (uv_async_t *handle) |
| libuv async callback for processing queued MQ messages. | |
| static int | _mq_enqueue (csilk_mq_t *mq, const char *topic, const void *payload, size_t len) |
| Internal: Enqueue a message into the in-memory linked list. | |
| static int | _mq_recovery (csilk_mq_t *mq) |
| Internal: Recover messages from the Write-Ahead Log on startup. | |
| static void | _mq_broadcast (csilk_mq_t *mq, const char *event, const char *topic, size_t len) |
| Internal: Create a new Message Queue instance. | |
| void | csilk_mq_get_stats (csilk_mq_t *mq, csilk_mq_stats_t *stats) |
| Get current MQ statistics. | |
| char * | csilk_mq_stats_to_json (const csilk_mq_stats_t *stats) |
| Convert MQ statistics to a JSON string. | |
| void | csilk_mq_register_monitor (csilk_mq_t *mq, csilk_ctx_t *c) |
| Register a WebSocket monitor for real-time MQ events. | |
| csilk_mq_t * | _csilk_mq_new (uv_loop_t *loop) |
| Internal: Create a new MQ instance bound to a libuv loop. | |
| int | csilk_mq_set_persistence (csilk_mq_t *mq, const char *wal_path) |
| Enable persistent message delivery using a Write-Ahead Log (WAL). | |
| static csilk_mq_topic_t * | get_or_create_topic (csilk_mq_t *mq, const char *name) |
| Find or create a topic structure. | |
| void | csilk_mq_use (csilk_mq_t *mq, const char *topic, csilk_mq_handler_t middleware) |
| Register a middleware handler for a specific topic (or globally). | |
| void | csilk_mq_subscribe (csilk_mq_t *mq, const char *topic, csilk_mq_handler_t subscriber) |
| Register a subscriber handler for a topic. | |
| static int | _mq_append_wal (csilk_mq_t *mq, const char *topic, const void *payload, size_t len) |
| Internal: append a message frame to the Write-Ahead Log file. | |
| int | csilk_mq_publish (csilk_mq_t *mq, const char *topic, const void *payload, size_t len) |
| Publish a message to a topic (thread-safe, optionally persistent). | |
| static void | on_mq_close (uv_handle_t *handle) |
| libuv close callback — final cleanup when the MQ async handle is closed. | |
| void | _csilk_mq_free (csilk_mq_t *mq) |
| Internal: initiate asynchronous shutdown and free of a Message Queue. | |
Internal Event Bus (Message Queue) implementation.
The MQ is a publish-subscribe event bus built on libuv async handles. It supports topic-based routing with glob patterns (via fnmatch), middleware chains, thread-safe publishing from any thread, optional persistence via Write-Ahead Log (WAL), and background thread offloading.
Messages flow through a handler chain assembled dynamically at dispatch time (see on_mq_async). The chain order is:
Each handler calls csilk_mq_next() to advance the chain, or csilk_mq_abort() to short-circuit.
Publishing (csilk_mq_publish) is thread-safe and lock-free on the fast path: the message is copied, appended to a mutex-guarded linked list, and an async signal (uv_async_send) wakes the main loop thread to drain the queue.
When enabled via csilk_mq_set_persistence(), every published message is first written to a binary WAL file before being enqueued in memory. On next startup, _mq_recovery() replays the WAL to restore undelivered messages. The WAL format is: [topic_len:4][topic:N][payload_len:4][payload:M][xor_checksum:4].
| void _csilk_mq_free | ( | csilk_mq_t * | mq | ) |
Internal: initiate asynchronous shutdown and free of a Message Queue.
Internal: Destroy an MQ instance and release all resources.
Triggers uv_close() on the async handle if it is not already closing. The actual cleanup (mutexes, WAL, queue, topics) happens in on_mq_close() when the close callback fires.
| mq | The MQ instance to free (may be NULL). |
| csilk_mq_t * _csilk_mq_new | ( | uv_loop_t * | loop | ) |
Internal: Create a new MQ instance bound to a libuv loop.
| loop | The libuv event loop. |
|
static |
Internal: append a message frame to the Write-Ahead Log file.
After writing all 5 parts via a single uv_fs_write() scatter-gather I/O (5 uv_buf_t entries), the file is fsynced for crash durability.
| mq | The MQ instance (must have wal_fd >= 0). |
| topic | Message topic string. |
| payload | Message payload data (may be NULL if len == 0). |
| len | Payload length in bytes. |
|
static |
Internal: Create a new Message Queue instance.
The MQ starts with no topics, no handlers, and no WAL — everything is populated lazily.
| loop | libuv event loop to associate the async handle with. |
|
static |
Internal: Enqueue a message into the in-memory linked list.
Internal: enqueue a message in the in-memory linked list.
Copies the topic and payload, appends to the queue's tail, and sends an async signal to wake the event loop for dispatch.
| mq | The MQ instance. |
| topic | Topic string. |
| payload | Opaque payload data. |
| len | Payload length in bytes. |
uv_async_send() is signal-safe and thread-safe — it can be called from any thread. If the loop is already awake, it's a no-op (libuv coalesces).
| mq | The MQ instance. |
| topic | Message topic. |
| payload | Message payload (may be NULL). |
| len | Payload length. |
|
static |
Internal: Recover messages from the Write-Ahead Log on startup.
Internal: recover messages from the Write-Ahead Log on startup.
Reads the WAL file sequentially and enqueues each persisted message into memory. Called once during MQ initialization.
| mq | The MQ instance. |
Reads the WAL file sequentially from offset 0 using positional reads (uv_fs_read with offset parameter). For each frame:
The WAL is append-only with no frame-length prefix. If a frame is corrupt, the next frame boundary is unknowable — we stop to avoid misinterpreting garbage as valid data.
| mq | The MQ instance (must have wal_fd >= 0). |
| void csilk_mq_abort | ( | csilk_mq_ctx_t * | ctx | ) |
Abort the current MQ middleware chain immediately.
Abort the MQ middleware/subscriber chain.
Sets the aborted flag on the context. Subsequent calls to csilk_mq_next() will be ignored.
| ctx | Message queue context (may be NULL). |
| const void * csilk_mq_get_payload | ( | csilk_mq_ctx_t * | ctx, |
| size_t * | len | ||
| ) |
Get the payload data and length of the current message.
Get the payload of the current message.
| ctx | Message queue context. |
| len | [out] If non-NULL, receives the payload length in bytes. |
| void csilk_mq_get_stats | ( | csilk_mq_t * | mq, |
| csilk_mq_stats_t * | stats | ||
| ) |
Get current MQ statistics.
| mq | The MQ instance. |
| stats | [out] Pointer to stats struct to populate. |
| const char * csilk_mq_get_topic | ( | csilk_mq_ctx_t * | ctx | ) |
Get the topic name of the current message in the MQ context.
Get the topic of the current message.
| ctx | Message queue context. |
| void csilk_mq_next | ( | csilk_mq_ctx_t * | ctx | ) |
Advance to the next middleware or subscriber in the MQ handler chain.
Pass control to the next middleware or subscriber in the MQ chain.
Handlers form a linear chain: [global_mw..., topic_mw..., subscriber...]. Each handler calls csilk_mq_next() to yield control to the next one. This is a non-recursive, non-reentrant manual trampoline — the chain is driven by the handlers themselves, not by a central loop.
If ctx->aborted is set (by a previous csilk_mq_abort() call), this is a no-op. Out-of-bounds handler_index is also silently ignored (end of chain).
| ctx | Message queue context. |
| void csilk_mq_offload | ( | csilk_mq_ctx_t * | ctx, |
| csilk_mq_worker_t | worker | ||
| ) |
Offload message processing to a libuv thread pool worker.
Offload message processing to a background thread.
The deep copy avoids shared mutable state between the background thread and the event loop. The caller's handler chain continues in parallel with the offloaded work — there is no result channel.
| ctx | Message queue context. |
| worker | Worker function that will receive topic, payload, and length on a background thread. |
| int csilk_mq_publish | ( | csilk_mq_t * | mq, |
| const char * | topic, | ||
| const void * | payload, | ||
| size_t | len | ||
| ) |
Publish a message to a topic (thread-safe, optionally persistent).
Publish a message to a topic.
The two operations are NOT atomic (WAL can succeed, memory enqueue can fail). This is a deliberate trade-off:
Processing is asynchronous — the caller receives no delivery confirmation. The message is deep-copied at both stages.
| mq | The MQ instance. |
| topic | Target topic name (cannot be NULL). |
| payload | Message payload data (may be NULL). |
| len | Payload length in bytes. |
payload immediately after this call returns — the data is copied internally. | void csilk_mq_register_monitor | ( | csilk_mq_t * | mq, |
| csilk_ctx_t * | c | ||
| ) |
Register a WebSocket monitor for real-time MQ events.
| mq | The MQ instance. |
| c | Framework context (WebSocket connection). |
| int csilk_mq_set_persistence | ( | csilk_mq_t * | mq, |
| const char * | wal_path | ||
| ) |
Enable persistent message delivery using a Write-Ahead Log (WAL).
Enable Write-Ahead Log (WAL) persistence for the MQ.
After this call, every csilk_mq_publish() appends to the WAL before enqueuing in memory.
| mq | The MQ instance. |
| wal_path | File path for the WAL. The file is created if it does not exist. |
| char * csilk_mq_stats_to_json | ( | const csilk_mq_stats_t * | stats | ) |
Convert MQ statistics to a JSON string.
| stats | Pointer to stats struct. |
| void csilk_mq_subscribe | ( | csilk_mq_t * | mq, |
| const char * | topic, | ||
| csilk_mq_handler_t | subscriber | ||
| ) |
Register a subscriber handler for a topic.
Register a subscriber for a topic.
Subscribers are treated as handlers appended to the end of the chain (after global and topic middlewares). This is a convenience wrapper around csilk_mq_use().
| mq | The MQ instance. |
| topic | Topic name to subscribe to. |
| subscriber | Handler function invoked when a message matches. |
| void csilk_mq_use | ( | csilk_mq_t * | mq, |
| const char * | topic, | ||
| csilk_mq_handler_t | middleware | ||
| ) |
Register a middleware handler for a specific topic (or globally).
Register MQ middleware for a topic.
Both arrays grow by doubling (initial cap = 4) when full.
| mq | The MQ instance. |
| topic | Topic name (e.g., "user.created"), or NULL for global. |
| middleware | Handler function to invoke during message processing. |
|
static |
Find or create a topic structure.
| mq | The MQ instance. |
| name | Topic name. |
|
static |
libuv async callback for processing queued MQ messages.
libuv async callback — dequeue and process all pending messages.
| handle | libuv async handle. |
This is the core of the MQ — it runs on the main event loop thread.
fnmatch(3) implements POSIX shell glob patterns: '*' matches any sequence, '?' matches any single char, '[abc]' matches character sets. This allows patterns like "user.*" or "system.event.?".
| handle | libuv async handle (data points to csilk_mq_t). |
|
static |
libuv close callback — final cleanup when the MQ async handle is closed.
This is guaranteed to run on the libuv event loop thread (uv_close callback), so it is safe to destroy mutexes here.
| handle | libuv handle being closed (data points to csilk_mq_t). |
|
static |
libuv after-work callback — runs on the main loop thread after worker_cb completes.
Frees the work context (topic, payload, and struct). The handler results (if any) should have been communicated back before this point since no result channel is provided.
| req | libuv work request (freed by this callback). |
| status | libuv status (ignored). |
|
static |
libuv work callback — runs the offloaded handler on a thread pool thread.
Extracts the work context from the request, then calls the user's handler with the topic, payload, and length.
| req | libuv work request (contains csilk_mq_work_ctx_t in data). |