|
Csilk 0.2.1
A lightweight, high-performance C HTTP web framework
|
AI Workflow engine for the csilk framework. More...


Go to the source code of this file.
Data Structures | |
| struct | csilk_data_t |
| Generic data container for passing messages between workflow nodes. More... | |
| struct | csilk_wf_trace_node_t |
| Trace record for a single node execution. More... | |
| struct | csilk_wf_trace_t |
| Complete execution trace of a workflow. More... | |
| struct | csilk_ai_config_t |
| Configuration for built-in AI nodes. More... | |
Typedefs | |
| typedef const char *(* | csilk_wf_router_t) (csilk_data_t *input) |
| Dynamic router function signature. | |
| typedef csilk_data_t *(* | csilk_wf_handler_t) (csilk_wf_ctx_t *ctx, csilk_data_t *input, void *user_data) |
| Function signature for a workflow node handler. | |
| typedef char *(* | csilk_wf_tool_fn) (const char *args_json, void *user_data) |
| Function signature for a workflow tool. | |
Enumerations | |
| enum | csilk_wf_join_policy_t { CSILK_WF_JOIN_AND , CSILK_WF_JOIN_OR } |
| Join policies for nodes with multiple incoming dependencies. More... | |
Functions | |
| void | csilk_wf_serve_ui (csilk_app_t *app, const char *path) |
| Register a default route to serve the workflow dashboard. | |
| csilk_data_t * | csilk_wf_data_new (csilk_wf_ctx_t *ctx, const char *type, void *value) |
| Allocate a new data container managed by the workflow arena. | |
| char * | csilk_wf_strdup (csilk_wf_ctx_t *ctx, const char *s) |
| Duplicate a string using the workflow arena. | |
| void * | csilk_wf_alloc (csilk_wf_ctx_t *ctx, size_t size) |
| Allocate memory from the workflow arena. | |
| csilk_wf_t * | csilk_wf_new (const char *name) |
| Create a new AI workflow instance. | |
| void | csilk_wf_free (csilk_wf_t *wf) |
| Deallocate a workflow and all its nodes. | |
| csilk_wf_node_t * | csilk_wf_add (csilk_wf_t *wf, const char *id, csilk_wf_handler_t handler, void *user_data) |
| Add a node to the workflow. | |
| csilk_wf_node_t * | csilk_wf_add_ai (csilk_wf_t *wf, const char *id, const csilk_ai_config_t *config) |
| Add a built-in AI node with template support. | |
| void | csilk_wf_register_tool (csilk_wf_t *wf, const char *name, const char *description, const char *parameters_json, csilk_wf_tool_fn fn, void *user_data) |
| Register a tool that AI nodes can call. | |
| csilk_wf_node_t * | csilk_wf_get_node (csilk_wf_t *wf, const char *id) |
| Get a node by ID. | |
| void | csilk_wf_node_set_entry (csilk_wf_node_t *node, int is_entry) |
| Mark a node as an entry point for the workflow. | |
| void | csilk_wf_bind (csilk_wf_node_t *from, csilk_wf_node_t *to) |
| Bind two nodes sequentially (default routing). | |
| void | csilk_wf_on (csilk_wf_node_t *from, const char *condition, csilk_wf_node_t *to) |
| Add a conditional route between nodes. | |
| void | csilk_wf_on_loop (csilk_wf_node_t *from, const char *condition, csilk_wf_node_t *to) |
| Add a loop-back / feedback route between nodes. | |
| void | csilk_wf_on_error (csilk_wf_node_t *from, csilk_wf_node_t *target) |
| Add an error fallback route. | |
| void | csilk_wf_route (csilk_wf_node_t *node, csilk_wf_router_t router) |
| Set a dynamic router for a node. | |
| void | csilk_wf_node_set_join (csilk_wf_node_t *node, csilk_wf_join_policy_t policy) |
| Set the join policy for a node. | |
| void | csilk_wf_node_set_interactive (csilk_wf_node_t *node, int is_interactive) |
| Mark a node as interactive (requires human signal to proceed). | |
| void | csilk_wf_node_set_schema (csilk_wf_node_t *node, const char *schema) |
| Set an expected JSON Schema for a node's output. | |
| void | csilk_wf_signal_continue (csilk_wf_t *wf, const char *exec_id, csilk_data_t *input, void(*callback)(csilk_data_t *result)) |
| Signal a paused workflow to continue. | |
| void | csilk_wf_node_set_timeout (csilk_wf_node_t *node, int timeout_ms) |
| Set a timeout for a specific node. | |
| void | csilk_wf_set_ttl (csilk_wf_t *wf, int ttl_sec) |
| Set a global TTL for the workflow execution. | |
| void | csilk_wf_node_set_retry (csilk_wf_node_t *node, int max_retries, int retry_delay_ms) |
| Set automatic retry policy for a specific node. | |
| void | csilk_wf_node_set_remote (csilk_wf_node_t *node, int is_remote) |
| Mark a node for remote execution via MQ. | |
| void | csilk_wf_enable_distributed (csilk_wf_t *wf, csilk_mq_t *mq) |
| Enable distributed execution by bridging workflow with an MQ. | |
| void | csilk_wf_set_persistence (csilk_wf_t *wf, const char *wal_dir) |
| Enable WAL persistence for a workflow definition. | |
| const char * | csilk_wf_run (csilk_wf_t *wf, csilk_data_t *input, void(*callback)(csilk_data_t *result)) |
| Run the workflow asynchronously. | |
| void | csilk_wf_resume (csilk_wf_t *wf, const char *exec_id, void(*callback)(csilk_data_t *result)) |
| Resume an interrupted workflow execution from a WAL file. | |
| void | csilk_wf_run_traced (csilk_wf_t *wf, csilk_data_t *input, void(*callback)(csilk_data_t *result, csilk_wf_trace_t *trace)) |
| Run workflow and generate a trace. | |
| char * | csilk_wf_trace_to_json (const csilk_wf_trace_t *trace) |
| Convert a trace object to a JSON string. | |
| void | csilk_wf_trace_free (csilk_wf_trace_t *trace) |
| Free a trace object. | |
| void | csilk_wf_register_handler (const char *name, csilk_wf_handler_t handler) |
| Register a global handler for use in declarative workflows. | |
| csilk_wf_t * | csilk_wf_load_yaml (const char *path) |
| Load a workflow definition from a YAML file. | |
| csilk_wf_t * | csilk_wf_from_json (const char *json) |
| Create a workflow from a JSON string. | |
| char * | csilk_wf_to_mermaid (csilk_wf_t *wf) |
| Export the workflow graph as a Mermaid string. | |
| void | csilk_wf_register_monitor (csilk_wf_t *wf, csilk_ctx_t *c) |
| Register a WebSocket connection to receive live workflow updates. | |
| void | csilk_wf_set_budget (csilk_wf_t *wf, int max_tokens) |
| Set a maximum token budget for the workflow. | |
AI Workflow engine for the csilk framework.
Provides a graph-based orchestration engine for AI pipelines and agents. Supports sequential execution, parallel fan-out (via multi-input join policies), conditional routing, agentic loops with feedback edges, and built-in AI node handlers with LLM tool calling.
The workflow is a directed acyclic (or cyclic) graph of nodes. Each node is a handler function (or a built-in AI node) that receives data from its predecessors and emits data to its successors. Edges can be:
csilk_wf_run starts execution asynchronously. Entry nodes (nodes with 0 incoming edges or explicitly marked) fire immediately. Results propagate through the graph via the event loop. WAL persistence enables crash recovery via csilk_wf_resume.
Workflow definitions are read-only during execution (all mutation must happen before csilk_wf_run). The runtime is single-threaded on the libuv event loop.
| struct csilk_wf_trace_node_t |
Trace record for a single node execution.
| struct csilk_wf_trace_t |
Complete execution trace of a workflow.

| Data Fields | ||
|---|---|---|
| uint64_t | end_time | |
| char * | exec_id |
Unique execution ID |
| size_t | node_count | |
| csilk_wf_trace_node_t ** | nodes | |
| uint64_t | start_time | |
| struct csilk_ai_config_t |
Configuration for built-in AI nodes.
| typedef csilk_data_t *(* csilk_wf_handler_t) (csilk_wf_ctx_t *ctx, csilk_data_t *input, void *user_data) |
Function signature for a workflow node handler.
| ctx | Execution context (can be used for arena allocation). |
| input | Input data from previous node(s). |
| user_data | Opaque pointer passed during node creation. |
| typedef const char *(* csilk_wf_router_t) (csilk_data_t *input) |
Dynamic router function signature.
| input | Data from the node that just finished. |
| typedef char *(* csilk_wf_tool_fn) (const char *args_json, void *user_data) |
Function signature for a workflow tool.
| args_json | JSON string of arguments provided by the LLM. |
| user_data | Opaque pointer passed during registration. |
| csilk_wf_node_t * csilk_wf_add | ( | csilk_wf_t * | wf, |
| const char * | id, | ||
| csilk_wf_handler_t | handler, | ||
| void * | user_data | ||
| ) |
Add a node to the workflow.
| wf | Workflow handle. |
| id | Unique ID for the node. |
| handler | Function to execute. |
| user_data | Opaque pointer passed to the handler. |
| csilk_wf_node_t * csilk_wf_add_ai | ( | csilk_wf_t * | wf, |
| const char * | id, | ||
| const csilk_ai_config_t * | config | ||
| ) |
Add a built-in AI node with template support.
| wf | Workflow handle. |
| id | Unique ID. |
| config | AI configuration (copied internally). |
| void * csilk_wf_alloc | ( | csilk_wf_ctx_t * | ctx, |
| size_t | size | ||
| ) |
Allocate memory from the workflow arena.
| ctx | Execution context. |
| size | Number of bytes. |
| void csilk_wf_bind | ( | csilk_wf_node_t * | from, |
| csilk_wf_node_t * | to | ||
| ) |
Bind two nodes sequentially (default routing).
| from | Source node. |
| to | Destination node. |
| csilk_data_t * csilk_wf_data_new | ( | csilk_wf_ctx_t * | ctx, |
| const char * | type, | ||
| void * | value | ||
| ) |
Allocate a new data container managed by the workflow arena.
| ctx | Execution context. |
| type | Data type identifier (copied). |
| value | Pointer to the data. |
| void csilk_wf_enable_distributed | ( | csilk_wf_t * | wf, |
| csilk_mq_t * | mq | ||
| ) |
Enable distributed execution by bridging workflow with an MQ.
| wf | Workflow handle. |
| mq | MQ handle for task distribution. |
| void csilk_wf_free | ( | csilk_wf_t * | wf | ) |
Deallocate a workflow and all its nodes.
| wf | Workflow handle. |
| csilk_wf_t * csilk_wf_from_json | ( | const char * | json_str | ) |
Create a workflow from a JSON string.
Create a workflow from a JSON string.
Parses the JSON into a three-pass construction: Pass 1: Create all nodes from the "steps" array. Each step has an "id", optional "type" ("ai" or "handler"), and optional "config" for AI nodes. AI nodes use csilk_wf_add_ai() with prompt/model config; handler nodes use csilk_wf_add() with a registered callback. Pass 2: Create connections from the "connections" array. Each connection links a source ("from") to a target ("to"), optionally with a "condition" for conditional edges or "loop: true" for loops. Pass 3: Set error targets from each step's "on_error" field.
| json_str | Null-terminated JSON string. |
| csilk_wf_node_t * csilk_wf_get_node | ( | csilk_wf_t * | wf, |
| const char * | id | ||
| ) |
Get a node by ID.
| wf | Workflow handle. |
| id | Node ID. |
Get a node by ID.
| wf | The workflow instance. |
| id | Node identifier (set in csilk_wf_add()). |
| csilk_wf_t * csilk_wf_load_yaml | ( | const char * | path | ) |
Load a workflow definition from a YAML file.
| path | Path to the .yaml or .yml file. |
Load a workflow definition from a YAML file.
Parses the YAML into cJSON via parse_yaml_file(), then delegates to csilk_wf_from_json() for workflow construction. This two-step approach avoids duplicated parsing logic.
| path | Path to a .yaml or .yml file. |
| csilk_wf_t * csilk_wf_new | ( | const char * | name | ) |
Create a new AI workflow instance.
| name | Descriptive name for the workflow. |
| void csilk_wf_node_set_entry | ( | csilk_wf_node_t * | node, |
| int | is_entry | ||
| ) |
Mark a node as an entry point for the workflow.
| node | Node handle. |
| is_entry | Non-zero to mark as entry, 0 to unmark. |
| void csilk_wf_node_set_interactive | ( | csilk_wf_node_t * | node, |
| int | is_interactive | ||
| ) |
Mark a node as interactive (requires human signal to proceed).
| node | Node handle. |
| is_interactive | Non-zero to enable interactive mode. |
| void csilk_wf_node_set_join | ( | csilk_wf_node_t * | node, |
| csilk_wf_join_policy_t | policy | ||
| ) |
Set the join policy for a node.
| node | Node handle. |
| policy | AND (default) or OR. |
| void csilk_wf_node_set_remote | ( | csilk_wf_node_t * | node, |
| int | is_remote | ||
| ) |
Mark a node for remote execution via MQ.
| node | Node handle. |
| is_remote | Non-zero to offload to remote worker. |
| void csilk_wf_node_set_retry | ( | csilk_wf_node_t * | node, |
| int | max_retries, | ||
| int | retry_delay_ms | ||
| ) |
Set automatic retry policy for a specific node.
| node | Node handle. |
| max_retries | Maximum number of retry attempts. |
| retry_delay_ms | Delay before each retry. |
| void csilk_wf_node_set_schema | ( | csilk_wf_node_t * | node, |
| const char * | schema | ||
| ) |
Set an expected JSON Schema for a node's output.
| node | Node handle. |
| schema | JSON Schema string (NULL to disable). |
| void csilk_wf_node_set_timeout | ( | csilk_wf_node_t * | node, |
| int | timeout_ms | ||
| ) |
Set a timeout for a specific node.
| node | Node handle. |
| timeout_ms | Timeout in milliseconds (0 for no timeout). |
| void csilk_wf_on | ( | csilk_wf_node_t * | from, |
| const char * | condition, | ||
| csilk_wf_node_t * | to | ||
| ) |
Add a conditional route between nodes.
| from | Source node. |
| condition | Result string that triggers this route (e.g., "fail"). |
| to | Destination node. |
| void csilk_wf_on_error | ( | csilk_wf_node_t * | from, |
| csilk_wf_node_t * | target | ||
| ) |
Add an error fallback route.
| from | Source node. |
| target | Destination node triggered if the source handler fails. |
| void csilk_wf_on_loop | ( | csilk_wf_node_t * | from, |
| const char * | condition, | ||
| csilk_wf_node_t * | to | ||
| ) |
Add a loop-back / feedback route between nodes.
| from | Source node. |
| condition | Result string that triggers this route. |
| to | Destination node (typically an earlier node). |
| void csilk_wf_register_handler | ( | const char * | name, |
| csilk_wf_handler_t | handler | ||
| ) |
Register a global handler for use in declarative workflows.
| name | Unique name (matches 'handler' key in YAML/JSON). |
| handler | Function pointer. |
| void csilk_wf_register_monitor | ( | csilk_wf_t * | wf, |
| csilk_ctx_t * | c | ||
| ) |
Register a WebSocket connection to receive live workflow updates.
| wf | Workflow handle. |
| c | Request context (must be upgraded to WebSocket). |
| void csilk_wf_register_tool | ( | csilk_wf_t * | wf, |
| const char * | name, | ||
| const char * | description, | ||
| const char * | parameters_json, | ||
| csilk_wf_tool_fn | fn, | ||
| void * | user_data | ||
| ) |
Register a tool that AI nodes can call.
| wf | Workflow handle. |
| name | Function name exposed to the LLM. |
| description | Description of what the tool does. |
| parameters_json | JSON Schema string for the arguments. |
| fn | C function to execute. |
| user_data | Context for the function. |
| void csilk_wf_resume | ( | csilk_wf_t * | wf, |
| const char * | exec_id, | ||
| void(*)(csilk_data_t *result) | callback | ||
| ) |
Resume an interrupted workflow execution from a WAL file.
| wf | Workflow definition. |
| exec_id | The execution ID to resume. |
| callback | Callback for when the resumed workflow finishes. |
| void csilk_wf_route | ( | csilk_wf_node_t * | node, |
| csilk_wf_router_t | router | ||
| ) |
Set a dynamic router for a node.
| node | Node handle. |
| router | Function that determines the next node based on output. |
| const char * csilk_wf_run | ( | csilk_wf_t * | wf, |
| csilk_data_t * | input, | ||
| void(*)(csilk_data_t *result) | callback | ||
| ) |
Run the workflow asynchronously.
| wf | Workflow handle. |
| input | Initial input data. |
| callback | Callback invoked when the workflow completes or exits. |
| void csilk_wf_run_traced | ( | csilk_wf_t * | wf, |
| csilk_data_t * | input, | ||
| void(*)(csilk_data_t *result, csilk_wf_trace_t *trace) | callback | ||
| ) |
Run workflow and generate a trace.
| wf | Workflow handle. |
| input | Initial input. |
| callback | Callback receiving final result and the full trace. |
| void csilk_wf_serve_ui | ( | csilk_app_t * | app, |
| const char * | path | ||
| ) |
Register a default route to serve the workflow dashboard.
| app | Application handle. |
| path | URL path (e.g., "/admin/workflow"). |
| void csilk_wf_set_budget | ( | csilk_wf_t * | wf, |
| int | max_tokens | ||
| ) |
Set a maximum token budget for the workflow.
| wf | Workflow handle. |
| max_tokens | Maximum total tokens (prompt + completion) allowed. |
| void csilk_wf_set_persistence | ( | csilk_wf_t * | wf, |
| const char * | wal_dir | ||
| ) |
Enable WAL persistence for a workflow definition.
| wf | Workflow handle. |
| wal_dir | Directory to store execution logs. |
| void csilk_wf_set_ttl | ( | csilk_wf_t * | wf, |
| int | ttl_sec | ||
| ) |
Set a global TTL for the workflow execution.
| wf | Workflow handle. |
| ttl_sec | TTL in seconds (0 for no limit). |
| void csilk_wf_signal_continue | ( | csilk_wf_t * | wf, |
| const char * | exec_id, | ||
| csilk_data_t * | input, | ||
| void(*)(csilk_data_t *result) | callback | ||
| ) |
Signal a paused workflow to continue.
| wf | Workflow definition. |
| exec_id | Execution ID of the paused workflow. |
| input | Optional replacement input (e.g., human-edited prompt). |
| callback | Callback for when the resumed workflow finishes. |
| char * csilk_wf_strdup | ( | csilk_wf_ctx_t * | ctx, |
| const char * | s | ||
| ) |
Duplicate a string using the workflow arena.
| ctx | Execution context. |
| s | Source string. |
| char * csilk_wf_to_mermaid | ( | csilk_wf_t * | wf | ) |
Export the workflow graph as a Mermaid string.
| wf | Workflow handle. |
| void csilk_wf_trace_free | ( | csilk_wf_trace_t * | trace | ) |
Free a trace object.
| char * csilk_wf_trace_to_json | ( | const csilk_wf_trace_t * | trace | ) |
Convert a trace object to a JSON string.