Documents
Dify Agent Server 模块分析 (Commit 55f95dbc)
Dify Agent Server 模块分析 (Commit 55f95dbc)
Type
Document
Status
Published
Created
May 14, 2026
Updated
May 25, 2026
Created by
Dosu Bot
Updated by
Dosu Bot

Dify Agent Server 模块分析 (Commit 55f95dbc)#

概述#

Commit 55f95dbc 于 2026-05-14 由 BeautyyuYanli 合并,对应 PR #36087 feat(agent): init agent server。该提交向 Dify monorepo 引入了一个全新的独立子包 dify-agent,总计新增 128 个文件、17,357 行代码(含约 6,000 行以上测试代码),是 Dify 迄今为止规模最大的单次功能提交之一

dify-agent 是一个基于 FastAPI 的生产级 AI Agent 后端服务,为异步 Agent 运行提供完整的生命周期管理能力,包括:

  • 异步运行执行:运行在后台 asyncio 任务中,客户端断连不影响 Agent 执行
  • Redis 事件流:所有运行记录和事件以 Redis Stream 形式持久化,支持游标分页和 SSE 回放
  • Python 客户端库:提供同步 / 异步 HTTP 客户端,含 SSE 重连逻辑
  • Agenton 框架:一个可独立使用的无状态 Layer 图组合框架,支持可序列化运行时状态和会话快照

整个包由三大顶层命名空间组成:

命名空间位置职责
agentonsrc/agenton/核心框架:无状态图组合、Layer 基类、会话快照
agenton_collectionssrc/agenton_collections/通用 Layer 实现:纯文本、Pydantic AI 桥接
dify_agentsrc/dify_agent/Dify 专用运行时:FastAPI 服务器、Redis 存储、Plugin Daemon 集成

该提交以多个子提交递进完成,涵盖项目脚手架(feat(agent): add dify agent project setup)、LLM 适配器(feat(agent): add dify llm adapter)、Agenton 引擎初始化(initialize the agenton engine)及结构调整(move the collections folder structure)等阶段

与 Dify 现有架构的关系#

旧版 Agent 系统#

dify-agent 包引入之前,Dify 的 Agent 执行依赖于主应用(api/)中的一套同步 runner 体系:

BaseAgentRunner 是所有 Agent runner 的抽象基类,负责工具初始化与转换、Agent 思考链条(AgentThought)的数据库持久化、对话历史组织,以及与 QueueManager 的事件发布集成。

CotAgentRunner 继承自 BaseAgentRunner,实现链式思维(Chain-of-Thought)执行模式:通过 thought → action → observation 的迭代循环驱动 Agent,使用 CotAgentOutputParser 解析 ReAct 格式输出,并强制执行最大迭代次数限制。

FunctionCallAgentRunner 同样继承自 BaseAgentRunner,利用 LLM 的原生函数调用能力(tool_calls),支持流式和非流式两种工具调用提取路径,每次迭代动态更新工具参数。

此外还有 CotChatAgentRunner CotCompletionAgentRunner 两个针对不同对话模式的 CoT 变体。

工作流 Agent 节点#

AgentNode 是更新的工作流集成点,继承自 graphon Node 基类,可嵌入 Dify 工作流图中。它使用 AgentStrategyResolver 动态加载 Agent 策略实现,通过消息转换适配工作流格式,并管理 Plugin 凭证。

Plugin Agent Strategy 层#

PluginAgentStrategy 实现了策略模式,将具体 Agent 执行委托给 PluginAgentClient,支持流式响应。

PluginAgentClient 通过 HTTP 与 Plugin Daemon(默认端口 5002)通信,负责:

  • 获取 Agent 策略提供者和策略定义
  • 使用结构化参数调用 Agent 策略
  • 将内部格式与 Plugin Daemon 格式互转

Plugin Daemon 承载着 LLM 提供者插件的完整运行时(如 langgenius/openai),是 Agent 实际发起 LLM 调用的入口。

新模块的定位#

dify-agent 包并不替换上述现有组件,而是在其旁边建立了一个独立的微服务边界

  • 集成方式:PR #36284 将 dify-agent 从开发依赖提升至生产依赖 ,API 端通过 DifyAgentBackendRunClient 实现完整的 Agent 后端集成
  • 与 Plugin Daemon 的关系:新模块直接与 Plugin Daemon(端口 5002)通信,复用了同一个 LLM 提供者运行时基础设施
  • 与 Redis 的关系:Redis 仅作为持久化存储使用(非作业队列),保存 run 记录和 Redis Stream 事件,两端均可独立扩展
┌─────────────────────────────────────────────────────┐
│ Dify API (api/) │
│ ┌──────────────┐ ┌──────────────────────────┐ │
│ │ 旧版 Runner │ │ dify-agent Client │ │
│ │ (CoT / FC) │ │ (HTTP → Agent Server) │ │
│ └──────┬───────┘ └──────────┬───────────────┘ │
└─────────┼─────────────────────┼─────────────────────┘
          │ │
          ▼ ▼
   ┌─────────────┐ ┌──────────────────┐
   │Plugin Daemon│ │ dify-agent │
   │ :5002 │◄─────│ Server :8000 │
   └─────────────┘ └────────┬─────────┘
                            ┌────▼─────┐
                            │ Redis │
                            └──────────┘

这一设计标志着 Dify 从单体同步执行模型异步微服务化 Agent 执行的架构演进。

核心架构设计#

设计原则#

dify-agent 服务器遵循三条核心设计原则,这些原则在 app.py 的模块文档字符串中有明确阐述

1. 进程内执行(In-Process Execution)

所有 Agent 运行作为后台 asyncio.Task 在同一 FastAPI 进程内执行,而非通过 Celery、RQ 等外部任务队列分发。这意味着客户端断开 HTTP 连接不会取消正在执行的 Agent 运行 —— 执行独立于请求处理器生命周期之外。

2. 无作业队列(No Job Queue)

Redis 在此系统中不作为作业队列使用。创建 run 的请求负载(包含 Layer 配置和模型凭证)从不写入 Redis,仅将运行记录和事件流持久化到 Redis 。执行调度完全由进程本地的 RunScheduler 负责。

3. 请求先验证后持久化(Validate-Before-Persist)

每次创建 run 请求会先进入一次轻量级的 Agenton 运行进行语义验证 —— 检查用户提示、结构化输出契约和退出策略 —— 之后才持久化 run 记录 。由于 Dify 的默认 Layer 生命周期钩子是无副作用的,这一验证过程不会触发任何外部服务调用。

三大核心组件#

HTTP 进程通过 FastAPI lifespan 管理三个共享资源,确保有序的启动和关闭

1. Redis Run Store(RedisRunStore#

负责所有持久化操作:

  • Run 记录:以 JSON 字符串存储在 Redis Key 中,记录 run_idstatuscreated_atupdated_aterror
  • 事件流:每个 run 对应一条 Redis Stream,事件以 xadd 追加写入
  • TTL 刷新:每次写入状态或事件时,同步刷新 run 记录和事件流的过期时间(默认保留 3 天)

Redis Stream ID 作为公开的事件游标,0-0 表示从头回放,支持断点续传。

2. 共享 Plugin Daemon HTTP 客户端#

整个 FastAPI 进程共享一个 httpx.AsyncClient 实例用于与 Plugin Daemon 通信,配置了完整的连接超时、读取超时和连接池限制。该客户端由 lifespan 拥有,Agenton Layer 和 Provider 借用它而不持有所有权,确保资源的确定性释放。

关键配置参数(来自 ServerSettings):

  • read_timeout:默认 600 秒(LLM 推理可能耗时较长)
  • max_connections:默认 100
  • max_keepalive_connections:默认 20

3. 进程本地 Run Scheduler(RunScheduler#

调度器维护一个进程本地的 active_tasks 字典(run_id → asyncio.Task),通过 asyncio.Lock 保护关键区域,确保以下操作的原子性:

  • 检查 stopping 标志
  • 执行 run 请求验证
  • 持久化 run 记录
  • 注册后台任务

FastAPI Lifespan 生命周期#

@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
    redis = Redis.from_url(...) # 1. 初始化 Redis 客户端
    plugin_daemon_http_client = ... # 2. 创建共享 HTTP 客户端
    store = RedisRunStore(redis, ...) # 3. 创建 Run 存储
    scheduler = RunScheduler(store, ...) # 4. 创建进程本地调度器
    try:
        yield # 5. 服务运行中
    finally:
        await scheduler.shutdown() # 6. 优雅停机(等待运行中任务)
        await plugin_daemon_http_client.aclose() # 7. 释放 HTTP 连接
        await redis.aclose() # 8. 关闭 Redis 连接

这一设计使得所有资源生命周期显式且可预测,便于测试和部署。

Agenton 框架#

Agenton 是随 dify-agent 包一同引入的、可独立使用的状态仅有的(State-Only)Layer 组合框架。其设计理念明确:核心框架仅管理三件事 —— 无状态 Layer 图组合、可序列化 runtime_state 生命周期和会话快照(session snapshots)。HTTP 客户端、进程句柄、清理回调等所有 "活跃资源" 均存在于框架之外

Layer 抽象#

Layer 是 Agenton 的核心抽象,是一个泛型基类

class Layer(ABC, Generic[DepsT, PromptT, UserPromptT, ToolT, ConfigT, RuntimeStateT]):
    deps: DepsT # 依赖的其他 Layer 实例(当前运行绑定)
    config: ConfigT # 验证过的 Pydantic 配置 DTO
    runtime_state: RuntimeStateT # 唯一由 Agenton 管理、可序列化的可变状态

每个 Layer 实例是调用范围的业务对象,不是跨会话的复用定义。CompositorRun 在每次 enter() 调用时创建全新的 Layer 实例,绑定依赖并水化 runtime_state(如有会话快照)。

Layer 生命周期状态#

NEW ──── on_context_create() ──── ACTIVE
                    ┌────────────────┤
                    │ │
         on_context_suspend() on_context_delete()
                    │ │
               SUSPENDED CLOSED
         on_context_resume()
                 ACTIVE

ExitIntent 决定退出行为:SUSPEND(默认,保存快照以便恢复)或 DELETE(丢弃状态)。

LayerDeps 类型化依赖#

Layer 通过 LayerDeps 子类声明依赖形状,注解成员必须是具体的 Layer 子类或可选类型

class MyLayerDeps(LayerDeps):
    plugin: DifyPluginLayer # 必须依赖
    output: OutputLayer | None # 可选依赖

依赖在 Compositor 构建 CompositorRun 时直接解析并绑定为 Layer 实例引用,无需依赖注入容器或查找 API。

Layer 体系结构#

dify-agent 提供了四个层次的 Layer 实现:

1. PlainLayer(agenton_collections#

最简单的 Layer 类型,直接提供纯文本提示和工具:

  • PromptLayer(type_id: plain.prompt):接受 prefix/user/suffix 提示配置
  • ToolsLayer:携带 Python callable 工具列表
  • ObjectLayer:携带任意类型对象供下游 Layer 访问

2. Pydantic AI Bridge Layer#

将 Agenton Layer 的提示和工具转换为 Pydantic AI Agent 所期望的格式,作为 Agenton 与 Pydantic AI 之间的适配桥梁。

PydanticAIHistoryLayer(type_id: pydantic_ai.history):状态专用历史层,在 runtime_state.messages 中存储 pydantic-ai 的 ModelMessage 序列,使 Agenton 会话快照可持久化和恢复类型化消息历史。该层不提供任何系统提示、用户提示或工具,仅通过 message_history 属性暴露存储的历史,并提供 replace_messages()append_messages()clear() 方法用于状态管理。Dify Agent 运行时将当前系统提示渲染为临时 message_history 前缀,因此模型看到的顺序为:当前系统提示 → 存储的历史 → 当前用户提示。

3. Dify Plugin Layer(dify_agent#

DifyPluginLayer(type_id: dify.plugin):共享的 Plugin Daemon 租户 / 用户上下文层,携带 tenant_iduser_id,不持有 HTTP 客户端。Plugin Daemon URL 和 API Key 由服务端 Provider 工厂注入,确保这些敏感配置不出现在公开的 HTTP 请求体中。该层作为共享依赖被 LLM 和工具层复用,具体的 plugin_id 由业务层(LLM、工具)各自携带

DifyPluginLLMLayer(type_id: dify.plugin.llm):依赖 DifyPluginLayer,持有插件包 ID(plugin_id)、模型提供者(model_provider)、模型名称(model)和模型凭证配置。其 get_model() 方法接受共享的 httpx.AsyncClient,返回 Pydantic AI 兼容的模型实例。

DifyPluginToolsLayer(type_id: dify.plugin.tools):插件工具暴露层,将 API 侧预备的插件工具声明注册为 Pydantic AI 工具。API 侧负责解析 daemon 声明、合并参数和生成模型可见 JSON schema;Agent 侧负责验证隐藏输入、应用默认值、调用 daemon 并转换响应。配置包含工具列表(DifyPluginToolConfig),每个工具携带自己的 plugin_id、提供者名称(provider)、daemon 工具名称(tool_name)、凭证类型(credential_type)、运行时隐藏参数(runtime_parameters)以及 API 预备的参数声明(parametersparameters_json_schema)。

4. Output Layer#

OutputLayer(type_id: dify.output):无状态的结构化输出层,基于 JSON Schema 构建动态 Pydantic 类型,该类型同时编码模型侧输出模式和运行时 jsonschema 验证逻辑。结构化输出工具名称固定为 final_output(不可配置),调用者仅可控制 json_schemadescriptionstrict 参数。无效的模型输出将触发 Pydantic AI 的内置重试机制。

Compositor 与 CompositorRun#

Compositor 存储不可变的图节点计划,from_config() 通过 type_id 解析 Provider 。每次调用 enter() 创建独立的 CompositorRun

async with compositor.enter(configs=layer_configs, session_snapshot=snapshot) as run:
    # run.user_prompts - 聚合的用户提示
    # run.prompts - 聚合的系统提示
    # run.tools - 聚合的工具列表
    # run.get_layer() - 按名称获取 Layer 实例
    # run.session_snapshot - 退出后填充

会话快照机制#

CompositorSessionSnapshot 按 Compositor 顺序保存每个 Layer 的生命周期状态和 runtime_state。成功运行的终端事件(run_succeeded)包含快照,消费者可将其原样传入下次 CreateRunRequestsession_snapshot 字段以恢复运行。

重要:快照不包含 Output Layer 配置,因此恢复使用结构化输出的运行时必须重新提交相同的 output Layer。

HTTP API 接口#

dify-agent 服务器提供四个 HTTP 端点,全部挂载在 /runs 前缀下,由 create_runs_router() 工厂函数生成

端点总览#

方法路径功能状态码
POST/runs创建并调度一个 Agent 运行202 / 422 / 503
GET/runs/{run_id}查询运行状态200 / 404
GET/runs/{run_id}/events轮询事件列表(游标分页)200 / 404
GET/runs/{run_id}/events/sseSSE 事件流(实时推送)200 / 404

POST /runs — 创建运行#

请求体CreateRunRequest

{
  "composition": {
    "schema_version": 1,
    "layers": [
      { "name": "prompt", "type": "plain.prompt", "config": { "user": "你好!" } },
      { "name": "plugin", "type": "dify.plugin", "config": { "tenant_id": "...", "plugin_id": "langgenius/openai" } },
      { "name": "llm", "type": "dify.plugin.llm", "deps": {"plugin": "plugin"}, "config": { "model_provider": "openai", "model": "gpt-4o-mini", "credentials": {} } }
    ]
  },
  "session_snapshot": null,
  "on_exit": { "default": "suspend", "layers": {} }
}

响应体(202 Accepted)CreateRunResponse

{ "run_id": "01JVE...", "status": "running" }

错误响应

  • 422 Unprocessable Entity:请求未通过语义验证(如空用户提示、不支持的 output layer 图形状)
  • 503 Service Unavailable:调度器正在关闭,不再接受新运行

控制器将已知的 RunRequestValidationErrorSchedulerStoppingError 映射为对应 HTTP 状态码,其他基础设施异常则交由 FastAPI 默认错误处理以避免将内部问题误报为客户端错误


GET /runs/{run_id} — 查询运行状态#

返回指定运行的当前状态和元数据

响应体RunStatusResponse

{
  "run_id": "01JVE...",
  "status": "succeeded",
  "created_at": "2026-05-14T06:20:00Z",
  "updated_at": "2026-05-14T06:20:05Z",
  "error": null
}

status 枚举值:running | succeeded | failed


GET /runs/{run_id}/events — 轮询事件#

基于 Redis Stream ID 的游标分页事件查询,适用于短轮询或单次历史查询

查询参数

  • after:上次返回的最后一个事件 ID(默认 0-0,表示从头开始)
  • limit:单次返回最大事件数(范围 1–500,默认 100)

响应体RunEventsResponse

{
  "run_id": "01JVE...",
  "events": [ { "id": "1747202401000-0", "run_id": "...", "type": "run_started", "data": {}, "created_at": "..." } ],
  "next_cursor": "1747202401000-0"
}

next_cursor 作为下次请求的 after 参数即可实现分页追踪。


GET /runs/{run_id}/events/sse — SSE 事件流#

服务端推送事件(Server-Sent Events)端点,适用于需要实时感知运行进度的客户端

支持的游标传递方式(按优先级):

  1. Query 参数 after
  2. HTTP Header Last-Event-ID(SSE 标准重连头)
  3. 默认 0-0(从头回放)

响应的 Content-Typetext/event-stream,每条 SSE 帧携带一个 JSON 事件。底层通过 store.iter_events() 先回放已有事件,然后以 xread(block=30_000) 阻塞等待新事件

Python 客户端库(dify_agent.client)对 SSE 端点实现了带重连的完整封装:瞬时流 / 连接 / 读取失败、超时和 HTTP 5xx 响应将携带最后已知事件 ID 重连;HTTP 4xx、DTO 验证失败和格式错误的帧则立即抛出异常。

运行执行流程#

从 HTTP 请求到终端事件,一次完整的 Agent 运行经历验证、持久化、调度、执行四个阶段,涉及 RunSchedulerAgentRunRunner 两个核心类

阶段一:验证(Validate)#

在任何数据写入 Redis 之前,validate_run_request() 对请求进行完整的语义检查

1. validate_output_layer_composition()
   → 检查 output layer 图形状(至多一个 dify.output layer,且位置合法)

2. validate_history_layer_composition()
   → 检查 history layer 图形状(至多一个 pydantic_ai.history layer,名称必须为保留 ID,不支持依赖)

3. normalize_composition()
   → 将公开 RunComposition DTO 拆分为:
     - CompositorConfig(图节点定义)
     - dict[str, LayerConfigInput](各节点的 per-run 配置)

4. build_pydantic_ai_compositor()
   → 通过 type_id 解析 Provider,构建 Compositor 实例

5. validate_layer_exit_signals()
   → 验证 on_exit 中引用的所有 layer 名称确实存在于图中

6. compositor.enter(configs, session_snapshot) ← 轻量级 Agenton 进入
   → apply_layer_exit_signals() 检查退出信号
   → has_non_blank_user_prompt() 确保有非空用户提示
   → resolve_run_output_contract() 构建结构化输出契约(如有)

验证过程中会遇到的异常均被规范化为 RunRequestValidationError,对应 HTTP 422 响应。

阶段二:持久化(Persist)#

验证通过后,store.create_run() 生成唯一 run_id(基于时间排序的 ID),创建状态为 running 的 run 记录,并以配置的 TTL 写入 Redis

安全设计:创建 run 的请求体(含 Layer 配置和模型凭证)从不持久化到 Redis,仅作为内存对象传入后台任务,防止凭证泄露到存储层

阶段三:调度(Schedule)#

asyncio.create_task() 将执行逻辑包装为后台任务,注册到 active_tasks 字典

task = asyncio.create_task(
    self._run_record(record, request),
    name=f"dify-agent-run-{record.run_id}"
)
self.active_tasks[record.run_id] = task
task.add_done_callback(lambda _task, run_id=record.run_id: self.active_tasks.pop(run_id, None))

任务完成(无论成功或失败)后自动从 active_tasks 中移除。此后,POST /runs 请求立即返回 202 给客户端,执行在后台异步进行。

阶段四:执行(Execute)#

AgentRunRunner.run() 按以下顺序执行

1. update_status(run_id, "running")
2. emit_run_started → 写入 run_started 事件到 Redis Stream

3. _run_agent() ← 核心执行
   a. 重建 Compositor 并验证
   b. compositor.enter(configs, session_snapshot)
   c. apply_layer_exit_signals(run, on_exit)
   d. 验证 user_prompts 非空
   e. resolve_run_output_contract(run) ← 解析结构化输出契约
   f. get_history_layer(run) ← 获取可选历史层
   g. build_run_message_history() ← 将当前系统提示渲染为临时前缀 + 存储的历史
   h. llm_layer.get_model(http_client) ← 获取 Pydantic AI 模型实例
   i. create_agent(model, tools, output_type) ← 不再接受 system_prompts 参数
   j. agent.run(user_input, message_history, event_stream_handler)
      → handle_events 回调中:每个 AgentStreamEvent 触发 emit_pydantic_ai_event
   k. append_successful_run_history() ← 将 result.new_messages() 追加到历史层
   l. 序列化输出为 JSON-safe 格式
   m. 读取 run.session_snapshot

4. 成功路径:emit_run_succeeded(output, session_snapshot)
             update_status(run_id, "succeeded")

5. 失败路径:emit_run_failed(error, reason)
             update_status(run_id, "failed", error)
             re-raise(使 asyncio.Task 以失败状态结束)

错误分类#

AgentRunRunner 区分两类错误:

类型描述处理方式
AgentRunValidationError请求合法但无法执行(如已关闭的 snapshot layer 状态)映射为 run_failed 事件,error 字段包含描述
其他运行时异常执行期间的意外失败(网络、LLM 超时等)同上,但调度器还会在日志中记录 exception

事件模型#

dify-agent 使用追加写入的事件日志作为运行观察的唯一接口。所有事件通过 Redis Streams 持久化,客户端通过轮询或 SSE 消费

事件信封结构#

每个事件共享相同的基础字段

字段类型说明
idstr | NoneRedis Stream ID(格式:{毫秒时间戳}-{序号}),如 1747202401234-0
run_idstr所属运行的唯一标识
typestr事件类型(判别器字段)
data见下表类型化载荷
created_atdatetimeUTC 时间戳(事件创建时刻)

事件类型使用 Pydantic 的 discriminator="type" 机制,客户端可通过 RUN_EVENT_ADAPTER.validate_json() 反序列化为正确的具体类型

四种事件类型#

1. run_started — 运行开始#

{
  "id": "1747202401000-0",
  "run_id": "01JVE...",
  "type": "run_started",
  "data": {},
  "created_at": "2026-05-14T06:20:01Z"
}

AgentRunRunner.run() 开始执行时立即发出,data 为空对象,表示 Agent 运行时已就绪

2. pydantic_ai_event — Pydantic AI 流式事件#

{
  "id": "1747202401500-0",
  "run_id": "01JVE...",
  "type": "pydantic_ai_event",
  "data": { "type": "text-delta", "delta": "你好" },
  "created_at": "2026-05-14T06:20:01.5Z"
}

data 字段为 Pydantic AI 原生的 AgentStreamEvent,包含文本增量、工具调用等原始流式事件。每个 Pydantic AI 流式事件触发一条此类记录

3. run_succeeded — 运行成功(终端事件)#

{
  "id": "1747202405000-0",
  "run_id": "01JVE...",
  "type": "run_succeeded",
  "data": {
    "output": "你好!我是 Dify Agent,很高兴为你服务。",
    "session_snapshot": {
      "schema_version": 1,
      "layers": [ { "name": "prompt", "lifecycle_state": "suspended", "runtime_state": {} }, ... ]
    }
  },
  "created_at": "2026-05-14T06:20:05Z"
}

终端成功事件的设计要点):最终 JSON-safe 输出和可恢复的 Agenton 会话快照一同包含在同一事件中。消费者可将终端事件视为运行的完整摘要,无需关联多个不同的载荷事件。

outputJsonValue 类型,对于非结构化文本输出即为字符串,对于使用了 OutputLayer 的结构化输出则为 JSON 对象。

4. run_failed — 运行失败(终端事件)#

{
  "id": "1747202402000-0",
  "run_id": "01JVE...",
  "type": "run_failed",
  "data": {
    "error": "LLM returned invalid structured output after retries",
    "reason": null
  },
  "created_at": "2026-05-14T06:20:02Z"
}

reason 字段在优雅停机取消时为 "shutdown",其他运行时错误为 null

完整事件流示意#

成功运行

run_started
pydantic_ai_event (text-delta: "你")
pydantic_ai_event (text-delta: "好")
...
pydantic_ai_event (text-delta: "。")
run_succeeded { output: "你好...", session_snapshot: {...} }

失败运行

run_started
pydantic_ai_event (text-delta: "...")
...
run_failed { error: "...", reason: null }

服务关闭取消

run_started
...
run_failed { error: "run cancelled during server shutdown", reason: "shutdown" }

消费者应将收到 run_succeededrun_failed 事件作为终止读取事件流的信号。

包目录结构#

以下是 dify-agent 包的完整目录树(共约 128 个文件),按功能分组并附注关键文件的职责说明

dify-agent/
├── src/ # 源代码根目录
│ │
│ ├── agenton/ # 核心框架(可独立使用)
│ │ ├── __init__.py
│ │ ├── compositor/ # 图组合引擎
│ │ │ ├── core.py # Compositor & LayerNode — 不可变图计划
│ │ │ ├── providers.py # LayerProvider — 工厂抽象
│ │ │ ├── run.py # CompositorRun — 单次调用上下文
│ │ │ ├── schemas.py # CompositorConfig、SessionSnapshot 模式定义
│ │ │ └── types.py # 泛型类型别名
│ │ └── layers/ # Layer 基础抽象
│ │ ├── base.py # Layer、LayerDeps、LifecycleState、ExitIntent
│ │ └── types.py # PlainLayer 等类型化 Layer 家族
│ │
│ ├── agenton_collections/ # 通用 Layer 实现集合
│ │ ├── layers/
│ │ │ ├── plain/
│ │ │ │ ├── basic.py # PromptLayer、ToolsLayer、ObjectLayer
│ │ │ │ └── dynamic_tools.py # 动态工具 Layer
│ │ │ └── pydantic_ai/
│ │ │ ├── bridge.py # Pydantic AI 桥接 Layer
│ │ │ └── history.py # PydanticAIHistoryLayer(可选历史层)
│ │ └── transformers/
│ │ └── pydantic_ai.py # Pydantic AI 格式转换器
│ │
│ └── dify_agent/ # Dify 专用运行时
│ ├── adapters/
│ │ └── llm/
│ │ ├── model.py # DifyLLMAdapterModel — Pydantic AI 兼容 LLM 模型
│ │ └── provider.py # DifyPluginDaemonProvider — Plugin Daemon 连接
│ ├── client/
│ │ ├── __init__.py # 公开 Client 类
│ │ └── _client.py # 同步/异步 HTTP 客户端,含 SSE 重连逻辑
│ ├── layers/
│ │ ├── dify_plugin/
│ │ │ ├── configs.py # DifyPluginLayerConfig、DifyPluginLLMLayerConfig
│ │ │ ├── llm_layer.py # DifyPluginLLMLayer(type_id: dify.plugin.llm)
│ │ │ └── plugin_layer.py # DifyPluginLayer(type_id: dify.plugin)
│ │ └── output/
│ │ ├── configs.py # OutputLayerConfig(JSON Schema 定义)
│ │ └── output_layer.py # OutputLayer(type_id: dify.output)
│ ├── protocol/
│ │ └── schemas.py # 公开 HTTP 线协议 DTO(CreateRunRequest 等)
│ ├── runtime/
│ │ ├── agent_factory.py # create_agent() — Pydantic AI Agent 构建器
│ │ ├── agenton_validation.py # Agenton 进入时验证错误分类
│ │ ├── compositor_factory.py # build_pydantic_ai_compositor()
│ │ ├── event_sink.py # RunEventSink 协议 & emit_* 工具函数
│ │ ├── history.py # History layer 助手(验证、获取、message_history 构建)
│ │ ├── layer_exit_signals.py # on_exit 信号验证与应用
│ │ ├── output_type.py # 结构化输出契约解析
│ │ ├── run_scheduler.py # RunScheduler — 进程本地 asyncio 任务调度
│ │ ├── runner.py # AgentRunRunner — 单次运行执行
│ │ └── user_prompt_validation.py # 用户提示非空检查
│ ├── server/
│ │ ├── app.py # FastAPI 应用工厂(lifespan 管理)
│ │ ├── routes/
│ │ │ └── runs.py # POST/GET /runs 路由
│ │ ├── schemas.py # RunRecord、new_run_id 服务端模式
│ │ ├── settings.py # ServerSettings(DIFY_AGENT_ 环境变量)
│ │ └── sse.py # SSE 事件流生成器
│ └── storage/
│ ├── redis_keys.py # Redis 键名生成函数
│ └── redis_run_store.py # RedisRunStore — 运行记录 & 事件流存储
├── tests/ # 测试套件(约 42 个文件,6000+ 行)
│ ├── docs/ # 文档示例测试
│ │ ├── test_examples.py # 可执行文档代码片段测试
│ │ └── test_snippets.py
│ └── local/ # 本地单元测试(镜像 src/ 结构)
│ ├── agenton/ # Compositor & Layer 测试
│ ├── agenton_collections/ # 通用 Layer 实现测试
│ └── dify_agent/ # Dify 运行时、服务器、存储、客户端测试
├── docs/ # MkDocs 文档源文件
│ ├── dify-agent/ # Dify Agent 操作指南
│ │ ├── get-started/index.md # 快速入门指南
│ │ └── guide/index.md # 操作手册
│ └── agenton/ # Agenton 框架文档
│ ├── index.md
│ └── guide/index.md
├── examples/ # 使用示例
│ ├── agenton/agenton_examples/ # Agenton 基础示例
│ │ ├── basics.py # 基本 Layer 组合
│ │ ├── pydantic_ai_bridge.py # Pydantic AI 桥接示例
│ │ └── session_snapshot.py # 会话快照恢复示例
│ └── dify_agent/dify_agent_examples/ # Dify Agent 服务器客户端示例
│ ├── run_server_consumer.py # 短轮询客户端
│ ├── run_server_sse_consumer.py # SSE 客户端
│ └── run_server_sync_client.py # 同步客户端
├── .example.env # 环境变量配置模板
├── AGENTS.md # 开发规范(编码风格、测试、文档)
├── Makefile # 开发命令(serve、dev、test、typecheck)
├── mkdocs.yml # MkDocs 文档构建配置
└── pyproject.toml # Python 包配置(依赖、工具链)

配置说明#

所有服务器配置通过以 DIFY_AGENT_ 为前缀的环境变量进行管理,对应 ServerSettings Pydantic 设置类 。设置文件读取顺序:当前目录的 .env,然后是 dify-agent/.env(从仓库根目录运行时)

完整环境变量参考#

Redis 配置#

环境变量默认值说明
DIFY_AGENT_REDIS_URLredis://localhost:6379/0Redis 连接 URL,用于存储运行记录和事件流
DIFY_AGENT_REDIS_PREFIXdify-agentRedis 键名前缀,用于多实例部署时的命名空间隔离

关闭与数据保留#

环境变量默认值说明
DIFY_AGENT_SHUTDOWN_GRACE_SECONDS30优雅停机等待时间(秒),超时后强制取消活跃任务
DIFY_AGENT_RUN_RETENTION_SECONDS259200(3 天)Redis 运行记录和事件流的保留时长(秒)

Plugin Daemon 连接#

环境变量默认值说明
DIFY_AGENT_PLUGIN_DAEMON_URLhttp://localhost:5002Plugin Daemon 的基础 URL
DIFY_AGENT_PLUGIN_DAEMON_API_KEY""(空)访问 Plugin Daemon 的 API 密钥,对应 Dify Docker 中的 PLUGIN_DAEMON_KEY

Plugin Daemon HTTP 超时配置#

环境变量默认值(秒)说明
DIFY_AGENT_PLUGIN_DAEMON_CONNECT_TIMEOUT10.0TCP 建立连接超时
DIFY_AGENT_PLUGIN_DAEMON_READ_TIMEOUT600.0等待响应数据超时(LLM 推理可能较慢)
DIFY_AGENT_PLUGIN_DAEMON_WRITE_TIMEOUT30.0发送请求数据超时
DIFY_AGENT_PLUGIN_DAEMON_POOL_TIMEOUT10.0等待连接池空闲连接超时

Plugin Daemon HTTP 连接池配置#

环境变量默认值说明
DIFY_AGENT_PLUGIN_DAEMON_MAX_CONNECTIONS100最大总 HTTP 连接数
DIFY_AGENT_PLUGIN_DAEMON_MAX_KEEPALIVE_CONNECTIONS20最大空闲 Keep-Alive 连接数
DIFY_AGENT_PLUGIN_DAEMON_KEEPALIVE_EXPIRY30.0空闲连接的 Keep-Alive 过期时间(秒)

最小必要配置#

根据快速入门指南,本地开发仅需以下四个变量

DIFY_AGENT_REDIS_URL=redis://localhost:6379/0
DIFY_AGENT_REDIS_PREFIX=dify-agent
DIFY_AGENT_PLUGIN_DAEMON_URL=http://localhost:5002
DIFY_AGENT_PLUGIN_DAEMON_API_KEY=replace-with-plugin-daemon-server-key

其余参数均有合理默认值,生产环境按需调整。

安全提示DIFY_AGENT_PLUGIN_DAEMON_API_KEY 应通过 secrets 管理工具(如 Kubernetes Secret、Vault)注入,避免硬编码在配置文件中。

部署与扩展#

本地开发#

dify-agent 提供 Makefile 封装所有常用开发操作

前提条件

  • Python 3.12 或更高版本
  • uv(Python 包管理器)
  • Redis 实例(本地 Docker 或远端)
  • 可访问的 Dify Plugin Daemon

安装依赖

cd dify-agent
uv sync --all-extras --all-groups

启动 Redis(如本地未运行)

docker run -d --name dify-agent-redis -p 6379:6379 redis:7-alpine

启动服务器

# 生产模式
make serve

# 开发模式(uvicorn 热重载)
make dev

两个命令均将服务监听在 http://127.0.0.1:8000。等效的手动命令为

uv run --extra server uvicorn dify_agent.server.app:app --host 127.0.0.1 --port 8000 --reload

其他开发命令

命令功能
make checkRuff 代码检查
make fixRuff 格式化 + 自动修复
make typecheckbasedpyright 静态类型检查
make test运行全部 pytest 测试
make docs构建 MkDocs 文档
make docs-serve本地预览文档

水平扩展#

dify-agent 服务器设计支持水平扩展,但需了解其进程本地调度的约束:

可扩展方面

  • 多个服务实例可共享同一 Redis 实例和 Plugin Daemon,每个实例独立管理自己的 active_tasks
  • Redis Streams 天然支持多消费者并发读取,事件轮询和 SSE 端点可在任意实例上服务任意 run(只要 run 的 Redis 数据存在)
  • 通过负载均衡器分发 POST /runs 请求,各实例均匀承接新运行

约束方面

  • 每次 POST /runs 将 run 绑定到处理该请求的进程,执行与进程生命周期耦合
  • 进程崩溃时,该进程正在执行的所有 run 状态丢失,无法自动迁移到其他实例
  • GET /runs/{id} 和事件端点可在任意实例上处理(通过共享 Redis),但 run 的实际执行只在创建它的进程中进行

推荐扩展模式

  • 将读取密集型端点(GET /runs/*)与创建端点(POST /runs)通过负载均衡器统一入口对外暴露
  • 使用多个 dify-agent 进程实例,通过 DIFY_AGENT_REDIS_PREFIX 区分不同服务组(如按租户隔离)

优雅停机#

当 FastAPI 进程接收终止信号时,lifespan 的 finally 块触发有序关闭流程

1. scheduler.shutdown() 被调用
   a. 设置 stopping=True(阻止新 run 创建)
   b. 快照当前 active_tasks 列表
   c. asyncio.wait(tasks, timeout=SHUTDOWN_GRACE_SECONDS)
      → 等待所有活跃任务自然完成

2. 超时后:
   a. 取消未完成的 asyncio.Task
   b. 对每个被取消的 run 写入 run_failed 事件(reason="shutdown")
   c. 更新 run 状态为 "failed"

3. plugin_daemon_http_client.aclose() → 关闭 HTTP 连接池
4. redis.aclose() → 关闭 Redis 连接

这确保了正在运行的 Agent 有机会完成,同时为被迫中断的运行留下可观察的失败记录。


故障恢复限制#

dify-agent 有意保持简单的故障模型,不实现自动恢复

"If the process crashes, currently active runs are lost until an external operator marks or retries them."

当前限制

  • 进程崩溃active_tasks 是纯内存结构,进程崩溃后无法恢复正在执行的 run
  • 无自动重试:失败的 run 不会自动重新调度到其他进程
  • 状态残留:崩溃的进程可能留下状态为 running 的 Redis 记录,这些记录在 TTL 到期前会误导客户端

运维应对策略

  • 通过监控系统检测长时间处于 running 状态的 run(超过预期最大执行时间)
  • 使用外部脚本将孤立的 running run 标记为 failed
  • 配合 Kubernetes 的 Pod 重启策略保证服务进程的存活性

关键设计决策分析#

以下表格汇总了 dify-agent 中的核心技术选型,并分析每个决策的原因和影响。

决策选择备选方案原因影响
Web 框架FastAPIFlask、Django原生 async/await 支持、自动 OpenAPI 文档、lifespan 管理、类型提示驱动的依赖注入所有 I/O 操作(Redis、Plugin Daemon)均异步,无需额外的异步适配层
调度模型进程本地 asyncio.TaskCelery、RQ、Redis Queue避免分布式任务队列的复杂性;执行与请求解耦,不需要额外的 Worker 进程扩展性受限于单进程;进程崩溃丢失 in-flight run
事件存储Redis StreamsPostgreSQL、Kafka、内存队列追加写入语义、基于游标的重放、内置 block 读取支持 SSE;无需引入新的基础设施(Dify 本身已使用 Redis)TTL 统一管理存储生命周期;高并发事件消费无锁冲突
LLM 执行引擎Pydantic AILangChain、直接 OpenAI SDK原生 Pydantic v2 集成、结构化输出重试机制、类型安全的工具定义;与 Dify 已有 Pydantic v2 技术栈一致结构化输出无效时自动触发 LLM 重试,减少业务层错误处理代码
配置管理pydantic-settingspython-decouple、dynaconf与 Pydantic v2 一体化,自动类型转换和验证;env_prefix 统一 DIFY_AGENT_ 命名空间配置错误在服务启动时即报错,而非运行时
验证时机先验证后持久化先持久化后验证避免无法执行的 run 占用 Redis 存储;模型凭证不持久化到 Redis 减少安全风险HTTP 202 响应前略有延迟(轻量级 Agenton 进入),但无外部 I/O
会话连续性SessionSnapshot有状态长连接、数据库状态无状态设计,跨运行无共享内存;快照通过终端事件回传客户端,存储层无需额外快照 API客户端持有快照,可在任意实例上恢复运行
凭证安全请求体不持久化加密存储到 Redis模型凭证(API Key 等)仅存在于请求内存,不写入任何持久化存储需要客户端在每次创建 run 时重新提交凭证
Layer 依赖注入直接实例绑定DI 容器(依赖注入框架)保持 Agenton 核心简单;依赖关系在图构建时静态确定,运行时无查找开销无法在运行时动态替换依赖;适合有明确图结构的 Agent

FastAPI vs 其他框架#

FastAPI 的 lifespan 机制是选型的关键因素 —— 它提供了精确的资源初始化和清理钩子,使得 Redis 连接、HTTP 客户端和调度器的生命周期管理代码仅需约 25 行即可完整实现

进程本地调度的权衡#

这一选择有意降低了系统复杂度:与 Celery/RQ 等方案相比,省去了 Worker 进程管理、任务序列化和分布式锁等问题。代价是单进程容量有限,且崩溃恢复需要外部运维干预。对于 Dify 的初期微服务化尝试而言,这是合理的渐进式复杂度控制

Redis Streams 的设计价值#

相比简单的 Redis List 或 Pub/Sub,Redis Streams 提供了:

  • 持久化重放:客户端可以从任意偏移量重放历史事件(SSE 断线重连)
  • 游标分页:Stream ID 天然作为不需要额外状态的游标
  • block 读取xread(block=30000) 实现高效的实时等待,无需轮询

Pydantic AI 与结构化输出#

将 Pydantic AI 的重试机制与自定义 jsonschema 验证钩子结合,使得结构化输出验证成为 LLM 推理管道的一部分,无效输出自动触发模型重试,对 AgentRunRunner 透明,降低了业务层错误处理的负担

开发规范#

dify-agent 包通过 AGENTS.md 文件确立了完整的开发规范,要求在修改任何源代码之前必须阅读相关的 docstring 和注释

类型注解规范#

代码应包含完整的类型注解,遵循以下原则

  • 使用现代 Python 形式list[str]dict[str, int],避免 List[str]Dict[str, int]
  • 避免 Any:除非有充分理由,否则不使用 Any 类型
  • 结构化数据:已知键集合的字典数据用 TypedDict;可选键使用 NotRequired[...];真正动态键空间才用 dict[...]
  • 类成员显式声明:在类体顶部(__init__ 之前)用类型注解声明所有成员变量,使类的形状一目了然
class Example:
    user_id: str
    created_at: datetime

    def __init__(self, user_id: str, created_at: datetime) -> None:
        self.user_id = user_id
        self.created_at = created_at

类型检查使用 basedpyright,须在每次编辑后保持通过:

make typecheck

代码风格#

  • 格式化工具:Ruff(含格式化和 lint)
  • 行宽限制:120 字符(含空格)
  • 命名规范:变量和函数用 snake_case,类用 PascalCase,常量用 UPPER_CASE
  • 通用原则:优先使用简单函数而非小型工具类;避免不必要的 dunder 方法;代码应清晰直观,避免巧妙技巧
make check # 仅检查
make fix # 格式化 + 自动修复

测试方法#

遵循 TDD(测试驱动开发) 原则

  1. 先写失败测试
  2. 使实现通过
  3. 在测试和类型检查均通过的前提下重构

测试组织规则

  • 本地测试放在 dify-agent/tests/local/
  • 目录结构镜像 dify-agent/src/ 结构,确保测试位置可预测

有价值的测试应覆盖

  • 调用者和下游代码可以观察到并依赖的行为
  • 单元对其依赖在边界上的使用方式(成功、失败、空响应、错误响应)
  • 文档化的不变量、错误映射和输入 / 输出形状保证

无价值的测试包括(应避免)

  • 仅镜像当前实现的测试(内部重构就需要更新)
  • 对私有辅助函数、内部状态的测试
  • 发明出来仅为使当前实现通过的 mock 行为

文档规范#

模块、类、函数三级 docstring 均为必要项

范围应包含内容
模块(文件)docstring目的、边界、关键不变量、"坑",关键协作者的交叉引用
类 docstring职责、生命周期、不变量、使用方式(含并发 / 异步假设)
函数 / 方法 docstring行为契约:参数、返回形状、副作用(DB 写入、I/O、任务分发)、抛出的领域异常
段落 / 块注释解释 "为什么"(权衡、历史约束、令人惊讶的边缘情况),而非 "是什么"

日志规范#

  • 禁止使用 print()
  • 使用模块级 logger:logger = logging.getLogger(__name__)
  • 日志级别:可重试事件用 warning,终端失败用 error
  • 日志上下文:相关时附加 tenant_idapp_idrun_id 等标识符

Pydantic 使用规范#

  • 使用 Pydantic v2 约定
  • 默认设置 extra="forbid" 防止意外字段
  • 使用 @field_validator / @model_validator 编写领域规则
class TriggerConfig(BaseModel):
    endpoint: HttpUrl
    secret: str

    model_config = ConfigDict(extra="forbid")

    @field_validator("secret")
    def ensure_secret_prefix(cls, value: str) -> str:
        if not value.startswith("dify_"):
            raise ValueError("secret must start with dify_")
        return value

包管理#

使用 uv 管理 Python 包(--project dify-agent 标志),确保与仓库其他部分的工具链一致。

总结#

Commit 55f95dbcPR #36087)是 Dify 平台架构演进史上的一个重要里程碑。它在不破坏现有系统的前提下,以 128 个文件、17,357 行代码从零构建了一个独立、生产级的异步 Agent 执行后端。

关注点分离#

该提交最突出的设计贡献是对关注点的清晰分离,体现在三个层次:

层次命名空间职责边界
框架层agenton纯粹的 Layer 图组合、可序列化状态、会话快照 —— 不持有任何活跃资源
通用层agenton_collections与 Dify 无关的通用 Layer 实现,可在其他 Python 项目中复用
平台层dify_agentDify 专用集成:FastAPI 服务器、Redis 存储、Plugin Daemon 适配、结构化输出

这种分层设计使得 Agenton 框架具备独立演化的能力,dify_agent 层可随 Dify 平台需求变化而调整,互不干扰。

与现有系统的关系#

新模块与旧版 BaseAgentRunner / CotAgentRunner / FunctionCallAgentRunner 体系并存而非替代

  • 旧版 runner 仍承担现有会话式 Agent 的同步执行
  • dify-agent 服务器提供了面向微服务架构的新执行路径,PR #36284 通过 DifyAgentBackendRunClient 完成了 API 端的生产集成,工作流节点现已能够创建 run、流式消费事件并管理 Agent 配置
  • 两者共用同一个 Plugin Daemon 基础设施(端口 5002),无需重复部署

架构演进的意义#

dify-agent 代表了 Dify 从同步、进程内、与主应用紧耦合的 Agent 执行异步、可独立部署、事件驱动的微服务化 Agent 执行的演进方向:

  1. 执行异步化:Agent 运行不再绑定 HTTP 请求生命周期,长时间运行的任务不再受 HTTP 超时限制
  2. 可观察性:Redis Streams 提供完整的事件历史,支持回放、SSE 实时推送和游标分页
  3. 状态可恢复:SessionSnapshot 机制使 Agent 运行天然支持中断恢复,为未来的暂停 / 继续功能奠定基础
  4. 凭证安全:模型凭证不持久化,降低了分布式部署中的安全攻击面
  5. 可测试性:完整的测试套件(42 个测试文件,6,000+ 行)确保了生产部署的可靠性

随着 Dify 平台继续向分布式、多租户、高并发方向演进,dify-agent 所建立的这套架构模式 —— 进程内调度 + Redis 持久化 + Plugin Daemon 委托 + Agenton 组合框架 —— 将成为后续功能迭代的重要基础。

PR #36284 - API 端集成#

PR #36284 feat: add new agent 于 2026-05-19 完成了 Agent 后端在 API 端的全面集成,建立了从工作流节点到 Agent 服务器的完整调用链路。该提交新增约 50 个文件、5,000+ 行代码,涵盖客户端包装、领域模型、REST 控制器和业务服务四大组件。

Agent Backend Client 集成#

api/clients/agent_backend/#

API 端通过薄包装层调用 dify-agent 公开客户端,确保工作流代码依赖本地协议而非跨服务契约:

DifyAgentBackendRunClientclient.py

  • 包装 dify_agent.client.Client 的同步方法(create_run_synccancel_run_syncstream_events_syncwait_run_sync
  • dify-agent 异常规范化为 API 原生错误类型(AgentBackendError 子类)
  • 定义 AgentBackendRunClient 协议作为工作流集成的稳定边界

错误映射errors.py):将跨服务异常转换为 API 端异常层次结构:

  • DifyAgentValidationErrorAgentBackendValidationError
  • DifyAgentHTTPErrorAgentBackendHTTPError(含 HTTP 状态码)
  • DifyAgentTimeoutErrorAgentBackendTransportError
  • DifyAgentStreamErrorAgentBackendStreamError

事件适配器event_adapter.py):将 dify-agent 的事件流转换为工作流节点可消费的格式

请求构建器request_builder.py):封装 CreateRunRequest 构造逻辑,处理 Layer 配置、执行上下文和退出策略

Domain Models(领域模型)#

api/models/agent.py#

引入四个核心数据库模型,建立 Agent 的完整生命周期管理:

Agent:工作区范围内的 Agent 身份标识

  • scope: AgentScoperoster(团队共享)或 workflow_only(工作流专用)
  • source: AgentSourceagent_appworkflowimportedsystem
  • active_config_version_id:指向当前生效的配置版本
  • roster_unique_name:计算列,确保 roster 中 active 状态的 Agent 名称唯一
  • 索引:tenant_id + status + updated_attenant_id + scope + statustenant_id + workflow_id

AgentConfigVersion:不可变的 Agent Soul 快照版本

  • config_snapshot: LongText:存储 JSON 字符串,不含明文凭证
  • version: int:每个 Agent 内单调递增的语义版本号
  • version_note:版本说明(可选)
  • 唯一约束:(agent_id, version)
  • @property config_snapshot_dict:按需解析 JSON

AgentConfigVersionRevision:配置版本的审计快照

  • 记录每次保存操作(含对现有版本的覆盖写)
  • operation: AgentConfigVersionOperationCREATE_VERSIONSAVE_CURRENT_VERSIONSAVE_NEW_VERSIONSAVE_NEW_AGENTSAVE_TO_ROSTER
  • previous_config_snapshot:用于 diff 对比(仅在覆盖现有版本时非空)
  • revision: int:每个 ConfigVersion 内单调递增
  • 唯一约束:(agent_config_version_id, revision)

WorkflowAgentNodeBinding:工作流节点与 Agent 配置版本的绑定关系

  • binding_type: WorkflowAgentBindingTyperoster_agent(引用 Roster Agent)或 inline_agent(内联专用 Agent)
  • node_job_config: LongText:存储工作流节点作业配置的 JSON,不含 Agent Soul
  • workflow_version: str:关联工作流版本(如 draft
  • 唯一约束:(tenant_id, workflow_id, workflow_version, node_id)

设计要点:

  • Agent Soul(配置快照)与 Node Job(节点作业配置)严格分离,避免数据冗余
  • Roster Agent 可被多个工作流节点引用,配置版本在所有绑定间共享
  • 配置快照明确禁止存储明文凭证,仅保存凭证引用
  • Revision 表实现完整的变更审计链

Controllers(REST 端点)#

api/controllers/console/agent/#

两个控制器模块提供 Agent Composer 和 Roster 的 HTTP 访问接口:

composer.py:Agent Composer 端点(工作流节点和 Agent App 两种变体)

  • GET /apps/{app_id}/workflows/draft/nodes/{node_id}/agent-composer:加载工作流节点的 Composer 状态
  • PUT /apps/{app_id}/workflows/draft/nodes/{node_id}/agent-composer:保存工作流节点的 Composer 配置
  • POST /apps/{app_id}/workflows/draft/nodes/{node_id}/agent-composer/validate:验证保存载荷合法性
  • GET /apps/{app_id}/workflows/draft/nodes/{node_id}/agent-composer/candidates:查询可用的候选资源
  • POST /apps/{app_id}/workflows/draft/nodes/{node_id}/agent-composer/impact:计算配置版本影响范围
  • POST /apps/{app_id}/workflows/draft/nodes/{node_id}/agent-composer/save-to-roster:将工作流 Agent 保存到 Roster
  • GET /apps/{app_id}/agent-composer:加载 Agent App 的 Composer 状态
  • PUT /apps/{app_id}/agent-composer:保存 Agent App 的 Composer 配置
  • POST /apps/{app_id}/agent-composer/validate:验证 Agent App 保存载荷
  • GET /apps/{app_id}/agent-composer/candidates:查询 Agent App 可用候选

roster.py:Agent Roster 管理端点

  • GET /agents:列出租户内所有 roster Agent(分页、搜索、排序)
  • POST /agents:创建新 roster Agent
  • GET /agents/{agent_id}:查询单个 Agent 详情
  • PUT /agents/{agent_id}:更新 Agent 元数据(名称、描述、图标)
  • DELETE /agents/{agent_id}:归档 Agent
  • GET /agents/{agent_id}/config-versions:列出 Agent 的所有配置版本
  • POST /agents/{agent_id}/config-versions:创建新配置版本
  • GET /agents/{agent_id}/config-versions/{version_id}:查询单个配置版本详情
  • PUT /agents/{agent_id}/config-versions/{version_id}:更新现有配置版本(覆盖)
  • POST /agents/{agent_id}/config-versions/{version_id}/activate:激活指定配置版本

设计特点:

  • 所有端点均需 setup_requiredlogin_requiredaccount_initialization_required 认证
  • 编辑操作额外要求 edit_permission_required
  • 使用 flask_restx 自动生成 OpenAPI 文档(console_ns.expect 注册 Pydantic 模型)
  • 返回体为 JSON 字典(由 Service 层序列化)

Services(业务逻辑)#

api/services/agent/#

三个服务模块封装 Agent 的业务逻辑和数据库操作:

AgentComposerServicecomposer_service.py):Agent Composer 业务逻辑中心

  • load_workflow_composer():加载工作流节点的 Composer 状态,返回 agent、version、binding、soul_lock、node_job、save_options
  • save_workflow_composer():根据 save_strategy 执行不同保存路径:
    • NODE_JOB_ONLY:仅保存节点作业配置
    • SAVE_TO_CURRENT_VERSION:覆盖当前配置版本(创建新 Revision)
    • SAVE_AS_NEW_VERSION:创建新配置版本(递增版本号)
    • SAVE_AS_NEW_AGENT:创建新 roster Agent 并绑定
    • SAVE_TO_ROSTER:将 workflow-only Agent 提升为 roster Agent
  • load_agent_app_composer():加载 Agent App 的 Composer 状态
  • save_agent_app_composer():保存 Agent App 配置(支持 SAVE_TO_CURRENT_VERSION 和 SAVE_AS_NEW_VERSION)
  • calculate_impact():统计指定配置版本被引用的工作流节点数量

RosterServiceroster_service.py):Roster Agent CRUD 和版本管理

  • list_agents():分页查询、全文搜索、多字段排序
  • create_agent():创建新 Agent 并初始化首个配置版本
  • get_agent():查询 Agent 详情(含活跃配置版本)
  • update_agent():更新 Agent 元数据(名称、描述、图标)
  • archive_agent():软删除(状态置为 archived,记录 archived_byarchived_at
  • list_config_versions():列出 Agent 的所有配置版本(逆序分页)
  • create_config_version():创建新配置版本(自动递增版本号)
  • get_config_version():查询单个配置版本详情
  • update_config_version():覆盖现有配置版本(创建新 Revision)
  • activate_config_version():激活指定配置版本(更新 Agent.active_config_version_id)
  • list_config_version_revisions():查询配置版本的所有审计 Revision
  • duplicate_agent():复制 Agent 及其活跃配置版本

ComposerConfigValidatorcomposer_validator.py):Composer 保存载荷验证

  • validate_save_payload():检查 ComposerSavePayload 的语义一致性(如 SAVE_AS_NEW_AGENT 必须提供 new_agent_name

设计模式:

  • Service 层直接操作 SQLAlchemy ORM 模型,返回序列化后的 JSON 字典
  • 使用 IntegrityError 捕获唯一约束冲突(如 Agent 名称重复)
  • 配置快照存储为 JSON 字符串(_json_dump 确保一致的格式)
  • 所有写操作通过 db.session.commit() 统一提交

Entities(数据传输对象)#

api/services/entities/agent_entities.py#

定义 Agent 域的 Pydantic DTO,服务于 HTTP 请求 / 响应和业务逻辑:

AgentSoulConfig:Agent Soul 配置(对应 dify-agentRunComposition

  • instruction: str:Agent 指令(系统提示)
  • app_features: dict:应用特性配置
  • app_variables: list:应用变量列表

WorkflowNodeJobConfig:工作流节点作业配置

  • declare_output_type: str | None:声明的输出类型
  • previous_node_outputs: list:前置节点输出映射

ComposerSavePayload:Composer 保存请求体

  • variant: ComposerVariantworkflowagent_app
  • save_strategy: ComposerSaveStrategy:保存策略枚举
  • agent_soul: AgentSoulConfig | None:Agent Soul 配置(按策略可选)
  • node_job: WorkflowNodeJobConfig | None:节点作业配置(仅工作流变体)
  • new_agent_name: str | None:新 Agent 名称(SAVE_AS_NEW_AGENT 时必需)
  • version_note: str | None:版本说明
  • binding: dict | None:现有绑定上下文

ComposerCandidatesResponse:Composer 候选资源响应

  • allowed_node_job_candidates: dict:节点作业候选(前置节点输出、输出类型等)
  • allowed_soul_candidates: dict:Soul 候选(技能文件、工具、数据集、人工联系人)

协议扩展(dify-agent 包)#

PR #36284 对 dify-agent 协议进行了扩展,新增取消运行和执行上下文字段:

ExecutionContextdify_agent/protocol/schemas.py):Dify 执行上下文

  • tenant_idapp_idworkflow_idworkflow_run_idnode_idnode_execution_id
  • conversation_idagent_idagent_config_version_id
  • invoke_from: InvokeFrom:调用来源(workflow_runsingle_stepagent_appbabysitfasten
  • trace_id:分布式追踪 ID

CreateRunRequest 扩展字段

  • execution_context: ExecutionContext | None:执行上下文(用于可观察性和产品关联)
  • purpose: RunPurpose:运行目的(workflow_nodesingle_stepagent_appbabysitfasten_preview
  • idempotency_key: str | None:幂等键
  • metadata: dict[str, JsonValue]:自由键值元数据

CancelRunRequest / CancelRunResponse:新增取消运行协议

  • reason: str | None:取消原因
  • message: str | None:取消消息
  • 响应体包含 run_idstatus: "cancelled"

新增事件类型

  • run_paused:可恢复暂停事件(含 reasonmessagesession_snapshot
  • run_cancelled:终端取消事件(由显式取消请求触发)

RunStatus 扩展:新增 pausedcancelled 状态(原有 runningsucceededfailed

dify-agent 客户端扩展dify_agent/client/_client.py):

  • cancel_run_sync()cancel_run():同步和异步取消运行方法
  • wait_run_sync()wait_run():轮询等待运行终止方法

依赖变更#

api/pyproject.toml:将 dify-agentdev 依赖组移至生产依赖,标志 API 端集成正式启用

架构意义#

PR #36284 实现了从 dify-agent 服务器(Commit 55f95dbc)API 生产集成 的完整闭环:

层次Commit 55f95dbc(基础设施)PR #36284(API 集成)
Agent 执行dify-agent 服务器、Agenton 框架DifyAgentBackendRunClient 客户端包装
数据模型Redis run 记录、事件流数据库 Agent / ConfigVersion / Binding 模型
协议层CreateRunRequestRunEventExecutionContextCancelRunRequestrun_paused/run_cancelled 事件
控制层POST /runsGET /runs/{id}/events/ssePUT /agent-composerPOST /agents REST 端点
业务逻辑RunSchedulerAgentRunRunnerAgentComposerServiceRosterService

现在,工作流节点可通过以下路径调用 Agent 后端:

Workflow Node
    → AgentComposerService.load_workflow_composer()
        → WorkflowAgentNodeBinding (DB)
            → Agent.active_config_version_id
                → AgentConfigVersion.config_snapshot
                    → DifyAgentBackendRunClient.create_run(CreateRunRequest)
                        → dify_agent.client.Client.create_run_sync()
                            → POST http://agent-backend:8000/runs
                                → RunScheduler.schedule_run()
                                    → AgentRunRunner.run()

这一集成建立了数据库持久化的 Agent 身份系统(Agent Roster)和工作流可引用的配置版本机制,使 Agent 配置从 "一次性请求负载" 演进为 "版本化、可审计、可复用的领域资产"。

Dify Agent Server 模块分析 (Commit 55f95dbc) | Dosu