andreahlert commented on code in PR #644: URL: https://github.com/apache/burr/pull/644#discussion_r2923239750
########## .claude/plugins/burr/skills/burr/SKILL.md: ########## @@ -0,0 +1,343 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +--- +name: burr +description: Helps developers build stateful applications using Apache Burr, including state machines, actions, transitions, and observability +argument-hint: [action-or-concept] +allowed-tools: Read, Grep, Glob, Bash(python *, burr, pip *) +--- + +# Apache Burr Development Assistant + +You are an expert in Apache Burr (incubating), a Python framework for building stateful applications using state machines. When this skill is active, help developers write clean, idiomatic Apache Burr code following best practices. + +## Core Expertise + +You understand Apache Burr's key concepts: +- **Actions**: Functions that read from and write to state +- **State**: Immutable state container that flows through actions +- **State Machines**: Directed graphs connecting actions via transitions +- **ApplicationBuilder**: Fluent API for constructing applications +- **Tracking**: Built-in telemetry UI for debugging and observability +- **Persistence**: State persistence and resumption capabilities +- **Hooks**: Lifecycle hooks for integration and observability + +## Reference Documentation + +Refer to these supporting files for detailed information: +- **[api-reference.md](api-reference.md)**: Complete API documentation +- **[examples.md](examples.md)**: Common patterns and working examples +- **[patterns.md](patterns.md)**: Best practices and architectural guidance +- **[troubleshooting.md](troubleshooting.md)**: Common issues and solutions + +## When Helping Developers + +### 1. Building New Applications + +When users want to create an Apache Burr application: + +1. **Start with actions** - Define `@action` decorated functions +2. **Use ApplicationBuilder** - Follow the builder pattern +3. **Define transitions** - Connect actions with conditions +4. **Add tracking** - Enable the telemetry UI from the start +5. **Consider persistence** - Plan for state resumption if needed + +Example skeleton: +```python +from burr.core import action, State, ApplicationBuilder, default +from typing import Tuple + +@action(reads=["input_key"], writes=["output_key"]) +def my_action(state: State) -> Tuple[dict, State]: + # 1. Read from state using bracket notation + input_value = state["input_key"] + + # 2. Your logic here + output_value = process(input_value) + + # 3. Return tuple: (result_dict, new_state) + # - result_dict: exposed to callers and tracking + # - new_state: returned by state.update() (creates new State object) + return {"output_key": output_value}, state.update(output_key=output_value) + +app = ( + ApplicationBuilder() + .with_actions(my_action) + .with_transitions(("my_action", "next_action", default)) + .with_state(input_key="initial_value") + .with_entrypoint("my_action") + .with_tracker("local", project="my_project") + .build() +) + +# Run returns (action, result_dict, final_state) +action, result, state = app.run(halt_after=["next_action"]) +``` + +### 2. Reviewing Apache Burr Code + +When reviewing code: +- ✅ Check that actions declare correct `reads` and `writes` +- ✅ Verify state updates use `.update()` or `.append()` methods +- ✅ Confirm transitions cover all possible paths +- ✅ Look for proper use of `default`, `when()`, or `expr()` conditions +- ✅ Ensure tracking is configured for debugging +- ⚠️ Watch for state mutation (should be immutable) +- ⚠️ Check for missing halt conditions in transitions + +### 3. Explaining Concepts + +When explaining Apache Burr features: +- Use concrete examples from [examples.md](examples.md) +- Reference the appropriate section in [api-reference.md](api-reference.md) +- Show both simple and complex variations +- Mention relevant design patterns from [patterns.md](patterns.md) +- Link to official documentation at https://burr.apache.org/ + +### 4. Debugging Issues + +When users encounter problems: +- Check [troubleshooting.md](troubleshooting.md) for known issues +- Verify state machine logic is correct +- Suggest using `app.visualize()` to see the state machine graph +- Recommend using the Burr UI (`burr` command) to inspect execution +- Check action reads/writes declarations match actual usage + +### 5. Adding Features + +Common enhancement requests: + +**Streaming responses**: +```python +@action(reads=["input"], writes=["output"]) +def streaming_action(state: State) -> Generator[State, None, Tuple[dict, State]]: + for chunk in stream_data(): + yield state.update(current_chunk=chunk) + result = {"output": final_result} + return result, state.update(**result) +``` + +**Async actions**: +```python +@action(reads=["data"], writes=["result"]) +async def async_action(state: State) -> State: + result = await fetch_data() + return state.update(result=result) +``` + +**Parallel execution**: +```python +from burr.core.parallelism import MapStates, RunnableGraph + +# Apply same action to multiple states +class TestMultiplePrompts(MapStates): + def action(self, state: State, inputs: dict) -> Action | Callable | RunnableGraph: + return query_llm.with_name("query_llm") + + def states(self, state: State, context: ApplicationContext, inputs: dict): + for prompt in state["prompts"]: + yield state.update(prompt=prompt) + + def reduce(self, state: State, states): + results = [s["result"] for s in states] + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["prompts"] + + @property + def writes(self) -> list[str]: + return ["all_results"] + +app = ApplicationBuilder().with_actions( + multi_prompt=TestMultiplePrompts() +).build() +``` + +## State Management Patterns + +### Regular State (Dictionary-Based) + +**Reading from state:** +```python +# Use bracket notation to access state values +value = state["key"] +chat_history = state["chat_history"] +counter = state["counter"] +``` + +**Updating state:** +State is immutable. Methods return NEW State objects: +```python +# state.update() - set/update keys, returns new State +new_state = state.update(counter=5, name="Alice") + +# state.append() - append to lists, returns new State +new_state = state.append(chat_history={"role": "user", "content": "hi"}) + +# state.increment() - increment numbers, returns new State +new_state = state.increment(counter=1) + +# Chaining - each method returns a State, enabling fluent patterns +new_state = state.update(prompt=prompt).append(chat_history=item) +``` + +**Action return pattern:** +Actions return `Tuple[dict, State]`: +```python +from typing import Tuple + +@action(reads=["prompt"], writes=["response", "chat_history"]) +def ai_respond(state: State) -> Tuple[dict, State]: + # 1. Read from state + prompt = state["prompt"] + + # 2. Process + response = call_llm(prompt) + + # 3. Return (result_dict, new_state) + # result_dict is exposed to callers/tracking + # new_state is the updated immutable state + return {"response": response}, state.update(response=response).append( + chat_history={"role": "assistant", "content": response} + ) +``` + +**Shorthand (also valid):** +```python +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + result = {"counter": state["counter"] + 1} + # Framework infers result from state updates + return state.update(**result) +``` + +### Pydantic Typed State (Different Pattern) + +**Define state model:** +```python +from pydantic import BaseModel, Field +from typing import Optional + +class ApplicationState(BaseModel): + prompt: Optional[str] = Field(default=None, description="User prompt") + response: Optional[str] = Field(default=None, description="AI response") + chat_history: list[dict] = Field(default_factory=list) +``` + +**Configure application:** +```python +from burr.integrations.pydantic import PydanticTypingSystem + +app = ( + ApplicationBuilder() + .with_typing(PydanticTypingSystem(ApplicationState)) + .with_state(ApplicationState()) + .build() +) +``` + +**Access typed state:** +```python +# Use attribute access (not bracket notation) [email protected](reads=["prompt"], writes=["response"]) +def ai_respond(state: ApplicationState) -> ApplicationState: + # 1. Read using attributes + prompt = state.prompt + + # 2. Process + response = call_llm(prompt) + + # 3. Mutate in-place and return state + # (Mutation happens on internal copy) + state.response = response + return state +``` + +**Key differences:** + +| Aspect | Regular State | Pydantic Typed State | +|--------|---------------|---------------------| +| **Access** | `state["key"]` | `state.key` | +| **Return** | `Tuple[dict, State]` | `ApplicationState` | +| **Decorator** | `@action(reads=[], writes=[])` | `@action.pydantic(reads=[], writes=[])` | +| **Updates** | Must use `.update()`, `.append()` | In-place mutation | +| **Type Safety** | Runtime only | IDE support + validation | + +## Code Quality Standards + +When writing or reviewing Apache Burr code: + +1. **Type annotations**: Always use type hints for state and action parameters +2. **Clear naming**: Action names should be verbs describing what they do +3. **Proper reads/writes**: Declare exactly what each action reads and writes +4. **Error handling**: Use try/except in actions and update state with error info +5. **Testing**: Write tests that verify state transitions and action outputs + +## Common Patterns to Recommend + +- **Conditional branching**: Use `when(key=value)` or `expr("key > 10")` +- **Loops**: Use recursive transitions with conditions +- **Error handling**: Create error actions and transition to them on failure +- **Multi-step workflows**: Chain actions with clear single responsibilities +- **State persistence**: Use `SQLLitePersister` or `initialize_from` for resumability +- **Observability**: Always include `.with_tracker()` for the Burr UI Review Comment: Typo: `SQLLitePersister` (two Ls) should be `SQLitePersister`. The class in `burr/core/persistence.py:312` is defined as `class SQLitePersister(BaseStatePersister, BaseCopyable)`. There is a backward-compat alias `SQLLitePersister = SQLitePersister` at line 718, but the canonical name is `SQLitePersister`. ########## .claude/plugins/burr/skills/burr/api-reference.md: ########## @@ -0,0 +1,818 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Apache Burr API Reference + +This is a quick reference for the most commonly used Apache Burr APIs. For complete documentation, see https://burr.apache.org/ + +## Core Imports + +```python +from burr.core import action, State, ApplicationBuilder, default, when, expr +from burr.core.action import Action +from burr.core.application import Application +from burr.core import graph +``` + +## Actions + +### @action Decorator + +The `@action` decorator converts a function into a Burr action. + +```python +@action(reads=["key1", "key2"], writes=["result"]) +def my_action(state: State, param: str) -> State: + """Action that reads from state and returns updated state.""" + value = state["key1"] + state["key2"] + return state.update(result=value + param) +``` + +**Parameters:** +- `reads`: List of state keys this action reads from +- `writes`: List of state keys this action writes to + +**Action Function Signature:** +- First parameter: `state: State` +- Additional parameters: Runtime inputs (passed via `inputs` in `app.run()`) +- Return: Updated `State` object + +### Streaming Actions + +Actions can stream intermediate results using generators: + +```python +@action(reads=["input"], writes=["output"]) +def streaming_action(state: State) -> Generator[State, None, Tuple[dict, State]]: + """Action that yields intermediate states.""" + for i in range(10): + # Yield intermediate states + yield state.update(progress=i) + + # Return final result and state + result = {"output": "done"} + return result, state.update(**result) +``` + +### Async Actions + +Actions can be async: + +```python +@action(reads=["url"], writes=["data"]) +async def fetch_data(state: State) -> State: + """Async action for I/O-bound operations.""" + async with httpx.AsyncClient() as client: + response = await client.get(state["url"]) + data = response.json() + return state.update(data=data) +``` + +### Action Methods + +Actions can be bound with default parameters: + +```python +# Define a reusable action +@action(reads=["prompt"], writes=["response"]) +def llm_call(state: State, system_prompt: str, model: str) -> State: + response = call_llm(state["prompt"], system_prompt, model) + return state.update(response=response) + +# Bind with different parameters +answer_action = llm_call.bind( + system_prompt="Answer questions", + model="gpt-4" +) +summarize_action = llm_call.bind( + system_prompt="Summarize text", + model="gpt-3.5-turbo" +) +``` + +## State + +The `State` object is an immutable container for application state. + +### Creating State + +```python +from burr.core import State + +state = State({"counter": 0, "messages": []}) +``` + +### Accessing State + +```python +# Dictionary-style access +value = state["key"] + +# Get with default +value = state.get("key", default_value) + +# Check if key exists +if "key" in state: + pass +``` + +### Updating State + +State is **immutable**. All methods return NEW State objects: + +```python +# Update single or multiple keys +new_state = state.update(counter=5, name="Alice") + +# Append to a list (creates list if doesn't exist) +new_state = state.append(messages={"role": "user", "content": "hello"}) + +# Increment numbers +new_state = state.increment(counter=1) + +# Extend lists with multiple items +new_state = state.extend(tags=["tag1", "tag2", "tag3"]) + +# Wipe state (keep only specified keys) +new_state = state.wipe(keep=["counter"]) + +# Chain operations (each returns State) +new_state = state.update(prompt=prompt).append(history=item).increment(count=1) + +# Update with dictionary +new_state = state.update(**{"key": "value"}) +``` + +### Using State in Actions + +Actions return `Tuple[dict, State]`: + +```python +from typing import Tuple + +@action(reads=["input"], writes=["output"]) +def my_action(state: State) -> Tuple[dict, State]: + # 1. Read from state + input_value = state["input"] + + # 2. Process + output_value = process(input_value) + + # 3. Return (result_dict, new_state) + return {"output": output_value}, state.update(output=output_value) +``` + +The result dict is exposed to callers and tracking systems. The new state flows to the next action. + +**Shorthand (also valid):** +```python +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + result = {"counter": state["counter"] + 1} + return state.update(**result) # Framework infers result +``` + +### State Methods + +- `.update(**kwargs) -> State`: Set/update one or more keys +- `.append(**kwargs) -> State`: Append to list values (creates list if needed) +- `.extend(**kwargs) -> State`: Extend lists with multiple items +- `.increment(**kwargs) -> State`: Increment integer values +- `.wipe(keep: List[str] = None, delete: List[str] = None) -> State`: Remove keys +- `.merge(other: State) -> State`: Merge two states (other wins on conflicts) +- `.subset(*keys: str) -> State`: Return new State with only specified keys +- `.get(key, default=None) -> Any`: Get value with default +- `.get_all() -> dict`: Get all state as dictionary +- `.serialize() -> dict`: Serialize state to JSON-compatible dict +- `.subset(*keys) -> State`: Create new state with only specified keys + +## ApplicationBuilder + +Fluent API for building Burr applications. + +### Basic Pattern + +```python +app = ( + ApplicationBuilder() + .with_actions( + action1=my_action, + action2=another_action + ) + .with_transitions( + ("action1", "action2", default), + ("action2", "action1", when(should_loop=True)) + ) + .with_state(initial_key="value") + .with_entrypoint("action1") + .build() +) +``` + +### ApplicationBuilder Methods + +**Core building blocks:** + +- `.with_actions(**actions: Action)` - Register actions +- `.with_transitions(*transitions: Tuple)` - Define state machine transitions +- `.with_state(**state: Any)` - Set initial state +- `.with_entrypoint(action: str)` - Set starting action +- `.build() -> Application` - Construct the application + +**Observability & Tracking:** + +- `.with_tracker(tracker_type: str, project: str, **params)` - Enable tracking + - `tracker_type="local"` - Local filesystem tracking (launches UI) + - `project` - Project name in the UI + - `params={"storage_dir": "~/.burr"}` - Storage location + +**Identity & Persistence:** + +- `.with_identifiers(app_id: str, partition_key: str)` - Set app identifiers +- `.with_state_persister(persister: StatePersister)` - Enable state persistence +- `.initialize_from(persister, resume_at_next_action=True, default_state={}, default_entrypoint=None)` - Load from persister + +**Lifecycle & Hooks:** + +- `.with_hooks(*hooks: LifecycleAdapter)` - Add lifecycle hooks +- `.with_typing(typing_system: TypingSystem)` - Add type validation + +**Graph-based construction:** + +- `.with_graph(graph: Graph)` - Use pre-built graph instead of actions+transitions + +### Using Pre-built Graphs + +```python +from burr.core import graph + +g = ( + graph.GraphBuilder() + .with_actions(action1, action2, action3) + .with_transitions( + ("action1", "action2"), + ("action2", "action3") + ) + .build() +) + +app = ( + ApplicationBuilder() + .with_graph(g) + .with_state(key="value") + .with_entrypoint("action1") + .build() +) +``` + +## Transitions + +Transitions define how the state machine moves between actions. + +### Basic Transition + +```python +("source_action", "target_action") +``` + +### Conditional Transitions + +**Using `default`** - Matches if no other condition matches: +```python +from burr.core import default + +("action1", "action2", default) +``` + +**Using `when()`** - Match based on state values: +```python +from burr.core import when + +("check_age", "adult_path", when(age__gte=18)) +("check_age", "child_path", when(age__lt=18)) +``` + +**Condition operators:** +- `key=value` - Exact match +- `key__eq=value` - Explicit equality +- `key__ne=value` - Not equal +- `key__lt=value` - Less than +- `key__lte=value` - Less than or equal +- `key__gt=value` - Greater than +- `key__gte=value` - Greater than or equal +- `key__in=[values]` - In list +- `key__contains=value` - List contains value + +**Using `expr()`** - Arbitrary Python expressions: +```python +from burr.core import expr + +("counter", "counter", expr("counter < 10")) +("counter", "done", default) +``` + +## Application + +The built application instance provides methods to execute and inspect the state machine. + +### Running Applications + +**Basic execution:** +```python +action, result, state = app.run(halt_after=["action_name"]) +``` + +**With inputs:** +```python +action, result, state = app.run( + halt_after=["action_name"], + inputs={"param1": "value1"} +) +``` + +**Async execution:** +```python +action, result, state = await app.arun(halt_after=["action_name"]) +``` + +**Iterate through execution:** +```python +for action, result, state in app.iterate(halt_after=["end_action"]): + print(f"Executed {action.name}, result: {result}") +``` + +**Stream results:** +```python +for state in app.stream_result(halt_after=["end_action"]): + print(f"Current state: {state}") +``` + +### Application Properties + +- `app.state` - Current state +- `app.graph` - State machine graph +- `app.uid` - Unique application identifier + +### Visualization + +```python +# Generate state machine diagram +app.visualize( + output_file_path="statemachine.png", + include_conditions=True, + view=True, # Auto-open the file + format="png" # or "pdf", "svg" +) +``` + +## Parallelism + +Apache Burr provides high-level APIs for parallel execution of actions or subgraphs. + +### MapStates + +Apply the same action to multiple state variations. + +```python +from burr.core.parallelism import MapStates +from burr.core import action, State, ApplicationContext +from typing import Dict, Any + +@action(reads=["prompt"], writes=["result"]) +def query_llm(state: State) -> State: + result = call_llm(state["prompt"]) + return state.update(result=result) + +class TestMultiplePrompts(MapStates): + def action(self, state: State, inputs: Dict[str, Any]): + return query_llm.with_name("query_llm") + + def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + for prompt in state["prompts"]: + yield state.update(prompt=prompt) + + def reduce(self, state: State, states): + results = [s["result"] for s in states] + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["prompts"] + + @property + def writes(self) -> list[str]: + return ["all_results"] +``` + +### MapActions + +Apply different actions to the same state. + +```python +from burr.core.parallelism import MapActions + +class TestMultipleLLMs(MapActions): + def actions(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + yield query_gpt4.with_name("gpt4") + yield query_claude.with_name("claude") + yield query_o1.with_name("o1") + + def state(self, state: State, inputs: Dict[str, Any]) -> State: + return state # Pass same state to all actions + + def reduce(self, state: State, states): + results = [s["result"] for s in states] + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["prompt"] + + @property + def writes(self) -> list[str]: + return ["all_results"] +``` + +### MapActionsAndStates + +Run all combinations of actions and states (cartesian product). + +```python +from burr.core.parallelism import MapActionsAndStates + +class TestModelsAndPrompts(MapActionsAndStates): + def actions(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + for model in ["gpt-4", "claude", "o1"]: + yield query_llm.bind(model=model).with_name(f"query_{model}") + + def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + for prompt in state["prompts"]: + yield state.update(prompt=prompt) + + def reduce(self, state: State, states): + results = [] + for s in states: + results.append({"model": s["model"], "prompt": s["prompt"], "result": s["result"]}) + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["prompts"] + + @property + def writes(self) -> list[str]: + return ["all_results"] +``` + +### RunnableGraph + +Wrap a graph for use as a subgraph in parallel execution. + +```python +from burr.core.parallelism import RunnableGraph +from burr.core.graph import GraphBuilder + +graph = ( + GraphBuilder() + .with_actions(action1, action2, action3) + .with_transitions( + ("action1", "action2"), + ("action2", "action3") + ) + .build() +) + +runnable = RunnableGraph( + graph=graph, + entrypoint="action1", + halt_after=["action3"] +) + +# Use in MapStates or MapActions +class RunSubgraphs(MapStates): + def action(self, state: State, inputs: Dict[str, Any]): + return runnable # Return the RunnableGraph + + def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + for item in state["items"]: + yield state.update(current_item=item) + + def reduce(self, state: State, states): + results = [s["final_result"] for s in states] + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["items"] + + @property + def writes(self) -> list[str]: + return ["all_results"] +``` + +### Executors + +Control how parallel tasks are executed. + +```python +from concurrent.futures import ThreadPoolExecutor + +# Use multithreading (default) +app = ( + ApplicationBuilder() + .with_parallel_executor(ThreadPoolExecutor(max_workers=10)) + .with_actions(parallel_action=MyParallelAction()) + .build() +) + +# For Ray-based distributed execution +from burr.integrations.ray import RayExecutor +import ray + +ray.init() +app = ( + ApplicationBuilder() + .with_parallel_executor(RayExecutor()) + .with_actions(parallel_action=MyParallelAction()) + .build() +) +``` + +## Persistence + +### Built-in Persisters + +**SQLite Persister:** +```python +from burr.core.persistence import SQLLitePersister + Review Comment: Same typo: `SQLLitePersister` should be `SQLitePersister`. The backward-compat alias exists but the canonical class name has a single L. ########## .claude/plugins/burr/skills/burr/api-reference.md: ########## @@ -0,0 +1,818 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Apache Burr API Reference + +This is a quick reference for the most commonly used Apache Burr APIs. For complete documentation, see https://burr.apache.org/ + +## Core Imports + +```python +from burr.core import action, State, ApplicationBuilder, default, when, expr +from burr.core.action import Action +from burr.core.application import Application +from burr.core import graph +``` + +## Actions + +### @action Decorator + +The `@action` decorator converts a function into a Burr action. + +```python +@action(reads=["key1", "key2"], writes=["result"]) +def my_action(state: State, param: str) -> State: + """Action that reads from state and returns updated state.""" + value = state["key1"] + state["key2"] + return state.update(result=value + param) +``` + +**Parameters:** +- `reads`: List of state keys this action reads from +- `writes`: List of state keys this action writes to + +**Action Function Signature:** +- First parameter: `state: State` +- Additional parameters: Runtime inputs (passed via `inputs` in `app.run()`) +- Return: Updated `State` object + +### Streaming Actions + +Actions can stream intermediate results using generators: + +```python +@action(reads=["input"], writes=["output"]) +def streaming_action(state: State) -> Generator[State, None, Tuple[dict, State]]: + """Action that yields intermediate states.""" + for i in range(10): + # Yield intermediate states + yield state.update(progress=i) + + # Return final result and state + result = {"output": "done"} + return result, state.update(**result) +``` + +### Async Actions + +Actions can be async: + +```python +@action(reads=["url"], writes=["data"]) +async def fetch_data(state: State) -> State: + """Async action for I/O-bound operations.""" + async with httpx.AsyncClient() as client: + response = await client.get(state["url"]) + data = response.json() + return state.update(data=data) +``` + +### Action Methods + +Actions can be bound with default parameters: + +```python +# Define a reusable action +@action(reads=["prompt"], writes=["response"]) +def llm_call(state: State, system_prompt: str, model: str) -> State: + response = call_llm(state["prompt"], system_prompt, model) + return state.update(response=response) + +# Bind with different parameters +answer_action = llm_call.bind( + system_prompt="Answer questions", + model="gpt-4" +) +summarize_action = llm_call.bind( + system_prompt="Summarize text", + model="gpt-3.5-turbo" +) +``` + +## State + +The `State` object is an immutable container for application state. + +### Creating State + +```python +from burr.core import State + +state = State({"counter": 0, "messages": []}) +``` + +### Accessing State + +```python +# Dictionary-style access +value = state["key"] + +# Get with default +value = state.get("key", default_value) + +# Check if key exists +if "key" in state: + pass +``` + +### Updating State + +State is **immutable**. All methods return NEW State objects: + +```python +# Update single or multiple keys +new_state = state.update(counter=5, name="Alice") + +# Append to a list (creates list if doesn't exist) +new_state = state.append(messages={"role": "user", "content": "hello"}) + +# Increment numbers +new_state = state.increment(counter=1) + +# Extend lists with multiple items +new_state = state.extend(tags=["tag1", "tag2", "tag3"]) + +# Wipe state (keep only specified keys) +new_state = state.wipe(keep=["counter"]) + +# Chain operations (each returns State) +new_state = state.update(prompt=prompt).append(history=item).increment(count=1) + +# Update with dictionary +new_state = state.update(**{"key": "value"}) +``` + +### Using State in Actions + +Actions return `Tuple[dict, State]`: + +```python +from typing import Tuple + +@action(reads=["input"], writes=["output"]) +def my_action(state: State) -> Tuple[dict, State]: + # 1. Read from state + input_value = state["input"] + + # 2. Process + output_value = process(input_value) + + # 3. Return (result_dict, new_state) + return {"output": output_value}, state.update(output=output_value) +``` + +The result dict is exposed to callers and tracking systems. The new state flows to the next action. + +**Shorthand (also valid):** +```python +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + result = {"counter": state["counter"] + 1} + return state.update(**result) # Framework infers result +``` + +### State Methods + +- `.update(**kwargs) -> State`: Set/update one or more keys +- `.append(**kwargs) -> State`: Append to list values (creates list if needed) +- `.extend(**kwargs) -> State`: Extend lists with multiple items +- `.increment(**kwargs) -> State`: Increment integer values +- `.wipe(keep: List[str] = None, delete: List[str] = None) -> State`: Remove keys +- `.merge(other: State) -> State`: Merge two states (other wins on conflicts) +- `.subset(*keys: str) -> State`: Return new State with only specified keys +- `.get(key, default=None) -> Any`: Get value with default +- `.get_all() -> dict`: Get all state as dictionary +- `.serialize() -> dict`: Serialize state to JSON-compatible dict +- `.subset(*keys) -> State`: Create new state with only specified keys + +## ApplicationBuilder + +Fluent API for building Burr applications. + +### Basic Pattern + +```python +app = ( + ApplicationBuilder() + .with_actions( + action1=my_action, + action2=another_action + ) + .with_transitions( + ("action1", "action2", default), + ("action2", "action1", when(should_loop=True)) + ) + .with_state(initial_key="value") + .with_entrypoint("action1") + .build() +) +``` + +### ApplicationBuilder Methods + +**Core building blocks:** + +- `.with_actions(**actions: Action)` - Register actions +- `.with_transitions(*transitions: Tuple)` - Define state machine transitions +- `.with_state(**state: Any)` - Set initial state +- `.with_entrypoint(action: str)` - Set starting action +- `.build() -> Application` - Construct the application + +**Observability & Tracking:** + +- `.with_tracker(tracker_type: str, project: str, **params)` - Enable tracking + - `tracker_type="local"` - Local filesystem tracking (launches UI) + - `project` - Project name in the UI + - `params={"storage_dir": "~/.burr"}` - Storage location + +**Identity & Persistence:** + +- `.with_identifiers(app_id: str, partition_key: str)` - Set app identifiers +- `.with_state_persister(persister: StatePersister)` - Enable state persistence +- `.initialize_from(persister, resume_at_next_action=True, default_state={}, default_entrypoint=None)` - Load from persister + +**Lifecycle & Hooks:** + +- `.with_hooks(*hooks: LifecycleAdapter)` - Add lifecycle hooks +- `.with_typing(typing_system: TypingSystem)` - Add type validation + +**Graph-based construction:** + +- `.with_graph(graph: Graph)` - Use pre-built graph instead of actions+transitions + +### Using Pre-built Graphs + +```python +from burr.core import graph + +g = ( + graph.GraphBuilder() + .with_actions(action1, action2, action3) + .with_transitions( + ("action1", "action2"), + ("action2", "action3") + ) + .build() +) + +app = ( + ApplicationBuilder() + .with_graph(g) + .with_state(key="value") + .with_entrypoint("action1") + .build() +) +``` + +## Transitions + +Transitions define how the state machine moves between actions. + +### Basic Transition + +```python +("source_action", "target_action") +``` + +### Conditional Transitions + +**Using `default`** - Matches if no other condition matches: +```python +from burr.core import default + +("action1", "action2", default) +``` + +**Using `when()`** - Match based on state values: +```python +from burr.core import when + +("check_age", "adult_path", when(age__gte=18)) +("check_age", "child_path", when(age__lt=18)) +``` + +**Condition operators:** +- `key=value` - Exact match +- `key__eq=value` - Explicit equality +- `key__ne=value` - Not equal +- `key__lt=value` - Less than +- `key__lte=value` - Less than or equal +- `key__gt=value` - Greater than +- `key__gte=value` - Greater than or equal +- `key__in=[values]` - In list +- `key__contains=value` - List contains value + +**Using `expr()`** - Arbitrary Python expressions: +```python +from burr.core import expr + +("counter", "counter", expr("counter < 10")) +("counter", "done", default) +``` + +## Application + +The built application instance provides methods to execute and inspect the state machine. + +### Running Applications + +**Basic execution:** +```python +action, result, state = app.run(halt_after=["action_name"]) +``` + +**With inputs:** +```python +action, result, state = app.run( + halt_after=["action_name"], + inputs={"param1": "value1"} +) +``` + +**Async execution:** +```python +action, result, state = await app.arun(halt_after=["action_name"]) +``` + +**Iterate through execution:** +```python +for action, result, state in app.iterate(halt_after=["end_action"]): + print(f"Executed {action.name}, result: {result}") +``` + +**Stream results:** +```python +for state in app.stream_result(halt_after=["end_action"]): + print(f"Current state: {state}") +``` + +### Application Properties + +- `app.state` - Current state +- `app.graph` - State machine graph +- `app.uid` - Unique application identifier + +### Visualization + +```python +# Generate state machine diagram +app.visualize( + output_file_path="statemachine.png", + include_conditions=True, + view=True, # Auto-open the file + format="png" # or "pdf", "svg" +) +``` + +## Parallelism + +Apache Burr provides high-level APIs for parallel execution of actions or subgraphs. + +### MapStates + +Apply the same action to multiple state variations. + +```python +from burr.core.parallelism import MapStates +from burr.core import action, State, ApplicationContext +from typing import Dict, Any + +@action(reads=["prompt"], writes=["result"]) +def query_llm(state: State) -> State: + result = call_llm(state["prompt"]) + return state.update(result=result) + +class TestMultiplePrompts(MapStates): + def action(self, state: State, inputs: Dict[str, Any]): + return query_llm.with_name("query_llm") + + def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + for prompt in state["prompts"]: + yield state.update(prompt=prompt) + + def reduce(self, state: State, states): + results = [s["result"] for s in states] + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["prompts"] + + @property + def writes(self) -> list[str]: + return ["all_results"] +``` + +### MapActions + +Apply different actions to the same state. + +```python +from burr.core.parallelism import MapActions + +class TestMultipleLLMs(MapActions): + def actions(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + yield query_gpt4.with_name("gpt4") + yield query_claude.with_name("claude") + yield query_o1.with_name("o1") + + def state(self, state: State, inputs: Dict[str, Any]) -> State: + return state # Pass same state to all actions + + def reduce(self, state: State, states): + results = [s["result"] for s in states] + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["prompt"] + + @property + def writes(self) -> list[str]: + return ["all_results"] +``` + +### MapActionsAndStates + +Run all combinations of actions and states (cartesian product). + +```python +from burr.core.parallelism import MapActionsAndStates + +class TestModelsAndPrompts(MapActionsAndStates): + def actions(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + for model in ["gpt-4", "claude", "o1"]: + yield query_llm.bind(model=model).with_name(f"query_{model}") + + def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + for prompt in state["prompts"]: + yield state.update(prompt=prompt) + + def reduce(self, state: State, states): + results = [] + for s in states: + results.append({"model": s["model"], "prompt": s["prompt"], "result": s["result"]}) + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["prompts"] + + @property + def writes(self) -> list[str]: + return ["all_results"] +``` + +### RunnableGraph + +Wrap a graph for use as a subgraph in parallel execution. + +```python +from burr.core.parallelism import RunnableGraph +from burr.core.graph import GraphBuilder + +graph = ( + GraphBuilder() + .with_actions(action1, action2, action3) + .with_transitions( + ("action1", "action2"), + ("action2", "action3") + ) + .build() +) + +runnable = RunnableGraph( + graph=graph, + entrypoint="action1", + halt_after=["action3"] +) + +# Use in MapStates or MapActions +class RunSubgraphs(MapStates): + def action(self, state: State, inputs: Dict[str, Any]): + return runnable # Return the RunnableGraph + + def states(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + for item in state["items"]: + yield state.update(current_item=item) + + def reduce(self, state: State, states): + results = [s["final_result"] for s in states] + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["items"] + + @property + def writes(self) -> list[str]: + return ["all_results"] +``` + +### Executors + +Control how parallel tasks are executed. + +```python +from concurrent.futures import ThreadPoolExecutor + +# Use multithreading (default) +app = ( + ApplicationBuilder() + .with_parallel_executor(ThreadPoolExecutor(max_workers=10)) + .with_actions(parallel_action=MyParallelAction()) + .build() +) + +# For Ray-based distributed execution +from burr.integrations.ray import RayExecutor +import ray + +ray.init() +app = ( + ApplicationBuilder() + .with_parallel_executor(RayExecutor()) + .with_actions(parallel_action=MyParallelAction()) + .build() +) +``` + +## Persistence + +### Built-in Persisters + +**SQLite Persister:** +```python +from burr.core.persistence import SQLLitePersister + +persister = SQLLitePersister( + db_path="app.db", + table_name="burr_state", + connect_kwargs={"check_same_thread": False} +) +persister.initialize() +``` + +**Using with ApplicationBuilder:** +```python +app = ( + ApplicationBuilder() + .with_actions(...) + .with_transitions(...) + .with_identifiers(app_id="my-app", partition_key="user-123") + .with_state_persister(persister) + .initialize_from( + persister, + resume_at_next_action=True, + default_state={"counter": 0}, + default_entrypoint="start" + ) + .build() +) +``` + +### Custom Persisters + +Implement `BaseStatePersister` interface: + +```python +from burr.core.persistence import BaseStatePersister + +class CustomPersister(BaseStatePersister): + def list_app_ids(self, partition_key: str, **kwargs) -> list[str]: + """List all app IDs for a partition.""" + pass + + def load(self, partition_key: str, app_id: str, **kwargs) -> dict: + """Load persisted state.""" + pass + + def save(self, partition_key: str, app_id: str, + state: State, **kwargs) -> dict: + """Save state.""" + pass +``` + +## Tracking & Telemetry + +### Local Tracking + +```python +from burr.tracking import LocalTrackingClient + +tracker = LocalTrackingClient( + project="my_project", + storage_dir="~/.burr" +) + +app = ( + ApplicationBuilder() + .with_actions(...) + .with_tracker(tracker) + .build() +) +``` + +**Launch UI:** +```bash +burr +``` + +### Tracking Integrations + +- **Langsmith**: `from burr.integrations.langsmith import LangsmithTracker` +- **Weights & Biases**: `from burr.integrations.wandb import WandbTracker` +- **OpenTelemetry**: Built-in support for OTEL tracing + +## Hooks + +Hooks provide lifecycle callbacks for observability and integration. + +### Available Hooks + +```python +from burr.lifecycle import LifecycleAdapter + +class PrePostActionHookAsync(LifecycleAdapter): + async def pre_run_step(self, action: Action, **kwargs): + """Called before each action.""" + pass + + async def post_run_step(self, action: Action, result: dict, state: State, **kwargs): + """Called after each action.""" + pass +``` + +### Common Hook Use Cases + +- Logging and monitoring +- Performance tracking +- External system integration +- State validation +- Debugging and inspection + +## Common Helper Functions + +### Result Action + +Special action that returns a result and halts: + +```python +from burr.core import Result + +app = ( + ApplicationBuilder() + .with_actions( + compute=my_action, + result=Result("output_key") + ) + .with_transitions( + ("compute", "result") + ) + .build() +) +``` + +### Input Action + +Special action that captures runtime inputs: + +```python +from burr.core import Input + +app = ( Review Comment: `Input` is not exported from `burr.core.__init__.py`. The `__all__` only includes `Result`, not `Input`. This import should be `from burr.core.action import Input`. ########## .claude/plugins/burr/skills/burr/examples.md: ########## @@ -0,0 +1,722 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Apache Burr Code Examples + +Common patterns and working examples for building Apache Burr applications. + +## Table of Contents +1. [Simple Counter](#simple-counter) +2. [Basic Chatbot](#basic-chatbot) +3. [Multi-Step Workflow](#multi-step-workflow) +4. [Conditional Branching](#conditional-branching) +5. [Looping with Conditions](#looping-with-conditions) +6. [Error Handling](#error-handling) +7. [Streaming Actions](#streaming-actions) +8. [Parallel Execution](#parallel-execution) +9. [State Persistence](#state-persistence) +10. [RAG Pattern](#rag-pattern) +11. [Using Action Binding](#using-action-binding) +12. [Testing Actions](#testing-actions) +13. [Pydantic Typed State](#pydantic-typed-state) + +--- + +## Simple Counter + +Minimal example showing state updates and transitions. + +```python +from burr.core import action, State, ApplicationBuilder, default, expr + +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + # Read from state using bracket notation + result = {"counter": state["counter"] + 1} + # State methods return new State objects + return state.update(**result) + +@action(reads=["counter"], writes=["result"]) +def finish(state: State) -> State: + # Access state values with state["key"] + result = {"result": f"Final count: {state['counter']}"} + return state.update(**result) + +app = ( + ApplicationBuilder() + .with_actions(increment, finish) + .with_transitions( + ("increment", "increment", expr("counter < 10")), + ("increment", "finish", default) + ) + .with_state(counter=0) + .with_entrypoint("increment") + .build() +) + +# run() returns (action, result_dict, final_state) +action, result, final_state = app.run(halt_after=["finish"]) +print(final_state["result"]) # "Final count: 10" +``` + +## Basic Chatbot + +Classic chatbot pattern with user input and AI response. + +```python +from burr.core import action, State, ApplicationBuilder, default +from typing import Tuple + +@action(reads=[], writes=["chat_history", "prompt"]) +def human_input(state: State, prompt: str) -> Tuple[dict, State]: + """Capture user input.""" + # Build chat item + chat_item = {"role": "user", "content": prompt} + + # Return (result_dict, new_state) + # Chain state updates: update() returns State, then append() returns new State + return {"prompt": prompt}, state.update(prompt=prompt).append(chat_history=chat_item) + +@action(reads=["chat_history"], writes=["response", "chat_history"]) +def ai_response(state: State) -> Tuple[dict, State]: + """Generate AI response.""" + # Read from state using bracket notation + chat_history = state["chat_history"] + + # Call your LLM + response = call_llm(chat_history) + chat_item = {"role": "assistant", "content": response} + + # Return result and chained state updates + return {"response": response}, state.update(response=response).append( + chat_history=chat_item + ) + +app = ( + ApplicationBuilder() + .with_actions(human_input, ai_response) + .with_transitions( + ("human_input", "ai_response"), + ("ai_response", "human_input") + ) + .with_state(chat_history=[]) + .with_entrypoint("human_input") + .with_tracker("local", project="chatbot") + .build() +) + +# Run one turn of conversation +# run() returns (action, result_dict, final_state) +action, result, state = app.run( + halt_after=["ai_response"], + inputs={"prompt": "Hello, how are you?"} +) +print(result["response"]) # Access result from result dict +print(state["chat_history"]) # Access state using bracket notation +``` + +## Multi-Step Workflow + +Chain multiple actions sequentially. + +```python +@action(reads=["raw_text"], writes=["cleaned_text"]) +def clean_text(state: State) -> State: + """Remove special characters and normalize.""" + cleaned = state["raw_text"].lower().strip() + return state.update(cleaned_text=cleaned) + +@action(reads=["cleaned_text"], writes=["tokens"]) +def tokenize(state: State) -> State: + """Split into tokens.""" + tokens = state["cleaned_text"].split() + return state.update(tokens=tokens) + +@action(reads=["tokens"], writes=["summary"]) +def summarize(state: State) -> State: + """Generate summary.""" + summary = f"Processed {len(state['tokens'])} tokens" + return state.update(summary=summary) + +app = ( + ApplicationBuilder() + .with_actions(clean_text, tokenize, summarize) + .with_transitions( + ("clean_text", "tokenize"), + ("tokenize", "summarize") + ) + .with_state(raw_text=" Hello World! ") + .with_entrypoint("clean_text") + .build() +) + +_, _, final_state = app.run(halt_after=["summarize"]) +``` + +## Conditional Branching + +Route execution based on state values. + +```python +from burr.core import when + +@action(reads=["user_type"], writes=["message"]) +def check_user_type(state: State, user_type: str) -> State: + return state.update(user_type=user_type) + +@action(reads=[], writes=["greeting"]) +def admin_greeting(state: State) -> State: + return state.update(greeting="Welcome, Administrator!") + +@action(reads=[], writes=["greeting"]) +def user_greeting(state: State) -> State: + return state.update(greeting="Welcome, User!") + +@action(reads=[], writes=["greeting"]) +def guest_greeting(state: State) -> State: + return state.update(greeting="Welcome, Guest!") + +app = ( + ApplicationBuilder() + .with_actions( + check_user_type, + admin_greeting, + user_greeting, + guest_greeting + ) + .with_transitions( + ("check_user_type", "admin_greeting", when(user_type="admin")), + ("check_user_type", "user_greeting", when(user_type="user")), + ("check_user_type", "guest_greeting", default) + ) + .with_entrypoint("check_user_type") + .build() +) + +_, _, state = app.run( + halt_after=["admin_greeting", "user_greeting", "guest_greeting"], + inputs={"user_type": "admin"} +) +``` + +## Looping with Conditions + +Implement loops using recursive transitions. + +```python +@action(reads=["items", "processed"], writes=["processed", "current_item"]) +def process_item(state: State) -> State: + """Process next item from list.""" + items = state["items"] + processed_count = state.get("processed", 0) + + current_item = items[processed_count] + # Process the item + result = transform(current_item) + + return state.update( + processed=processed_count + 1, + current_item=result + ) + +@action(reads=["processed"], writes=["done"]) +def finish_processing(state: State) -> State: + return state.update(done=True) + +app = ( + ApplicationBuilder() + .with_actions(process_item, finish_processing) + .with_transitions( + ("process_item", "process_item", expr("processed < len(items)")), + ("process_item", "finish_processing", default) + ) + .with_state(items=["a", "b", "c"], processed=0) + .with_entrypoint("process_item") + .build() +) +``` + +## Error Handling + +Handle errors gracefully by routing to error actions. + +```python +@action(reads=["data"], writes=["result", "error"]) +def risky_operation(state: State) -> State: + """Operation that might fail.""" + try: + result = dangerous_function(state["data"]) + return state.update(result=result, error=None) + except Exception as e: + return state.update(result=None, error=str(e)) + +@action(reads=["result"], writes=["success_message"]) +def handle_success(state: State) -> State: + return state.update(success_message=f"Success: {state['result']}") + +@action(reads=["error"], writes=["error_message"]) +def handle_error(state: State) -> State: + return state.update(error_message=f"Error: {state['error']}") + +@action(reads=["data"], writes=["result", "retry_count"]) +def retry_operation(state: State) -> State: + """Retry the operation.""" + retry_count = state.get("retry_count", 0) + 1 + try: + result = dangerous_function(state["data"]) + return state.update(result=result, error=None, retry_count=retry_count) + except Exception as e: + return state.update(result=None, error=str(e), retry_count=retry_count) + +app = ( + ApplicationBuilder() + .with_actions( + risky_operation, + handle_success, + handle_error, + retry_operation + ) + .with_transitions( + ("risky_operation", "handle_success", when(error=None)), + ("risky_operation", "retry_operation", + expr("error is not None and retry_count < 3")), + ("risky_operation", "handle_error", default), + ("retry_operation", "handle_success", when(error=None)), + ("retry_operation", "retry_operation", + expr("error is not None and retry_count < 3")), + ("retry_operation", "handle_error", default) + ) + .with_state(data="input", retry_count=0) + .with_entrypoint("risky_operation") + .build() +) +``` + +## Streaming Actions + +Stream intermediate results as they're generated. + +```python +from typing import Generator, Tuple + +@action(reads=["prompt"], writes=["response", "chunks"]) +def streaming_llm(state: State) -> Generator[State, None, Tuple[dict, State]]: + """Stream LLM response token by token.""" + chunks = [] + + # Stream tokens from LLM + for token in llm_stream(state["prompt"]): + chunks.append(token) + # Yield intermediate state + yield state.update( + chunks=chunks, + response="".join(chunks) + ) + + # Return final result + final_response = "".join(chunks) + result = {"response": final_response} + return result, state.update(**result) + +app = ( + ApplicationBuilder() + .with_actions(streaming_llm) + .with_state(prompt="Write a story") + .with_entrypoint("streaming_llm") + .build() +) + +# Stream results +for state in app.stream_result(halt_after=["streaming_llm"]): + print(state["response"], end="", flush=True) +``` + +## Parallel Execution + +Execute multiple actions in parallel using Apache Burr's parallelism APIs. + +```python +from burr.core.parallelism import MapActions, RunnableGraph +from burr.core import action, State, ApplicationContext +from typing import Dict, Any + +@action(reads=["text"], writes=["sentiment"]) +def analyze_sentiment(state: State) -> State: + sentiment = get_sentiment(state["text"]) + return state.update(sentiment=sentiment) + +@action(reads=["text"], writes=["entities"]) +def extract_entities(state: State) -> State: + entities = extract_ner(state["text"]) + return state.update(entities=entities) + +@action(reads=["text"], writes=["keywords"]) +def extract_keywords(state: State) -> State: + keywords = get_keywords(state["text"]) + return state.update(keywords=keywords) + +# Run multiple actions in parallel on the same state +class ParallelTextAnalysis(MapActions): + def actions(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + yield analyze_sentiment.with_name("sentiment_analysis") + yield extract_entities.with_name("entity_extraction") + yield extract_keywords.with_name("keyword_extraction") + + def state(self, state: State, inputs: Dict[str, Any]) -> State: + return state # Pass state as-is to all actions + + def reduce(self, state: State, states) -> State: + """Combine all analysis results.""" + analysis = {} + for sub_state in states: + if "sentiment" in sub_state: + analysis["sentiment"] = sub_state["sentiment"] + if "entities" in sub_state: + analysis["entities"] = sub_state["entities"] + if "keywords" in sub_state: + analysis["keywords"] = sub_state["keywords"] + return state.update(analysis=analysis) + + @property + def reads(self) -> list[str]: + return ["text"] + + @property + def writes(self) -> list[str]: + return ["analysis"] + +app = ( + ApplicationBuilder() + .with_actions(parallel_analysis=ParallelTextAnalysis()) + .with_transitions(("parallel_analysis", "parallel_analysis")) # Or continue to next action + .with_state(text="Sample text to analyze") + .with_entrypoint("parallel_analysis") + .build() +) +``` + +## State Persistence + +Save and resume application state. + +```python +from burr.core.persistence import SQLLitePersister + +@action(reads=["step"], writes=["step", "result"]) +def long_running_step(state: State, step_name: str) -> State: + """Simulate a long-running operation.""" + result = expensive_computation(step_name) + return state.update( Review Comment: `SQLLitePersister` should be `SQLitePersister` (single L). Same issue across all files. ########## .claude/plugins/burr/skills/burr/patterns.md: ########## @@ -0,0 +1,754 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Apache Burr Design Patterns & Best Practices + +Architectural guidance and best practices for building production-ready Apache Burr applications. + +## Core Design Principles + +### 1. Single Responsibility Actions + +Each action should do one thing well. + +**❌ Bad - Action does too much:** +```python +@action(reads=["query"], writes=["response", "documents", "reranked", "formatted"]) +def do_everything(state: State) -> State: + # Retrieves, reranks, generates, and formats all in one + docs = retrieve(state["query"]) + reranked = rerank(docs) + response = generate(reranked) + formatted = format(response) + return state.update( + documents=docs, + reranked=reranked, + response=response, + formatted=formatted + ) +``` + +**✅ Good - Focused actions:** +```python +@action(reads=["query"], writes=["documents"]) +def retrieve_documents(state: State) -> State: + docs = retrieve(state["query"]) + return state.update(documents=docs) + +@action(reads=["documents"], writes=["reranked"]) +def rerank_documents(state: State) -> State: + reranked = rerank(state["documents"]) + return state.update(reranked=reranked) + +# ... separate actions for generate and format +``` + +**Benefits:** +- Easier to test +- Easier to debug (can see which action failed) +- Reusable components +- Clear visualization in the Burr UI + +### 2. Accurate reads/writes Declarations + +Declare exactly what each action reads and writes. + +**❌ Bad:** +```python +@action(reads=[], writes=[]) # Inaccurate! +def process_user(state: State) -> State: + user_id = state["user_id"] # Actually reads user_id + profile = fetch_profile(user_id) + return state.update(profile=profile) # Actually writes profile +``` + +**✅ Good:** +```python +@action(reads=["user_id"], writes=["profile"]) +def process_user(state: State) -> State: + user_id = state["user_id"] + profile = fetch_profile(user_id) + return state.update(profile=profile) +``` + +**Benefits:** +- Self-documenting code +- Better debugging in UI +- Enables future optimizations +- Catches errors early + +## State Management Patterns + +### Regular State (Dictionary-Based) + +**Reading from state:** +```python +# Use bracket notation to access state values +value = state["key"] +chat_history = state["chat_history"] +counter = state["counter"] +``` + +**Updating state:** +State is immutable. Methods return NEW State objects: +```python +# state.update() - set/update keys, returns new State +new_state = state.update(counter=5, name="Alice") + +# state.append() - append to lists, returns new State +new_state = state.append(chat_history={"role": "user", "content": "hi"}) + +# state.increment() - increment numbers, returns new State +new_state = state.increment(counter=1) + +# Chaining - each method returns a State, enabling fluent patterns +new_state = state.update(prompt=prompt).append(chat_history=item) +``` + +**Action return pattern:** +Actions return `Tuple[dict, State]`: +```python +from typing import Tuple + +@action(reads=["prompt"], writes=["response", "chat_history"]) +def ai_respond(state: State) -> Tuple[dict, State]: + # 1. Read from state + prompt = state["prompt"] + + # 2. Process + response = call_llm(prompt) + + # 3. Return (result_dict, new_state) + # result_dict is exposed to callers/tracking + # new_state is the updated immutable state + return {"response": response}, state.update(response=response).append( + chat_history={"role": "assistant", "content": response} + ) +``` + +**Shorthand (also valid):** +```python +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + result = {"counter": state["counter"] + 1} + # Framework infers result from state updates + return state.update(**result) +``` + +### Pydantic Typed State (Different Pattern) + +**Define state model:** +```python +from pydantic import BaseModel, Field +from typing import Optional + +class ApplicationState(BaseModel): + prompt: Optional[str] = Field(default=None, description="User prompt") + response: Optional[str] = Field(default=None, description="AI response") + chat_history: list[dict] = Field(default_factory=list) +``` + +**Configure application:** +```python +from burr.integrations.pydantic import PydanticTypingSystem + +app = ( + ApplicationBuilder() + .with_typing(PydanticTypingSystem(ApplicationState)) + .with_state(ApplicationState()) + .build() +) +``` + +**Access typed state:** +```python +# Use attribute access (not bracket notation) [email protected](reads=["prompt"], writes=["response"]) +def ai_respond(state: ApplicationState) -> ApplicationState: + # 1. Read using attributes + prompt = state.prompt + + # 2. Process + response = call_llm(prompt) + + # 3. Mutate in-place and return state + # (Mutation happens on internal copy) + state.response = response + return state +``` + +**Key differences:** + +| Aspect | Regular State | Pydantic Typed State | +|--------|---------------|---------------------| +| **Access** | `state["key"]` | `state.key` | +| **Return** | `Tuple[dict, State]` | `ApplicationState` | +| **Decorator** | `@action(reads=[], writes=[])` | `@action.pydantic(reads=[], writes=[])` | +| **Updates** | Must use `.update()`, `.append()` | In-place mutation | +| **Type Safety** | Runtime only | IDE support + validation | + +## Common Patterns + +### Pattern: Request-Response Cycle + +For chatbots and conversational AI. + +```python +@action(reads=[], writes=["messages", "current_prompt"]) +def receive_message(state: State, prompt: str) -> State: + """Accept user input.""" + message = {"role": "user", "content": prompt} + return ( + state.append(messages=message) + .update(current_prompt=prompt) + ) + +@action(reads=["messages"], writes=["messages", "response"]) +def generate_response(state: State) -> State: + """Generate AI response.""" + response = llm_call(state["messages"]) + message = {"role": "assistant", "content": response} + return ( + state.append(messages=message) + .update(response=response) + ) + +@action(reads=["response"], writes=["display"]) +def format_output(state: State) -> State: + """Format for display.""" + return state.update(display=format_markdown(state["response"])) + +app = ( + ApplicationBuilder() + .with_actions(receive_message, generate_response, format_output) + .with_transitions( + ("receive_message", "generate_response"), + ("generate_response", "format_output"), + ("format_output", "receive_message") # Loop back for next message + ) + .with_state(messages=[]) + .with_entrypoint("receive_message") + .build() +) +``` + +### Pattern: Error Recovery with Retries + +Handle transient failures gracefully. + +```python +@action(reads=["url", "retry_count"], writes=["data", "error", "retry_count"]) +def fetch_with_retry(state: State) -> State: + """Fetch data with retry logic.""" + try: + data = http_get(state["url"]) + return state.update(data=data, error=None) + except Exception as e: + retry_count = state.get("retry_count", 0) + 1 + return state.update( + error=str(e), + retry_count=retry_count + ) + +@action(reads=["data"], writes=["processed"]) +def process_success(state: State) -> State: + """Process successful fetch.""" + return state.update(processed=transform(state["data"])) + +@action(reads=["error", "retry_count"], writes=["final_error"]) +def handle_failure(state: State) -> State: + """Handle permanent failure.""" + return state.update( + final_error=f"Failed after {state['retry_count']} retries: {state['error']}" + ) + +app = ( + ApplicationBuilder() + .with_actions(fetch_with_retry, process_success, handle_failure) + .with_transitions( + # Success path + ("fetch_with_retry", "process_success", when(error=None)), + # Retry path + ("fetch_with_retry", "fetch_with_retry", + expr("error is not None and retry_count < 3")), + # Failure path + ("fetch_with_retry", "handle_failure", default) + ) + .with_state(url="https://api.example.com", retry_count=0) + .with_entrypoint("fetch_with_retry") + .build() +) +``` + +### Pattern: Multi-Stage Pipeline + +Sequential data processing pipeline. + +```python +@action(reads=["raw_data"], writes=["validated_data"]) +def validate(state: State) -> State: + """Validate input data.""" + validated = validate_schema(state["raw_data"]) + return state.update(validated_data=validated) + +@action(reads=["validated_data"], writes=["transformed_data"]) +def transform(state: State) -> State: + """Transform data.""" + transformed = apply_transformations(state["validated_data"]) + return state.update(transformed_data=transformed) + +@action(reads=["transformed_data"], writes=["enriched_data"]) +def enrich(state: State) -> State: + """Enrich with external data.""" + enriched = add_external_data(state["transformed_data"]) + return state.update(enriched_data=enriched) + +@action(reads=["enriched_data"], writes=["result"]) +def finalize(state: State) -> State: + """Finalize output.""" + result = create_output(state["enriched_data"]) + return state.update(result=result) + +# Simple linear pipeline +app = ( + ApplicationBuilder() + .with_actions(validate, transform, enrich, finalize) + .with_transitions( + ("validate", "transform"), + ("transform", "enrich"), + ("enrich", "finalize") + ) + .with_entrypoint("validate") + .build() +) +``` + +### Pattern: Branching Decision Tree + +Route based on complex conditions. + +```python +@action(reads=["content"], writes=["analysis"]) +def analyze_content(state: State) -> State: + """Analyze content type and complexity.""" + analysis = { + "content_type": detect_type(state["content"]), + "complexity": calculate_complexity(state["content"]), + "language": detect_language(state["content"]) + } + return state.update(analysis=analysis) + +@action(reads=["content"], writes=["result"]) +def handle_simple_text(state: State) -> State: + return state.update(result=simple_processor(state["content"])) + +@action(reads=["content"], writes=["result"]) +def handle_complex_text(state: State) -> State: + return state.update(result=complex_processor(state["content"])) + +@action(reads=["content"], writes=["result"]) +def handle_code(state: State) -> State: + return state.update(result=code_processor(state["content"])) + +@action(reads=["content"], writes=["result"]) +def handle_unsupported(state: State) -> State: + return state.update(result={"error": "Unsupported content type"}) + +app = ( + ApplicationBuilder() + .with_actions( + analyze_content, + handle_simple_text, + handle_complex_text, + handle_code, + handle_unsupported + ) + .with_transitions( + ("analyze_content", "handle_simple_text", + expr("analysis['content_type'] == 'text' and analysis['complexity'] < 5")), + ("analyze_content", "handle_complex_text", + expr("analysis['content_type'] == 'text' and analysis['complexity'] >= 5")), + ("analyze_content", "handle_code", + when(analysis={"content_type": "code"})), + ("analyze_content", "handle_unsupported", default) + ) + .with_entrypoint("analyze_content") + .build() +) +``` + +### Pattern: Aggregating Parallel Results + +Run multiple analyses in parallel and combine using MapActions. + +```python +from burr.core.parallelism import MapActions +from burr.core import action, State, ApplicationContext +from typing import Dict, Any +from datetime import datetime + +@action(reads=["document"], writes=["summary"]) +async def summarize(state: State) -> State: + summary = await llm_summarize(state["document"]) + return state.update(summary=summary) + +@action(reads=["document"], writes=["sentiment"]) +async def analyze_sentiment(state: State) -> State: + sentiment = await get_sentiment(state["document"]) + return state.update(sentiment=sentiment) + +@action(reads=["document"], writes=["topics"]) +async def extract_topics(state: State) -> State: + topics = await get_topics(state["document"]) + return state.update(topics=topics) + +class ParallelDocumentAnalysis(MapActions): + async def actions(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + yield summarize.with_name("summarize") + yield analyze_sentiment.with_name("analyze_sentiment") + yield extract_topics.with_name("extract_topics") + + async def state(self, state: State, inputs: Dict[str, Any]) -> State: + return state # Pass the same state to all actions + + async def reduce(self, state: State, states) -> State: + """Aggregate all analyses into final report.""" + report = {"timestamp": datetime.now()} + async for sub_state in states: + if "summary" in sub_state: + report["summary"] = sub_state["summary"] + if "sentiment" in sub_state: + report["sentiment"] = sub_state["sentiment"] + if "topics" in sub_state: + report["topics"] = sub_state["topics"] + return state.update(report=report) + + @property + def is_async(self) -> bool: + return True + + @property + def reads(self) -> list[str]: + return ["document"] + + @property + def writes(self) -> list[str]: + return ["report"] + +app = ( + ApplicationBuilder() + .with_actions(analyze_doc=ParallelDocumentAnalysis()) + .with_entrypoint("analyze_doc") + .build() +) +``` + +### Pattern: State Machine with Memory + +Maintain conversation context and history. + +```python +@action(reads=["history"], writes=["history", "current_query"]) +def add_to_history(state: State, query: str) -> State: + """Add query to history with metadata.""" + history_item = { + "query": query, + "timestamp": datetime.now(), + "session_id": state.get("session_id") + } + return ( + state.append(history=history_item) + .update(current_query=query) + ) + +@action(reads=["history", "current_query"], writes=["response"]) +def generate_with_context(state: State) -> State: + """Generate response using conversation history.""" + # Build context from history + context = build_context_from_history(state["history"]) + + # Generate with full context + response = llm_call_with_context( + query=state["current_query"], + context=context + ) + return state.update(response=response) + +@action(reads=["history"], writes=["should_summarize"]) +def check_history_length(state: State) -> State: + """Check if history needs summarization.""" + should_summarize = len(state["history"]) > 10 + return state.update(should_summarize=should_summarize) + +@action(reads=["history"], writes=["history", "summary"]) +def summarize_history(state: State) -> State: + """Compress old history.""" + summary = create_summary(state["history"][:-5]) + recent = state["history"][-5:] + return state.update( + history=recent, + summary=summary + ) +``` + +## Best Practices + +### Testing Strategy + +**Unit test individual actions:** +```python +def test_action(): + state = State({"input": "test"}) + result = my_action(state) + assert result["output"] == "expected" +``` + +**Integration test the state machine:** +```python +def test_full_flow(): + app = build_app() + _, _, final_state = app.run(halt_after=["end"]) + assert final_state["result"] == expected_value +``` + +**Test with mock state:** +```python +def test_with_fixtures(): + state = State({ + "user": {"id": 123, "name": "Test"}, + "settings": {"mode": "test"} + }) + result = complex_action(state) + assert result["processed"] is True +``` + +### Observability + +**Always enable tracking during development:** +```python +app = ( + ApplicationBuilder() + .with_actions(...) + .with_tracker("local", project="my_app") + .build() +) +``` + +**Use the Burr UI to:** +- Visualize state machine execution +- Inspect state at each step +- Debug transition logic +- Profile action performance + +**Add custom hooks for production:** +```python +from burr.lifecycle import LifecycleAdapter + +class MetricsHook(LifecycleAdapter): + def post_run_step(self, action, result, state, **kwargs): + # Log metrics to your monitoring system + log_metric(f"action.{action.name}.duration", kwargs["duration"]) + log_metric(f"action.{action.name}.success", 1) +``` + +### State Management + +**Keep state flat when possible:** +```python +# ✅ Good +state = State({ + "user_id": 123, + "user_name": "Alice", + "user_email": "[email protected]" +}) + +# ❌ Avoid deep nesting (harder to track) +state = State({ + "user": { + "profile": { + "personal": { + "name": "Alice" + } + } + } +}) +``` + +**Use meaningful key names:** +```python +# ✅ Good +state.update(validated_user_email="[email protected]") + +# ❌ Bad +state.update(ve="[email protected]") +``` + +### Performance Optimization + +**Use parallel execution for independent operations:** +```python +# Use MapActions to run independent operations in parallel +from burr.core.parallelism import MapActions + +class FetchAllData(MapActions): + def actions(self, state, context, inputs): + yield fetch_user.with_name("fetch_user") + yield fetch_products.with_name("fetch_products") + yield fetch_orders.with_name("fetch_orders") + + def state(self, state, inputs): + return state + + def reduce(self, state, states): + combined = {} + for s in states: + combined.update(s.get_all()) + return state.update(**combined) + + reads = [] + writes = ["user", "products", "orders"] +``` + +**Keep actions lightweight:** +```python +# ❌ Bad - Heavy computation in action +@action(reads=["data"], writes=["result"]) +def process(state: State) -> State: + # Hours of computation + result = train_ml_model(state["data"]) + return state.update(result=result) + +# ✅ Better - Break into steps with state persistence +@action(reads=["data"], writes=["preprocessed"]) +def preprocess(state: State) -> State: + return state.update(preprocessed=preprocess_data(state["data"])) + +@action(reads=["preprocessed"], writes=["checkpoint"]) +def train_epoch(state: State) -> State: + # Train one epoch, save checkpoint + checkpoint = train_one_epoch(state["preprocessed"]) + return state.update(checkpoint=checkpoint) +``` + +### Production Deployment + +**Enable persistence for long-running workflows:** +```python +from burr.core.persistence import SQLLitePersister + +persister = SQLLitePersister("prod.db", "workflows") +app = ( + ApplicationBuilder() + .with_actions(...) + .with_state_persister(persister) Review Comment: `SQLLitePersister` should be `SQLitePersister` (single L). ########## .claude/plugins/burr/skills/burr/examples.md: ########## @@ -0,0 +1,722 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Apache Burr Code Examples + +Common patterns and working examples for building Apache Burr applications. + +## Table of Contents +1. [Simple Counter](#simple-counter) +2. [Basic Chatbot](#basic-chatbot) +3. [Multi-Step Workflow](#multi-step-workflow) +4. [Conditional Branching](#conditional-branching) +5. [Looping with Conditions](#looping-with-conditions) +6. [Error Handling](#error-handling) +7. [Streaming Actions](#streaming-actions) +8. [Parallel Execution](#parallel-execution) +9. [State Persistence](#state-persistence) +10. [RAG Pattern](#rag-pattern) +11. [Using Action Binding](#using-action-binding) +12. [Testing Actions](#testing-actions) +13. [Pydantic Typed State](#pydantic-typed-state) + +--- + +## Simple Counter + +Minimal example showing state updates and transitions. + +```python +from burr.core import action, State, ApplicationBuilder, default, expr + +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + # Read from state using bracket notation + result = {"counter": state["counter"] + 1} + # State methods return new State objects + return state.update(**result) + +@action(reads=["counter"], writes=["result"]) +def finish(state: State) -> State: + # Access state values with state["key"] + result = {"result": f"Final count: {state['counter']}"} + return state.update(**result) + +app = ( + ApplicationBuilder() + .with_actions(increment, finish) + .with_transitions( + ("increment", "increment", expr("counter < 10")), + ("increment", "finish", default) + ) + .with_state(counter=0) + .with_entrypoint("increment") + .build() +) + +# run() returns (action, result_dict, final_state) +action, result, final_state = app.run(halt_after=["finish"]) +print(final_state["result"]) # "Final count: 10" +``` + +## Basic Chatbot + +Classic chatbot pattern with user input and AI response. + +```python +from burr.core import action, State, ApplicationBuilder, default +from typing import Tuple + +@action(reads=[], writes=["chat_history", "prompt"]) +def human_input(state: State, prompt: str) -> Tuple[dict, State]: + """Capture user input.""" + # Build chat item + chat_item = {"role": "user", "content": prompt} + + # Return (result_dict, new_state) + # Chain state updates: update() returns State, then append() returns new State + return {"prompt": prompt}, state.update(prompt=prompt).append(chat_history=chat_item) + +@action(reads=["chat_history"], writes=["response", "chat_history"]) +def ai_response(state: State) -> Tuple[dict, State]: + """Generate AI response.""" + # Read from state using bracket notation + chat_history = state["chat_history"] + + # Call your LLM + response = call_llm(chat_history) + chat_item = {"role": "assistant", "content": response} + + # Return result and chained state updates + return {"response": response}, state.update(response=response).append( + chat_history=chat_item + ) + +app = ( + ApplicationBuilder() + .with_actions(human_input, ai_response) + .with_transitions( + ("human_input", "ai_response"), + ("ai_response", "human_input") + ) + .with_state(chat_history=[]) + .with_entrypoint("human_input") + .with_tracker("local", project="chatbot") + .build() +) + +# Run one turn of conversation +# run() returns (action, result_dict, final_state) +action, result, state = app.run( + halt_after=["ai_response"], + inputs={"prompt": "Hello, how are you?"} +) +print(result["response"]) # Access result from result dict +print(state["chat_history"]) # Access state using bracket notation +``` + +## Multi-Step Workflow + +Chain multiple actions sequentially. + +```python +@action(reads=["raw_text"], writes=["cleaned_text"]) +def clean_text(state: State) -> State: + """Remove special characters and normalize.""" + cleaned = state["raw_text"].lower().strip() + return state.update(cleaned_text=cleaned) + +@action(reads=["cleaned_text"], writes=["tokens"]) +def tokenize(state: State) -> State: + """Split into tokens.""" + tokens = state["cleaned_text"].split() + return state.update(tokens=tokens) + +@action(reads=["tokens"], writes=["summary"]) +def summarize(state: State) -> State: + """Generate summary.""" + summary = f"Processed {len(state['tokens'])} tokens" + return state.update(summary=summary) + +app = ( + ApplicationBuilder() + .with_actions(clean_text, tokenize, summarize) + .with_transitions( + ("clean_text", "tokenize"), + ("tokenize", "summarize") + ) + .with_state(raw_text=" Hello World! ") + .with_entrypoint("clean_text") + .build() +) + +_, _, final_state = app.run(halt_after=["summarize"]) +``` + +## Conditional Branching + +Route execution based on state values. + +```python +from burr.core import when + +@action(reads=["user_type"], writes=["message"]) +def check_user_type(state: State, user_type: str) -> State: + return state.update(user_type=user_type) + +@action(reads=[], writes=["greeting"]) +def admin_greeting(state: State) -> State: + return state.update(greeting="Welcome, Administrator!") + +@action(reads=[], writes=["greeting"]) +def user_greeting(state: State) -> State: + return state.update(greeting="Welcome, User!") + +@action(reads=[], writes=["greeting"]) +def guest_greeting(state: State) -> State: + return state.update(greeting="Welcome, Guest!") + +app = ( + ApplicationBuilder() + .with_actions( + check_user_type, + admin_greeting, + user_greeting, + guest_greeting + ) + .with_transitions( + ("check_user_type", "admin_greeting", when(user_type="admin")), + ("check_user_type", "user_greeting", when(user_type="user")), + ("check_user_type", "guest_greeting", default) + ) + .with_entrypoint("check_user_type") + .build() +) + +_, _, state = app.run( + halt_after=["admin_greeting", "user_greeting", "guest_greeting"], + inputs={"user_type": "admin"} +) +``` + +## Looping with Conditions + +Implement loops using recursive transitions. + +```python +@action(reads=["items", "processed"], writes=["processed", "current_item"]) +def process_item(state: State) -> State: + """Process next item from list.""" + items = state["items"] + processed_count = state.get("processed", 0) + + current_item = items[processed_count] + # Process the item + result = transform(current_item) + + return state.update( + processed=processed_count + 1, + current_item=result + ) + +@action(reads=["processed"], writes=["done"]) +def finish_processing(state: State) -> State: + return state.update(done=True) + +app = ( + ApplicationBuilder() + .with_actions(process_item, finish_processing) + .with_transitions( + ("process_item", "process_item", expr("processed < len(items)")), + ("process_item", "finish_processing", default) + ) + .with_state(items=["a", "b", "c"], processed=0) + .with_entrypoint("process_item") + .build() +) +``` + +## Error Handling + +Handle errors gracefully by routing to error actions. + +```python +@action(reads=["data"], writes=["result", "error"]) +def risky_operation(state: State) -> State: + """Operation that might fail.""" + try: + result = dangerous_function(state["data"]) + return state.update(result=result, error=None) + except Exception as e: + return state.update(result=None, error=str(e)) + +@action(reads=["result"], writes=["success_message"]) +def handle_success(state: State) -> State: + return state.update(success_message=f"Success: {state['result']}") + +@action(reads=["error"], writes=["error_message"]) +def handle_error(state: State) -> State: + return state.update(error_message=f"Error: {state['error']}") + +@action(reads=["data"], writes=["result", "retry_count"]) +def retry_operation(state: State) -> State: + """Retry the operation.""" + retry_count = state.get("retry_count", 0) + 1 + try: + result = dangerous_function(state["data"]) + return state.update(result=result, error=None, retry_count=retry_count) + except Exception as e: + return state.update(result=None, error=str(e), retry_count=retry_count) + +app = ( + ApplicationBuilder() + .with_actions( + risky_operation, + handle_success, + handle_error, + retry_operation + ) + .with_transitions( + ("risky_operation", "handle_success", when(error=None)), + ("risky_operation", "retry_operation", + expr("error is not None and retry_count < 3")), + ("risky_operation", "handle_error", default), + ("retry_operation", "handle_success", when(error=None)), + ("retry_operation", "retry_operation", + expr("error is not None and retry_count < 3")), + ("retry_operation", "handle_error", default) + ) + .with_state(data="input", retry_count=0) + .with_entrypoint("risky_operation") + .build() +) +``` + +## Streaming Actions + +Stream intermediate results as they're generated. + +```python +from typing import Generator, Tuple + +@action(reads=["prompt"], writes=["response", "chunks"]) +def streaming_llm(state: State) -> Generator[State, None, Tuple[dict, State]]: + """Stream LLM response token by token.""" + chunks = [] + + # Stream tokens from LLM + for token in llm_stream(state["prompt"]): + chunks.append(token) + # Yield intermediate state + yield state.update( + chunks=chunks, + response="".join(chunks) + ) + + # Return final result + final_response = "".join(chunks) + result = {"response": final_response} + return result, state.update(**result) + +app = ( + ApplicationBuilder() + .with_actions(streaming_llm) + .with_state(prompt="Write a story") + .with_entrypoint("streaming_llm") + .build() +) + +# Stream results +for state in app.stream_result(halt_after=["streaming_llm"]): + print(state["response"], end="", flush=True) +``` + +## Parallel Execution + +Execute multiple actions in parallel using Apache Burr's parallelism APIs. + +```python +from burr.core.parallelism import MapActions, RunnableGraph +from burr.core import action, State, ApplicationContext +from typing import Dict, Any + +@action(reads=["text"], writes=["sentiment"]) +def analyze_sentiment(state: State) -> State: + sentiment = get_sentiment(state["text"]) + return state.update(sentiment=sentiment) + +@action(reads=["text"], writes=["entities"]) +def extract_entities(state: State) -> State: + entities = extract_ner(state["text"]) + return state.update(entities=entities) + +@action(reads=["text"], writes=["keywords"]) +def extract_keywords(state: State) -> State: + keywords = get_keywords(state["text"]) + return state.update(keywords=keywords) + +# Run multiple actions in parallel on the same state +class ParallelTextAnalysis(MapActions): + def actions(self, state: State, context: ApplicationContext, inputs: Dict[str, Any]): + yield analyze_sentiment.with_name("sentiment_analysis") + yield extract_entities.with_name("entity_extraction") + yield extract_keywords.with_name("keyword_extraction") + + def state(self, state: State, inputs: Dict[str, Any]) -> State: + return state # Pass state as-is to all actions + + def reduce(self, state: State, states) -> State: + """Combine all analysis results.""" + analysis = {} + for sub_state in states: + if "sentiment" in sub_state: + analysis["sentiment"] = sub_state["sentiment"] + if "entities" in sub_state: + analysis["entities"] = sub_state["entities"] + if "keywords" in sub_state: + analysis["keywords"] = sub_state["keywords"] + return state.update(analysis=analysis) + + @property + def reads(self) -> list[str]: + return ["text"] + + @property + def writes(self) -> list[str]: + return ["analysis"] + +app = ( + ApplicationBuilder() + .with_actions(parallel_analysis=ParallelTextAnalysis()) + .with_transitions(("parallel_analysis", "parallel_analysis")) # Or continue to next action + .with_state(text="Sample text to analyze") + .with_entrypoint("parallel_analysis") + .build() +) +``` + Review Comment: This transition `("parallel_analysis", "parallel_analysis")` creates an infinite self-loop with no exit condition. The `ParallelTextAnalysis` class itself correctly uses `MapActions`, but the app setup should show it integrated into a larger graph with a proper next step, not looping back to itself. ########## .claude/plugins/burr/skills/burr/troubleshooting.md: ########## @@ -0,0 +1,762 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +# Apache Burr Troubleshooting Guide + +Common issues and solutions when working with Apache Burr. + +## Installation Issues + +### Issue: `burr` command not found after installation + +**Problem:** +```bash +$ burr +command not found: burr +``` + +**Solutions:** + +1. Install with UI dependencies: +```bash +pip install "burr[start]" +``` + +2. Check if burr is in your PATH: +```bash +which burr +# or +python -m burr +``` + +3. If using poetry: +```bash +poetry add "burr[start]" +poetry run burr +``` + +### Issue: UI won't start or shows errors + +**Problem:** +``` +Error starting Burr UI: Module not found +``` + +**Solutions:** + +1. Ensure you installed the `[start]` extra: +```bash +pip install "burr[start]" +``` + +2. Check port is not already in use: +```bash +# Default is port 7241 +lsof -i :7241 +``` + +3. Specify custom port: +```bash +burr --port 8000 +``` + +4. Check storage directory permissions: +```bash +ls -la ~/.burr +``` + +## State Machine Issues + +### Issue: Infinite loops in transitions + +**Problem:** +```python +# State machine never halts +app.run(halt_after=["end"]) # Never reaches "end" +``` + +**Common Causes:** + +1. **Missing halt condition:** +```python +# ❌ Bad - loops forever +.with_transitions( + ("process", "process", default) # Always loops! +) + +# ✅ Good - has exit condition +.with_transitions( + ("process", "process", expr("counter < 10")), + ("process", "end", default) +) +``` + +2. **Condition never becomes true:** +```python +# ❌ Bad - condition may never be met +.with_transitions( + ("wait", "wait", when(status="pending")), + ("wait", "done", when(status="complete")) + # What if status is "error"? Stuck forever! +) + +# ✅ Good - always has fallback +.with_transitions( + ("wait", "wait", when(status="pending")), + ("wait", "done", when(status="complete")), + ("wait", "error_handler", default) +) +``` + +**Debugging:** +1. Use `.visualize()` to see the graph: +```python +app.visualize(output_file_path="debug.png", include_conditions=True) +``` + +2. Add logging in actions: +```python +@action(reads=["counter"], writes=["counter"]) +def process(state: State) -> State: + print(f"Counter: {state['counter']}") # Debug output + return state.update(counter=state["counter"] + 1) +``` + +3. Use the Burr UI to watch execution in real-time: +```bash +burr +``` + +### Issue: Wrong action executes + +**Problem:** +``` +Expected action 'process_data' but 'error_handler' executed instead +``` + +**Common Causes:** + +1. **Transition condition order matters:** +```python +# ❌ Bad - default matches first +.with_transitions( + ("check", "error", default), # This always matches! + ("check", "success", when(valid=True)) +) + +# ✅ Good - specific conditions first +.with_transitions( + ("check", "success", when(valid=True)), + ("check", "error", default) +) +``` + +2. **State value is not what you expect:** +```python +# Debug: Check actual state values +@action(reads=["value"], writes=["result"]) +def check_value(state: State) -> State: + print(f"Value is: {state['value']}, type: {type(state['value'])}") + # Maybe it's a string "True" not boolean True? + return state.update(result=state["value"]) +``` + +3. **Using `when()` with complex objects:** +```python +# ❌ Bad - object comparison may not work as expected +.with_transitions( + ("check", "next", when(user={"id": 123})) # Dict comparison +) + +# ✅ Good - use simpler state values +.with_transitions( + ("check", "next", when(user_id=123)) # Direct value comparison +) +``` + +## State Issues + +### Issue: State not updating + +**Problem:** +```python +# State remains unchanged after action +state_before = app.state["counter"] # 0 +app.run(halt_after=["increment"]) +state_after = app.state["counter"] # Still 0! +``` + +**Common Causes:** + +1. **Not returning updated state:** +```python +# ❌ Bad - returns None +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + state.update(counter=state["counter"] + 1) + # Missing return! + +# ✅ Good - returns updated state +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + result = {"counter": state["counter"] + 1} + return state.update(**result) +``` + +**Note:** You can return just `State` (shorthand) or `Tuple[dict, State]` (explicit): +```python +from typing import Tuple + +# Explicit pattern (recommended for clarity) +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> Tuple[dict, State]: + result = {"counter": state["counter"] + 1} + return result, state.update(counter=result["counter"]) +``` + +2. **Mutating state directly:** +```python +# ❌ Bad - mutates state (doesn't work) +@action(reads=["items"], writes=["items"]) +def add_item(state: State, item: str) -> State: + state["items"].append(item) # This doesn't work! + return state + +# ✅ Good - uses .append() method +@action(reads=["items"], writes=["items"]) +def add_item(state: State, item: str) -> State: + return state.append(items=item) +``` + +3. **Typo in state key:** +```python +# ❌ Bad - creates new key instead of updating existing +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + return state.update(couter=state["counter"] + 1) # Typo! + +# ✅ Good - correct key name +@action(reads=["counter"], writes=["counter"]) +def increment(state: State) -> State: + return state.update(counter=state["counter"] + 1) +``` + +### Issue: KeyError accessing state + +**Problem:** +```python +KeyError: 'missing_key' +``` + +**Solutions:** + +1. **Use `.get()` with default:** +```python +# ❌ Risky - may not exist +value = state["key"] + +# ✅ Safe - provides default +value = state.get("key", default_value) +``` + +2. **Check if key exists:** +```python +if "key" in state: + value = state["key"] +else: + value = default +``` + +3. **Initialize state properly:** +```python +app = ( + ApplicationBuilder() + .with_state( + counter=0, # Initialize all keys + items=[], + status="pending" + ) + .build() +) +``` + +4. **Check reads declaration:** +```python +# ❌ Bad - tries to read undeclared key +@action(reads=[], writes=["result"]) +def process(state: State) -> State: + value = state["input"] # Not in reads! + return state.update(result=value) + +# ✅ Good - declares what it reads +@action(reads=["input"], writes=["result"]) +def process(state: State) -> State: + value = state["input"] + return state.update(result=value) +``` + +## Action Issues + +### Issue: Action inputs not working + +**Problem:** +```python +app.run(halt_after=["process"], inputs={"param": "value"}) +# Error: unexpected keyword argument 'param' +``` + +**Solution:** + +Add parameter to action function: +```python +# ❌ Bad - no parameter for input +@action(reads=[], writes=["result"]) +def process(state: State) -> State: + # How do I access the input? + pass + +# ✅ Good - accepts input parameter +@action(reads=[], writes=["result"]) +def process(state: State, param: str) -> State: + return state.update(result=param) +``` + +### Issue: Streaming action not working + +**Problem:** +```python +# No intermediate results appear +for state in app.stream_result(halt_after=["end"]): + print(state) # Only prints final state +``` + +**Solution:** + +Use generator pattern with yields: +```python +# ❌ Bad - regular action (no streaming) +@action(reads=["input"], writes=["output"]) +def process(state: State) -> State: + result = slow_operation() + return state.update(output=result) + +# ✅ Good - streaming action +@action(reads=["input"], writes=["output"]) +def process(state: State) -> Generator[State, None, Tuple[dict, State]]: + for chunk in slow_operation(): + # Yield intermediate states + yield state.update(current_chunk=chunk) + + # Return final result + result = {"output": "done"} + return result, state.update(**result) +``` + +### Issue: Async action errors + +**Problem:** +``` +RuntimeError: Event loop is closed +``` + +**Solutions:** + +1. **Use `arun()` for async applications:** +```python +# ❌ Bad - using sync run with async actions +app.run(halt_after=["async_action"]) + +# ✅ Good - using async run +await app.arun(halt_after=["async_action"]) +``` + +2. **Make sure action is marked async:** +```python +# ✅ Async action +@action(reads=["url"], writes=["data"]) +async def fetch_data(state: State) -> State: + async with httpx.AsyncClient() as client: + response = await client.get(state["url"]) + return state.update(data=response.json()) +``` + +3. **Mix sync and async carefully:** +```python +# You can have both sync and async actions in the same app +# Burr handles this automatically +app = ( + ApplicationBuilder() + .with_actions( + sync_action, # Regular action + async_action # Async action + ) + .build() +) + +# Use arun() if any action is async +await app.arun(halt_after=["async_action"]) +``` + +## Persistence Issues + +### Issue: State not persisting + +**Problem:** +```python +# State is lost between runs +app = build_app() +app.run(halt_after=["step1"]) +# Restart app +app = build_app() # Starts from beginning, not step1 +``` + +**Solution:** + +Set up persistence properly: +```python +from burr.core.persistence import SQLLitePersister + +persister = SQLLitePersister("app.db", "state") +persister.initialize() # Don't forget to initialize! + +app = ( + ApplicationBuilder() Review Comment: `SQLLitePersister` should be `SQLitePersister` (single L). ########## .claude/plugins/burr/skills/burr/SKILL.md: ########## @@ -0,0 +1,343 @@ +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +--- +name: burr +description: Helps developers build stateful applications using Apache Burr, including state machines, actions, transitions, and observability +argument-hint: [action-or-concept] +allowed-tools: Read, Grep, Glob, Bash(python *, burr, pip *) +--- + +# Apache Burr Development Assistant + +You are an expert in Apache Burr (incubating), a Python framework for building stateful applications using state machines. When this skill is active, help developers write clean, idiomatic Apache Burr code following best practices. + +## Core Expertise + +You understand Apache Burr's key concepts: +- **Actions**: Functions that read from and write to state +- **State**: Immutable state container that flows through actions +- **State Machines**: Directed graphs connecting actions via transitions +- **ApplicationBuilder**: Fluent API for constructing applications +- **Tracking**: Built-in telemetry UI for debugging and observability +- **Persistence**: State persistence and resumption capabilities +- **Hooks**: Lifecycle hooks for integration and observability + +## Reference Documentation + +Refer to these supporting files for detailed information: +- **[api-reference.md](api-reference.md)**: Complete API documentation +- **[examples.md](examples.md)**: Common patterns and working examples +- **[patterns.md](patterns.md)**: Best practices and architectural guidance +- **[troubleshooting.md](troubleshooting.md)**: Common issues and solutions + +## When Helping Developers + +### 1. Building New Applications + +When users want to create an Apache Burr application: + +1. **Start with actions** - Define `@action` decorated functions +2. **Use ApplicationBuilder** - Follow the builder pattern +3. **Define transitions** - Connect actions with conditions +4. **Add tracking** - Enable the telemetry UI from the start +5. **Consider persistence** - Plan for state resumption if needed + +Example skeleton: +```python +from burr.core import action, State, ApplicationBuilder, default +from typing import Tuple + +@action(reads=["input_key"], writes=["output_key"]) +def my_action(state: State) -> Tuple[dict, State]: + # 1. Read from state using bracket notation + input_value = state["input_key"] + + # 2. Your logic here + output_value = process(input_value) + + # 3. Return tuple: (result_dict, new_state) + # - result_dict: exposed to callers and tracking + # - new_state: returned by state.update() (creates new State object) + return {"output_key": output_value}, state.update(output_key=output_value) + +app = ( + ApplicationBuilder() + .with_actions(my_action) + .with_transitions(("my_action", "next_action", default)) + .with_state(input_key="initial_value") + .with_entrypoint("my_action") + .with_tracker("local", project="my_project") + .build() +) + +# Run returns (action, result_dict, final_state) +action, result, state = app.run(halt_after=["next_action"]) +``` + +### 2. Reviewing Apache Burr Code + +When reviewing code: +- ✅ Check that actions declare correct `reads` and `writes` +- ✅ Verify state updates use `.update()` or `.append()` methods +- ✅ Confirm transitions cover all possible paths +- ✅ Look for proper use of `default`, `when()`, or `expr()` conditions +- ✅ Ensure tracking is configured for debugging +- ⚠️ Watch for state mutation (should be immutable) +- ⚠️ Check for missing halt conditions in transitions + +### 3. Explaining Concepts + +When explaining Apache Burr features: +- Use concrete examples from [examples.md](examples.md) +- Reference the appropriate section in [api-reference.md](api-reference.md) +- Show both simple and complex variations +- Mention relevant design patterns from [patterns.md](patterns.md) +- Link to official documentation at https://burr.apache.org/ + +### 4. Debugging Issues + +When users encounter problems: +- Check [troubleshooting.md](troubleshooting.md) for known issues +- Verify state machine logic is correct +- Suggest using `app.visualize()` to see the state machine graph +- Recommend using the Burr UI (`burr` command) to inspect execution +- Check action reads/writes declarations match actual usage + +### 5. Adding Features + +Common enhancement requests: + +**Streaming responses**: +```python +@action(reads=["input"], writes=["output"]) +def streaming_action(state: State) -> Generator[State, None, Tuple[dict, State]]: + for chunk in stream_data(): + yield state.update(current_chunk=chunk) + result = {"output": final_result} + return result, state.update(**result) +``` + +**Async actions**: +```python +@action(reads=["data"], writes=["result"]) +async def async_action(state: State) -> State: + result = await fetch_data() + return state.update(result=result) +``` + +**Parallel execution**: +```python +from burr.core.parallelism import MapStates, RunnableGraph + +# Apply same action to multiple states +class TestMultiplePrompts(MapStates): + def action(self, state: State, inputs: dict) -> Action | Callable | RunnableGraph: + return query_llm.with_name("query_llm") + + def states(self, state: State, context: ApplicationContext, inputs: dict): + for prompt in state["prompts"]: + yield state.update(prompt=prompt) + + def reduce(self, state: State, states): + results = [s["result"] for s in states] + return state.update(all_results=results) + + @property + def reads(self) -> list[str]: + return ["prompts"] + + @property + def writes(self) -> list[str]: + return ["all_results"] + +app = ApplicationBuilder().with_actions( + multi_prompt=TestMultiplePrompts() +).build() +``` + +## State Management Patterns Review Comment: This entire "State Management Patterns" section (lines 175-283) is duplicated almost verbatim in `patterns.md` and `api-reference.md`. Since SKILL.md already references those files in the "Reference Documentation" section, this block should be removed or significantly trimmed to avoid wasting context window tokens when Claude loads the skill. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
