[拆解LangChain执行引擎] ManagedValue——一种特殊的只读虚拟通道
我们一直在强调Pregel对象的状态是通过Channel维护和传递的,其实承载传递状态功能的组件除了Channel,还有 ManagedValue。我们可以将ManagedValue视为虚拟Channel,Node不仅采用与读取Channel完全一样的方式读取ManagedValue,而且注册的ManagedValue也直接存放在Pregel的channels字段中。
如果我们仔细查看Pregel类的定义,可以看出其channels字段返回一个字典,字典的值的类型联合了BaseChannel和ManagedValueSpec两种类型,前者是Channel的基类,后者就是ManagedValue类的别名。
class Pregel(
PregelProtocol[StateT, ContextT, InputT, OutputT],
Generic[StateT, ContextT, InputT, OutputT]):
channels : dict[str, BaseChannel | ManagedValueSpec]
ManagedValueSpec = type[ManagedValue]
如果说Channel存储的是的业务状态,那么ManagedValue传递的就是Pregel这个执行引擎的运行时状态。一般来说,ManagedValue自身不负责存储状态,其提供的值可以实时计算得出,所以它不参与基于Checkpoint的持久化。从如下所示的代码片段可以看出,ManagedValue仅仅定义了一个唯一的静态抽象方法get返回对应的值,由于作为输入的PregelScratchpad对象提供的信息有限,所以ManagedValue能够发挥的空间其实很有限,在大部分情况下用不到它。
class ManagedValue(ABC, Generic[V]):
@staticmethod
@abstractmethod
def get(scratchpad: PregelScratchpad) -> V: ...
1. PregelScratchpad
ManagedValue提供的值是通过其get方法根据PregelScratchpad对象计算所得。当确定后续待执行的Node后,引擎会为每个Node创建一个任务,每个任务都会附加一个PregelScratchpad对象。PregelScratchpad的step和stop字段就返回当前Superstep的序号和针对迭代的限制(最大超步数),其它字段与持久化有关。
@dataclasses.dataclass(**_DC_KWARGS)
class PregelScratchpad:
step : int
stop : int
call_counter : Callable[[], int]
interrupt_counter : Callable[[], int]
get_null_resume : Callable[[bool], Any]
resume : list[Any]
subgraph_counter : Callable[[], int]
PregelScratchpad的call_counter、interrupt_counter和subgraph_counter字段以闭包的形式返回一个计数器。call_counter计数器用于为当前Superstep内产生的所有任务分配唯一的内部序列号。
1.1 Resume Value和中断计数器
interrupt_counter、get_null_resume和resume字段与Pregel基于 “中断(Interrupt)/恢复(Resume)” 的执行方式有关。假设Pregel的对应一个需要人工介入的多级审批流程,在每次需要以人工介入的方式收集审批者决定的时候,流程进入一个中断,当前的状态被持久化。当审批决定给出后,流程以 “恢复” 的形式开始执行,中断时持久化的快照被提取出来 “恢复现场” ,审批决定以Resume Value的形式提供给引擎。为了匹配多个中断点与对应的Resume Value,后者会按照顺序被持久化,并在恢复执行的时候连同当前提供的Resume Value一并填充到PregelScratchpad的resume列表中。
恢复执行做不到在中断点出开始执行,它总是从头执行Node的处理函数,所以定义 幂等Node 应该成为Agent编程的 “金科玉律”。由于PregelScratchpad的resume字段会按照中断的顺序存放Resume Value,所以在恢复执行的时候,每遇到一个中断,引擎可以利用interrupt_counter字段返回的计数器作为位置索引从resume列表中将匹配的Resume Value提取出来。如果提取的Resume Value为None,或者计数器返回的索引越界,get_null_resume字段提供的回调就会执行。这个回调函数具有一个bool类型的参数is_called,调用时该参数被设置为True,表示该中断确实被触发了,但没有对应的数据。这会消耗掉这个中断位,确保流程不至于永远得不到恢复。
1.2 子图调用计数器
如果说interrupt_counter计数器旨在解决每次中断与提供的Resume Value的匹配问题,那么subgraph_counter计数器解决的每次“子图调用”与对应Pregel实例的匹配问题。如果站在“图”的视角,每个Pregel对象就是由多个Node组成的图,而Pregel也可以作为一个Node出现在另一个Pregel构建的图中,两个Pregel之间就称为了“父子”关系,子Pregel构建的图就是“子图”,针对它的调用就是子图调用。
虽然在同一个图中,每个Pregel会独自完成自身的持久化。在恢复执行场景中,引擎会率先加载作为“根”的Pregel对应的Checkpoint来恢复现场。当遇到“子图”形式调用另一个Pregel时,引擎会加载对应的Checkpoint来恢复子图在中断那个时间点的状态。现在问题来了:在子Pregel众多持久化的Checkpoint中,怎么知道该加载哪一个呢?
这个问题本质上是如何解决作为子图执行的Pregel在执行持久化时,如何将生成的Checkpoint与当前执行上下文进行匹配的问题,这个问题是利用Checkpoint命名空间来解决的。Node是以任务的形式被执行的,每个任务具有唯一的ID,并且在恢复时保持不变,如果命名空间由执行链路上每个任务的节点名称+任务ID组成,那么子图的Checkpoint就能利用此命名空间关联起来。
但是问题还是没有完全解决,如果同一个任务涉及针对同一子图的多次调用,如命名空间只包含基于任务的执行路径,此时两个子图会共享相同的命名空间,具体对应哪个Checkpoint依然无法解决。因此若涉及同一个Node针对同一个Pregel对象的多次调用,持久化这个Pregel的Checkpoint的命名空间还应该包含调用顺序。
Checkpoint的命名空间的规则可以通过如下这个演示实例来证实。如代码片段所示,我们创建了一个由单一Node组成的Pregel对象(sub_graph),命名为 “baz” 的Node在执行的时候会从当前的RunnableConfig配置中提取并输出当前的Checkpoint命名空间。
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.pregel._write import ChannelWrite, ChannelWriteTupleEntry
from langgraph.types import RunnableConfig
from typing import Any
def handle(args:dict[str,Any], config:RunnableConfig)->None:
print(config["configurable"]["checkpoint_ns"])
sub_node = (NodeBuilder()
.subscribe_to("start")
.do(handle))
sub_graph = Pregel(
nodes={"baz": sub_node},
channels={
"start": LastValue(None),
},
input_channels=["start"],
output_channels=[])
def handle1(args:dict[str,Any])->None:
sub_graph.invoke(input={"start": None})
def handle2(args:dict[str,Any])->str:
sub_graph.invoke(input={"start": None})
sub_graph.invoke(input={"start": None})
foo = (NodeBuilder()
.subscribe_to("foo")
.do(handle1)
.write_to(bar=None))
bar = (NodeBuilder()
.subscribe_to("bar")
.do(handle2))
graph = Pregel(
nodes={"foo": foo, "bar": bar},
channels={
"foo": LastValue(None),
"bar": LastValue(str),
},
input_channels=["foo"],
output_channels=[],
checkpointer= InMemorySaver())
config = {"configurable": {"thread_id": "123"}}
graph.invoke(input={"foo": None}, config=config)
在另一个Pregel中,我们为它设置了两个先后执行的Node(foo和bar),前者调用sub_graph一次,后者调用两次。针对三次调用,sub_graph为自身持久化设置的Checkpoint命名会以如下的形式输出,可以看出命名空间同时体现了调用链路和次序。
foo:36817c76-c3f7-643f-7924-0d29b39f469a|baz:311cc911-96a0-56b6-225b-28e4cece7cd9
bar:97be6a71-1b71-7364-e691-a122cfef1a92|baz:789287de-869f-42b8-dd03-7518820daaa6
bar:97be6a71-1b71-7364-e691-a122cfef1a92|1|baz:dd1ddd1b-fc62-b46a-c2ec-6a1d8344b793
基于Pregel“中断/恢复”的执行方式,让我们对Pregel实例会有特别的理解。我们习惯了将一个通过调用某个类构造函数创建的对象视为该类型的一个实例,但是在Node的处理函数中,即使针对同一Pregel实例的连续两次调用都有可能出现中断,一旦恢复执行,后一个实例就有可能使根据另一个Checkpoint的状态创建的,它自然也就不是原来的那个实例了。在不断的“中断/恢复”执行流程中,所谓Pregel实例有时候表示成对应的Checkpoint可能更准确。
对于同一个节点任务来说,如果涉及针对同一个子Pregel的多次调用,从第二次调用开始,对方持久化生成的Checkpoint会将调用次序包含在命名空间中。与之相对的,在恢复执行的时候,也需要根据当前的执行上下文提供包含此序号的命名空间采用加载对应的Checkpoint,并最终恢复对应的Pregel对象,PregelScratchpad的subgraph_counter字段返回的计数器就是为了提供这个序号。
2. 两个原生的ManagedValue
由于ManagedValue所能提供的值是根据PregelScratchpad计算生成,而后者可用的唯有表示当前和最大Superstep序号的step和stop字段,所以我们采用ManagedValue的应用场景其实很窄。我从只找到如下两个原生的ManagedValue类型,它们都定义在langgraph.managed.is_last_step这个包中。其中一个IsLastStepManager用于判断是否为最后一个Superstep,而RemainingStepsManager则用来确定余下的Superstep数。具体的实现非常简单,仅仅是针对PregelScratchpad的step和stop字段的简单运算而已。
class IsLastStepManager(ManagedValue[bool]):
@staticmethod
def get(scratchpad: PregelScratchpad) -> bool:
return scratchpad.step == scratchpad.stop - 1
class RemainingStepsManager(ManagedValue[int]):
@staticmethod
def get(scratchpad: PregelScratchpad) -> int:
return scratchpad.stop - scratchpad.step
由于ManagedValue属于一个计算属性,所以它只能作为Node的输入。它可以被视为一种虚拟的Channel,Node针对ManagedValue和常规Channel的读取方式完全一致。在创建Pregel对象时,所用到的ManagedValue需要在channels字段中显式声明,但是不能将其添加到输入和输出Channel列表中。
如下的实例演示了RemainingStepsManager的使用方式,创建的Pregel由两个先后执行的Node构成(foo和bar),它们会将命名为remaining_steps的ManagedValue作为输入,并将其分别输出到remaining_steps_after_foo和remaining_steps_after_bar这两个Channel中,分别表示在这两个Node完成执行后所剩的Superstep数。
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.managed.is_last_step import RemainingStepsManager
from langgraph.channels import LastValue
foo = (NodeBuilder()
.subscribe_to("foo")
.read_from("remaining_steps")
.do(lambda args: args["remaining_steps"])
.write_to(remaining_steps_after_foo= lambda args:args, bar=None))
bar = (NodeBuilder()
.subscribe_to("bar")
.read_from("remaining_steps")
.do(lambda args: args["remaining_steps"])
.write_to("remaining_steps_after_bar"))
app = Pregel(
nodes={"foo":foo, "bar":bar},
channels={
"foo": LastValue(None),
"bar":LastValue(None),
"remaining_steps_after_foo": LastValue(int),
"remaining_steps_after_bar":LastValue(int),
"remaining_steps": RemainingStepsManager,
},
input_channels=["foo"],
output_channels=["remaining_steps_after_foo", "remaining_steps_after_bar"])
config = {"recursion_limit": 10}
result = app.invoke({"foo":None}, config=config)
assert result["remaining_steps_after_foo"] == 10
assert result["remaining_steps_after_bar"] == 9
在根据两个Node创建Pregel对象时,我们将针对命名为remaining_steps的ManagedValue的声明添加到channels字段中,对应的类型被设置为RemainingStepsManager。由于在调用Pregel对象时利用RunnableConfig配置将Superstep迭代限制为10,所以先后执行的两个Node后剩余步数分别为10和9。









