使用url自定义LLM并接入langchain

前言

现在有一个新的需求,要把MindIE部署的大模型封装成 LLM。于是我就摸索了一下。

本文从MindIE的简介开始逐步介绍如何自定义封装。有关MindIE的详细介绍就在后续单开一章,因为东西太多了。

另:本章代码已开源到代码库,对应库中的llms/custom/cmdi.py

MindIE为我们提供了什么

我们就不再细究到底什么是MindIE以及原理是什么,一篇文章可讲不完。

总之,MindIE为我们提供了三个接口:

1
2
3
v1/chat/completions
generate
generate_stream

这三个接口从上到下分别是:

  • 兼容OpenAI的接口
  • 返回值仅包含generated_text的返回结果,一次性给出所有回复
  • 逐字给出回复,并且附上每个字的token

返回结果长什么样呢?

让我们来给大模型一个输入:

1
我靠,您就是大名鼎鼎的V吧?

于是,

generate接口返回值就是这样的:

1
2
3
{
"generated_text": "哈哈,你可以这么认为,但在不同的文化和语境中,我可能有着不同的身份和角色。我是来自阿里云的大规模语言模型,我叫通义千问,我在不断学习和进步中,努力成为一个更加优秀和全面的语言模型。如果您有任何问题或需要帮助,请随时告诉我,我会尽力提供支持!不过,说到“大名鼎鼎的V”,您可能是指《V字仇杀队》中的角色吧?那部作品确实非常经典,您喜欢这部电影吗?我们可以聊聊它,或者您想聊些什么,我都愿意倾听和分享。😊"
}

generate_stream接口返回值是这样的:

1
2
3
4
5
{"token":{"id":100323,"text":"哈哈","logprob":null,"special":null},"generated_text":null,"details":null}
{"token":{"id":3837,"text":",","logprob":null,"special":null},"generated_text":null,"details":null}
{"token":{"id":101473,"text":"算是","logprob":null,"special":null},"generated_text":null,"details":null}
{"token":{"id":100003,"text":"吧","logprob":null,"special":null},"generated_text":null,"details":null}
{"token":{"id":3837,"text":"。","logprob":null,"special":null},"generated_text":null,"details":null}

逐字输出的话,后面太长了,就不贴了。

v1/chat/completions接口返回值是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
{
"id": "endpoint_common_1082",
"object": "chat.completion",
"created": 1734364022,
"model": "qwen-plus",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "哈哈,您可能把我跟其他人搞混了。我是Qwen,阿里巴巴云推出的一种超大规模语言模型。我在2023年7月正式与大家见面,我的目标是帮助用户生成各种类型的文本,比如文章、故事、诗歌等,并能进行流畅的对话交流。如果您有任何问题或需要帮助,随时可以告诉我哦!"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 42,
"completion_tokens": 75,
"total_tokens": 117
}
}

可以看得出来,每个接口都有自己的返回值,并且返回值也不尽相同,在不同的场合下有着完全不同的作用。

就比如说,当我们只需要关注结果时,generate接口就够了。当我们需要作为一个产品逐字展示时,可以用generate_stream接口。而当我们需要利用OpenAI的各种工具时,就需要v1/chat/completions接口来进行适配。

为什么偏偏是OpenAI接口?其实最终的目的只是完成工作流,并不是为了接入OpenAI

或者说,说的更绝对一点,兼容OpenAI并不是目的,目的是为了适配OpenAI的一系列生态,比如,langchain

如何将接口MindIE接入langchain

非流式调用

其实,在之前的很多文章中(统一放在博客标签:langchain),讲了很多如何将各种工具接入langchain的方法,也讲了如何应用。

怎么样我们才能够将自己的url集成进去呢?

我们可以模仿官方文档的做法:

其中,必须实现的就只有两个:_call_llm_type。官方文档中出现的_stream_identifying_params都是附带的。

那么是不是说,我们只需要在_call中完成我们调用url的方法就可以了?

试试就逝世。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import json
import requests

import streamlit as st

from langchain_core.language_models.llms import LLM
from langchain_core.callbacks.manager import CallbackManagerForLLMRun

from typing import Any, Dict, Tuple, List, Optional

class MYLLM(LLM):
def _request_builder_(self, prompt: str) -> Tuple[Dict, Dict]:
"""
创建请求参数
"""
url: str = f"{st.secrets['BASE_URL']}/{st.secrets['MORE_URL']}/{st.secrets['OPEN_URL']}"
payload: Dict = {
"model": "qwen-plus",
"messages": [],
"stream": False
}
payload["messages"].append({"role": "user", "content": prompt})
headers: Dict = {
"Content-Type": "application/json",
"Authorization": f"{st.secrets['APPCODE_TYP']} {st.secrets['APPCODE_KEY']}"
}
return url, payload, headers

def _call(
self,
prompt: str,
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> str:
url, payload, headers = self._request_builder_(prompt)
results = requests.post(url, json=payload, headers=headers)
return str(json.loads(results.text)["choices"][0]["message"]["content"])
def _llm_type(self) -> str: return "MyLLM"

于是呢,我们就尝试着按照这篇文章简单调用一下:

1
2
3
4
5
6
7
prompt = PromptTemplate(template="""
{chat_history}
Human: {input}\n\nAssistant:\n
""", input_variables=["input", "chat_history"])
llm = CMDILLM()
chain = prompt | llm
results = chain.invoke({"input": "我靠,您就是大名鼎鼎的V吧?", "chat_history": memory})

于是,控制台输出:

1
哈哈,看起来您可能把我跟别人搞混了。我是Qwen,阿里巴巴云推出的一种超大规模语言模型。如果您有任何问题或需要帮助,请随时告诉我!

嗯,这样我们就把MindIE接入langchain了。

流式调用

当然,这个是一种generate的实现。我们又应该如何实现流式输出呢?

为了集成LangChain,我这边还是采用兼容OpenAI接口的v1/chat/completions

在这里呢,我们首先需要将底层数据适配到适合流式调用的方案,也就是对应到这个片段:

1
2
3
4
5
payload: Dict = {
"model": "qwen-plus",
"messages": [],
"stream": False
}

我们要做的也就只是将stream设置为True即可。

值得注意的是,这样请求的结果与你所期望的可能有点差距。他的输出大概是这样:

1
b'data: {"id":"endpoint_common_29","object":"chat.completion.chunk","created":1745564259,"model":"***","choices":[{"index":0,"delta":{"role":"assistant","content":"<think>"},"finish_reason":null}]}\n\ndata: {"id":"endpoint_common_29", ...

不难发现,这个跟平常我们所能见到的json完全不一样。甚至,这一整串还不是字符串,是一段二进制数据。

那么,如何接入LangChain呢?这里只描述两种方案。

非流式的流式调用

这句话应当这么理解:你实际上使用的是流式调用,但由于核心处理逻辑是收集所有结果之后,提取关键内容,再统一返回,反而感觉像非流式。

于是呢,要做的事情就很简单了:

  1. 解析二进制数据
  2. 提取json字符串
  3. 处理json字符串中的关键信息
  4. 拼接所有内容
  5. 返回拼接结果

那么,我们也就只需要对最后的response做亿点点处理:

1
2
3
4
5
6
pattern = r'data:.*?(?=\ndata:|$)'
matches = [item.strip() for item in re.findall(pattern, result["data"], re.DOTALL)]
for chunk in matches:
deal = chunk.replace("data: ", "")
if "[DONE]" not in deal:
print(json.loads(deal)["choices"][0]["delta"]["content"])

老实的流式调用

这块本质上还是一字一句的返回结果,只不过,在返回的时候需要反复拼接当前字段并输出。具体而言,就是:

  1. 解析二进制数据
  2. 提取json字符串
  3. 处理json字符串中的关键信息
  4. 每次处理json后都要拼接内容
  5. 每次拼接结束后都返回结果

也就是说,需要改成:

1
2
3
4
5
6
7
8
pattern = r'data:.*?(?=\ndata:|$)'
matches = [item.strip() for item in re.findall(pattern, result["data"], re.DOTALL)]
whole_sentence = ""
for chunk in matches:
deal = chunk.replace("data: ", "")
if "[DONE]" not in deal:
whole_sentence = whole_sentence + json.loads(deal)["choices"][0]["delta"]["content"]
print(whole_sentence)

如果你在print上加上end='\r',那样就会不停的刷新这一行的信息,从而实现打字机的效果。

如果你需要做成迭代器,用于后续的灵活变量处理,实际上也就是需要将print改成yield就好了。