一、Functions简介
可以把Tools作为依赖于外部服务的插件,Functions就是内部插件,二者都是用来增强open webui的能力的。Functions是轻量的,高度可定制的,并且是用纯Python编写的,所以你可以自由地创建任何东西——从新的人工智能工作流到与你使用的任何东西的集成,比如谷歌搜索或家庭助理。
在open webui中的Function包括三种类型:Pipe Function、Filter Function和Action Function。
Pipe类型的Function用于自定义Agent或模型,用户在对话中可以像普通的模型那样选择使用。
Filter类型的Function用于对往返大模型的数据进行处理,从而可以在不中断对话的前提下,拦截对话内容并进行修改或其他处理,比如日志。过滤器一般用于轻量级处理,包括:发送数据到监控平台、记录日志、修改用户输入、阻断有害消息、翻译和限流等。
Action类型的Function用来对聊天界面的按钮进行定制。这些按钮出现在单个聊天消息下方,让您可以方便地一键访问您定义的操作。
本文仅对Action类型的Function进行解析。
二、导入一个Function
1)进入open webui社区的Functions页面,选择一个Function,这里以Save Outputs为例
2)点击Save Outputs,进入如下页面
3)点击Get,在对话框填写你的open webui的地址
4)点击Import to WebUI
进入open webui页面,显示函数代码,核心代码为把大模型的输出写入本地文件中,完整源码如下:
"""
title: save_outputs
author: stefanpietrusky
author_url: https://downchurch.studio/
inspiration: add_to_memories_action_button @pad4651
instruction: you need to mount the container folder /app/data with a local folder when creating the container! „--mount type=bind,source="FOLDER PATH\docker_data",target=/app/data“
icon_url: 
version: 0.1
"""import os
from pydantic import BaseModel, Field
from typing import Optional
class Action:
class Valves(BaseModel):
passclass UserValves(BaseModel):
show_status: bool = Field(
default=True, description="Show status of the action."
)
passdef __init__(self):
self.valves = self.Valves()
passasync def action(
self,
body: dict,
__user__=None,
__event_emitter__=None,
__event_call__=None,
) -> Optional[dict]:
print(f"action:{__name__}")user_valves = __user__.get("valves")
if not user_valves:
user_valves = self.UserValves()if __event_emitter__:
last_assistant_message = body["messages"][-1]if user_valves.show_status:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Saving to file", "done": False},
}
)try:
directory = "/app/data"
if not os.path.exists(directory):
os.makedirs(directory)file_path = os.path.join(directory, "saved_outputs.txt")
with open(file_path, "a") as file:
file.write(f"{last_assistant_message['content']}\n\n")
print("Output saved to file in the container, accessible on the host.")except Exception as e:
print(f"Error saving output to file: {str(e)}")
if user_valves.show_status:
await __event_emitter__(
{
"type": "status",
"data": {
"description": "Error Saving to File",
"done": True,
},
}
)if user_valves.show_status:
await __event_emitter__(
{
"type": "status",
"data": {"description": "Output Saved", "done": True},
}
)
因为可能是恶意代码,所以需要阅读检查代码。检查无误后,可以保存,该函数便作为插件进入open webui体系中。
三、具体使用
函数生效后,在大模型返回对一个问题的应答后,在工具栏显示该函数图标。
用户点击该链接,则保存当前大模型输出写入到文件中。
三、源码分析
1)数据模型
Function数据保存在Function表中,表定义如下:
其中:
id:函数唯一标识
userid:用户唯一标识
name:函数名
type:函数类型 filter|pipe|action
content:方法源代码
meta:元数据
valves:阈值
is_active:是否被激活(激活后才可见)
is_global:全局还是局部(仅某个用户使用)
2)导入函数
从open webui社区页面点击 Import to WebUI时,浏览器启动一个新页面,并提交代码格式化请求到/app/v1/utils/code/format,后端调用black模块进行严格格式化处理,并把格式化后的代码返回前端。
@router.post("/code/format")
async def format_code(form_data: CodeForm, user=Depends(get_admin_user)):
try:
formatted_code = black.format_str(form_data.code, mode=black.Mode())
return {"code": formatted_code}
except black.NothingChanged:
return {"code": form_data.code}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
完成格式化处理后,后端再提交创建Function请求到 /api/v1/functions/create。请求数据为:
{
"id": "save_outputs",
"name": "Save Outputs",
"meta": {
"description": "Save outputs locally on your computer.",
"manifest": {
"title": "save_outputs",
"author": "stefanpietrusky",
"author_url": "https://downchurch.studio/",
"inspiration": "add_to_memories_action_button @pad4651",
"instruction": "you need to mount the container folder /app/data with a local folder when creating the container! „--mount type=bind,source=\"FOLDER PATH\\docker_data\",target=/app/data\"",
"icon_url": "",
"version": "0.1"
},
"type": "action",
"user": {
"id": "9e4f4854-71d9-429a-99b9-9338a393de9e",
"username": "pietrusky",
"name": "",
"createdAt": 1724186428,
"role": null,
"verified": false
},
"id": "542145b0-59a0-44f2-86f1-dd2f1e64d705"
},#content由注释和源代码组成
"content": "\"\"\"\ntitle: save_outputs\nauthor: stefanpietrusky\nauthor_url: https://downchurch.studio/\ninspiration: add_to_memories_action_button @pad4651\ninstruction: you need to mount the container folder /app/data with a local folder when creating the container! „--mount type=bind,source=\"FOLDER PATH\\docker_data\",target=/app/data\"\nicon_url: \nversion: 0.1\n\"\"\"\n\nimport os\nfrom pydantic import BaseModel, Field\nfrom typing import Optional\n\n\nclass Action:\n class Valves(BaseModel):\n pass\n\n class UserValves(BaseModel):\n show_status: bool = Field(\n default=True, description=\"Show status of the action.\"\n )\n pass\n\n def __init__(self):\n self.valves = self.Valves()\n pass\n\n async def action(\n self,\n body: dict,\n __user__=None,\n __event_emitter__=None,\n __event_call__=None,\n ) -> Optional[dict]:\n print(f\"action:{__name__}\")\n\n user_valves = __user__.get(\"valves\")\n if not user_valves:\n user_valves = self.UserValves()\n\n if __event_emitter__:\n last_assistant_message = body[\"messages\"][-1]\n\n if user_valves.show_status:\n await __event_emitter__(\n {\n \"type\": \"status\",\n \"data\": {\"description\": \"Saving to file\", \"done\": False},\n }\n )\n\n try:\n directory = \"/app/data\"\n if not os.path.exists(directory):\n os.makedirs(directory)\n\n file_path = os.path.join(directory, \"saved_outputs.txt\")\n with open(file_path, \"a\") as file:\n file.write(f\"{last_assistant_message['content']}\\n\\n\")\n print(\"Output saved to file in the container, accessible on the host.\")\n\n except Exception as e:\n print(f\"Error saving output to file: {str(e)}\")\n if user_valves.show_status:\n await __event_emitter__(\n {\n \"type\": \"status\",\n \"data\": {\n \"description\": \"Error Saving to File\",\n \"done\": True,\n },\n }\n )\n\n if user_valves.show_status:\n await __event_emitter__(\n {\n \"type\": \"status\",\n \"data\": {\"description\": \"Output Saved\", \"done\": True},\n }\n )"
}
对应函数源码如下:
处理流程如下:
1)防错处理,判断函数名是否符合python标识符的命名规则,不符合则报错
2)对源中import的模块名进行替换
3)加载源码成为可使用的模块
4)把该函数加载到全局FUNCTIONS中,供后继使用
5)为函数创建缓存目录
@router.post("/create", response_model=Optional[FunctionResponse])
async def create_new_function(
request: Request, form_data: FunctionForm, user=Depends(get_admin_user)
):
if not form_data.id.isidentifier(): #对id进行校验
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only alphanumeric characters and underscores are allowed in the id",
)form_data.id = form_data.id.lower()
#从Function表查询该函数是否已经入库
function = Functions.get_function_by_id(form_data.id)
if function is None:
try:#用本地模块名,替换源码中的模块名,比如用from open_webui.utils替换from utils
form_data.content = replace_imports(form_data.content)#把函数加载为模块
function_module, function_type, frontmatter = load_function_module_by_id(
form_data.id,
content=form_data.content,
)
form_data.meta.manifest = frontmatter#把Function实例增加到全局FUNCTIONS中
FUNCTIONS = request.app.state.FUNCTIONS
FUNCTIONS[form_data.id] = function_module#把函数数据插入到FUNCTION表中
function = Functions.insert_new_function(user.id, function_type, form_data)
#为该方法创建目录/app/backend/data/cache/functions/{函数名}
function_cache_dir = CACHE_DIR / "functions" / form_data.id
function_cache_dir.mkdir(parents=True, exist_ok=True)if function:
return function
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error creating function"),
)
except Exception as e:
log.exception(f"Failed to create a new function: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(e),
)
else: #如果已经入库,则报错
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ID_TAKEN,
)
在该方法中的核心代码是load_function_module_by_id,load_function_module_by_id实现代码动态加载,重点分析一下。
def load_function_module_by_id(function_id: str, content: str | None = None):
#如果参数content为None,则从数据库查询
if content is None:
function = Functions.get_function_by_id(function_id)
if not function:
raise Exception(f"Function not found: {function_id}")
content = function.contentcontent = replace_imports(content)#替换源码中的导入的模块名
Functions.update_function_by_id(function_id, {"content": content})#更新数据库content
else:#从content提取元数据
frontmatter = extract_frontmatter(content)#安装依赖模块
install_frontmatter_requirements(frontmatter.get("requirements", ""))
module_name = f"function_{function_id}"
#创建function_{function_id}模块,比如function_save_outputs
module = types.ModuleType(module_name)#加载模块到sys_modules
sys.modules[module_name] = module# 创建临时文件,用于存储函数的源代码
temp_file = tempfile.NamedTemporaryFile(delete=False)
temp_file.close()
try:#把源代码写入临时文件
with open(temp_file.name, "w", encoding="utf-8") as f:
f.write(content)
module.__dict__["__file__"] = temp_file.name #设置模块的__file__为临时文件名# 在本模块的命名空间运行源代码,完成模块源码的载入
exec(content, module.__dict__)
frontmatter = extract_frontmatter(content)
log.info(f"Loaded module: {module.__name__}")# 根据Function类型,返回对应类的实例
if hasattr(module, "Pipe"):#返回管道实例
return module.Pipe(), "pipe", frontmatter
elif hasattr(module, "Filter"): #返回过滤器实例
return module.Filter(), "filter", frontmatter
elif hasattr(module, "Action"):
return module.Action(), "action", frontmatter #返回Action实例
else:
raise Exception("No Function class found in the module")
except Exception as e:
log.error(f"Error loading module: {function_id}: {e}")
# Cleanup by removing the module in case of error
del sys.modules[module_name]Functions.update_function_by_id(function_id, {"is_active": False})
raise e
finally:
os.unlink(temp_file.name)
3)执行函数
用户在对话界面点击按钮执行函数时,后端入口为http://{ip:port}/api/chat/actions/{函数名},后端调用该函数执行对应的操作。对应入口函数为chat_action。
该方法和简洁,主要是调用chat_action_handle。
@app.post("/api/chat/actions/{action_id}")
async def chat_action(
request: Request, action_id: str, form_data: dict, user=Depends(get_verified_user)
):
try:
model_item = form_data.pop("model_item", {})if model_item.get("direct", False):
request.state.direct = True
request.state.model = model_itemreturn await chat_action_handler(request, action_id, form_data, user)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
chat_action_handle实际对应 open_webui.utils.chat模块中的chat_action方法,具体源码如下:
async def chat_action(request: Request, action_id: str, form_data: dict, user: Any):
if "." in action_id: #如果action_id是多层,则用'.'分割
action_id, sub_action_id = action_id.split(".")
else:
sub_action_id = Noneaction = Functions.get_function_by_id(action_id)#从数据库查找Function是否存在
if not action:
raise Exception(f"Action not found: {action_id}")#以下代码确定使用的模型
if not request.app.state.MODELS:
await get_all_models(request, user=user)if getattr(request.state, "direct", False) and hasattr(request.state, "model"):
models = {
request.state.model["id"]: request.state.model,
}
else:
models = request.app.state.MODELSdata = form_data
model_id = data["model"]if model_id not in models:
raise Exception("Model not found")
model = models[model_id]#通过websocket发送数据到前端
__event_emitter__ = get_event_emitter(
{
"chat_id": data["chat_id"],
"message_id": data["id"],
"session_id": data["session_id"],
"user_id": user.id,
}
)
__event_call__ = get_event_call(
{
"chat_id": data["chat_id"],
"message_id": data["id"],
"session_id": data["session_id"],
"user_id": user.id,
}
)#根据action_id获取模块
function_module, _, _ = get_function_module_from_cache(request, action_id)
#阀门处理
if hasattr(function_module, "valves") and hasattr(function_module, "Valves"):
valves = Functions.get_function_valves_by_id(action_id)
function_module.valves = function_module.Valves(**(valves if valves else {}))if hasattr(function_module, "action"):
try:
action = function_module.action#从Action类中获取action方法# 得到函数签名
sig = inspect.signature(action)
params = {"body": data}# Extra parameters to be passed to the function
extra_params = {
"__model__": model,
"__id__": sub_action_id if sub_action_id is not None else action_id,
"__event_emitter__": __event_emitter__,
"__event_call__": __event_call__,
"__request__": request,
}#把extra_params中的项中与函数签名中的参数匹配的项加入到params中
for key, value in extra_params.items():
if key in sig.parameters:
params[key] = valueif "__user__" in sig.parameters:
#如果函数签名中有__user__,则在调用参数中增加用户相关阀门设置
__user__ = user.model_dump() if isinstance(user, UserModel) else {}try:
if hasattr(function_module, "UserValves"):
__user__["valves"] = function_module.UserValves(
**Functions.get_user_valves_by_id_and_user_id(
action_id, user.id
)
)
except Exception as e:
log.exception(f"Failed to get user values: {e}")params = {**params, "__user__": __user__}
if inspect.iscoroutinefunction(action): #如果action方法是协程,则await调用
data = await action(**params)
else: #非协程则直接调用
data = action(**params)except Exception as e:
return Exception(f"Error: {e}")return data