Csilk 0.2.1
A lightweight, high-performance C HTTP web framework
Loading...
Searching...
No Matches
workflow.h File Reference

AI Workflow engine for the csilk framework. More...

#include "csilk/app/app.h"
#include "csilk/csilk.h"
Include dependency graph for workflow.h:
This graph shows which files directly or indirectly include this file:

Go to the source code of this file.

Data Structures

struct  csilk_data_t
 Generic data container for passing messages between workflow nodes. More...
 
struct  csilk_wf_trace_node_t
 Trace record for a single node execution. More...
 
struct  csilk_wf_trace_t
 Complete execution trace of a workflow. More...
 
struct  csilk_ai_config_t
 Configuration for built-in AI nodes. More...
 

Typedefs

typedef const char *(* csilk_wf_router_t) (csilk_data_t *input)
 Dynamic router function signature.
 
typedef csilk_data_t *(* csilk_wf_handler_t) (csilk_wf_ctx_t *ctx, csilk_data_t *input, void *user_data)
 Function signature for a workflow node handler.
 
typedef char *(* csilk_wf_tool_fn) (const char *args_json, void *user_data)
 Function signature for a workflow tool.
 

Enumerations

enum  csilk_wf_join_policy_t { CSILK_WF_JOIN_AND , CSILK_WF_JOIN_OR }
 Join policies for nodes with multiple incoming dependencies. More...
 

Functions

void csilk_wf_serve_ui (csilk_app_t *app, const char *path)
 Register a default route to serve the workflow dashboard.
 
csilk_data_tcsilk_wf_data_new (csilk_wf_ctx_t *ctx, const char *type, void *value)
 Allocate a new data container managed by the workflow arena.
 
char * csilk_wf_strdup (csilk_wf_ctx_t *ctx, const char *s)
 Duplicate a string using the workflow arena.
 
void * csilk_wf_alloc (csilk_wf_ctx_t *ctx, size_t size)
 Allocate memory from the workflow arena.
 
csilk_wf_t * csilk_wf_new (const char *name)
 Create a new AI workflow instance.
 
void csilk_wf_free (csilk_wf_t *wf)
 Deallocate a workflow and all its nodes.
 
csilk_wf_node_t * csilk_wf_add (csilk_wf_t *wf, const char *id, csilk_wf_handler_t handler, void *user_data)
 Add a node to the workflow.
 
csilk_wf_node_t * csilk_wf_add_ai (csilk_wf_t *wf, const char *id, const csilk_ai_config_t *config)
 Add a built-in AI node with template support.
 
void csilk_wf_register_tool (csilk_wf_t *wf, const char *name, const char *description, const char *parameters_json, csilk_wf_tool_fn fn, void *user_data)
 Register a tool that AI nodes can call.
 
csilk_wf_node_t * csilk_wf_get_node (csilk_wf_t *wf, const char *id)
 Get a node by ID.
 
void csilk_wf_node_set_entry (csilk_wf_node_t *node, int is_entry)
 Mark a node as an entry point for the workflow.
 
void csilk_wf_bind (csilk_wf_node_t *from, csilk_wf_node_t *to)
 Bind two nodes sequentially (default routing).
 
void csilk_wf_on (csilk_wf_node_t *from, const char *condition, csilk_wf_node_t *to)
 Add a conditional route between nodes.
 
void csilk_wf_on_loop (csilk_wf_node_t *from, const char *condition, csilk_wf_node_t *to)
 Add a loop-back / feedback route between nodes.
 
void csilk_wf_on_error (csilk_wf_node_t *from, csilk_wf_node_t *target)
 Add an error fallback route.
 
void csilk_wf_route (csilk_wf_node_t *node, csilk_wf_router_t router)
 Set a dynamic router for a node.
 
void csilk_wf_node_set_join (csilk_wf_node_t *node, csilk_wf_join_policy_t policy)
 Set the join policy for a node.
 
void csilk_wf_node_set_interactive (csilk_wf_node_t *node, int is_interactive)
 Mark a node as interactive (requires human signal to proceed).
 
void csilk_wf_node_set_schema (csilk_wf_node_t *node, const char *schema)
 Set an expected JSON Schema for a node's output.
 
void csilk_wf_signal_continue (csilk_wf_t *wf, const char *exec_id, csilk_data_t *input, void(*callback)(csilk_data_t *result))
 Signal a paused workflow to continue.
 
void csilk_wf_node_set_timeout (csilk_wf_node_t *node, int timeout_ms)
 Set a timeout for a specific node.
 
void csilk_wf_set_ttl (csilk_wf_t *wf, int ttl_sec)
 Set a global TTL for the workflow execution.
 
void csilk_wf_node_set_retry (csilk_wf_node_t *node, int max_retries, int retry_delay_ms)
 Set automatic retry policy for a specific node.
 
void csilk_wf_node_set_remote (csilk_wf_node_t *node, int is_remote)
 Mark a node for remote execution via MQ.
 
void csilk_wf_enable_distributed (csilk_wf_t *wf, csilk_mq_t *mq)
 Enable distributed execution by bridging workflow with an MQ.
 
void csilk_wf_set_persistence (csilk_wf_t *wf, const char *wal_dir)
 Enable WAL persistence for a workflow definition.
 
const char * csilk_wf_run (csilk_wf_t *wf, csilk_data_t *input, void(*callback)(csilk_data_t *result))
 Run the workflow asynchronously.
 
void csilk_wf_resume (csilk_wf_t *wf, const char *exec_id, void(*callback)(csilk_data_t *result))
 Resume an interrupted workflow execution from a WAL file.
 
void csilk_wf_run_traced (csilk_wf_t *wf, csilk_data_t *input, void(*callback)(csilk_data_t *result, csilk_wf_trace_t *trace))
 Run workflow and generate a trace.
 
char * csilk_wf_trace_to_json (const csilk_wf_trace_t *trace)
 Convert a trace object to a JSON string.
 
void csilk_wf_trace_free (csilk_wf_trace_t *trace)
 Free a trace object.
 
void csilk_wf_register_handler (const char *name, csilk_wf_handler_t handler)
 Register a global handler for use in declarative workflows.
 
csilk_wf_t * csilk_wf_load_yaml (const char *path)
 Load a workflow definition from a YAML file.
 
csilk_wf_t * csilk_wf_from_json (const char *json)
 Create a workflow from a JSON string.
 
char * csilk_wf_to_mermaid (csilk_wf_t *wf)
 Export the workflow graph as a Mermaid string.
 
void csilk_wf_register_monitor (csilk_wf_t *wf, csilk_ctx_t *c)
 Register a WebSocket connection to receive live workflow updates.
 
void csilk_wf_set_budget (csilk_wf_t *wf, int max_tokens)
 Set a maximum token budget for the workflow.
 

Detailed Description

AI Workflow engine for the csilk framework.

Provides a graph-based orchestration engine for AI pipelines and agents. Supports sequential execution, parallel fan-out (via multi-input join policies), conditional routing, agentic loops with feedback edges, and built-in AI node handlers with LLM tool calling.

Graph Model

The workflow is a directed acyclic (or cyclic) graph of nodes. Each node is a handler function (or a built-in AI node) that receives data from its predecessors and emits data to its successors. Edges can be:

  • Sequential (csilk_wf_bind): default routing, always fires.
  • Conditional (csilk_wf_on): fires only when output matches a string.
  • Loop-back (csilk_wf_on_loop): cycles but does NOT increment the target's incoming-edge counter (avoiding deadlock in AND-join logic).
  • Error fallback (csilk_wf_on_error): fires on handler failure.
  • Dynamic (csilk_wf_route): programmatic router callback decides the next node at runtime.

Execution

csilk_wf_run starts execution asynchronously. Entry nodes (nodes with 0 incoming edges or explicitly marked) fire immediately. Results propagate through the graph via the event loop. WAL persistence enables crash recovery via csilk_wf_resume.

Thread Safety

Workflow definitions are read-only during execution (all mutation must happen before csilk_wf_run). The runtime is single-threaded on the libuv event loop.


Data Structure Documentation

◆ csilk_wf_trace_node_t

struct csilk_wf_trace_node_t

Trace record for a single node execution.

Data Fields
int completion_tokens
uint64_t end_time

Microseconds

char * error

Error message if failed

char * input_dump

String representation of input

char * model

AI model used (if applicable)

char * node_id
char * output_dump

String representation of output

int prompt_tokens
uint64_t start_time

Microseconds (uv_hrtime)

◆ csilk_wf_trace_t

struct csilk_wf_trace_t

Complete execution trace of a workflow.

Collaboration diagram for csilk_wf_trace_t:
Data Fields
uint64_t end_time
char * exec_id

Unique execution ID

size_t node_count
csilk_wf_trace_node_t ** nodes
uint64_t start_time

◆ csilk_ai_config_t

struct csilk_ai_config_t

Configuration for built-in AI nodes.

Data Fields
int max_history_messages

Max messages to keep in history (0 = unlimited).

int max_tokens
const char * model

AI model identifier.

const char * prompt

User prompt (supports {{node.value}} templates).

int stream

Enable token streaming to monitors.

const char * system_msg

Optional system prompt.

double temperature

Typedef Documentation

◆ csilk_wf_handler_t

typedef csilk_data_t *(* csilk_wf_handler_t) (csilk_wf_ctx_t *ctx, csilk_data_t *input, void *user_data)

Function signature for a workflow node handler.

Parameters
ctxExecution context (can be used for arena allocation).
inputInput data from previous node(s).
user_dataOpaque pointer passed during node creation.

◆ csilk_wf_router_t

typedef const char *(* csilk_wf_router_t) (csilk_data_t *input)

Dynamic router function signature.

Parameters
inputData from the node that just finished.
Returns
ID of the next node to trigger, or NULL for default routing.

◆ csilk_wf_tool_fn

typedef char *(* csilk_wf_tool_fn) (const char *args_json, void *user_data)

Function signature for a workflow tool.

Parameters
args_jsonJSON string of arguments provided by the LLM.
user_dataOpaque pointer passed during registration.
Returns
JSON string or text result (caller will free).

Enumeration Type Documentation

◆ csilk_wf_join_policy_t

Join policies for nodes with multiple incoming dependencies.

Enumerator
CSILK_WF_JOIN_AND 

Default: Trigger only when ALL inputs arrive.

CSILK_WF_JOIN_OR 

Trigger when ANY input arrives.

Function Documentation

◆ csilk_wf_add()

csilk_wf_node_t * csilk_wf_add ( csilk_wf_t *  wf,
const char *  id,
csilk_wf_handler_t  handler,
void *  user_data 
)

Add a node to the workflow.

Parameters
wfWorkflow handle.
idUnique ID for the node.
handlerFunction to execute.
user_dataOpaque pointer passed to the handler.
Returns
Handle to the created node, or NULL on failure.

◆ csilk_wf_add_ai()

csilk_wf_node_t * csilk_wf_add_ai ( csilk_wf_t *  wf,
const char *  id,
const csilk_ai_config_t config 
)

Add a built-in AI node with template support.

Parameters
wfWorkflow handle.
idUnique ID.
configAI configuration (copied internally).
Returns
Node handle.

◆ csilk_wf_alloc()

void * csilk_wf_alloc ( csilk_wf_ctx_t *  ctx,
size_t  size 
)

Allocate memory from the workflow arena.

Parameters
ctxExecution context.
sizeNumber of bytes.
Returns
Pointer to allocated memory.

◆ csilk_wf_bind()

void csilk_wf_bind ( csilk_wf_node_t *  from,
csilk_wf_node_t *  to 
)

Bind two nodes sequentially (default routing).

Parameters
fromSource node.
toDestination node.

◆ csilk_wf_data_new()

csilk_data_t * csilk_wf_data_new ( csilk_wf_ctx_t *  ctx,
const char *  type,
void *  value 
)

Allocate a new data container managed by the workflow arena.

Parameters
ctxExecution context.
typeData type identifier (copied).
valuePointer to the data.
Returns
New data container.

◆ csilk_wf_enable_distributed()

void csilk_wf_enable_distributed ( csilk_wf_t *  wf,
csilk_mq_t *  mq 
)

Enable distributed execution by bridging workflow with an MQ.

Parameters
wfWorkflow handle.
mqMQ handle for task distribution.

◆ csilk_wf_free()

void csilk_wf_free ( csilk_wf_t *  wf)

Deallocate a workflow and all its nodes.

Parameters
wfWorkflow handle.

◆ csilk_wf_from_json()

csilk_wf_t * csilk_wf_from_json ( const char *  json_str)

Create a workflow from a JSON string.

Create a workflow from a JSON string.

Parses the JSON into a three-pass construction: Pass 1: Create all nodes from the "steps" array. Each step has an "id", optional "type" ("ai" or "handler"), and optional "config" for AI nodes. AI nodes use csilk_wf_add_ai() with prompt/model config; handler nodes use csilk_wf_add() with a registered callback. Pass 2: Create connections from the "connections" array. Each connection links a source ("from") to a target ("to"), optionally with a "condition" for conditional edges or "loop: true" for loops. Pass 3: Set error targets from each step's "on_error" field.

Parameters
json_strNull-terminated JSON string.
Returns
A new csilk_wf_t, or NULL on parse failure or empty workflow.
Note
The caller owns the returned workflow and must free it with csilk_wf_free(). Handler functions must be registered via csilk_wf_register_handler() before calling this function.

◆ csilk_wf_get_node()

csilk_wf_node_t * csilk_wf_get_node ( csilk_wf_t *  wf,
const char *  id 
)

Get a node by ID.

Parameters
wfWorkflow handle.
idNode ID.
Returns
Node handle or NULL if not found.

Get a node by ID.

Parameters
wfThe workflow instance.
idNode identifier (set in csilk_wf_add()).
Returns
Node pointer, or NULL if not found.
Note
Linear search of the node array — O(n).

◆ csilk_wf_load_yaml()

csilk_wf_t * csilk_wf_load_yaml ( const char *  path)

Load a workflow definition from a YAML file.

Parameters
pathPath to the .yaml or .yml file.
Returns
Workflow handle, or NULL on failure.

Load a workflow definition from a YAML file.

Parses the YAML into cJSON via parse_yaml_file(), then delegates to csilk_wf_from_json() for workflow construction. This two-step approach avoids duplicated parsing logic.

Parameters
pathPath to a .yaml or .yml file.
Returns
A new csilk_wf_t, or NULL if the file cannot be read or the YAML is invalid.
Note
The caller owns the returned workflow.

◆ csilk_wf_new()

csilk_wf_t * csilk_wf_new ( const char *  name)

Create a new AI workflow instance.

Parameters
nameDescriptive name for the workflow.
Returns
New workflow handle, or NULL on failure.

◆ csilk_wf_node_set_entry()

void csilk_wf_node_set_entry ( csilk_wf_node_t *  node,
int  is_entry 
)

Mark a node as an entry point for the workflow.

Parameters
nodeNode handle.
is_entryNon-zero to mark as entry, 0 to unmark.
Note
By default, nodes with 0 incoming edges are entries. Use this to explicitly start a node that is also part of a loop.

◆ csilk_wf_node_set_interactive()

void csilk_wf_node_set_interactive ( csilk_wf_node_t *  node,
int  is_interactive 
)

Mark a node as interactive (requires human signal to proceed).

Parameters
nodeNode handle.
is_interactiveNon-zero to enable interactive mode.

◆ csilk_wf_node_set_join()

void csilk_wf_node_set_join ( csilk_wf_node_t *  node,
csilk_wf_join_policy_t  policy 
)

Set the join policy for a node.

Parameters
nodeNode handle.
policyAND (default) or OR.

◆ csilk_wf_node_set_remote()

void csilk_wf_node_set_remote ( csilk_wf_node_t *  node,
int  is_remote 
)

Mark a node for remote execution via MQ.

Parameters
nodeNode handle.
is_remoteNon-zero to offload to remote worker.

◆ csilk_wf_node_set_retry()

void csilk_wf_node_set_retry ( csilk_wf_node_t *  node,
int  max_retries,
int  retry_delay_ms 
)

Set automatic retry policy for a specific node.

Parameters
nodeNode handle.
max_retriesMaximum number of retry attempts.
retry_delay_msDelay before each retry.

◆ csilk_wf_node_set_schema()

void csilk_wf_node_set_schema ( csilk_wf_node_t *  node,
const char *  schema 
)

Set an expected JSON Schema for a node's output.

Parameters
nodeNode handle.
schemaJSON Schema string (NULL to disable).

◆ csilk_wf_node_set_timeout()

void csilk_wf_node_set_timeout ( csilk_wf_node_t *  node,
int  timeout_ms 
)

Set a timeout for a specific node.

Parameters
nodeNode handle.
timeout_msTimeout in milliseconds (0 for no timeout).

◆ csilk_wf_on()

void csilk_wf_on ( csilk_wf_node_t *  from,
const char *  condition,
csilk_wf_node_t *  to 
)

Add a conditional route between nodes.

Parameters
fromSource node.
conditionResult string that triggers this route (e.g., "fail").
toDestination node.

◆ csilk_wf_on_error()

void csilk_wf_on_error ( csilk_wf_node_t *  from,
csilk_wf_node_t *  target 
)

Add an error fallback route.

Parameters
fromSource node.
targetDestination node triggered if the source handler fails.

◆ csilk_wf_on_loop()

void csilk_wf_on_loop ( csilk_wf_node_t *  from,
const char *  condition,
csilk_wf_node_t *  to 
)

Add a loop-back / feedback route between nodes.

Parameters
fromSource node.
conditionResult string that triggers this route.
toDestination node (typically an earlier node).
Note
Unlike csilk_wf_on, this does NOT increment the 'to' node's incoming dependency count, preventing deadlocks in join logic.

◆ csilk_wf_register_handler()

void csilk_wf_register_handler ( const char *  name,
csilk_wf_handler_t  handler 
)

Register a global handler for use in declarative workflows.

Parameters
nameUnique name (matches 'handler' key in YAML/JSON).
handlerFunction pointer.

◆ csilk_wf_register_monitor()

void csilk_wf_register_monitor ( csilk_wf_t *  wf,
csilk_ctx_t *  c 
)

Register a WebSocket connection to receive live workflow updates.

Parameters
wfWorkflow handle.
cRequest context (must be upgraded to WebSocket).

◆ csilk_wf_register_tool()

void csilk_wf_register_tool ( csilk_wf_t *  wf,
const char *  name,
const char *  description,
const char *  parameters_json,
csilk_wf_tool_fn  fn,
void *  user_data 
)

Register a tool that AI nodes can call.

Parameters
wfWorkflow handle.
nameFunction name exposed to the LLM.
descriptionDescription of what the tool does.
parameters_jsonJSON Schema string for the arguments.
fnC function to execute.
user_dataContext for the function.

◆ csilk_wf_resume()

void csilk_wf_resume ( csilk_wf_t *  wf,
const char *  exec_id,
void(*)(csilk_data_t *result)  callback 
)

Resume an interrupted workflow execution from a WAL file.

Parameters
wfWorkflow definition.
exec_idThe execution ID to resume.
callbackCallback for when the resumed workflow finishes.

◆ csilk_wf_route()

void csilk_wf_route ( csilk_wf_node_t *  node,
csilk_wf_router_t  router 
)

Set a dynamic router for a node.

Parameters
nodeNode handle.
routerFunction that determines the next node based on output.

◆ csilk_wf_run()

const char * csilk_wf_run ( csilk_wf_t *  wf,
csilk_data_t input,
void(*)(csilk_data_t *result)  callback 
)

Run the workflow asynchronously.

Parameters
wfWorkflow handle.
inputInitial input data.
callbackCallback invoked when the workflow completes or exits.
Returns
Unique Execution ID (string). Caller must not free.

◆ csilk_wf_run_traced()

void csilk_wf_run_traced ( csilk_wf_t *  wf,
csilk_data_t input,
void(*)(csilk_data_t *result, csilk_wf_trace_t *trace)  callback 
)

Run workflow and generate a trace.

Parameters
wfWorkflow handle.
inputInitial input.
callbackCallback receiving final result and the full trace.

◆ csilk_wf_serve_ui()

void csilk_wf_serve_ui ( csilk_app_t *  app,
const char *  path 
)

Register a default route to serve the workflow dashboard.

Parameters
appApplication handle.
pathURL path (e.g., "/admin/workflow").

◆ csilk_wf_set_budget()

void csilk_wf_set_budget ( csilk_wf_t *  wf,
int  max_tokens 
)

Set a maximum token budget for the workflow.

Parameters
wfWorkflow handle.
max_tokensMaximum total tokens (prompt + completion) allowed.

◆ csilk_wf_set_persistence()

void csilk_wf_set_persistence ( csilk_wf_t *  wf,
const char *  wal_dir 
)

Enable WAL persistence for a workflow definition.

Parameters
wfWorkflow handle.
wal_dirDirectory to store execution logs.

◆ csilk_wf_set_ttl()

void csilk_wf_set_ttl ( csilk_wf_t *  wf,
int  ttl_sec 
)

Set a global TTL for the workflow execution.

Parameters
wfWorkflow handle.
ttl_secTTL in seconds (0 for no limit).

◆ csilk_wf_signal_continue()

void csilk_wf_signal_continue ( csilk_wf_t *  wf,
const char *  exec_id,
csilk_data_t input,
void(*)(csilk_data_t *result)  callback 
)

Signal a paused workflow to continue.

Parameters
wfWorkflow definition.
exec_idExecution ID of the paused workflow.
inputOptional replacement input (e.g., human-edited prompt).
callbackCallback for when the resumed workflow finishes.

◆ csilk_wf_strdup()

char * csilk_wf_strdup ( csilk_wf_ctx_t *  ctx,
const char *  s 
)

Duplicate a string using the workflow arena.

Parameters
ctxExecution context.
sSource string.
Returns
Copied string.

◆ csilk_wf_to_mermaid()

char * csilk_wf_to_mermaid ( csilk_wf_t *  wf)

Export the workflow graph as a Mermaid string.

Parameters
wfWorkflow handle.
Returns
Heap-allocated Mermaid code (caller must free).

◆ csilk_wf_trace_free()

void csilk_wf_trace_free ( csilk_wf_trace_t trace)

Free a trace object.

◆ csilk_wf_trace_to_json()

char * csilk_wf_trace_to_json ( const csilk_wf_trace_t trace)

Convert a trace object to a JSON string.

Returns
JSON string (caller must free).