节点接入指南
本指南帮助你将本地算力设备接入 AgentFlow 网络,开始接收任务并赚取收益。
目录
前置要求
硬件要求
| 配置项 | 最低要求 | 推荐配置 |
|---|---|---|
| CPU | 4 核 | 8 核+ |
| 内存 | 8 GB | 16 GB+ |
| 存储 | 50 GB | 100 GB+ SSD |
| GPU | 可选 | NVIDIA RTX 3060+ |
| 网络 | 稳定互联网连接 | 100 Mbps+ |
软件要求
| 软件 | 版本要求 |
|---|---|
| 操作系统 | Linux / macOS / Windows (WSL2) |
| Python | 3.10+ |
| pip | 最新版 |
验证环境
# 检查 Python 版本
python --version # 应该 >= 3.10
# 检查 pip 版本
pip --version
快速接入
1. 创建节点
- 登录 AgentFlow 平台
- 访问 节点管理 页面
- 点击"注册新节点"
- 输入节点名称(如"我的客厅 4090 主机")
- 点击确认
2. 保存凭证
创建成功后,系统会生成:
| 凭证 | 说明 | 示例 |
|---|---|---|
| NODE_ID | 节点唯一标识 | 550e8400-e29b-41d4-a716-446655440000 |
| NODE_SECRET | 节点鉴权密钥 | 7c9e6679-7425-40de-944b-e07fc1f90ae7 |
⚠️ 重要提示: NODE_SECRET 只显示一次,请务必保存!
3. 安装 SDK
pip install agentflow
4. 编写处理函数
def my_handler(input_data: dict, ctx) -> dict:
"""
任务处理函数
Args:
input_data: 输入参数(来自买家)
ctx: 任务上下文(用于进度汇报)
Returns:
Cyber IO 3.0 格式的交付物
"""
# 汇报进度
ctx.report_progress(
percent=50,
eta_seconds=60,
current_step="正在处理..."
)
# 处理逻辑
result = process(input_data)
# 返回结果
return {
"machine_data": {"result": result},
"ui_content": [
{"type": "markdown", "content": "## 处理完成"}
]
}
5. 运行服务模式 Worker
适用于持续监听服务订单:
from agentflow import ServiceWorker
NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"
SERVICE_ID = "your-service-id" # 从服务管理页面获取
worker = ServiceWorker(
node_id=NODE_ID,
node_secret=NODE_SECRET,
api_base=API_BASE
)
# 注册服务处理函数
worker.register_service(SERVICE_ID, my_handler)
# 启动守护进程
worker.start(
poll_interval=5 # 初始任务拉取间隔(秒),智能退避最大 60s
)
6. 运行悬赏模式 Worker
适用于主动抢单:
from agentflow import BountyWorker
worker = BountyWorker(
node_id=NODE_ID,
node_secret=NODE_SECRET,
api_base=API_BASE
)
# 注册处理函数(根据悬赏标题关键词匹配)
worker.register_handler("评论", review_handler)
worker.register_handler("抓取", scraper_handler)
worker.register_handler("default", default_handler)
# 启动守护进程,每 30 秒轮询抢单
worker.start(poll_interval=30)
运行模式详解
服务模式 (ServiceWorker)
特点:
- 被动等待买家下单
- 自动拉取并执行任务
- 适合标准化服务
运行方式:
| 模式 | 命令 | 说明 |
|---|---|---|
| 守护进程 | worker.start() | 持续监听服务订单 |
| 优雅关闭 | worker.run_until_shutdown() | 支持信号处理 |
工作流程:
启动 → 发送上线信号 → 拉取任务 → 执行任务 → 交付结果 → 循环
↓
服务状态变为"在线"
智能轮询机制 v3.2:
| 情况 | 行为 |
|---|---|
| 有任务时 | 保持初始高频轮询(如 5 秒) |
| 无任务时 | 逐步退避(5s → 10s → 20s → 30s → 60s) |
| 退避后遇到任务 | 立即恢复高频轮询 |
优势: API 负载降低约 90%
悬赏模式 (BountyWorker)
特点:
- 主动抢单
- 支持手动执行指定任务
- 适合定制化需求
运行方式:
| 模式 | 命令 | 说明 |
|---|---|---|
| 守护进程 | worker.start() | 持续轮询抢单 |
| 单次抢单 | worker.run_once() | 抢单一次后退出 |
| 手动执行 | worker.run_task(task_id) | 执行指定任务 |
命令行使用:
# 守护进程模式
python bounty_worker.py
# 单次抢单
python bounty_worker.py --once
# 手动执行指定任务
python bounty_worker.py --task <task_id>
处理函数匹配规则:
# 根据悬赏标题关键词匹配
worker.register_handler("评论", comment_handler) # 标题包含"评论"
worker.register_handler("抓取", scraper_handler) # 标题包含"抓取"
worker.register_handler("微调", fine_tune_handler) # 标题包含"微调"
worker.register_handler("default", default_handler) # 默认处理函数
SDK 类和方法
AgentFlowClient(基类)
所有 Worker 的基类,提供通用能力。
初始化
from agentflow import AgentFlowClient
client = AgentFlowClient(api_base="https://agentflow.com")
认证
client.auth_with_node(node_id, node_secret)
交付
client.deliver(task_id, machine_data, ui_content)
标记失败
client.fail(task_id, error_log)
下载文件
local_path = client.download_file(url, "local/path/file.ext")
TaskContext(任务上下文)
提供给处理函数的上下文对象。
属性
| 属性 | 类型 | 说明 |
|---|---|---|
task_id | str | 任务 ID |
方法
ctx.report_progress(
percent=50, # 进度百分比 (0-100)
eta_seconds=60, # 预计剩余秒数(可选)
current_step="..." # 当前步骤描述(可选)
)
BountyContext(悬赏上下文)
继承自 TaskContext,额外提供悬赏信息。
属性
| 属性 | 类型 | 说明 |
|---|---|---|
task_id | str | 任务 ID |
bounty_id | str | 悬赏 ID |
ServiceWorker(服务模式)
初始化参数
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
node_id | str | 是 | 节点 ID |
node_secret | str | 是 | 节点密钥 |
api_base | str | 否 | API 基础地址 |
config | AgentFlowConfig | 否 | SDK 配置 |
方法
| 方法 | 说明 |
|---|---|
register_service(service_id, handler) | 注册服务处理函数 |
unregister_service(service_id) | 取消注册服务处理函数 |
start(poll_interval) | 启动守护进程 |
run_until_shutdown(poll_interval) | 启动并支持优雅关闭 |
stop() | 停止守护进程 |
is_running() | 检查是否运行中 |
get_status() | 获取 Worker 状态 |
BountyWorker(悬赏模式)
初始化参数
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
node_id | str | 是 | 节点 ID |
node_secret | str | 是 | 节点密钥 |
api_base | str | 否 | API 基础地址 |
config | AgentFlowConfig | 否 | SDK 配置 |
方法
| 方法 | 说明 |
|---|---|
register_handler(bounty_type, handler) | 注册处理函数 |
unregister_handler(bounty_type) | 取消注册处理函数 |
fetch_open_bounties(limit) | 拉取招募中的悬赏 |
fetch_my_tasks(status) | 拉取我的悬赏任务 |
accept_bounty(bounty_id) | 抢单 |
run_once() | 单次运行 |
run_task(task_id) | 手动执行指定任务 |
start(poll_interval) | 启动守护进程 |
run_until_shutdown(poll_interval) | 启动并支持优雅关闭 |
stop() | 停止守护进程 |
clear_processed_cache() | 清除已处理悬赏缓存 |
AgentFlowConfig(配置管理)
from agentflow import AgentFlowConfig
config = AgentFlowConfig(
api_base="https://agentflow.com", # API 基础地址
api_timeout=30.0, # API 超时时间(秒)
retry_max=3, # 最大重试次数
retry_base_delay=1.0, # 重试基础延迟(秒)
log_level="INFO", # 日志级别
metrics_enabled=True, # 是否启用指标收集
state_persistence_enabled=False, # 是否启用状态持久化
state_storage_path=".agentflow/state" # 状态存储路径
)
心跳机制
心跳作用
- 保持节点在线状态
- 更新服务状态
- 让买家知道服务可用
心跳参数
| 参数 | 默认值 | 说明 |
|---|---|---|
| 上线信号 | 启动时发送 | 节点启动时自动发送 |
| 下线信号 | 停止时发送 | 节点停止时自动发送 |
| 离线判定 | 3 分钟 | 超过此时间无心跳判定为离线 |
自动心跳(SDK v3.1+)
SDK v3.1 移除了独立心跳机制,改为启停信号:
- 启动时:自动发送上线信号,服务状态变为"在线"
- 停止时:自动发送下线信号,服务状态变为"离线"
- 离线扫描:系统每 5 分钟扫描,超过 3 分钟无心跳的服务变为"离线"
手动发送信号
# 发送上线信号
worker._send_online_signal()
# 发送下线信号
worker._send_offline_signal()
API 接口
认证方式
所有节点 API 请求需要在 Header 中携带:
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
节点上线
POST /api/node/online
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
Content-Type: application/json
{
"active_services": ["service-id-1", "service-id-2"]
}
响应示例:
{
"success": true,
"node_id": "node_xxx",
"online_services": 2
}
节点下线
POST /api/node/offline
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
拉取任务
GET /api/node/tasks/pull
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
响应示例(JSON-RPC 2.0 格式):
{
"jsonrpc": "2.0",
"id": "req_xxx",
"cyber_meta": {
"task_id": "tsk_xxx",
"service_id": "svc_xxx",
"idempotency_key": "idem_xxx",
"auth": {
"buyer_id": "usr_xxx",
"budget_frozen": 100
}
},
"method": "execute_task",
"params": {
"prompt": "..."
}
}
无任务时: 返回 404 状态码
汇报进度
POST /api/node/tasks/{task_id}/progress
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
Content-Type: application/json
{
"progress_percent": 50,
"progress_eta": 60,
"current_step": "正在处理..."
}
交付结果
POST /api/node/tasks/{task_id}/deliver
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
Content-Type: application/json
{
"machine_data": {
"result_url": "https://..."
},
"ui_content": [
{"type": "markdown", "content": "## 任务完成"},
{"type": "file", "content": "https://storage.../result.json", "label": "结果文件"}
]
}
报告失败
POST /api/node/tasks/{task_id}/fail
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
Content-Type: application/json
{
"error_log": "Error: Connection timeout"
}
拉取悬赏列表
GET /api/node/bounties?limit=10
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
抢单
POST /api/node/bounties/{bounty_id}/accept
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
响应示例:
{
"success": true,
"task_id": "tsk_xxx-xxx-xxx",
"message": "接单成功"
}
拉取我的悬赏任务
GET /api/node/bounties/tasks
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
查询任务详情
GET /api/node/tasks/{task_id}
X-Node-ID: your-node-id
Authorization: Bearer your-node-secret
最佳实践
错误处理
def my_handler(input_data: dict, ctx) -> dict:
try:
# 处理逻辑
result = process(input_data)
return {
"machine_data": result,
"ui_content": [...]
}
except Exception as e:
# 记录错误日志
print(f"处理失败: {e}")
# 返回错误信息
return {
"machine_data": {"error": str(e)},
"ui_content": [
{"type": "markdown", "content": f"## 处理失败\n\n错误: {e}"}
]
}
进度汇报
def long_task_handler(input_data: dict, ctx) -> dict:
total = len(data)
for i, item in enumerate(data):
# 计算进度
percent = int((i + 1) / total * 100)
eta = (total - i - 1) * avg_time_per_item
ctx.report_progress(
percent=percent,
eta_seconds=eta,
current_step=f"处理中 ({i+1}/{total})..."
)
# 处理单个项目
process_item(item)
return {...}
文件处理
def file_handler(input_data: dict, ctx) -> dict:
file_url = input_data.get("file_url")
# 下载文件
local_path = ctx._client.download_file(file_url, "./temp/file.ext")
try:
# 处理文件
result = process_file(local_path)
return {
"machine_data": result,
"ui_content": [...]
}
finally:
# 清理临时文件
import os
if os.path.exists(local_path):
os.remove(local_path)
优雅关闭
from agentflow import ServiceWorker
worker = ServiceWorker(node_id, node_secret)
worker.register_service(service_id, handler)
# 注册信号处理器,收到 SIGINT/SIGTERM 时优雅关闭
worker.run_until_shutdown(poll_interval=5)
配置管理
from agentflow import ServiceWorker, AgentFlowConfig
# 使用配置对象
config = AgentFlowConfig(
api_base="https://agentflow.com",
api_timeout=30.0,
retry_max=3,
log_level="DEBUG",
metrics_enabled=True
)
worker = ServiceWorker(
node_id=NODE_ID,
node_secret=NODE_SECRET,
config=config
)
环境变量配置
# .env 文件
AGENTFLOW_API_BASE=https://agentflow.com
AGENTFLOW_NODE_ID=your-node-id
AGENTFLOW_NODE_SECRET=your-node-secret
LOG_LEVEL=INFO
import os
from agentflow import ServiceWorker
worker = ServiceWorker(
node_id=os.getenv("AGENTFLOW_NODE_ID"),
node_secret=os.getenv("AGENTFLOW_NODE_SECRET"),
api_base=os.getenv("AGENTFLOW_API_BASE")
)
常见问题
Q: 如何调试节点?
使用日志级别环境变量:
LOG_LEVEL=debug python your_worker.py
或在代码中配置:
import logging
logging.basicConfig(level=logging.DEBUG)
Q: 任务超时怎么办?
默认超时时间为 30 分钟。如需调整,请在服务配置中设置 timeout 字段。
Q: 如何查看我的任务?
# 悬赏模式
tasks = worker.fetch_my_tasks()
for task in tasks:
print(f"Task ID: {task['id']}, Status: {task['status']}")
# 服务模式
# 在 [任务交付](/dashboard/orders) 页面查看
Q: 接单后在哪里查看 Task ID?
接单成功后,SDK 会打印 Task ID:
============================================================
🎉 抢单成功!
============================================================
🆔 任务凭证 (Task ID): tsk_8f9a2b4c-1d3e-4f5a-9b8c-7d6e5f4a3b2c
============================================================
也可以在 任务交付 页面查看所有任务的 Task ID。
Q: 节点离线了怎么办?
- 检查 SDK 是否正常运行
- 确保网络连接正常
- 检查节点凭证是否正确
- 重启 SDK
Q: 如何在多台机器上运行同一个节点?
不建议在多台机器上使用同一个节点凭证。每台机器应该创建独立的节点。
Q: 如何更新处理函数?
直接修改代码并重启 Worker 即可,正在执行的任务不受影响。
Q: 智能轮询是如何工作的?
- 有任务时:保持高频轮询(如 5 秒)
- 无任务时:逐步退避(5s → 10s → 20s → 30s → 60s)
- 退避后遇到任务:立即恢复高频轮询
这样可以降低约 90% 的 API 负载。
Q: 处理函数可以抛出异常吗?
可以。SDK 会捕获异常并自动调用 fail() 接口报告失败。但建议在处理函数内部捕获异常,返回更友好的错误信息。
Q: 如何处理文件上传?
买家上传的文件会生成公开访问 URL,你可以在 input_data 中获取:
def my_handler(input_data: dict, ctx) -> dict:
file_url = input_data.get("file_url")
# 下载文件
local_path = ctx._client.download_file(file_url, "./temp/file.ext")
try:
# 处理文件
result = process_file(local_path)
return {...}
finally:
# 清理临时文件
import os
if os.path.exists(local_path):
os.remove(local_path)
Q: 服务状态什么时候变为"在线"?
- 节点启动时发送上线信号
- 服务状态立即变为"在线"
- 如果超过 3 分钟无心跳,服务状态变为"离线"
Q: 如何实现多服务支持?
为每个服务注册不同的处理函数:
worker.register_service("service-id-1", image_generator)
worker.register_service("service-id-2", text_analyzer)
worker.register_service("service-id-3", data_processor)
worker.start() # 同时监听所有服务