用FunctionCall实现文件解析(九):用通义千问构建rerank方法

前言

好了,铺垫了这么这么久,总算来到稍微正经一点的问题上了。

P.S.:毕竟咱是演示项目,做起来就很粗暴,搜索时间也从来没有优化,所以搜索时间复杂度也是相当的绝望呢。

通义服务

阿里灵积当然是有这样的服务的,使用的案例也可以在官方文档上找到。

由于咱选用的是pymilvus,他的milvus-lite可以直接放在本地,就不需要额外走阿里云的AnalyticDB服务。当然,这样配置的话,存取的速度上限就成了磁盘的速度上限。

本地服务流程

如果需要将内容放在本地的话,流程其实需要走两步:

  1. 首先创建一个数据库,并在数据库中建一张表;
  2. 基于数据库表进行查询;

首先,阿里的通义服务无法给出存储的逻辑,只能够文本向量化。

然后,milvus-lite存储的内容是向量,并提供向量相关的间索过程。

综合这两个问题,流程也就显而易见了。将两者结合在一起,用空间换时间,才是相对而言较优的解决方案。

于是,就可以进行分解与解耦了。

文本向量化

首先是文本向量化。在官方文档中,已经给出了完整的代码示例,可以直接粘贴使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _embedding(
self, input: str,
model: str = TextEmbedding.Models.text_embedding_v3,
dimension: int = 1024
) -> List[float]:
resp = TextEmbedding.call(
model = model,
input = input,
dimension = dimension,
api_key=st.secrets["DASHSCOPE_API_KEY"],
)
if resp.status_code == HTTPStatus.OK:
return resp.output["embeddings"][0]["embedding"]
else:
return []

这一段基本是照搬,没啥悬念。

不过,我们还是添油加醋一番:既然有一个str的向量化,那么我对一个List[str]做向量化没问题吧?

那么就添加一个:

1
2
3
4
5
6
7
8
9
10
11
12
13
def embeddings(
self, input: List[str],
model: str = TextEmbedding.Models.text_embedding_v3,
dimension: int = 1024
) -> List[Dict[str, Union[int, str, List[float]]]]:
embeddings = [
{
"idx": idx,
"text": s,
"vector": self._embedding(s, model, dimension)
} for idx, s in enumerate(input)
]
return embeddings

看上去没啥问题。

向量持久化

接下来就是将向量存储下来了。由于向量本身想要编码回到原来的内容代价稍大,所以,在存储的过程中,我们直接使用map进行存储:

1
2
3
4
5
{
"idx": 1,
"text": "hello world",
"vector": [0.1, 0.2, 0.3, ...]
}

通过向量查询相似度较高的数组序列之后,还能够通过idx获取到原本的文本,这样做也是相对而言方便一些。

于是,接下来的操作也就是按照建表流程建表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from pymilvus import MilvusClient, FieldSchema, CollectionSchema, DataType
def _create_or_replace(
self, milvus_client: MilvusClient, db_file_name: str, collection_name: str,
dimension: int = 1024, data: List[str] = None
) -> MilvusClient:
"""
创建搜索样本库
"""
# 为了简化操作,直接本地创建搜索样本库,并采用覆盖创建的方式
db_file_path = f"{db_file_name}.db"
if milvus_client is None or not os.path.exists(db_file_path):
milvus_client = MilvusClient(db_file_path)
if milvus_client.has_collection(collection_name = collection_name):
milvus_client.drop_collection(collection_name = collection_name)
# 创建集合
milvus_client.create_collection(
collection_name = collection_name, dimension = dimension,
metric_type="IP", # Inner product distance
consistency_level="Strong", # Strong consistency level
# 创建集合`schema`
schema = CollectionSchema(fields=[
## 下面三个字段来自`milvus`官网的`example`
### `id`字段,自增(手动)主键,标记当前句子是第几个
FieldSchema(
name = "id", dtype = DataType.INT64,
is_primary = True, description = "primary key"
),
### `embeddings`字段,存储当前句子的向量
FieldSchema(
name = "embeddings", dtype = DataType.FLOAT_VECTOR,
description = "embeddings", dim = dimension
),
### `text`字段,存储句子本身
FieldSchema(
name = "text", dtype = DataType.VARCHAR,
description = "text", max_length = 65535
)
], description = "data schema")
)
# 创建集合索引,创建后才可以使用`IP`搜索策略
index_params = IndexParams()
## 主要针对`embeddings`字段创建索引
index_params.add_index(
field_name = "embeddings", index_type = "IVF_FLAT",
index_name = "embeddings"
)
milvus_client.create_index(
collection_name = collection_name, index_params = index_params
)
# 插入数据
if data is not None:
milvus_client.insert(collection_name = collection_name, data = data)
return milvus_client

看着也没啥问题。

向量搜索

既然已经建好了数据库,接下来就是搜索这个数据库了。

搜索我们可以直接借助milvus自带的搜索功能,就像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def _embedding_search(
self,
milvus_client: MilvusClient, collection_name: str, question: str, top_k: int = 5
) -> List[Dict]:
"""
搜索向量
"""
result = milvus_client.search(
collection_name = collection_name,
anns_field = "embeddings", # 指定搜索的向量字段名(关键参数)
data = [self._embedding(input = question)], # 输入搜索向量
limit = top_k,
output_fields = ["text"],
)
return result[0]

P.S.:

result输出:

1
2
3
4
5
["[
{'id': 0, 'distance': 0.7548444271087646, 'entity': {'text': ...}},
{'id': 1, 'distance': 0.7238685488700867, 'entity': {'text': ...}},
{'id': 3, 'distance': 0.6937779784202576, 'entity': {'text': ...}}, ...]
"]

因此解包需要指定返回第一个结果,第一个结果里面才是匹配列表。列表中的每个元素都按照先前规定的表头格式呈现。

rerank

那么整体逻辑就清晰了。

rerank本质上作为搜索向量相似度的方法,我们只需要按照顺序拼接上述方法就好了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def rerank(
self, query: str,
model: str = TextEmbedding.Models.text_embedding_v3,
documents: List[str],
db_file: str = "test",
collection_name: str = "demo",
dimension: int = 1024,
top_k: int = 5,
) -> ExtraList:
embeddings = self.embeddings(documents, model)
milvus_client = None
milvus_client = self._create_or_replace(
milvus_client, db_file, collection_name, dimension, embeddings
)
response = self._embedding_search(
milvus_client, collection_name, query, top_k
)
return response

到这里,一个embeddings,一个rerank也就全部结束了。

本地部署

当然,你会想知道本地部署的东西应该怎么调用。

这里也给出基于昇腾解决方案给出的样例:

P.S.:使用AutoTokenizer部署参数的过程就略过了。这里的代码是在已经完成部署的前提下写的请求逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def rerank(
self, query: str, documents: List[str],
model: str, top_k: int = 5,
return_documents: bool = True,
) -> Sequence:
client: Client = Client(
base_url = "http://api.example.com",
headers = {"Authorization": f"Bearer {key}"}
timeout = 60,
)
try:
response: Response = client.post(
"/rerank",
json = {
"query": query,
"documents": documents,
"model": model,
"top_k": top_k,
"return_documents": return_documents,
},
)
response.raise_for_status()
results = response.json()
if results.get("results", None) is not None:
return results["results"]
else:
return []
except Exception as e:
return {"error": str(e)}
finally:
client.close()