LangGraph StateGraph 编译装配:控制流编码与状态隔离
解析 StateGraph 编译期的图装配流程,理解控制流如何被编码为内部 Channel,以及 Global/Private 状态的隔离机制
背景与动机
前两篇文章分析了执行模型和 Channel 机制。但用户编写 LangGraph 应用时,并不直接操作 Pregel Loop 和 Channel,而是使用 StateGraph 这个 Builder API:
1
2
3
4
5
builder = StateGraph(state_schema=State)
builder.add_node("A", node_a)
builder.add_edge("A", "B")
builder.add_conditional_edges("B", route_fn, {"x": "A", "y": END})
compiled = builder.compile(checkpointer=...)
这段声明式代码如何转换为可执行的 Pregel 图? 控制流如何被编码? 状态如何隔离? 本文解析编译期的装配过程。
核心概念
StateGraph 是 Builder
StateGraph 不能执行,必须 .compile() 转换为 CompiledStateGraph。编译过程:
- 验证拓扑(中断节点是否存在、边是否有效)
- 计算输出 channels 和流输出 channels
- 创建
CompiledStateGraph实例(继承自Pregel) - 遍历节点、边、分支,调用 attach 方法装配
- 验证完整性,返回可执行图
CompiledStateGraph 是 Pregel
CompiledStateGraph 继承 Pregel,拥有执行所需的所有字段:
nodes: dict[str, PregelNode]: 编译后的节点channels: dict[str, BaseChannel]: 状态 Channel 和控制 Channelcheckpointer/store/cache: 扩展依赖
执行时调用 invoke/stream,由 PregelLoop 驱动。
控制流编码为内部 Channel
用户声明的控制流被转换为内部 Channel:
静态边: A -> B
生成 branch:to:B Channel(EphemeralValue)
A 的 writers 追加写入该 Channel
B 的 triggers 追加该 Channel
Join 边: A,C -> D
生成 join:A+C:D Channel(NamedBarrierValue)
A 写入 "A",C 写入 "C"
D 等待屏障释放后触发
条件边: route_fn(state) -> target
生成 branch writer,包含:
- Reader:
ChannelRead.do_read(fresh=True)读取最新状态 - Router: 调用
route_fn计算目标 - Writer: 根据目标写入
branch:to:<target>或Send
编译流程
编译期装配
flowchart TB
SG[StateGraph 声明] --> Compile["compile()"]
Compile --> Validate[验证拓扑]
Validate --> Create[创建 CompiledStateGraph]
Create --> AttachNodes[遍历 nodes]
AttachNodes --> AttachEdges[遍历 edges/waiting_edges]
AttachEdges --> AttachBranches[遍历 branches]
AttachBranches --> Validate2[验证完整性]
Validate2 --> Return[返回可执行图]
subgraph "attach 副作用"
AttachNodes --> |"attach_node"| PN[创建 PregelNode]
AttachNodes --> |"生成"| BT["branch:to:<node> Channel"]
AttachEdges --> |"静态边"| BT2["branch:to:<end>"]
AttachEdges --> |"join 边"| JC["join:* NamedBarrierValue"]
AttachBranches --> |"条件边"| BR[BranchSpec writer+reader]
end
attach_node 关键副作用
对每个节点:
- 创建
branch:to:<node>Channel(EphemeralValue) - 构造 PregelNode:
channels: 节点订阅的状态 keystriggers: 触发该节点的 Channel 列表bound: 节点 runnable(用户函数或子图)writers: 写入 state 和控制 Channel 的 writers
- 注册到
compiled.nodes[name]
attach_edge 控制流编码
静态边 start -> end:
1
2
3
4
5
6
7
8
9
10
11
def attach_edge(self, start: str, end: str):
# 生成控制 Channel 名称
channel_name = f"branch:to:{end}"
# 追加到 start 的 writers
self.nodes[start].writers.append(
ChannelWriteEntry(channel_name, None)
)
# 追加到 end 的 triggers
self.nodes[end].triggers.append(channel_name)
Join 边 starts -> end:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def attach_edge(self, starts: list[str], end: str):
# 生成 join Channel 名称
channel_name = f"join:{'+'.join(starts)}:{end}"
# 注册屏障 Channel
self.channels[channel_name] = NamedBarrierValue(
str,
names=set(starts)
)
# 每个 start 写入自己的名字
for start in starts:
self.nodes[start].writers.append(
ChannelWriteEntry(channel_name, start)
)
# end 等待屏障
self.nodes[end].triggers.append(channel_name)
attach_branch 条件路由
条件边生成包含 reader 和 writer 的 runnable:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def attach_branch(self, start: str, branch: BranchSpec):
# 构造 fresh reader
reader = ChannelRead.do_read(
select=state_keys,
fresh=True # 读取包含本节点写入的视图
)
# 构造 writer
def get_writes(dests):
writes = []
for dest in dests:
if isinstance(dest, Send):
writes.append((TASKS, dest))
else:
writes.append((f"branch:to:{dest}", None))
return writes
# 追加到 start 的 writers
branch_writer = branch.run(get_writes, reader)
self.nodes[start].writers.append(branch_writer)
Reducer 编译期降级
StateGraph 的 state_schema 支持 Annotated reducer:
1
2
3
4
class State(TypedDict):
messages: Annotated[list, add] # 累积
counter: Annotated[int, add] # 累加
result: str # 单值
编译期转换规则:
- 显式 Channel: 如果 Annotated 元数据中有
BaseChannel子类,直接使用 - Binary Operator: 如果元数据最后一个是可调用且签名为
(a, b) -> c,生成BinaryOperatorAggregate - Fallback: 默认使用
LastValue
1
2
3
4
5
6
7
8
9
10
11
12
def _get_channel(annotation):
if is_field_channel(annotation):
# 情况 1: 显式 Channel
return extract_channel(annotation)
if is_field_binop(annotation):
# 情况 2: Binary operator
op = extract_operator(annotation)
return BinaryOperatorAggregate(typ, op)
# 情况 3: Fallback
return LastValue(typ)
这意味着 “reducer 聚合” 不是运行时动态分派,而是编译期把 state key 映射到确定的 Channel 类型。
状态隔离
Global 状态
Channels: 所有 state keys、控制 Channel(branch:to:*/join:*)都在同一 Channel 表
可见性: 受 step 提交和 checkpoint 管控
持久化: 进入 checkpoint 快照(除非是 UntrackedValue)
Private 状态
Task-local Config: 每个任务有独立的 config,包含:
CONFIG_KEY_TASK_ID: 任务唯一 IDCONFIG_KEY_CHECKPOINT_NS: 命名空间(含 task_id)CONFIG_KEY_SEND: 写入回调CONFIG_KEY_READ: 读取回调CONFIG_KEY_RUNTIME: Runtime 实例(context/store/stream_writer)CONFIG_KEY_SCRATCHPAD: Scratchpad(step/resume/counters)
ManagedValue: 从 scratchpad 派生,不进入 channels
读取位置: local_read 检查 managed 映射,调用 get(scratchpad)
用途: 任务私有值、派生状态
Namespace 隔离:
1
2
3
4
5
# 任务级 namespace
task_checkpoint_ns = f"{checkpoint_ns}{NS_END}{task_id}"
# 子图可以在相同 thread_id 下隔离状态
# 通过 namespace 层次区分父子图
recast_checkpoint_ns 去除 task_id,用于比较和定位:
1
2
3
4
5
6
def recast_checkpoint_ns(ns: str) -> str:
# "parent:123|child:456" -> "parent|child"
return NS_SEP.join(
part.split(NS_END)[0]
for part in ns.split(NS_SEP)
)
使用场景
编译期注入扩展
1
2
3
4
5
6
7
compiled = StateGraph(State).compile(
checkpointer=MyCheckpointSaver(),
store=MyStore(),
cache=MyCache(),
interrupt_before=["human_feedback"],
interrupt_after=["planner"]
)
这些扩展在编译期注入到 CompiledStateGraph 字段,运行时通过 _defaults 选择和传递给 Loop。
子图复用
子图也是 CompiledStateGraph,可以作为节点嵌入:
1
2
subgraph = StateGraph(SubState).compile()
builder.add_node("sub", subgraph)
子图在父图的 task config 中获得独立的 checkpoint_ns,状态完全隔离。
扩展点
自定义 ManagedValue
实现 ManagedValueSpec:
1
2
3
4
class CustomManaged(ManagedValueSpec):
def get(self, scratchpad: PregelScratchpad) -> Any:
# 从 scratchpad 派生值
return derived_value
注册到 compiled graph 的 managed 字段,可被 local_read 读取。
自定义状态 Channel
在 state_schema 的 Annotated 中显式指定:
1
2
class State(TypedDict):
custom: Annotated[Value, CustomChannel(typ, ...)]
编译期会提取并使用自定义 Channel。
权衡与风险
控制流 Channel 化
收益: 执行内核只理解 “Channel 触发”,无需理解用户级 edge 语义
代价: 内部 Channel 命名复杂,迁移逻辑复杂(_migrate_checkpoint)
Fresh Read 的开销
问题: Conditional edge 求值时 copy channels 并 apply task writes
适用: 分支判断需要看到本节点刚写的状态
优化: 只在必需场景使用 fresh=True
Reducer 的确定性
问题: Reducer 应用顺序受 task.path 排序影响
保证: apply_writes 按 task.path[:3] 排序,确保确定性
局限: 顺序敏感的 reducer(列表拼接)仍需业务层理解排序规则
Namespace 复杂度
问题: 为隔离子图/任务,ns 中携带 task_id,引入 recast/migrate/父子映射等机制
收益: 可追溯性和可调试性
代价: 复杂度税
运行期时序
sequenceDiagram
participant Client
participant Pregel as Pregel Loop
participant NodeA as Node A
participant NodeB as Node B
participant Channels as Channels
participant CP as Checkpointer
Client->>Pregel: invoke(initial_state)
Pregel->>Channels: apply input writes
Pregel->>NodeA: 执行 bound
NodeA->>Channels: 节点 writers 写入 state
NodeA->>Channels: 写入 branch:to:B
Pregel->>Pregel: step 边界
alt Conditional Edge
Pregel->>Channels: fresh read (copy + apply)
Pregel->>NodeB: 执行 branch logic
NodeB->>Channels: 写入 branch:to:<target>
end
alt Multi-source Join
Pregel->>Channels: 等待 NamedBarrierValue
Note over Channels: seen == names
Pregel->>NodeB: 触发执行
end
Channels->>CP: save checkpoint
Pregel->>Client: return result
小结
StateGraph 编译装配的核心机制:
控制流编码: edge/join/branch 转换为内部 Channel,执行内核统一处理
Reducer 降级: 编译期把 Annotated reducer 映射到 Channel 类型
状态隔离: Global channels 共享,Private config/managed/namespace 隔离
理解编译期装配是理解运行期行为的前提。下一篇文章 条件边与扇入扇出 会深入解析 fresh read、Send 派发和屏障同步的完整语义。