mirror of
https://github.com/aljazceru/Tutorial-Codebase-Knowledge.git
synced 2025-12-19 15:34:23 +01:00
398 lines
20 KiB
Markdown
398 lines
20 KiB
Markdown
---
|
|
layout: default
|
|
title: "Checkpointer (BaseCheckpointSaver)"
|
|
parent: "LangGraph"
|
|
nav_order: 6
|
|
---
|
|
|
|
# Chapter 6: Checkpointer (`BaseCheckpointSaver`) - Saving Your Progress
|
|
|
|
In [Chapter 5: Pregel Execution Engine](05_pregel_execution_engine.md), we saw how the engine runs our graph step-by-step. But what happens if a graph takes hours to run, or if it needs to pause and wait for a human? If the program crashes or we need to stop it, do we lose all the progress?
|
|
|
|
That's where **Checkpointers** come to the rescue!
|
|
|
|
## What Problem Do Checkpointers Solve?
|
|
|
|
Imagine you're playing a long video game. You wouldn't want to start from the very beginning every time you stop playing, right? Games have save points or checkpoints that record your progress.
|
|
|
|
LangGraph's **Checkpointer** does the same thing for your graph execution. It automatically saves the graph's state at certain points, usually after each step completed by the [Pregel Execution Engine](05_pregel_execution_engine.md).
|
|
|
|
This is incredibly useful for:
|
|
|
|
1. **Long-Running Processes:** If your graph involves many steps or calls to slow tools/LLMs, you can stop it and resume later without losing work.
|
|
2. **Resilience:** If your program crashes unexpectedly, you can restart it from the last saved checkpoint.
|
|
3. **Human-in-the-Loop (HITL):** As we saw with `Interrupt` in [Chapter 4: Control Flow Primitives](04_control_flow_primitives___branch____send____interrupt__.md), pausing the graph requires saving its state so it can be perfectly restored when the human provides input. Checkpointers are essential for this.
|
|
|
|
**Analogy:** Think of a checkpointer as an automatic "Save" button for your graph's progress. It takes snapshots of the shared "whiteboard" ([Channels](03_channels.md)) so you can always pick up where you left off.
|
|
|
|
## Key Concepts
|
|
|
|
1. **What is Saved?** The checkpointer saves the current value and version of every [Channel](03_channels.md) in your graph's state. It also keeps track of which step the graph was on and any pending tasks (like those created by `Send`).
|
|
2. **When is it Saved?** The [Pregel Execution Engine](05_pregel_execution_engine.md) typically triggers the checkpointer to save after each "superstep" (a round of node executions and state updates).
|
|
3. **Where is it Saved?** This depends on the specific checkpointer implementation you choose. LangGraph provides several:
|
|
* `MemorySaver`: Stores checkpoints in your computer's RAM. Simple for testing, but **lost when your script ends**.
|
|
* `SqliteSaver`: Stores checkpoints in a local SQLite database file, making them persistent across script runs.
|
|
* Other savers might store checkpoints in cloud databases or other persistent storage.
|
|
4. **`thread_id` (The Save Slot Name):** To save and load progress correctly, you need a way to identify *which* specific run of the graph you want to work with. Think of this like naming your save file in a game. In LangGraph, this identifier is called the `thread_id`. You provide it in the `config` when you run the graph. Each unique `thread_id` represents an independent "conversation" or execution history.
|
|
|
|
## How to Use a Checkpointer
|
|
|
|
Using a checkpointer is straightforward. You just need to tell LangGraph *which* checkpointer to use when you compile your graph.
|
|
|
|
**Step 1: Import a Checkpointer**
|
|
|
|
Let's start with the simplest one, `MemorySaver`.
|
|
|
|
```python
|
|
# Import the simplest checkpointer
|
|
from langgraph.checkpoint.memory import MemorySaver
|
|
```
|
|
|
|
**Step 2: Instantiate the Checkpointer**
|
|
|
|
```python
|
|
# Create an instance of the memory checkpointer
|
|
memory_saver = MemorySaver()
|
|
```
|
|
|
|
**Step 3: Compile Your Graph with the Checkpointer**
|
|
|
|
Let's reuse our simple `adder -> multiplier` graph. The graph definition itself doesn't change.
|
|
|
|
```python
|
|
# --- Define State and Nodes (same as Chapter 1) ---
|
|
from typing import TypedDict
|
|
from langgraph.graph import StateGraph, END, START
|
|
|
|
class MyState(TypedDict):
|
|
value: int
|
|
|
|
def add_one(state: MyState) -> dict:
|
|
print(f"Adder: Adding 1 to {state['value']}")
|
|
return {"value": state['value'] + 1}
|
|
|
|
def multiply_by_two(state: MyState) -> dict:
|
|
print(f"Multiplier: Doubling {state['value']}")
|
|
return {"value": state['value'] * 2}
|
|
|
|
# --- Build the Graph (same as Chapter 1) ---
|
|
workflow = StateGraph(MyState)
|
|
workflow.add_node("adder", add_one)
|
|
workflow.add_node("multiplier", multiply_by_two)
|
|
workflow.set_entry_point("adder")
|
|
workflow.add_edge("adder", "multiplier")
|
|
workflow.add_edge("multiplier", END)
|
|
|
|
# --- Compile WITH the checkpointer ---
|
|
# Pass the checkpointer instance to the compile method
|
|
app = workflow.compile(checkpointer=memory_saver)
|
|
```
|
|
|
|
That's it! By passing `checkpointer=memory_saver` to `compile()`, you've enabled automatic checkpointing for this graph.
|
|
|
|
**Step 4: Run with a `thread_id`**
|
|
|
|
To use the checkpointer, you need to provide a configuration dictionary (`config`) containing a unique identifier for this specific execution thread.
|
|
|
|
```python
|
|
import uuid
|
|
|
|
# Create a unique ID for this run
|
|
thread_id = str(uuid.uuid4())
|
|
config = {"configurable": {"thread_id": thread_id}}
|
|
|
|
# Define the initial state
|
|
initial_state = {"value": 5}
|
|
|
|
print("--- Running Graph (First Time) ---")
|
|
# Run the graph with the config
|
|
final_state = app.invoke(initial_state, config=config)
|
|
|
|
print("\n--- Final State (First Run) ---")
|
|
print(final_state)
|
|
```
|
|
|
|
**Expected Output (First Run):**
|
|
|
|
```text
|
|
--- Running Graph (First Time) ---
|
|
Adder: Adding 1 to 5
|
|
Multiplier: Doubling 6
|
|
|
|
--- Final State (First Run) ---
|
|
{'value': 12}
|
|
```
|
|
|
|
Behind the scenes, `MemorySaver` saved the state after the `adder` step and after the `multiplier` step, associating it with the `thread_id` you provided.
|
|
|
|
**Step 5: Resume the Graph**
|
|
|
|
Now, let's imagine we stopped the process. If we run the *same graph* with the *same `thread_id`*, the checkpointer allows the [Pregel Execution Engine](05_pregel_execution_engine.md) to load the last saved state and continue. Since the first run finished completely, running `invoke` again will just load the final state.
|
|
|
|
```python
|
|
print("\n--- Running Graph Again with SAME thread_id ---")
|
|
# Use the SAME config (containing the same thread_id)
|
|
# Provide NO initial state, as it will be loaded from the checkpoint
|
|
resumed_state = app.invoke(None, config=config)
|
|
|
|
print("\n--- Final State (Resumed Run) ---")
|
|
print(resumed_state)
|
|
|
|
# Let's check the saved states using the checkpointer directly
|
|
print("\n--- Checkpoints Saved ---")
|
|
for checkpoint in memory_saver.list(config):
|
|
print(checkpoint)
|
|
```
|
|
|
|
**Expected Output (Second Run):**
|
|
|
|
```text
|
|
--- Running Graph Again with SAME thread_id ---
|
|
# Notice: No node printouts because the graph already finished!
|
|
# It just loads the final saved state.
|
|
|
|
--- Final State (Resumed Run) ---
|
|
{'value': 12}
|
|
|
|
--- Checkpoints Saved ---
|
|
# You'll see checkpoint objects representing saved states
|
|
CheckpointTuple(config={'configurable': {'thread_id': '...'}}, checkpoint={'v': 1, 'ts': '...', 'id': '...', 'channel_values': {'value': 6}, 'channel_versions': {'adder': 1}, 'versions_seen': {'adder': {}}}, metadata={'source': 'loop', 'step': 1, ...}, ...)
|
|
CheckpointTuple(config={'configurable': {'thread_id': '...'}}, checkpoint={'v': 1, 'ts': '...', 'id': '...', 'channel_values': {'value': 12}, 'channel_versions': {'adder': 1, 'multiplier': 2}, 'versions_seen': {'adder': {}, 'multiplier': {'adder': 1}}}, metadata={'source': 'loop', 'step': 2, ...}, ...)
|
|
CheckpointTuple(config={'configurable': {'thread_id': '...'}}, checkpoint={'v': 1, 'ts': '...', 'id': '...', 'channel_values': {'value': 12}, 'channel_versions': {'adder': 1, 'multiplier': 2}, 'versions_seen': {'adder': {}, 'multiplier': {'adder': 1}}}, metadata={'source': 'loop', 'step': 3, ...}, ...)
|
|
```
|
|
|
|
The checkpointer successfully loaded the final state (`{'value': 12}`) associated with that `thread_id`.
|
|
|
|
**Checkpointers and `Interrupt` (Human-in-the-Loop)**
|
|
|
|
Remember the `Interrupt` example from [Chapter 4](04_control_flow_primitives___branch____send____interrupt__.md)?
|
|
|
|
```python
|
|
# (Simplified HITL example from Chapter 4)
|
|
from langgraph.types import interrupt, Command
|
|
# ... (State, Nodes: create_plan, request_approval, execute_plan) ...
|
|
|
|
# Compile WITH checkpointer (REQUIRED for interrupt)
|
|
memory_saver_hitl = MemorySaver()
|
|
app_hitl = workflow.compile(checkpointer=memory_saver_hitl)
|
|
|
|
# Run, get interrupted
|
|
config_hitl = {"configurable": {"thread_id": str(uuid.uuid4())}}
|
|
for chunk in app_hitl.stream({"plan": ""}, config=config_hitl):
|
|
# ... (detect interrupt) ...
|
|
print("Graph interrupted!")
|
|
break
|
|
|
|
# Resume after human decision
|
|
human_decision = "Approved"
|
|
for chunk in app_hitl.stream(Command(resume=human_decision), config=config_hitl):
|
|
# ... (process remaining steps) ...
|
|
print("Graph resumed and finished!")
|
|
```
|
|
|
|
When `interrupt()` was called inside the `request_approval` node, the [Pregel Execution Engine](05_pregel_execution_engine.md) automatically used the `memory_saver_hitl` checkpointer to save the *exact state* of the graph at that moment (including the plan). When we called `stream` again with `Command(resume=...)` and the *same* `config_hitl`, the engine loaded that saved state using the checkpointer, allowing the graph to continue exactly where it left off, now with the human's feedback.
|
|
|
|
**Without a checkpointer, `Interrupt` cannot work.**
|
|
|
|
## How Checkpointing Works Internally
|
|
|
|
What happens behind the scenes when a checkpointer is configured?
|
|
|
|
**Saving:**
|
|
|
|
1. **Step Complete:** The [Pregel Execution Engine](05_pregel_execution_engine.md) finishes a step (e.g., after running the `adder` node and updating the state).
|
|
2. **Signal Checkpointer:** The engine tells the configured checkpointer (`MemorySaver` in our example) that it's time to save.
|
|
3. **Gather State:** The checkpointer (or the engine on its behalf) accesses all the active [Channels](03_channels.md).
|
|
4. **Serialize State:** For each channel, it calls the channel's internal `checkpoint()` method to get a serializable representation of its current value (e.g., the number `6` for the `"value"` channel).
|
|
5. **Store Checkpoint:** The checkpointer bundles the serialized channel values, their versions, the current step number, and other metadata into a `Checkpoint` object. It then stores this `Checkpoint` associated with the current `thread_id` provided in the `config`. `MemorySaver` stores it in a dictionary in RAM; `SqliteSaver` writes it to a database table.
|
|
|
|
**Loading (Resuming):**
|
|
|
|
1. **Invoke with `thread_id`:** You call `app.invoke(None, config=config)` where `config` contains a `thread_id` that has been previously saved.
|
|
2. **Request Checkpoint:** The [Pregel Execution Engine](05_pregel_execution_engine.md) asks the checkpointer to load the latest checkpoint for the given `thread_id`.
|
|
3. **Retrieve Checkpoint:** The checkpointer retrieves the saved `Checkpoint` object (e.g., from its memory dictionary or the database).
|
|
4. **Restore State:** The engine takes the saved channel values from the checkpoint. For each channel, it calls the channel's `from_checkpoint()` method (or similar internal logic) to restore its state. The "whiteboard" ([Channels](03_channels.md)) is now exactly as it was when the checkpoint was saved.
|
|
5. **Continue Execution:** The engine looks at the saved step number and metadata to figure out where to resume execution, typically by preparing the tasks for the *next* step.
|
|
|
|
Here's a simplified view of the interaction:
|
|
|
|
```mermaid
|
|
sequenceDiagram
|
|
participant User
|
|
participant App as CompiledGraph
|
|
participant Engine as Pregel Engine
|
|
participant Saver as Checkpointer (e.g., MemorySaver)
|
|
participant Storage as Underlying Storage (RAM, DB)
|
|
|
|
%% Saving %%
|
|
Engine->>Engine: Finishes Step N
|
|
Engine->>Saver: Save checkpoint for config (thread_id)
|
|
Saver->>Engine: Request current channel states & versions
|
|
Engine-->>Saver: Provides states & versions
|
|
Saver->>Storage: Store Checkpoint(Step N, states, versions) linked to thread_id
|
|
Storage-->>Saver: Acknowledge Save
|
|
Saver-->>Engine: Save Complete
|
|
|
|
%% Loading %%
|
|
User->>App: invoke(None, config with thread_id)
|
|
App->>Engine: Start/Resume Execution
|
|
Engine->>Saver: Get latest checkpoint for config (thread_id)
|
|
Saver->>Storage: Retrieve Checkpoint linked to thread_id
|
|
Storage-->>Saver: Returns Checkpoint(Step N, states, versions)
|
|
Saver-->>Engine: Provides Checkpoint
|
|
Engine->>Engine: Restore channel states from checkpoint
|
|
Engine->>Engine: Prepare tasks for Step N+1
|
|
Engine->>App: Continue execution...
|
|
```
|
|
|
|
## A Peek at the Code (`checkpoint/base.py`, `checkpoint/memory.py`, `pregel/loop.py`)
|
|
|
|
Let's look at the core components:
|
|
|
|
* **`BaseCheckpointSaver` (`checkpoint/base.py`)**: This is the abstract base class (like a template) that all checkpointers must implement. It defines the essential methods the engine needs.
|
|
|
|
```python
|
|
# checkpoint/base.py (Highly Simplified)
|
|
from abc import ABC, abstractmethod
|
|
from typing import Any, Mapping, Optional, Sequence, Tuple, TypedDict
|
|
|
|
# Represents a saved checkpoint
|
|
class Checkpoint(TypedDict):
|
|
channel_values: Mapping[str, Any] # Saved state of channels
|
|
channel_versions: Mapping[str, int] # Internal versions
|
|
versions_seen: Mapping[str, Mapping[str, int]] # Tracking for node execution
|
|
# ... other metadata like v, ts, id, pending_sends ...
|
|
|
|
# Represents the checkpoint tuple retrieved from storage
|
|
class CheckpointTuple(NamedTuple):
|
|
config: dict # The config used (includes thread_id)
|
|
checkpoint: Checkpoint
|
|
metadata: dict
|
|
# ... other fields like parent_config, pending_writes ...
|
|
|
|
class BaseCheckpointSaver(ABC):
|
|
# --- Sync Methods ---
|
|
@abstractmethod
|
|
def get_tuple(self, config: dict) -> Optional[CheckpointTuple]:
|
|
"""Load the checkpoint tuple for the given config."""
|
|
...
|
|
|
|
@abstractmethod
|
|
def put(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> dict:
|
|
"""Save a checkpoint."""
|
|
...
|
|
|
|
# --- Async Methods (similar structure) ---
|
|
@abstractmethod
|
|
async def aget_tuple(self, config: dict) -> Optional[CheckpointTuple]:
|
|
"""Async load the checkpoint tuple."""
|
|
...
|
|
|
|
@abstractmethod
|
|
async def aput(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> dict:
|
|
"""Async save a checkpoint."""
|
|
...
|
|
|
|
# --- Other methods (list, put_writes) omitted for brevity ---
|
|
```
|
|
The key methods are `get_tuple` (to load) and `put` (to save), along with their async counterparts (`aget_tuple`, `aput`). Any specific checkpointer (like `MemorySaver`, `SqliteSaver`) must provide concrete implementations for these methods.
|
|
|
|
* **`MemorySaver` (`checkpoint/memory.py`)**: A simple implementation that uses an in-memory dictionary.
|
|
|
|
```python
|
|
# checkpoint/memory.py (Highly Simplified)
|
|
import threading
|
|
from collections import defaultdict
|
|
|
|
class MemorySaver(BaseCheckpointSaver):
|
|
def __init__(self):
|
|
# Use a dictionary to store checkpoints in RAM
|
|
# Key: thread_id, Value: List of CheckpointTuples
|
|
self._checkpoints: defaultdict[str, list[CheckpointTuple]] = defaultdict(list)
|
|
self._lock = threading.RLock() # To handle multiple threads safely
|
|
|
|
def get_tuple(self, config: dict) -> Optional[CheckpointTuple]:
|
|
thread_id = config["configurable"]["thread_id"]
|
|
with self._lock:
|
|
if checkpoints := self._checkpoints.get(thread_id):
|
|
# Return the latest checkpoint for this thread_id
|
|
return checkpoints[-1]
|
|
return None
|
|
|
|
def put(self, config: dict, checkpoint: Checkpoint, metadata: dict) -> dict:
|
|
thread_id = config["configurable"]["thread_id"]
|
|
with self._lock:
|
|
# Append the new checkpoint to the list for this thread_id
|
|
self._checkpoints[thread_id].append(
|
|
CheckpointTuple(config, checkpoint, metadata)
|
|
)
|
|
return {"configurable": {"thread_id": thread_id}}
|
|
|
|
# ... async methods (aget_tuple, aput) are similar using the same dict ...
|
|
# ... list method iterates through the dictionary ...
|
|
```
|
|
As you can see, `MemorySaver` just uses a standard Python dictionary (`self._checkpoints`) to store the `CheckpointTuple` for each `thread_id`. This is simple but not persistent.
|
|
|
|
* **Integration (`pregel/loop.py`)**: The [Pregel Execution Engine](05_pregel_execution_engine.md) (`PregelLoop` classes) interacts with the checkpointer during its execution cycle.
|
|
|
|
```python
|
|
# pregel/loop.py (Conceptual Snippets)
|
|
|
|
class PregelLoop: # Base class for Sync/Async loops
|
|
def __init__(self, ..., checkpointer: Optional[BaseCheckpointSaver], ...):
|
|
self.checkpointer = checkpointer
|
|
# ... other init ...
|
|
|
|
def _put_checkpoint(self, metadata: CheckpointMetadata) -> None:
|
|
# Called by the loop after a step or input processing
|
|
if self.checkpointer:
|
|
# 1. Create the Checkpoint object from current channels/state
|
|
checkpoint_data = create_checkpoint(self.checkpoint, self.channels, ...)
|
|
|
|
# 2. Call the checkpointer's put method (sync or async)
|
|
# (Uses self.submit to potentially run in background)
|
|
self.submit(self.checkpointer.put, self.checkpoint_config, checkpoint_data, metadata)
|
|
|
|
# 3. Update internal config with the new checkpoint ID
|
|
self.checkpoint_config = {"configurable": {"thread_id": ..., "checkpoint_id": checkpoint_data["id"]}}
|
|
|
|
def __enter__(self): # Or __aenter__ for async
|
|
# Called when the loop starts
|
|
if self.checkpointer:
|
|
# 1. Try to load an existing checkpoint tuple
|
|
saved = self.checkpointer.get_tuple(self.checkpoint_config)
|
|
else:
|
|
saved = None
|
|
|
|
if saved:
|
|
# 2. Restore state from the loaded checkpoint
|
|
self.checkpoint = saved.checkpoint
|
|
self.checkpoint_config = saved.config
|
|
# ... restore channels from saved.checkpoint['channel_values'] ...
|
|
else:
|
|
# Initialize with an empty checkpoint
|
|
self.checkpoint = empty_checkpoint()
|
|
|
|
# ... setup channels based on restored or empty checkpoint ...
|
|
return self
|
|
```
|
|
The `PregelLoop` uses the checkpointer's `get_tuple` method when it starts (in `__enter__` or `__aenter__`) to load any existing state. It uses the `put` method (inside `_put_checkpoint`) during execution to save progress.
|
|
|
|
## Conclusion
|
|
|
|
You've learned about **Checkpointers (`BaseCheckpointSaver`)**, the mechanism that gives your LangGraph applications memory and resilience.
|
|
|
|
* Checkpointers **save** the state of your graph's [Channels](03_channels.md) periodically.
|
|
* They **load** saved states to resume execution.
|
|
* This is crucial for **long-running graphs**, **human-in-the-loop** workflows (using `Interrupt`), and **recovering from failures**.
|
|
* You enable checkpointing by passing a `checkpointer` instance (like `MemorySaver` or `SqliteSaver`) to `graph.compile()`.
|
|
* You manage different execution histories using a unique `thread_id` in the `config`.
|
|
* `MemorySaver` is simple for testing but lost when the script ends; use database savers (like `SqliteSaver`) for true persistence.
|
|
|
|
This chapter concludes our tour of the core concepts in LangGraph! You now understand the fundamental building blocks: the blueprint ([`StateGraph`](01_graph___stategraph.md)), the workers ([`Nodes`](02_nodes___pregelnode__.md)), the communication system ([`Channels`](03_channels.md)), the traffic signals ([Control Flow Primitives](04_control_flow_primitives___branch____send____interrupt__.md)), the engine room ([Pregel Execution Engine](05_pregel_execution_engine.md)), and the save system ([Checkpointer](06_checkpointer___basecheckpointsaver__.md)).
|
|
|
|
With these concepts, you're well-equipped to start building your own sophisticated, stateful applications with LangGraph! Explore the documentation for more examples, advanced patterns, and different checkpointer implementations. Happy building!
|
|
|
|
---
|
|
|
|
Generated by [AI Codebase Knowledge Builder](https://github.com/The-Pocket/Tutorial-Codebase-Knowledge) |