用FunctionCall实现文件解析(四):ChatOpenAI处理大模型返回结果
前言
在上一篇文章中,我们完成了ChatOpenAI
的实例化。接下来也就自然而然是使用它了。
如何选择
我们通过网络,可以查到ChatOpenAI
的调用方法有invoke
、generate
、stream
三种。当然,每种方法也有对应的async
调用,算上ainvoke
、agenerate
、astream
总共有6种方法。
我们先不考虑async
,因为原理上没有很大的差别。
invoke
我们首先观察invoke
方法:
1 | def invoke( |
暂时看不出来啥。
我们继续深挖,看看generate_prompt
方法:
1 | def generate_prompt( |
发现了吗?在这里,我们直接调用了generate
方法。哪怕我不去深究generate
方法里面是什么,我也知道invoke
方法就是generate
方法的上层封装了。
generate
当然,你也可以继续追究下去。但是generate
方法就是更底层的实现了。通过像Vue
一样的钩子函数,generate
方法通过on_chat_model_start
、on_llm_error
、on_llm_end
等钩子函数,确定了大模型从启动到终止这个全过程的处理过程。代码较长,就不在这里面详细展开了。
stream
那么,stream
又是怎么一回事呢?
我们先不去考虑方法本身,光看函数返回值就觉得不一般:
1 | def stream(...) -> Iterator[BaseMessageChunk] |
和invoke
方法返回的BaseMessage
不同,首先返回了一个BaseMessageChunk
,其次,返回的是一个迭代器。
我们一步步来。首先是BaseMessageChunk
。在官方源码中,有这么一句注释:
1 | # If both are (subclass of) BaseMessageChunk, |
所以,看上去,BaseMessageChunk
是一个BaseMessage
的子类,所需要实现的功能,就是将多个BaseMessageChunk
合并成一个BaseMessageChunk
。其中,BaseMessageChunk
中的__add__
方法中,也规定了合并过程中需要列表或者字符串。
那么迭代器又是为什么?
在方法一开始,_should_stream
方法规定:
- 当调用的
API
是同步API
,但是同步接口没实现 - 当调用的
API
是异步API
,但是异同步接口都没实现
这样的话,就会从stream
方法折回invoke
方法。
到这一步,似乎无法解释为什么是迭代器。没事,我们继续。
哪怕真的折回invoke
方法了,构建返回值的时候,也是直接这样返回:
1 | yield cast( |
也就是说,返回的其实就是迭代器。相比于整个返回结果全部一次性加载,这个迭代器的加入实际上就给了一个打字机的效果,看上去更有大模型的味道,而且主要是这样做看起来负载会更小一点,效率会更高一些。
那么,如果_should_stream
返回结果引导stream
方法不退回到invoke
,又该如何执行呢?
与generate
相同的是,同样采用了钩子函数,在stream
方法中,通过on_chat_model_start
、on_llm_error
、on_llm_end
等钩子函数,确定 larg模型从启动到终止这个全过程的处理过程;与generate
不同的是,由于stream
需要类似打字机的效果,所以在中间过程中还会有on_llm_new_token
钩子函数来监测每次输出的结果,从而选择推动继续生成还是终止。
async的ainvoke、agenerate和astream
为什么说基本一样的呢?
因为仔细观察他们的源码,可以发现,虽然说方法定义的时候有async
,但是内部方法只有相应的钩子函数或者关键业务函数通过asynio
控制或者直接await
返回,其他的业务逻辑基本没有变化。因此,可以理解为,整体上与同步方法没有太大区别。
总结
根据以上介绍,我们也就可以简单地认为:
当我们并不太在意结果的返回结果的时间以及显存空间的占用时,我们完全可以直接调用invoke
方法。尤其是当我们构建的ChatOpenAI
实例采用的是云服务器的时候。
而当我们需要在意显存占用或者返回结果时间中的任意一个时,我们需要更谨慎地决定是否可以使用invoke
方法。
当我们需要时刻为客户的使用体验考虑时,就没有使用invoke
或者generate
方法的必要了,只能采用astream
方法,让用户时刻注意到大模型的返回进度与内容,并且最大幅度通过async
的异步机制增加CPU
和GPU
的效率,从而更大程度上提升负载与并发。
使用
由于并发极限在我的演示项目中很难出现,所以,为了方便各位参考更接近产品的示例,接下来也将采用stream
方法演示。
比如,我们在测试脚本中,首先构造一个问答场景:
1 | from collections.abc import Iterable |
拿到返回结果先别慌,先思考:
我拿到的是迭代器还是数组?
虽然说我这里给了
response
的类型,但还是需要各位能够清楚地认识到,stream
的返回结果就是一个迭代器。
那也就是说,最终我其实并不需要再额外定义一个迭代器生成器函数,来将当前这个迭代器解包为一个新的迭代器,而是直接读取这个response
迭代器,然后解包输出即可。
所以,拿到response
之后,我们真正要做的,并不是再去def stream_response_iterator(response)
,而是直接用这个拿去迭代输出:
1 | for chunk in response: |
在这里,print
的行为通过end
和flush
两个参数进行了重定义。首先,end
参数使得print
函数在每次输出之后,不添加任何信息,而是直接接在后面继续输出。
其次,flush
参数使得每次输出之后,立即刷新缓冲区,而不是等待缓冲区满之后再刷新。
最后,结束后另起一行,清除end = ""
带来的效果。
这样,我们也就在终端里得到了打字机的效果了。
到这里,所有的功能也就调试的差不多了。准备上前端吧。