前言
好了,铺垫了这么这么久,总算来到稍微正经一点的问题上了。
P.S.:毕竟咱是演示项目,做起来就很粗暴,搜索时间也从来没有优化,所以搜索时间复杂度也是相当的绝望呢。
通义服务
阿里灵积当然是有这样的服务的,使用的案例也可以在官方文档上找到。
由于咱选用的是pymilvus
,他的milvus-lite
可以直接放在本地,就不需要额外走阿里云的AnalyticDB
服务。当然,这样配置的话,存取的速度上限就成了磁盘的速度上限。
本地服务流程
如果需要将内容放在本地的话,流程其实需要走两步:
- 首先创建一个数据库,并在数据库中建一张表;
- 基于数据库表进行查询;
首先,阿里的通义服务无法给出存储的逻辑,只能够文本向量化。
然后,milvus-lite
存储的内容是向量,并提供向量相关的间索过程。
综合这两个问题,流程也就显而易见了。将两者结合在一起,用空间换时间,才是相对而言较优的解决方案。
于是,就可以进行分解与解耦了。
文本向量化
首先是文本向量化。在官方文档中,已经给出了完整的代码示例,可以直接粘贴使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def _embedding( self, input: str, model: str = TextEmbedding.Models.text_embedding_v3, dimension: int = 1024 ) -> List[float]: resp = TextEmbedding.call( model = model, input = input, dimension = dimension, api_key=st.secrets["DASHSCOPE_API_KEY"], ) if resp.status_code == HTTPStatus.OK: return resp.output["embeddings"][0]["embedding"] else: return []
|
这一段基本是照搬,没啥悬念。
不过,我们还是添油加醋一番:既然有一个str
的向量化,那么我对一个List[str]
做向量化没问题吧?
那么就添加一个:
1 2 3 4 5 6 7 8 9 10 11 12 13
| def embeddings( self, input: List[str], model: str = TextEmbedding.Models.text_embedding_v3, dimension: int = 1024 ) -> List[Dict[str, Union[int, str, List[float]]]]: embeddings = [ { "idx": idx, "text": s, "vector": self._embedding(s, model, dimension) } for idx, s in enumerate(input) ] return embeddings
|
看上去没啥问题。
向量持久化
接下来就是将向量存储下来了。由于向量本身想要编码回到原来的内容代价稍大,所以,在存储的过程中,我们直接使用map
进行存储:
1 2 3 4 5
| { "idx": 1, "text": "hello world", "vector": [0.1, 0.2, 0.3, ...] }
|
通过向量查询相似度较高的数组序列之后,还能够通过idx
获取到原本的文本,这样做也是相对而言方便一些。
于是,接下来的操作也就是按照建表流程建表:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| from pymilvus import MilvusClient, FieldSchema, CollectionSchema, DataType def _create_or_replace( self, milvus_client: MilvusClient, db_file_name: str, collection_name: str, dimension: int = 1024, data: List[str] = None ) -> MilvusClient: """ 创建搜索样本库 """ db_file_path = f"{db_file_name}.db" if milvus_client is None or not os.path.exists(db_file_path): milvus_client = MilvusClient(db_file_path) if milvus_client.has_collection(collection_name = collection_name): milvus_client.drop_collection(collection_name = collection_name) milvus_client.create_collection( collection_name = collection_name, dimension = dimension, metric_type="IP", consistency_level="Strong", schema = CollectionSchema(fields=[ FieldSchema( name = "id", dtype = DataType.INT64, is_primary = True, description = "primary key" ), FieldSchema( name = "embeddings", dtype = DataType.FLOAT_VECTOR, description = "embeddings", dim = dimension ), FieldSchema( name = "text", dtype = DataType.VARCHAR, description = "text", max_length = 65535 ) ], description = "data schema") ) index_params = IndexParams() index_params.add_index( field_name = "embeddings", index_type = "IVF_FLAT", index_name = "embeddings" ) milvus_client.create_index( collection_name = collection_name, index_params = index_params ) if data is not None: milvus_client.insert(collection_name = collection_name, data = data) return milvus_client
|
看着也没啥问题。
向量搜索
既然已经建好了数据库,接下来就是搜索这个数据库了。
搜索我们可以直接借助milvus
自带的搜索功能,就像这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| def _embedding_search( self, milvus_client: MilvusClient, collection_name: str, question: str, top_k: int = 5 ) -> List[Dict]: """ 搜索向量 """ result = milvus_client.search( collection_name = collection_name, anns_field = "embeddings", data = [self._embedding(input = question)], limit = top_k, output_fields = ["text"], ) return result[0]
|
P.S.:
result输出:
1 2 3 4 5
| ["[ {'id': 0, 'distance': 0.7548444271087646, 'entity': {'text': ...}}, {'id': 1, 'distance': 0.7238685488700867, 'entity': {'text': ...}}, {'id': 3, 'distance': 0.6937779784202576, 'entity': {'text': ...}}, ...] "]
|
因此解包需要指定返回第一个结果,第一个结果里面才是匹配列表。列表中的每个元素都按照先前规定的表头格式呈现。
rerank
那么整体逻辑就清晰了。
rerank
本质上作为搜索向量相似度的方法,我们只需要按照顺序拼接上述方法就好了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| def rerank( self, query: str, model: str = TextEmbedding.Models.text_embedding_v3, documents: List[str], db_file: str = "test", collection_name: str = "demo", dimension: int = 1024, top_k: int = 5, ) -> ExtraList: embeddings = self.embeddings(documents, model) milvus_client = None milvus_client = self._create_or_replace( milvus_client, db_file, collection_name, dimension, embeddings ) response = self._embedding_search( milvus_client, collection_name, query, top_k ) return response
|
到这里,一个embeddings
,一个rerank
也就全部结束了。
本地部署
当然,你会想知道本地部署的东西应该怎么调用。
这里也给出基于昇腾解决方案给出的样例:
P.S.:使用AutoTokenizer
部署参数的过程就略过了。这里的代码是在已经完成部署的前提下写的请求逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| def rerank( self, query: str, documents: List[str], model: str, top_k: int = 5, return_documents: bool = True, ) -> Sequence: client: Client = Client( base_url = "http://api.example.com", headers = {"Authorization": f"Bearer {key}"} timeout = 60, ) try: response: Response = client.post( "/rerank", json = { "query": query, "documents": documents, "model": model, "top_k": top_k, "return_documents": return_documents, }, ) response.raise_for_status() results = response.json() if results.get("results", None) is not None: return results["results"] else: return [] except Exception as e: return {"error": str(e)} finally: client.close()
|