langchain更新还体验:连接neo4j

前言

在前面的两篇文章(langchain更新再体验:加入一个promptlangchain更新初体验)中,我们完成了一些基础任务,能够回答,也能够植入自定义prompt,那么就再进一步吧,找出句子中的实体,再去知识图谱中查询是否存在。

目前源码已更新到了我的GitHub上,本文对应的是functions目录下的neo4disease.py文件。

配置

这次在配置文件中增加了一些内容:

1
2
3
4
5
DASHSCOPE_API_KEY="sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
NEO4J_ADDR="xxx.xxx.xxx.xxx"
NEO4J_PORT="xxxxx"
NEO4J_USER="xxxxx"
NEO4J_PASS="xxxxx"

利用配置文件加载可以减少耦合,顺便增加破解难度。

获得大模型对象

也不再赘述,仅给出代码:

1
2
3
4
5
6
7
8
9
10
11
class TongyiFactory(BaseLLMFactory):
def __init__(self, api_key: str = st.secrets["DASHSCOPE_API_KEY"], model_name: str = "qwen-max", top_p: float = 0.7):
self.api_key = api_key
self.model_name = model_name
self.top_p = top_p

def build(self):
from langchain_community.llms import Tongyi
llm = Tongyi(model = self.model_name, top_p = self.top_p, api_key = self.api_key)
return llm
llm = TongyiFactory().build()

PromptTemplate

也不再赘述,仅给出代码:

1
2
3
4
5
6
7
8
9
template = """
请你作为一名医学专家,请听取用户所说的话,好好理解用户的病症。
{chat_history}
在了解病症之后,请鼓励用户积极说明病症并积极参与治疗。
Human: {human_input}
"""
prompt_template = PromptTemplate(
template = template, input_variables=["chat_history", "human_input"]
)

chat_history

也不再赘述,仅给出代码:

1
memory = ConversationBufferMemory(memory_key="chat_history", input_key="human_input")

管道链接

虽然还是一样,但是在这里由于加入了不同的功能,所以有必要展开讲解一下。

按照流程上来说,肯定是先提取句子中的实体,再查询知识图谱,最后再回复。

那么,问题就应该分为三步进行:

  1. 构建句子实体提取Runnable对象,接收用户输入,并且提取其中可能为【病症】的实体;
  2. 构建知识图谱查询Runnable对象,接收上一步提取的实体,并且查询知识图谱;
  3. 构建回复Runnable对象,接收上一步查询的结果,并且回复用户。

那么,我们开始吧。

构建知识图谱查询Runnable对象

虽然说流程上是先提取,后查询。为了方便,就设计成了提取句子的Runnable里面调用查询知识图谱的Runnable。于是在介绍的时候,就优先构建知识图谱查询Runnable类。

由于我们已经给出了配置类,所以接下来构建类的过程可以构建为全局变量。

1
2
3
4
5
6
7
8
# 利用的是 `neo4j` 中的 `GraphDatabase`,而不是 `py2neo`
from neo4j import GraphDatabase

# 创建 Neo4j 驱动
URI = f'bolt://{st.secrets["NEO4J_ADDR"]}:{st.secrets["NEO4J_PORT"]}'
USERNAME = st.secrets["NEO4J_USER"]
PASSWORD = st.secrets["NEO4J_PASS"]
DRIVER = GraphDatabase.driver(uri, auth = (USERNAME, PASSWORD))

或者,我们可以作为类的配置项:

1
2
3
4
5
6
class DiseaseQueryRunnable(Runnable):
def __init__(self, uri: str = URI, username: str = USERNAME, password: str = PASSWORD, driver: GraphDatabase.driver = DRIVER):
self.uri = uri
self.username = username
self.password = password
self.driver = driver

为了方便,我们采用全局变量的方式。当然,再怎么全局也依然限制在当前文件的闭包中,所以也不必担心C/C++中未声明externel造成的问题。

然后,我们就需要实现Runnable中的invoke方法。

具体也就是遍历在提取过程中所获得的实体名称,并送入neo4j进行查询。即:

1
2
3
4
5
6
7
8
9
10
11
12
13
from typing import Optional
def invoke(self, entities: List[str], config: Optional[dict] = None) -> str:
neo4j_results = []
for entity in entities:
query = f"MATCH (d:Disease {{name: '{entity}'}}) RETURN d"
with driver.session() as session:
result = session.run(query)
records = [record["d"] for record in result]
if records:
neo4j_results.append(f"病症 '{entity}' 存在于数据库中。")
else:
neo4j_results.append(f"病症 '{entity}' 不存在于数据库中。")
return "\n".join(neo4j_results)

需要注意的是,invoke方法需要2参数:

  1. input,表示输入数据,这里是用entities表示提取的实体列表
  2. config,表示Runnable的配置

最后还有一个**kargs,可以忽略。

如果说需要构建一个可以被代理的对象,那么就需要指定输入与配置,就像上面给出来的一样。

构建句子实体提取Runnable对象

接下来就是如何提取句子中的Runnable对象了。

同样的,我们创建一个类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from langchain.schema.runnable import Runnable
from langchain.prompts.base import BasePromptTemplate
from langchain_core.memory import BaseMemory

class DiseaseExtractionRunnable(Runnable):
def __init__(self, chain: Runnable | BasePromptTemplate, memory: BaseMemory):
self.chain = chain
self.memory = memory
def invoke(self, inputs: dict, config: Optional[dict] = None) -> dict:
human_input = inputs["human_input"]
# 提取实体
extracted_entities = self.chain.invoke({"text": human_input})
entities = [e.strip() for e in extracted_entities.split(',') if e.strip()]
# 查询 Neo4j
neo4j_results = DiseaseQueryRunnable().invoke(entities)
# 获取对话历史
chat_history = self.memory.load_memory_variables({"human_input": human_input})["chat_history"]
# 返回 PromptTemplate 所需的输入
result = dict(
chat_history = chat_history,
human_input = human_input,
neo4j_results = neo4j_results
)

P.S.:其实BasePromptTemplate也继承自Runnable,但是这里为了明确说明可以接PromptTemplate,所以才写在这里。

其中,可以看到,在执行invoke方法的过程中,分为2个部分:

  1. 将用户输入的问题送给chain,并提交到大模型,得到结果
  2. 将结果送入刚刚构建的知识图谱查询Runnable对象,并得到结果

因此,一定会有一个chain会进入这个方法,用于提取实体。因此在__init__中就需要传入chain对象。同时,为了兼顾对话历史,也需要一个memory对象。

而在后续过程中,每次都将获取invoke方法返回的结果,从而逐步骤给出大模型结果。

完善过程

在完成上述步骤之后,把整个过程串起来作为一个流程,也就比较顺利了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
llm = get_llm_factory("tongyi")
memory = ConversationBufferMemory(memory_key="chat_history", input_key="human_input")
entity_extraction_prompt = PromptTemplate(
input_variables = ["text"],
template = "请从以下文本中提取病症名称,用逗号分隔多个病症:{text}"
)
entity_extraction_chain = entity_extraction_prompt | llm
template = """
请你作为一名医学专家,请听取用户所说的话,好好理解用户的病症。
{chat_history}
{neo4j_results}
在了解病症之后,请鼓励用户积极说明病症并积极参与治疗。
Human: {human_input}
"""
prompt_template = PromptTemplate(
template = template,
input_variables = ["chat_history", "human_input", "neo4j_results"]
)
chain = DiseaseExtractionRunnable(chain = entity_extraction_chain, memory = memory) | prompt_template | llm
human_input = "我最近头痛得厉害,不知道是不是感冒了。"
response = chain.invoke({"chat_history": memory, "human_input": human_input})
# print(response)
memory.save_context({"human_input": human_input}, {"output": response})

到这里,也就是一个比较基本的入门了。