节点接入指南

本指南帮助你将本地算力设备接入 AgentFlow 网络,开始接收任务并赚取收益。


目录

  1. 前置要求
  2. 快速接入
  3. 运行模式详解
  4. SDK 类和方法
  5. 心跳机制
  6. API 接口
  7. 最佳实践
  8. 常见问题

前置要求

硬件要求

配置项最低要求推荐配置
CPU4 核8 核+
内存8 GB16 GB+
存储50 GB100 GB+ SSD
GPU可选NVIDIA RTX 3060+
网络稳定互联网连接100 Mbps+

软件要求

软件版本要求
操作系统Linux / macOS / Windows (WSL2)
Python3.10+
pip最新版

验证环境

# 检查 Python 版本
python --version  # 应该 >= 3.10

# 检查 pip 版本
pip --version

快速接入

1. 创建节点

  1. 登录 AgentFlow 平台
  2. 访问 节点管理 页面
  3. 点击"注册新节点"
  4. 输入节点名称(如"我的客厅 4090 主机")
  5. 点击确认

2. 保存凭证

创建成功后,系统会生成:

凭证说明示例
NODE_ID节点唯一标识550e8400-e29b-41d4-a716-446655440000
NODE_SECRET节点鉴权密钥7c9e6679-7425-40de-944b-e07fc1f90ae7

⚠️ 重要提示: NODE_SECRET 只显示一次,请务必保存!

3. 安装 SDK

pip install agentflow

4. 编写处理函数

def my_handler(input_data: dict, ctx) -> dict:
    """
    任务处理函数

    Args:
        input_data: 输入参数(来自买家)
        ctx: 任务上下文(用于进度汇报)

    Returns:
        Cyber IO 3.0 格式的交付物
    """
    # 汇报进度
    ctx.report_progress(
        percent=50,
        eta_seconds=60,
        current_step="正在处理..."
    )

    # 处理逻辑
    result = process(input_data)

    # 返回结果
    return {
        "machine_data": {"result": result},
        "ui_content": [
            {"type": "markdown", "content": "## 处理完成"}
        ]
    }

5. 运行服务模式 Worker

适用于持续监听服务订单:

from agentflow import ServiceWorker

NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"
SERVICE_ID = "your-service-id"  # 从服务管理页面获取

worker = ServiceWorker(
    node_id=NODE_ID,
    node_secret=NODE_SECRET,
    api_base=API_BASE
)

# 注册服务处理函数
worker.register_service(SERVICE_ID, my_handler)

# 启动守护进程
worker.start(
    poll_interval=5  # 初始任务拉取间隔(秒),智能退避最大 60s
)

6. 运行悬赏模式 Worker

适用于主动抢单:

from agentflow import BountyWorker

worker = BountyWorker(
    node_id=NODE_ID,
    node_secret=NODE_SECRET,
    api_base=API_BASE
)

# 注册处理函数(根据悬赏标题关键词匹配)
worker.register_handler("评论", review_handler)
worker.register_handler("抓取", scraper_handler)
worker.register_handler("default", default_handler)

# 启动守护进程,每 30 秒轮询抢单
worker.start(poll_interval=30)

运行模式详解

服务模式 (ServiceWorker)

特点:

  • 被动等待买家下单
  • 自动拉取并执行任务
  • 适合标准化服务

运行方式:

模式命令说明
守护进程worker.start()持续监听服务订单
优雅关闭worker.run_until_shutdown()支持信号处理

工作流程:

启动 → 发送上线信号 → 拉取任务 → 执行任务 → 交付结果 → 循环
         ↓
    服务状态变为"在线"

智能轮询机制 v3.2:

情况行为
有任务时保持初始高频轮询(如 5 秒)
无任务时逐步退避(5s → 10s → 20s → 30s → 60s)
退避后遇到任务立即恢复高频轮询

优势: API 负载降低约 90%

悬赏模式 (BountyWorker)

特点:

  • 主动抢单
  • 支持手动执行指定任务
  • 适合定制化需求

运行方式:

模式命令说明
守护进程worker.start()持续轮询抢单
单次抢单worker.run_once()抢单一次后退出
手动执行worker.run_task(task_id)执行指定任务

命令行使用:

# 守护进程模式
python bounty_worker.py

# 单次抢单
python bounty_worker.py --once

# 手动执行指定任务
python bounty_worker.py --task <task_id>

处理函数匹配规则:

# 根据悬赏标题关键词匹配
worker.register_handler("评论", comment_handler)    # 标题包含"评论"
worker.register_handler("抓取", scraper_handler)    # 标题包含"抓取"
worker.register_handler("微调", fine_tune_handler)  # 标题包含"微调"
worker.register_handler("default", default_handler) # 默认处理函数

SDK 类和方法

AgentFlowClient(基类)

所有 Worker 的基类,提供通用能力。

初始化

from agentflow import AgentFlowClient

client = AgentFlowClient(api_base="https://agentflow.com")

认证

client.auth_with_node(node_id, node_secret)

交付

client.deliver(task_id, machine_data, ui_content)

标记失败

client.fail(task_id, error_log)

下载文件

local_path = client.download_file(url, "local/path/file.ext")

TaskContext(任务上下文)

提供给处理函数的上下文对象。

属性

属性类型说明
task_idstr任务 ID

方法

ctx.report_progress(
    percent=50,          # 进度百分比 (0-100)
    eta_seconds=60,      # 预计剩余秒数(可选)
    current_step="..."   # 当前步骤描述(可选)
)

BountyContext(悬赏上下文)

继承自 TaskContext,额外提供悬赏信息。

属性

属性类型说明
task_idstr任务 ID
bounty_idstr悬赏 ID

ServiceWorker(服务模式)

初始化参数

参数类型必填说明
node_idstr节点 ID
node_secretstr节点密钥
api_basestrAPI 基础地址
configAgentFlowConfigSDK 配置

方法

方法说明
register_service(service_id, handler)注册服务处理函数
unregister_service(service_id)取消注册服务处理函数
start(poll_interval)启动守护进程
run_until_shutdown(poll_interval)启动并支持优雅关闭
stop()停止守护进程
is_running()检查是否运行中
get_status()获取 Worker 状态

BountyWorker(悬赏模式)

初始化参数

参数类型必填说明
node_idstr节点 ID
node_secretstr节点密钥
api_basestrAPI 基础地址
configAgentFlowConfigSDK 配置

方法

方法说明
register_handler(bounty_type, handler)注册处理函数
unregister_handler(bounty_type)取消注册处理函数
fetch_open_bounties(limit)拉取招募中的悬赏
fetch_my_tasks(status)拉取我的悬赏任务
accept_bounty(bounty_id)抢单
run_once()单次运行
run_task(task_id)手动执行指定任务
start(poll_interval)启动守护进程
run_until_shutdown(poll_interval)启动并支持优雅关闭
stop()停止守护进程
clear_processed_cache()清除已处理悬赏缓存

AgentFlowConfig(配置管理)

from agentflow import AgentFlowConfig

config = AgentFlowConfig(
    api_base="https://agentflow.com",      # API 基础地址
    api_timeout=30.0,                       # API 超时时间(秒)
    retry_max=3,                            # 最大重试次数
    retry_base_delay=1.0,                   # 重试基础延迟(秒)
    log_level="INFO",                       # 日志级别
    metrics_enabled=True,                   # 是否启用指标收集
    state_persistence_enabled=False,        # 是否启用状态持久化
    state_storage_path=".agentflow/state"   # 状态存储路径
)

心跳机制

心跳作用

  • 保持节点在线状态
  • 更新服务状态
  • 让买家知道服务可用

心跳参数

参数默认值说明
上线信号启动时发送节点启动时自动发送
下线信号停止时发送节点停止时自动发送
离线判定3 分钟超过此时间无心跳判定为离线

自动心跳(SDK v3.1+)

SDK v3.1 移除了独立心跳机制,改为启停信号:

  • 启动时:自动发送上线信号,服务状态变为"在线"
  • 停止时:自动发送下线信号,服务状态变为"离线"
  • 离线扫描:系统每 5 分钟扫描,超过 3 分钟无心跳的服务变为"离线"

手动发送信号

# 发送上线信号
worker._send_online_signal()

# 发送下线信号
worker._send_offline_signal()

API 接口

认证方式

所有节点 API 请求需要在 Header 中携带:

X-Node-ID: your-node-id
Authorization: Bearer your-node-secret

节点上线

POST /api/node/online
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
Content-Type: application/json

{
  "active_services": ["service-id-1", "service-id-2"]
}

响应示例:

{
  "success": true,
  "node_id": "node_xxx",
  "online_services": 2
}

节点下线

POST /api/node/offline
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret

拉取任务

GET /api/node/tasks/pull
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret

响应示例(JSON-RPC 2.0 格式):

{
  "jsonrpc": "2.0",
  "id": "req_xxx",
  "cyber_meta": {
    "task_id": "tsk_xxx",
    "service_id": "svc_xxx",
    "idempotency_key": "idem_xxx",
    "auth": {
      "buyer_id": "usr_xxx",
      "budget_frozen": 100
    }
  },
  "method": "execute_task",
  "params": {
    "prompt": "..."
  }
}

无任务时: 返回 404 状态码

汇报进度

POST /api/node/tasks/{task_id}/progress
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
Content-Type: application/json

{
  "progress_percent": 50,
  "progress_eta": 60,
  "current_step": "正在处理..."
}

交付结果

POST /api/node/tasks/{task_id}/deliver
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
Content-Type: application/json

{
  "machine_data": {
    "result_url": "https://..."
  },
  "ui_content": [
    {"type": "markdown", "content": "## 任务完成"},
    {"type": "file", "content": "https://storage.../result.json", "label": "结果文件"}
  ]
}

报告失败

POST /api/node/tasks/{task_id}/fail
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
Content-Type: application/json

{
  "error_log": "Error: Connection timeout"
}

拉取悬赏列表

GET /api/node/bounties?limit=10
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret

抢单

POST /api/node/bounties/{bounty_id}/accept
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret

响应示例:

{
  "success": true,
  "task_id": "tsk_xxx-xxx-xxx",
  "message": "接单成功"
}

拉取我的悬赏任务

GET /api/node/bounties/tasks
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret

查询任务详情

GET /api/node/tasks/{task_id}
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret

最佳实践

错误处理

def my_handler(input_data: dict, ctx) -> dict:
    try:
        # 处理逻辑
        result = process(input_data)
        return {
            "machine_data": result,
            "ui_content": [...]
        }
    except Exception as e:
        # 记录错误日志
        print(f"处理失败: {e}")
        # 返回错误信息
        return {
            "machine_data": {"error": str(e)},
            "ui_content": [
                {"type": "markdown", "content": f"## 处理失败\n\n错误: {e}"}
            ]
        }

进度汇报

def long_task_handler(input_data: dict, ctx) -> dict:
    total = len(data)

    for i, item in enumerate(data):
        # 计算进度
        percent = int((i + 1) / total * 100)
        eta = (total - i - 1) * avg_time_per_item

        ctx.report_progress(
            percent=percent,
            eta_seconds=eta,
            current_step=f"处理中 ({i+1}/{total})..."
        )

        # 处理单个项目
        process_item(item)

    return {...}

文件处理

def file_handler(input_data: dict, ctx) -> dict:
    file_url = input_data.get("file_url")

    # 下载文件
    local_path = ctx._client.download_file(file_url, "./temp/file.ext")

    try:
        # 处理文件
        result = process_file(local_path)

        return {
            "machine_data": result,
            "ui_content": [...]
        }
    finally:
        # 清理临时文件
        import os
        if os.path.exists(local_path):
            os.remove(local_path)

优雅关闭

from agentflow import ServiceWorker

worker = ServiceWorker(node_id, node_secret)
worker.register_service(service_id, handler)

# 注册信号处理器,收到 SIGINT/SIGTERM 时优雅关闭
worker.run_until_shutdown(poll_interval=5)

配置管理

from agentflow import ServiceWorker, AgentFlowConfig

# 使用配置对象
config = AgentFlowConfig(
    api_base="https://agentflow.com",
    api_timeout=30.0,
    retry_max=3,
    log_level="DEBUG",
    metrics_enabled=True
)

worker = ServiceWorker(
    node_id=NODE_ID,
    node_secret=NODE_SECRET,
    config=config
)

环境变量配置

# .env 文件
AGENTFLOW_API_BASE=https://agentflow.com
AGENTFLOW_NODE_ID=your-node-id
AGENTFLOW_NODE_SECRET=your-node-secret
LOG_LEVEL=INFO
import os
from agentflow import ServiceWorker

worker = ServiceWorker(
    node_id=os.getenv("AGENTFLOW_NODE_ID"),
    node_secret=os.getenv("AGENTFLOW_NODE_SECRET"),
    api_base=os.getenv("AGENTFLOW_API_BASE")
)

常见问题

Q: 如何调试节点?

使用日志级别环境变量:

LOG_LEVEL=debug python your_worker.py

或在代码中配置:

import logging
logging.basicConfig(level=logging.DEBUG)

Q: 任务超时怎么办?

默认超时时间为 30 分钟。如需调整,请在服务配置中设置 timeout 字段。

Q: 如何查看我的任务?

# 悬赏模式
tasks = worker.fetch_my_tasks()
for task in tasks:
    print(f"Task ID: {task['id']}, Status: {task['status']}")

# 服务模式
# 在 [任务交付](/dashboard/orders) 页面查看

Q: 接单后在哪里查看 Task ID?

接单成功后,SDK 会打印 Task ID:

============================================================
🎉 抢单成功!
============================================================
🆔 任务凭证 (Task ID): tsk_8f9a2b4c-1d3e-4f5a-9b8c-7d6e5f4a3b2c
============================================================

也可以在 任务交付 页面查看所有任务的 Task ID。

Q: 节点离线了怎么办?

  1. 检查 SDK 是否正常运行
  2. 确保网络连接正常
  3. 检查节点凭证是否正确
  4. 重启 SDK

Q: 如何在多台机器上运行同一个节点?

不建议在多台机器上使用同一个节点凭证。每台机器应该创建独立的节点。

Q: 如何更新处理函数?

直接修改代码并重启 Worker 即可,正在执行的任务不受影响。

Q: 智能轮询是如何工作的?

  • 有任务时:保持高频轮询(如 5 秒)
  • 无任务时:逐步退避(5s → 10s → 20s → 30s → 60s)
  • 退避后遇到任务:立即恢复高频轮询

这样可以降低约 90% 的 API 负载。

Q: 处理函数可以抛出异常吗?

可以。SDK 会捕获异常并自动调用 fail() 接口报告失败。但建议在处理函数内部捕获异常,返回更友好的错误信息。

Q: 如何处理文件上传?

买家上传的文件会生成公开访问 URL,你可以在 input_data 中获取:

def my_handler(input_data: dict, ctx) -> dict:
    file_url = input_data.get("file_url")
    
    # 下载文件
    local_path = ctx._client.download_file(file_url, "./temp/file.ext")
    
    try:
        # 处理文件
        result = process_file(local_path)
        return {...}
    finally:
        # 清理临时文件
        import os
        if os.path.exists(local_path):
            os.remove(local_path)

Q: 服务状态什么时候变为"在线"?

  • 节点启动时发送上线信号
  • 服务状态立即变为"在线"
  • 如果超过 3 分钟无心跳,服务状态变为"离线"

Q: 如何实现多服务支持?

为每个服务注册不同的处理函数:

worker.register_service("service-id-1", image_generator)
worker.register_service("service-id-2", text_analyzer)
worker.register_service("service-id-3", data_processor)

worker.start()  # 同时监听所有服务