Csilk 0.2.1
A lightweight, high-performance C HTTP web framework
Loading...
Searching...
No Matches
workflow.c File Reference

AI Workflow engine implementation with WAL persistence, Tracing, Tools, Monitoring, and Budgeting. More...

#include "csilk/app/workflow.h"
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <uv.h>
#include "cJSON.h"
#include "csilk/app/workflow_wal.h"
#include "csilk/core/internal.h"
#include <sys/stat.h>
#include <unistd.h>
Include dependency graph for workflow.c:

Data Structures

struct  csilk_ai_meta_t
 AI metadata attached to workflow node outputs for token tracking and budget enforcement. More...
 
struct  csilk_wf_tool_entry_t
 A registered workflow tool (function-calling capability exposed to AI nodes). Tools are invoked in parallel via the libuv thread pool during AI node execution. More...
 
struct  csilk_wf_edge_t
 
struct  csilk_wf_node_t
 A single node in a workflow DAG. Each node wraps a handler function with optional edges, error handling, dynamic routing, and timeout support. More...
 
struct  csilk_wf_t
 Workflow definition: a DAG of processing nodes connected by conditional or unconditional edges. Each node runs a handler function on the libuv thread pool. Supports persistence via WAL, real-time monitoring via WebSocket, tool registration for AI nodes, and budget (token/TTL) limits. More...
 
struct  csilk_wf_ctx_t
 Workflow execution context — per-run state tracking all node progress, scheduling, memory, and budget enforcement. Created by csilk_wf_run_ext_internal() and freed by cleanup_ctx(). More...
 
struct  node_work_t
 Per-node-execution state passed through libuv work requests. Allocated in execute_node(), freed in after_worker_cb(). More...
 
struct  sub_tool_work_t
 Per-tool-call context for parallel tool execution within an AI node. Each tool call runs on its own libuv thread-pool worker. More...
 
struct  stream_ctx_t
 

Macros

#define MAX_WORKFLOW_STEPS   1000
 

Functions

static void execute_node (csilk_wf_ctx_t *ctx, csilk_wf_node_t *node, csilk_data_t *input)
 Internal: enqueue a workflow node for execution on the libuv thread pool.
 
static void cleanup_ctx (csilk_wf_ctx_t *ctx)
 Internal: free a workflow execution context and all resources.
 
static void after_worker_cb (uv_work_t *req, int status)
 
static void register_active_ctx (csilk_wf_t *wf, csilk_wf_ctx_t *ctx)
 
static void unregister_active_ctx (csilk_wf_t *wf, csilk_wf_ctx_t *ctx)
 
static csilk_wf_ctx_t * find_active_ctx (csilk_wf_t *wf, const char *exec_id)
 
static const char * csilk_wf_run_ext_internal (csilk_wf_t *wf, csilk_data_t *input, void(*callback)(csilk_data_t *), void(*trace_cb)(csilk_data_t *, csilk_wf_trace_t *))
 
static void serve_ui_handler (csilk_ctx_t *c)
 
void csilk_wf_serve_ui (csilk_app_t *app, const char *path)
 Register a default route to serve the workflow dashboard.
 
csilk_wf_t * csilk_wf_new (const char *name)
 Create a new AI workflow instance.
 
static void ai_config_free (void *ptr)
 
static void node_free (csilk_wf_node_t *node)
 
void csilk_wf_free (csilk_wf_t *wf)
 Deallocate a workflow and all its nodes.
 
csilk_wf_node_t * csilk_wf_add (csilk_wf_t *wf, const char *id, csilk_wf_handler_t handler, void *user_data)
 Add a node to the workflow.
 
void csilk_wf_node_set_entry (csilk_wf_node_t *node, int is_entry)
 Mark a node as an entry point for the workflow.
 
static void node_add_edge (csilk_wf_node_t *from, const char *condition, csilk_wf_node_t *to, int is_loop)
 
void csilk_wf_bind (csilk_wf_node_t *from, csilk_wf_node_t *to)
 Bind two nodes sequentially (default routing).
 
void csilk_wf_on (csilk_wf_node_t *from, const char *condition, csilk_wf_node_t *to)
 Add a conditional route between nodes.
 
void csilk_wf_on_loop (csilk_wf_node_t *from, const char *condition, csilk_wf_node_t *to)
 Add a loop-back / feedback route between nodes.
 
void csilk_wf_on_error (csilk_wf_node_t *from, csilk_wf_node_t *target)
 Add an error fallback route.
 
void csilk_wf_route (csilk_wf_node_t *node, csilk_wf_router_t router)
 Set a dynamic router for a node.
 
void csilk_wf_node_set_join (csilk_wf_node_t *node, csilk_wf_join_policy_t policy)
 Set the join policy for a node.
 
void csilk_wf_set_persistence (csilk_wf_t *wf, const char *wal_dir)
 Enable WAL persistence for a workflow definition.
 
void csilk_wf_register_monitor (csilk_wf_t *wf, csilk_ctx_t *c)
 Register a WebSocket connection to receive live workflow updates.
 
static void _wf_broadcast (csilk_wf_t *wf, const char *event, const char *node_id, const char *payload)
 
void csilk_wf_set_budget (csilk_wf_t *wf, int max_tokens)
 Set a maximum token budget for the workflow.
 
void csilk_wf_node_set_timeout (csilk_wf_node_t *node, int timeout_ms)
 Set a timeout for a specific node.
 
void csilk_wf_set_ttl (csilk_wf_t *wf, int ttl_sec)
 Set a global TTL for the workflow execution.
 
void csilk_wf_node_set_interactive (csilk_wf_node_t *node, int is_interactive)
 Mark a node as interactive (requires human signal to proceed).
 
void csilk_wf_node_set_schema (csilk_wf_node_t *node, const char *schema)
 Set an expected JSON Schema for a node's output.
 
void csilk_wf_node_set_retry (csilk_wf_node_t *node, int max_retries, int retry_delay_ms)
 Set automatic retry policy for a specific node.
 
static csilk_data_tremote_pass_handler (csilk_wf_ctx_t *ctx, csilk_data_t *input, void *user_data)
 
void csilk_wf_node_set_remote (csilk_wf_node_t *node, int is_remote)
 Mark a node for remote execution via MQ.
 
static void on_remote_result (csilk_mq_ctx_t *m_ctx)
 
void csilk_wf_enable_distributed (csilk_wf_t *wf, csilk_mq_t *mq)
 Enable distributed execution by bridging workflow with an MQ.
 
void * csilk_wf_alloc (csilk_wf_ctx_t *ctx, size_t size)
 Allocate memory from the workflow arena.
 
char * csilk_wf_strdup (csilk_wf_ctx_t *ctx, const char *s)
 Duplicate a string using the workflow arena.
 
csilk_data_tcsilk_wf_data_new (csilk_wf_ctx_t *ctx, const char *type, void *value)
 Allocate a new data container managed by the workflow arena.
 
static char * _csilk_json_get_path (csilk_wf_ctx_t *ctx, cJSON *root, const char *path)
 Internal: traverse a cJSON tree following a dot-separated path.
 
static char * apply_filter (csilk_wf_ctx_t *ctx, const char *filter, char *val)
 Internal: resolve template expressions in a prompt string.
 
static char * resolve_templates (csilk_wf_ctx_t *ctx, const char *template)
 
static void sub_worker_cb (uv_work_t *req)
 libuv thread-pool work callback for tool execution. Looks up the tool by name in the workflow's tool registry and calls its function with the arguments from the AI tool call.
 
static void after_sub_worker_cb (uv_work_t *req, int status)
 libuv after-work callback for tool execution. Decrements the shared pending counter and signals the condition variable to wake the main AI node handler thread.
 
static void on_ai_stream (const char *chunk, void *user_data)
 
static csilk_data_tai_node_handler (csilk_wf_ctx_t *ctx, csilk_data_t *input, void *user_data)
 Built-in handler for AI workflow nodes.
 
csilk_wf_node_t * csilk_wf_add_ai (csilk_wf_t *wf, const char *id, const csilk_ai_config_t *config)
 Add a built-in AI node with template support.
 
void csilk_wf_register_tool (csilk_wf_t *wf, const char *name, const char *description, const char *parameters_json, csilk_wf_tool_fn fn, void *user_data)
 Register a tool that AI nodes can call.
 
csilk_wf_node_t * csilk_wf_get_node (csilk_wf_t *wf, const char *id)
 Look up a workflow node by its string ID.
 
char * csilk_wf_to_mermaid (csilk_wf_t *wf)
 Export the workflow graph as a Mermaid string.
 
char * csilk_wf_trace_to_json (const csilk_wf_trace_t *trace)
 Convert a trace object to a JSON string.
 
void csilk_wf_trace_free (csilk_wf_trace_t *trace)
 Free a trace object.
 
static void cleanup_ctx_now (csilk_wf_ctx_t *ctx)
 
static void on_ttl_timer_close (uv_handle_t *handle)
 
static void wal_log_event (csilk_wf_ctx_t *ctx, csilk_wf_event_type_t type, const char *node_id, csilk_data_t *data)
 Internal: persist a workflow event to the Write-Ahead Log.
 
static void worker_cb (uv_work_t *req)
 libuv thread-pool work callback — executes a workflow node's handler on a background thread.
 
static void on_retry_timer (uv_timer_t *handle)
 libuv after-work callback — processes node completion on the main loop thread.
 
static void on_work_timer_close (uv_handle_t *handle)
 
static void free_work (node_work_t *work)
 
static void on_node_timeout (uv_timer_t *handle)
 libuv timer callback — marks a node as timed out. Sets the is_timed_out flag on the node_work_t, which causes after_worker_cb to treat the output as NULL even if the handler eventually completes.
 
const char * csilk_wf_run (csilk_wf_t *wf, csilk_data_t *input, void(*callback)(csilk_data_t *result))
 Run the workflow asynchronously.
 
void csilk_wf_run_traced (csilk_wf_t *wf, csilk_data_t *input, void(*callback)(csilk_data_t *result, csilk_wf_trace_t *trace))
 Run workflow and generate a trace.
 
static void on_workflow_ttl (uv_timer_t *handle)
 
void csilk_wf_resume (csilk_wf_t *wf, const char *exec_id, void(*callback)(csilk_data_t *result))
 Resume an interrupted workflow execution from a WAL file.
 
void csilk_wf_signal_continue (csilk_wf_t *wf, const char *exec_id, csilk_data_t *input, void(*callback)(csilk_data_t *result))
 Signal a paused workflow to continue.
 

Variables

static csilk_wf_t * g_distributed_wfs [32]
 
static size_t g_distributed_wf_count = 0
 

Detailed Description

AI Workflow engine implementation with WAL persistence, Tracing, Tools, Monitoring, and Budgeting.


Data Structure Documentation

◆ csilk_ai_meta_t

struct csilk_ai_meta_t

AI metadata attached to workflow node outputs for token tracking and budget enforcement.

Data Fields
int completion_tokens

Tokens consumed by the completion.

char * model

Model name (e.g., "gpt-3.5-turbo").

int prompt_tokens

Tokens consumed by the prompt.

◆ csilk_wf_tool_entry_t

struct csilk_wf_tool_entry_t

A registered workflow tool (function-calling capability exposed to AI nodes). Tools are invoked in parallel via the libuv thread pool during AI node execution.

Data Fields
char * description

Description for the AI model's tool schema.

csilk_wf_tool_fn fn

Tool implementation callback.

char * name

Tool name (e.g., "get_weather").

char * parameters_json

JSON schema string for tool parameters.

void * user_data

Opaque context for the callback.

◆ csilk_wf_edge_t

struct csilk_wf_edge_t
Data Fields
char * condition

NULL for default/bind.

csilk_wf_node_t * target

Destination node.

◆ csilk_wf_s

struct csilk_wf_s

Workflow definition: a DAG of processing nodes connected by conditional or unconditional edges. Each node runs a handler function on the libuv thread pool. Supports persistence via WAL, real-time monitoring via WebSocket, tool registration for AI nodes, and budget (token/TTL) limits.

Opaque handle for a workflow instance.

Collaboration diagram for csilk_wf_t:
Data Fields
size_t active_context_capacity
size_t active_context_count
csilk_wf_ctx_t ** active_contexts
uv_mutex_t ctx_mutex
uv_loop_t * loop

libuv event loop for thread-pool scheduling.

int max_tokens

Maximum total tokens across all AI calls (0 = unlimited).

size_t monitor_capacity

Allocated monitor array capacity.

size_t monitor_count

Number of active monitors.

uv_mutex_t monitor_mutex

Protects the monitor array.

csilk_ctx_t ** monitors

WebSocket monitoring connections.

csilk_mq_t * mq

Optional MQ for distributed execution.

char * name

Human-readable workflow name.

size_t node_capacity

Allocated node array capacity.

size_t node_count

Number of registered nodes.

csilk_wf_node_t ** nodes

Array of node pointers.

size_t tool_capacity

Allocated tool array capacity.

size_t tool_count

Number of registered tools.

csilk_wf_tool_entry_t * tools

Registered tool definitions.

int ttl_sec

Workflow Time-To-Live in seconds (0 = no limit).

char * wal_dir

WAL directory path (NULL = no persistence).

◆ node_work_t

struct node_work_t

Per-node-execution state passed through libuv work requests. Allocated in execute_node(), freed in after_worker_cb().

Collaboration diagram for node_work_t:
Data Fields
csilk_wf_ctx_t * ctx

Workflow execution context.

csilk_data_t * input

Input data to the node's handler.

int is_timed_out

Flag set by timer if node exceeds timeout_ms.

csilk_wf_node_t * node

The node being executed.

uv_timer_t node_timer

Per-node timeout or retry delay timer.

csilk_data_t * output

Output data from the handler (set by worker_cb).

uv_work_t req

libuv work request (must be first for cast).

int retry_count

Current retry attempt.

int timer_closing

Non-zero once uv_close is called on node_timer.

csilk_wf_trace_node_t * trace_node

Trace record for this node (NULL if not tracing).

◆ sub_tool_work_t

struct sub_tool_work_t

Per-tool-call context for parallel tool execution within an AI node. Each tool call runs on its own libuv thread-pool worker.

Collaboration diagram for sub_tool_work_t:
Data Fields
uv_cond_t * cond

Shared condition variable for completion.

csilk_wf_ctx_t * ctx

Workflow context (for tool registry lookup).

uv_mutex_t * mutex

Shared mutex for the pending counter.

int * pending

Shared atomic-like pending count.

char * result

Tool output string (allocated by tool fn).

csilk_ai_tool_call_t * tc

Tool call arguments from the AI response.

◆ stream_ctx_t

struct stream_ctx_t
Data Fields
csilk_wf_ctx_t * ctx
const char * node_id

Macro Definition Documentation

◆ MAX_WORKFLOW_STEPS

#define MAX_WORKFLOW_STEPS   1000

Function Documentation

◆ _csilk_json_get_path()

static char * _csilk_json_get_path ( csilk_wf_ctx_t *  ctx,
cJSON *  root,
const char *  path 
)
static

Internal: traverse a cJSON tree following a dot-separated path.

Algorithm:

  1. Split the path on "." using strtok_r.
  2. For each token, if the current cJSON node is an array, index into it using atoi(); otherwise, use cJSON_GetObjectItemCaseSensitive().
  3. If the final value is a string/number/bool, return it as a string allocated from the workflow arena. For objects/arrays, return a stringified JSON representation.
Parameters
ctxWorkflow context (for arena allocation).
rootRoot cJSON node to start traversal from.
pathDot-separated path (e.g., "user.address.city").
Returns
A string allocated in ctx->arena, or NULL if the path does not exist. The returned string is valid for the workflow's lifetime.

◆ _wf_broadcast()

static void _wf_broadcast ( csilk_wf_t *  wf,
const char *  event,
const char *  node_id,
const char *  payload 
)
static

◆ after_sub_worker_cb()

static void after_sub_worker_cb ( uv_work_t *  req,
int  status 
)
static

libuv after-work callback for tool execution. Decrements the shared pending counter and signals the condition variable to wake the main AI node handler thread.

◆ after_worker_cb()

static void after_worker_cb ( uv_work_t *  req,
int  status 
)
static

◆ ai_config_free()

static void ai_config_free ( void *  ptr)
static

◆ ai_node_handler()

static csilk_data_t * ai_node_handler ( csilk_wf_ctx_t *  ctx,
csilk_data_t input,
void *  user_data 
)
static

Built-in handler for AI workflow nodes.

Algorithm:

  1. Resolve template expressions in the prompt string.
  2. Create an AI engine instance (OpenAI driver by default) using AGENT_API_KEY and AGENT_API_BASE environment variables.
  3. Build tool definitions from the workflow's registered tools.
  4. Construct a message array (optional system message + user prompt).
  5. Enter a request loop (max 10 iterations): a. Send a chat completion request with the current message array. b. If the response contains tool calls, execute each tool in parallel on the libuv thread pool, wait for all to complete via a condition variable, append tool results as new messages. c. If the response is a direct text completion, extract the content and AI metadata (model, token counts), return as csilk_data_t.
  6. Clean up all temporary allocations (messages, tools, AI handle).
Parameters
ctxWorkflow execution context.
inputIgnored (AI prompts come from config templates).
user_dataPointer to csilk_ai_config_t with model, prompt, etc.
Returns
Output data with type "text/plain" and AI metadata, or NULL on failure (missing API key, driver init failure, all retries exhausted).

◆ apply_filter()

static char * apply_filter ( csilk_wf_ctx_t *  ctx,
const char *  filter,
char *  val 
)
static

Internal: resolve template expressions in a prompt string.

Template syntax: {{node_id.value}} -> raw value output from a node {{node_id.value.path.to}} -> JSONPath into a node's JSON output {{input.value}} -> the workflow's initial input {{input.value.path.to}} -> JSONPath into the initial input

Algorithm:

  1. Iterate over all workflow nodes, searching for {{node_id.value}} patterns in the template string.
  2. For each match, look up the node's output. If followed by ".path", parse the node output as JSON and traverse with _csilk_json_get_path(). Otherwise, use the raw output value.
  3. Replace the {{...}} placeholder with the resolved value using arena memory.
  4. Repeat for {{input.value}} patterns using the workflow's initial input.
Parameters
ctxWorkflow execution context.
templateTemplate string with {{...}} placeholders.
Returns
Resolved string allocated in ctx->arena.
Note
Unresolvable patterns (missing node output, bad path) are replaced with "(null)".

◆ cleanup_ctx()

static void cleanup_ctx ( csilk_wf_ctx_t *  ctx)
static

Internal: free a workflow execution context and all resources.

Stops and closes the TTL timer if active, destroys all mutexes, frees the memory arena, node tracking arrays, WAL path, and the context struct itself.

Parameters
ctxThe execution context to clean up (may be NULL).

◆ cleanup_ctx_now()

static void cleanup_ctx_now ( csilk_wf_ctx_t *  ctx)
static

◆ csilk_wf_add()

csilk_wf_node_t * csilk_wf_add ( csilk_wf_t *  wf,
const char *  id,
csilk_wf_handler_t  handler,
void *  user_data 
)

Add a node to the workflow.

Parameters
wfWorkflow handle.
idUnique ID for the node.
handlerFunction to execute.
user_dataOpaque pointer passed to the handler.
Returns
Handle to the created node, or NULL on failure.

◆ csilk_wf_add_ai()

csilk_wf_node_t * csilk_wf_add_ai ( csilk_wf_t *  wf,
const char *  id,
const csilk_ai_config_t config 
)

Add a built-in AI node with template support.

Parameters
wfWorkflow handle.
idUnique ID.
configAI configuration (copied internally).
Returns
Node handle.

◆ csilk_wf_alloc()

void * csilk_wf_alloc ( csilk_wf_ctx_t *  ctx,
size_t  size 
)

Allocate memory from the workflow arena.

Parameters
ctxExecution context.
sizeNumber of bytes.
Returns
Pointer to allocated memory.

◆ csilk_wf_bind()

void csilk_wf_bind ( csilk_wf_node_t *  from,
csilk_wf_node_t *  to 
)

Bind two nodes sequentially (default routing).

Parameters
fromSource node.
toDestination node.

◆ csilk_wf_data_new()

csilk_data_t * csilk_wf_data_new ( csilk_wf_ctx_t *  ctx,
const char *  type,
void *  value 
)

Allocate a new data container managed by the workflow arena.

Parameters
ctxExecution context.
typeData type identifier (copied).
valuePointer to the data.
Returns
New data container.

◆ csilk_wf_enable_distributed()

void csilk_wf_enable_distributed ( csilk_wf_t *  wf,
csilk_mq_t *  mq 
)

Enable distributed execution by bridging workflow with an MQ.

Parameters
wfWorkflow handle.
mqMQ handle for task distribution.

◆ csilk_wf_free()

void csilk_wf_free ( csilk_wf_t *  wf)

Deallocate a workflow and all its nodes.

Parameters
wfWorkflow handle.

◆ csilk_wf_get_node()

csilk_wf_node_t * csilk_wf_get_node ( csilk_wf_t *  wf,
const char *  id 
)

Look up a workflow node by its string ID.

Get a node by ID.

Parameters
wfThe workflow instance.
idNode identifier (set in csilk_wf_add()).
Returns
Node pointer, or NULL if not found.
Note
Linear search of the node array — O(n).

◆ csilk_wf_new()

csilk_wf_t * csilk_wf_new ( const char *  name)

Create a new AI workflow instance.

Parameters
nameDescriptive name for the workflow.
Returns
New workflow handle, or NULL on failure.

◆ csilk_wf_node_set_entry()

void csilk_wf_node_set_entry ( csilk_wf_node_t *  node,
int  is_entry 
)

Mark a node as an entry point for the workflow.

Parameters
nodeNode handle.
is_entryNon-zero to mark as entry, 0 to unmark.
Note
By default, nodes with 0 incoming edges are entries. Use this to explicitly start a node that is also part of a loop.

◆ csilk_wf_node_set_interactive()

void csilk_wf_node_set_interactive ( csilk_wf_node_t *  node,
int  is_interactive 
)

Mark a node as interactive (requires human signal to proceed).

Parameters
nodeNode handle.
is_interactiveNon-zero to enable interactive mode.

◆ csilk_wf_node_set_join()

void csilk_wf_node_set_join ( csilk_wf_node_t *  node,
csilk_wf_join_policy_t  policy 
)

Set the join policy for a node.

Parameters
nodeNode handle.
policyAND (default) or OR.

◆ csilk_wf_node_set_remote()

void csilk_wf_node_set_remote ( csilk_wf_node_t *  node,
int  is_remote 
)

Mark a node for remote execution via MQ.

Parameters
nodeNode handle.
is_remoteNon-zero to offload to remote worker.

◆ csilk_wf_node_set_retry()

void csilk_wf_node_set_retry ( csilk_wf_node_t *  node,
int  max_retries,
int  retry_delay_ms 
)

Set automatic retry policy for a specific node.

Parameters
nodeNode handle.
max_retriesMaximum number of retry attempts.
retry_delay_msDelay before each retry.

◆ csilk_wf_node_set_schema()

void csilk_wf_node_set_schema ( csilk_wf_node_t *  node,
const char *  schema 
)

Set an expected JSON Schema for a node's output.

Parameters
nodeNode handle.
schemaJSON Schema string (NULL to disable).

◆ csilk_wf_node_set_timeout()

void csilk_wf_node_set_timeout ( csilk_wf_node_t *  node,
int  timeout_ms 
)

Set a timeout for a specific node.

Parameters
nodeNode handle.
timeout_msTimeout in milliseconds (0 for no timeout).

◆ csilk_wf_on()

void csilk_wf_on ( csilk_wf_node_t *  from,
const char *  condition,
csilk_wf_node_t *  to 
)

Add a conditional route between nodes.

Parameters
fromSource node.
conditionResult string that triggers this route (e.g., "fail").
toDestination node.

◆ csilk_wf_on_error()

void csilk_wf_on_error ( csilk_wf_node_t *  from,
csilk_wf_node_t *  target 
)

Add an error fallback route.

Parameters
fromSource node.
targetDestination node triggered if the source handler fails.

◆ csilk_wf_on_loop()

void csilk_wf_on_loop ( csilk_wf_node_t *  from,
const char *  condition,
csilk_wf_node_t *  to 
)

Add a loop-back / feedback route between nodes.

Parameters
fromSource node.
conditionResult string that triggers this route.
toDestination node (typically an earlier node).
Note
Unlike csilk_wf_on, this does NOT increment the 'to' node's incoming dependency count, preventing deadlocks in join logic.

◆ csilk_wf_register_monitor()

void csilk_wf_register_monitor ( csilk_wf_t *  wf,
csilk_ctx_t *  c 
)

Register a WebSocket connection to receive live workflow updates.

Parameters
wfWorkflow handle.
cRequest context (must be upgraded to WebSocket).

◆ csilk_wf_register_tool()

void csilk_wf_register_tool ( csilk_wf_t *  wf,
const char *  name,
const char *  description,
const char *  parameters_json,
csilk_wf_tool_fn  fn,
void *  user_data 
)

Register a tool that AI nodes can call.

Parameters
wfWorkflow handle.
nameFunction name exposed to the LLM.
descriptionDescription of what the tool does.
parameters_jsonJSON Schema string for the arguments.
fnC function to execute.
user_dataContext for the function.

◆ csilk_wf_resume()

void csilk_wf_resume ( csilk_wf_t *  wf,
const char *  exec_id,
void(*)(csilk_data_t *result)  callback 
)

Resume an interrupted workflow execution from a WAL file.

Parameters
wfWorkflow definition.
exec_idThe execution ID to resume.
callbackCallback for when the resumed workflow finishes.

◆ csilk_wf_route()

void csilk_wf_route ( csilk_wf_node_t *  node,
csilk_wf_router_t  router 
)

Set a dynamic router for a node.

Parameters
nodeNode handle.
routerFunction that determines the next node based on output.

◆ csilk_wf_run()

const char * csilk_wf_run ( csilk_wf_t *  wf,
csilk_data_t input,
void(*)(csilk_data_t *result)  callback 
)

Run the workflow asynchronously.

Parameters
wfWorkflow handle.
inputInitial input data.
callbackCallback invoked when the workflow completes or exits.
Returns
Unique Execution ID (string). Caller must not free.

◆ csilk_wf_run_ext_internal()

static const char * csilk_wf_run_ext_internal ( csilk_wf_t *  wf,
csilk_data_t input,
void(*)(csilk_data_t *)  callback,
void(*)(csilk_data_t *, csilk_wf_trace_t *)  trace_cb 
)
static

◆ csilk_wf_run_traced()

void csilk_wf_run_traced ( csilk_wf_t *  wf,
csilk_data_t input,
void(*)(csilk_data_t *result, csilk_wf_trace_t *trace)  callback 
)

Run workflow and generate a trace.

Parameters
wfWorkflow handle.
inputInitial input.
callbackCallback receiving final result and the full trace.

◆ csilk_wf_serve_ui()

void csilk_wf_serve_ui ( csilk_app_t *  app,
const char *  path 
)

Register a default route to serve the workflow dashboard.

Parameters
appApplication handle.
pathURL path (e.g., "/admin/workflow").

◆ csilk_wf_set_budget()

void csilk_wf_set_budget ( csilk_wf_t *  wf,
int  max_tokens 
)

Set a maximum token budget for the workflow.

Parameters
wfWorkflow handle.
max_tokensMaximum total tokens (prompt + completion) allowed.

◆ csilk_wf_set_persistence()

void csilk_wf_set_persistence ( csilk_wf_t *  wf,
const char *  wal_dir 
)

Enable WAL persistence for a workflow definition.

Parameters
wfWorkflow handle.
wal_dirDirectory to store execution logs.

◆ csilk_wf_set_ttl()

void csilk_wf_set_ttl ( csilk_wf_t *  wf,
int  ttl_sec 
)

Set a global TTL for the workflow execution.

Parameters
wfWorkflow handle.
ttl_secTTL in seconds (0 for no limit).

◆ csilk_wf_signal_continue()

void csilk_wf_signal_continue ( csilk_wf_t *  wf,
const char *  exec_id,
csilk_data_t input,
void(*)(csilk_data_t *result)  callback 
)

Signal a paused workflow to continue.

Parameters
wfWorkflow definition.
exec_idExecution ID of the paused workflow.
inputOptional replacement input (e.g., human-edited prompt).
callbackCallback for when the resumed workflow finishes.

◆ csilk_wf_strdup()

char * csilk_wf_strdup ( csilk_wf_ctx_t *  ctx,
const char *  s 
)

Duplicate a string using the workflow arena.

Parameters
ctxExecution context.
sSource string.
Returns
Copied string.

◆ csilk_wf_to_mermaid()

char * csilk_wf_to_mermaid ( csilk_wf_t *  wf)

Export the workflow graph as a Mermaid string.

Parameters
wfWorkflow handle.
Returns
Heap-allocated Mermaid code (caller must free).

◆ csilk_wf_trace_free()

void csilk_wf_trace_free ( csilk_wf_trace_t trace)

Free a trace object.

◆ csilk_wf_trace_to_json()

char * csilk_wf_trace_to_json ( const csilk_wf_trace_t trace)

Convert a trace object to a JSON string.

Returns
JSON string (caller must free).

◆ execute_node()

static void execute_node ( csilk_wf_ctx_t *  ctx,
csilk_wf_node_t *  node,
csilk_data_t input 
)
static

Internal: enqueue a workflow node for execution on the libuv thread pool.

Algorithm:

  1. Check termination flag (budget exceeded, TTL expired).
  2. Handle Interactive Nodes: if a node is marked as interactive and has not yet been approved in this context, set is_paused flag, log WF_EV_PAUSE to WAL, broadcast to monitors, and return without executing.
  3. Log WF_EV_NODE_START to WAL and broadcast "node_queued" to monitors.
  4. Allocate a node_work_t struct. If tracing is active, create a trace node with start time and input dump.
  5. Increment total_executions and nodes_active counters.
  6. If the node has a per-node timeout, initialize and arm a uv_timer.
  7. Queue the work via uv_queue_work(). The node handler runs on a background thread; after_worker_cb processes the result.
Parameters
ctxWorkflow execution context.
nodeThe node to execute.
inputInput data to pass to the node's handler.

◆ find_active_ctx()

static csilk_wf_ctx_t * find_active_ctx ( csilk_wf_t *  wf,
const char *  exec_id 
)
static

◆ free_work()

static void free_work ( node_work_t work)
static

◆ node_add_edge()

static void node_add_edge ( csilk_wf_node_t *  from,
const char *  condition,
csilk_wf_node_t *  to,
int  is_loop 
)
static

◆ node_free()

static void node_free ( csilk_wf_node_t *  node)
static

◆ on_ai_stream()

static void on_ai_stream ( const char *  chunk,
void *  user_data 
)
static

◆ on_node_timeout()

static void on_node_timeout ( uv_timer_t *  handle)
static

libuv timer callback — marks a node as timed out. Sets the is_timed_out flag on the node_work_t, which causes after_worker_cb to treat the output as NULL even if the handler eventually completes.

◆ on_remote_result()

static void on_remote_result ( csilk_mq_ctx_t *  m_ctx)
static

◆ on_retry_timer()

static void on_retry_timer ( uv_timer_t *  handle)
static

libuv after-work callback — processes node completion on the main loop thread.

Algorithm (the central scheduler dispatch in the workflow engine):

  1. Stop the per-node timeout timer if active.
  2. Store the output in ctx->node_outputs and accumulate token usage from AI metadata.
  3. Log to WAL (if enabled) and broadcast "node_finish" to monitors.
  4. Record trace data (start/end time, input/output dump, model info).
  5. Check budget (max_tokens): if exceeded, set is_terminated flag and terminate the workflow on next idle check.
  6. If output is NULL and error_target is set, route to error node.
  7. If the node has a dynamic router function, call it to determine the next node; otherwise, evaluate each outgoing edge:
    • Unconditional edges (condition == NULL) always match.
    • Conditional edges match if output type equals condition string.
  8. For matching edges, check the target's join policy: AND join requires all incoming edges to fire before the target is ready; OR join fires on any single edge.
  9. If no edges are triggered and no nodes are active, the workflow is complete: log WF_EV_END, deliver the final output via callback, and clean up the context.

◆ on_ttl_timer_close()

static void on_ttl_timer_close ( uv_handle_t *  handle)
static

◆ on_work_timer_close()

static void on_work_timer_close ( uv_handle_t *  handle)
static

◆ on_workflow_ttl()

static void on_workflow_ttl ( uv_timer_t *  handle)
static

◆ register_active_ctx()

static void register_active_ctx ( csilk_wf_t *  wf,
csilk_wf_ctx_t *  ctx 
)
static

◆ remote_pass_handler()

static csilk_data_t * remote_pass_handler ( csilk_wf_ctx_t *  ctx,
csilk_data_t input,
void *  user_data 
)
static

◆ resolve_templates()

static char * resolve_templates ( csilk_wf_ctx_t *  ctx,
const char *  template 
)
static

◆ serve_ui_handler()

static void serve_ui_handler ( csilk_ctx_t *  c)
static

◆ sub_worker_cb()

static void sub_worker_cb ( uv_work_t *  req)
static

libuv thread-pool work callback for tool execution. Looks up the tool by name in the workflow's tool registry and calls its function with the arguments from the AI tool call.

◆ unregister_active_ctx()

static void unregister_active_ctx ( csilk_wf_t *  wf,
csilk_wf_ctx_t *  ctx 
)
static

◆ wal_log_event()

static void wal_log_event ( csilk_wf_ctx_t *  ctx,
csilk_wf_event_type_t  type,
const char *  node_id,
csilk_data_t data 
)
static

Internal: persist a workflow event to the Write-Ahead Log.

Packs node_id, data type, and data value into a flat payload buffer and delegates to _wf_wal_append(). The payload format is: [node_id\0][data_type\0][data_value\0] Fields are NUL-terminated strings for simple parsing during recovery.

Parameters
ctxWorkflow execution context (must have wal_path set).
typeEvent type (WF_EV_START, WF_EV_NODE_START, etc.).
node_idOriginating node ID, or NULL for workflow-level events.
dataAssociated data (may be NULL for simple events).
Note
This is a no-op if ctx has no WAL path configured.

◆ worker_cb()

static void worker_cb ( uv_work_t *  req)
static

libuv thread-pool work callback — executes a workflow node's handler on a background thread.

Broadcasts a "node_start" event to monitors, then calls the node's handler function. The output is stored in the work request struct for retrieval by after_worker_cb on the main loop thread.

Variable Documentation

◆ g_distributed_wf_count

size_t g_distributed_wf_count = 0
static

◆ g_distributed_wfs

csilk_wf_t* g_distributed_wfs[32]
static