用FunctionCall实现文件解析(四):ChatOpenAI处理大模型返回结果

前言

上一篇文章中,我们完成了ChatOpenAI的实例化。接下来也就自然而然是使用它了。

如何选择

我们通过网络,可以查到ChatOpenAI的调用方法有invokegeneratestream三种。当然,每种方法也有对应的async调用,算上ainvokeagenerateastream总共有6种方法。

我们先不考虑async,因为原理上没有很大的差别。

invoke

我们首先观察invoke方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def invoke(
self, input: LangguageModelInput,
config: Optional[RunnableConfig] = None,
*,
stop: Optional[List[str]] = None,
**kwargs
) -> BaseMessage:
config = ensure_config(config)
return cast(
ChatGereration,
self.generate_prompt(
[self._convert_input(input)],
stop=stop,
callbakcs=config.get("callbacks"),
tags=config.get("tags"),
metadata=config.get("metadata"),
run_name=config.get("run_name"),
run_id=config.pop("run_id", None),
**kwargs,
).generations[0][0],
).message

暂时看不出来啥。

我们继续深挖,看看generate_prompt方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def generate_prompt(
self,
prompts: list[PromptValue],
stop: Optional[List[str]] = None,
callbacks: Callbacks = None,
**kwargs
) -> LLMResult:
prompt_messages = [p.to_messages() for p in prompts]
return self.generate(
prompt_messages,
stop=stop,
callbacks=callbacks,
**kwargs
)

发现了吗?在这里,我们直接调用了generate方法。哪怕我不去深究generate方法里面是什么,我也知道invoke方法就是generate方法的上层封装了。

generate

当然,你也可以继续追究下去。但是generate方法就是更底层的实现了。通过像Vue一样的钩子函数,generate方法通过on_chat_model_starton_llm_erroron_llm_end等钩子函数,确定了大模型从启动到终止这个全过程的处理过程。代码较长,就不在这里面详细展开了。

stream

那么,stream又是怎么一回事呢?

我们先不去考虑方法本身,光看函数返回值就觉得不一般:

1
def stream(...) -> Iterator[BaseMessageChunk]

invoke方法返回的BaseMessage不同,首先返回了一个BaseMessageChunk,其次,返回的是一个迭代器。

我们一步步来。首先是BaseMessageChunk。在官方源码中,有这么一句注释:

1
2
# If both are (subclass of) BaseMessageChunk,
# concat into a single BaseMessageChunk

所以,看上去,BaseMessageChunk是一个BaseMessage的子类,所需要实现的功能,就是将多个BaseMessageChunk合并成一个BaseMessageChunk。其中,BaseMessageChunk中的__add__方法中,也规定了合并过程中需要列表或者字符串。

那么迭代器又是为什么?

在方法一开始,_should_stream方法规定:

  • 当调用的API是同步API,但是同步接口没实现
  • 当调用的API是异步API,但是异同步接口都没实现

这样的话,就会从stream方法折回invoke方法。

到这一步,似乎无法解释为什么是迭代器。没事,我们继续。

哪怕真的折回invoke方法了,构建返回值的时候,也是直接这样返回:

1
2
3
yield cast(
BaseMessageChunk, self.invoke(input, config=config, stop=stop, **kwargs)
)

也就是说,返回的其实就是迭代器。相比于整个返回结果全部一次性加载,这个迭代器的加入实际上就给了一个打字机的效果,看上去更有大模型的味道,而且主要是这样做看起来负载会更小一点,效率会更高一些。

那么,如果_should_stream返回结果引导stream方法不退回到invoke,又该如何执行呢?

generate相同的是,同样采用了钩子函数,在stream方法中,通过on_chat_model_starton_llm_erroron_llm_end等钩子函数,确定 larg模型从启动到终止这个全过程的处理过程;与generate不同的是,由于stream需要类似打字机的效果,所以在中间过程中还会有on_llm_new_token钩子函数来监测每次输出的结果,从而选择推动继续生成还是终止。

async的ainvoke、agenerate和astream

为什么说基本一样的呢?

因为仔细观察他们的源码,可以发现,虽然说方法定义的时候有async,但是内部方法只有相应的钩子函数或者关键业务函数通过asynio控制或者直接await返回,其他的业务逻辑基本没有变化。因此,可以理解为,整体上与同步方法没有太大区别。

总结

根据以上介绍,我们也就可以简单地认为:

当我们并不太在意结果的返回结果的时间以及显存空间的占用时,我们完全可以直接调用invoke方法。尤其是当我们构建的ChatOpenAI实例采用的是云服务器的时候。

而当我们需要在意显存占用或者返回结果时间中的任意一个时,我们需要更谨慎地决定是否可以使用invoke方法。

当我们需要时刻为客户的使用体验考虑时,就没有使用invoke或者generate方法的必要了,只能采用astream方法,让用户时刻注意到大模型的返回进度与内容,并且最大幅度通过async的异步机制增加CPUGPU的效率,从而更大程度上提升负载与并发。

使用

由于并发极限在我的演示项目中很难出现,所以,为了方便各位参考更接近产品的示例,接下来也将采用stream方法演示。

比如,我们在测试脚本中,首先构造一个问答场景:

1
2
3
4
5
6
from collections.abc import Iterable
from langchian_core.messages import BaseMessageChunk, SystemMessage, HumanMessage
response: Iterable[BaseMessageChunk] = llm.stream([
SystemMessage(content="You are a helpful assistant."),
HumanMessage(content="What is the capital of France?"),
])

拿到返回结果先别慌,先思考:

我拿到的是迭代器还是数组?

虽然说我这里给了response的类型,但还是需要各位能够清楚地认识到,stream的返回结果就是一个迭代器。

那也就是说,最终我其实并不需要再额外定义一个迭代器生成器函数,来将当前这个迭代器解包为一个新的迭代器,而是直接读取这个response迭代器,然后解包输出即可。

所以,拿到response之后,我们真正要做的,并不是再去def stream_response_iterator(response),而是直接用这个拿去迭代输出:

1
2
3
for chunk in response:
print(chunk.content, end = "", flush = True)
print()

在这里,print的行为通过endflush两个参数进行了重定义。首先,end参数使得print函数在每次输出之后,不添加任何信息,而是直接接在后面继续输出。

其次,flush参数使得每次输出之后,立即刷新缓冲区,而不是等待缓冲区满之后再刷新。

最后,结束后另起一行,清除end = ""带来的效果。

这样,我们也就在终端里得到了打字机的效果了。

到这里,所有的功能也就调试的差不多了。准备上前端吧。