卖家指南

本指南帮助你在 AgentFlow 平台上出售算力服务、接单悬赏、赚取收益。


目录

  1. 创建节点
  2. 发布服务
  3. 接单悬赏
  4. 管理订单
  5. SDK 使用指南
  6. Cyber IO 3.0 交付协议
  7. 常见问题

1. 创建节点

节点是你的算力设备在平台上的身份标识。所有服务都必须关联到节点才能运行。

1.1 创建步骤

  1. 访问 节点管理 页面
  2. 点击"注册新节点"
  3. 输入节点名称(如"我的客厅 4090 主机")
  4. 点击确认

1.2 保存凭证

创建成功后,系统会生成:

凭证说明用途
NODE_ID节点唯一标识SDK 认证、API 调用
NODE_SECRET节点鉴权密钥SDK 认证、API 调用

⚠️ 重要提示: NODE_SECRET 只显示一次,请务必保存!

凭证格式示例:

NODE_ID: 550e8400-e29b-41d4-a716-446655440000
NODE_SECRET: 7c9e6679-7425-40de-944b-e07fc1f90ae7

1.3 节点管理

在节点管理页面可以:

操作说明
查看列表查看节点列表和承载的服务数量
复制凭证复制 NODE_ID 和 NODE_SECRET
编辑名称修改节点显示名称
删除节点删除节点(会同步删除该节点下的所有服务)

节点状态:

状态说明
online节点在线,SDK 正在运行
offline节点离线,SDK 未运行
maintenance维护中(预留)

2. 发布服务

服务管理 页面发布你的 AI 服务。

2.1 基础信息

字段说明限制
服务名称简洁明了的服务名称最多 20 字
版本号如 v1.0.0建议格式 v1.0.0
简短介绍展示在市场卡片上最多 200 字
详细描述支持 Markdown 的详细说明无限制

详细描述建议:

  • 功能介绍
  • 使用场景
  • 输入输出说明
  • 注意事项

2.2 运行设置

字段说明
选择节点运行此服务的节点(必须关联)
单次价格每次调用的积分价格

价格建议:

  • 参考市场同类服务定价
  • 考虑算力成本和时间成本
  • 新服务可以设置较低价格吸引买家

2.3 输入输出配置

简化配置

配置项说明
文本输入是否接受文本输入
字数限制文本最大字符数
文件输入是否接受文件上传
文件格式允许的文件类型(逗号分隔)
文件大小最大文件大小(MB)

高级配置(JSON Schema)

对于复杂需求,可直接填写自定义 Schema:

// Input Schema - 买家需要提供的参数
{
  "type": "object",
  "required": ["image_url"],
  "properties": {
    "image_url": {
      "type": "string",
      "format": "uri",
      "description": "待处理的图片URL"
    },
    "style": {
      "type": "string",
      "enum": ["cartoon", "oil_painting", "sketch"],
      "default": "cartoon",
      "description": "转换风格"
    }
  }
}

// Output Schema - 服务返回的数据结构
{
  "type": "object",
  "properties": {
    "result_url": {
      "type": "string",
      "format": "uri",
      "description": "处理后的图片URL"
    }
  }
}

详细说明请参考 高级配置指南

2.4 服务上线

发布服务后,需要运行 SDK 节点才能接收订单:

  1. 保存服务 ID(在服务管理页面查看)
  2. 使用 SDK 运行节点(参考下方 SDK 使用指南)
  3. 节点心跳上报后,服务状态变为"在线"

服务状态判定:

  • 节点启动时发送上线信号 → 服务状态变为"在线"
  • 超过 3 分钟无心跳 → 服务状态变为"离线"
  • 节点停止时发送下线信号 → 服务状态变为"离线"

2.5 管理服务

在服务管理页面可以:

操作说明
查看列表查看服务列表和状态
编辑服务修改服务配置
删除服务永久删除服务
查看数据查看销量、收藏、收益

3. 接单悬赏

悬赏大厅 浏览买家发布的需求。

3.1 浏览悬赏

悬赏大厅展示所有招募中的悬赏:

信息说明
标题悬赏标题
简短介绍需求概述
悬赏金额完成后可获得的积分
违约金超时未交付时扣除的积分
交付时限必须在多少小时内完成

筛选方式:

  • 状态筛选:招募中 / 已完结
  • 关键词搜索

3.2 查看详情

点击悬赏卡片,右侧弹出 Sheet 侧边栏展示:

区域内容
基础信息标题、悬赏金额、违约金、交付时限
详细描述Markdown 渲染的需求描述
输入输出期望的输入输出格式
发布者买家信息

3.3 抢单接单

  1. 点击"立即抢单"按钮
  2. 系统创建任务并锁定悬赏金额
  3. 显示任务凭证(Task ID)
  4. 保存 Task ID 用于后续交付

抢单规则:

  • 不能接自己发布的悬赏
  • 先到先得,同一时间只能有一个卖家接单
  • 接单后必须在交付时限内完成

3.4 开发交付

接单成功后,使用 SDK 或手动交付成果:

使用 SDK 交付

from agentflow import BountyWorker

worker = BountyWorker(
    node_id="your-node-id",
    node_secret="your-node-secret"
)

# 注册处理函数
worker.register_handler("评论", my_handler)

# 手动执行指定任务
worker.run_task("tsk_xxx-xxx-xxx")

手动交付

通过 API 直接交付:

curl -X POST https://agentflow.com/api/node/tasks/TASK_ID/deliver \
  -H "X-Node-ID: YOUR_NODE_ID" \
  -H "Authorization: Bearer YOUR_NODE_SECRET" \
  -H "Content-Type: application/json" \
  -d '{
    "machine_data": {"result": "..."},
    "ui_content": [{"type": "markdown", "content": "完成"}]
  }'

3.5 等待验收

买家验收后,悬赏金额(扣除平台手续费)打入你的钱包。

收益计算:

  • 悬赏金额 + 违约金 = 总收益
  • 平台手续费:10%
  • 实际到账:总收益 × 90%

4. 管理订单

任务交付 页面查看所有接到的订单。

4.1 订单类型

类型标识说明
服务订单买家购买你的服务产生
悬赏订单紫色"悬赏"标签你接单的悬赏任务

4.2 订单详情

点击"详情"查看:

信息说明
Task ID任务凭证,用于 SDK 交付
执行进度实时进度条
输入数据买家提供的输入参数
输出格式期望的输出格式(悬赏订单)
剩余时间悬赏订单显示交付倒计时
财务信息锁定积分、卖家收入

4.3 订单状态

状态说明操作
待处理等待节点拉取任务SDK 自动拉取
执行中正在处理汇报进度
待验收已交付等待买家验收
已完成验收通过积分到账
已失败执行失败查看错误日志

5. SDK 使用指南

安装 SDK

pip install agentflow

SDK v3.0 特性

特性说明
同步/异步双模式支持 ServiceWorkerAsyncServiceWorker
配置管理AgentFlowConfig 支持环境变量和 JSON 配置
结构化日志Python logging 模块,JSON 格式输出
异常层次结构清晰的异常类型,便于错误处理
上下文管理器with / async with 自动资源清理
重试机制指数退避重试,熔断器保护
优雅关闭跨平台信号处理,安全退出
智能轮询动态调整轮询间隔,降低 API 负载

同步 vs 异步:如何选择?

场景推荐模式原因
简单脚本、单任务同步模式代码简单,易于调试
高并发、多任务异步模式高性能,资源利用率高
需要同时处理多个请求异步模式避免阻塞等待
快速原型开发同步模式开发效率高

服务模式 (ServiceWorker)

适用于:持续监听服务订单,自动执行。

同步版本(推荐入门)

from agentflow import ServiceWorker, AgentFlowConfig

# 配置节点凭证(从节点管理页面获取)
NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"

# SDK v3.0 配置管理
config = AgentFlowConfig(
    api_base=API_BASE,
    api_timeout=30.0,
    retry_max=3,
    log_level="INFO"
)

# 创建 Worker(使用上下文管理器)
with ServiceWorker(
    node_id=NODE_ID,
    node_secret=NODE_SECRET,
    config=config
) as worker:
    # 定义处理函数
    def my_handler(input_data: dict, ctx) -> dict:
        """
        处理服务订单

        Args:
            input_data: 买家输入的参数
            ctx: 任务上下文,用于进度汇报

        Returns:
            Cyber IO 3.0 格式的交付物
        """
        # 汇报进度
        ctx.report_progress(
            percent=50,
            eta_seconds=60,
            current_step="正在处理..."
        )

        # 处理逻辑
        result = process(input_data)

        # 返回结果(Cyber IO 3.0 格式)
        return {
            "machine_data": {
                "result_url": result.url
            },
            "ui_content": [
                {
                    "type": "markdown",
                    "content": f"## 处理完成\n\n结果: {result.url}"
                },
                {
                    "type": "file",
                    "content": result.url,
                    "label": "结果文件"
                }
            ]
        }

    # 注册服务
    SERVICE_ID = "your-service-id"
    worker.register_service(SERVICE_ID, my_handler)

    # 启动守护进程(阻塞主线程)
    worker.start(
        poll_interval=5  # 初始任务拉取间隔(秒),智能退避最大 60s
    )

异步版本(高并发场景)

import asyncio
from agentflow.async_ import AsyncServiceWorker
from agentflow.config.settings import AgentFlowConfig

NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"

config = AgentFlowConfig(
    api_base=API_BASE,
    api_timeout=30.0,
    retry_max=3,
    log_level="INFO"
)

async def my_handler(input_data: dict, ctx) -> dict:
    """异步处理函数"""
    # 汇报进度
    await ctx.report_progress(
        percent=50,
        eta_seconds=60,
        current_step="正在处理..."
    )

    # 异步处理逻辑
    result = await async_process(input_data)

    return {
        "machine_data": {"result_url": result.url},
        "ui_content": [
            {"type": "markdown", "content": "## 处理完成"},
            {"type": "file", "content": result.url, "label": "结果文件"}
        ]
    }

async def main():
    # 使用异步上下文管理器
    async with AsyncServiceWorker(
        node_id=NODE_ID,
        node_secret=NODE_SECRET,
        config=config
    ) as worker:
        # 注册服务
        worker.register_service("service-id", my_handler)

        # 启动守护进程
        await worker.start(poll_interval=5)

if __name__ == "__main__":
    asyncio.run(main())

智能轮询机制 v3.2

SDK v3.2 引入智能轮询,动态调整轮询间隔:

情况行为
有任务时保持初始高频轮询(如 5 秒)
无任务时逐步退避(5s → 10s → 20s → 30s → 60s)
退避后遇到任务立即恢复高频轮询

优势: API 负载降低约 90%


悬赏模式 (BountyWorker)

适用于:主动抢单、手动执行悬赏任务。

同步版本

from agentflow import BountyWorker, AgentFlowConfig

NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"

config = AgentFlowConfig(api_base=API_BASE, log_level="INFO")

with BountyWorker(
    node_id=NODE_ID,
    node_secret=NODE_SECRET,
    config=config
) as worker:
    # 注册处理函数(根据悬赏标题关键词匹配)
    worker.register_handler("评论", review_scraper_handler)
    worker.register_handler("抓取", scraper_handler)
    worker.register_handler("微调", fine_tuning_handler)
    worker.register_handler("default", default_handler)  # 默认处理函数

    # 运行模式 1:守护进程模式(自动抢单)
    worker.start(poll_interval=30)  # 初始间隔 30s,智能退避最大 120s

    # 运行模式 2:单次抢单模式
    # worker.run_once()

    # 运行模式 3:手动执行指定任务
    # success = worker.run_task("tsk_xxx-xxx-xxx")

异步版本

import asyncio
from agentflow.async_ import AsyncBountyWorker
from agentflow.config.settings import AgentFlowConfig

NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"

config = AgentFlowConfig(api_base=API_BASE, log_level="INFO")

async def main():
    async with AsyncBountyWorker(
        node_id=NODE_ID,
        node_secret=NODE_SECRET,
        config=config
    ) as worker:
        # 注册处理函数
        worker.register_handler("评论", async_review_handler)
        worker.register_handler("抓取", async_scraper_handler)

        # 守护进程模式
        await worker.start(poll_interval=30)

if __name__ == "__main__":
    asyncio.run(main())

查看我的任务

# 拉取我接的所有悬赏任务
tasks = worker.fetch_my_tasks()

# 按状态过滤
processing_tasks = worker.fetch_my_tasks(status="processing")

输出示例:

============================================================
📋 我的悬赏任务 (3 个)
============================================================
  🆔 tsk_8f9a2b4c-1d3e... | 京东商品评论抓取... | processing
  🆔 tsk_7c8d9e0f-1a2b... | LLaMA 3 微调服务... | delivered
============================================================

6. Cyber IO 3.0 交付协议

所有交付结果必须遵循 Cyber IO 3.0 协议格式。

协议结构

{
    # 供 Agent/程序读取的结构化数据
    "machine_data": {
        "result_url": "https://...",
        "count": 100,
        "status": "success"
    },

    # 供前端渲染的多模态内容数组
    "ui_content": [
        {"type": "markdown", "content": "## 任务完成"},
        {"type": "file", "content": "https://...", "label": "下载文件"}
    ]
}

machine_data 字段

  • 必须是字典(dict)
  • 供 Agent 或程序自动读取
  • 建议与 Output Schema 结构一致
  • 可包含任意 JSON 可序列化的数据

ui_content 字段

  • 必须是数组(list)
  • 每个元素必须包含 typecontent 字段
  • 按数组顺序依次渲染

支持的类型:

type说明必填字段
markdownMarkdown 文本content
jsonJSON 数据content
file文件下载content, label

完整示例

def my_handler(input_data: dict, ctx) -> dict:
    # 处理逻辑
    result = process(input_data)

    # 返回 Cyber IO 3.0 格式
    return {
        "machine_data": {
            "summary": {
                "total_reviews": 1000,
                "average_rating": 4.2
            },
            "download_url": result.download_url
        },
        "ui_content": [
            {
                "type": "markdown",
                "content": """## 📊 评论抓取完成

**抓取数量**: 1000 条

### 情感分析
- 😊 好评: 720 条 (72%)
- 😠 差评: 180 条 (18%)
- ⭐ 平均评分: 4.2/5.0"""
            },
            {
                "type": "file",
                "content": result.download_url,
                "label": "完整评论数据.json"
            }
        ]
    }

进度汇报

对于长时间运行的任务,可以汇报进度让买家看到实时状态。

使用方法

def long_running_handler(input_data: dict, ctx) -> dict:
    # 阶段 1
    ctx.report_progress(
        percent=10,
        eta_seconds=3600,  # 预计剩余 1 小时
        current_step="正在下载数据..."
    )

    # 阶段 2
    ctx.report_progress(
        percent=50,
        eta_seconds=1800,
        current_step="正在处理数据 (500/1000)..."
    )

    # 阶段 3
    ctx.report_progress(
        percent=90,
        eta_seconds=300,
        current_step="正在生成报告..."
    )

    # 最终交付
    return {
        "machine_data": {...},
        "ui_content": [...]
    }

参数说明

参数类型说明
percentint进度百分比(0-100)
eta_secondsint预计剩余秒数(可选)
current_stepstr当前执行步骤描述(可选)

完整示例

示例 1:电商评论抓取(同步版本)

from agentflow import BountyWorker, AgentFlowConfig
import time

NODE_ID = "your-node-id"
NODE_SECRET = "your-node-secret"
API_BASE = "https://agentflow.com"

config = AgentFlowConfig(api_base=API_BASE, log_level="INFO")

def ecommerce_review_scraper(input_data: dict, ctx) -> dict:
    product_url = input_data.get("product_url", "")
    max_reviews = input_data.get("max_reviews", 1000)

    # 阶段 1:初始化
    ctx.report_progress(
        percent=10,
        eta_seconds=3600,
        current_step="正在初始化爬虫引擎..."
    )
    time.sleep(2)

    # 阶段 2:抓取
    for i in range(1, 5):
        ctx.report_progress(
            percent=10 + i * 20,
            eta_seconds=3600 - i * 600,
            current_step=f"正在抓取评论 ({i * 250}/{max_reviews})..."
        )
        time.sleep(1)

    # 返回 Cyber IO 3.0 格式
    return {
        "machine_data": {
            "total_reviews": 1000,
            "download_url": "https://storage.../reviews.json"
        },
        "ui_content": [
            {
                "type": "markdown",
                "content": "## 📊 评论抓取完成\n\n共抓取 1000 条评论"
            },
            {
                "type": "file",
                "content": "https://storage.../reviews.json",
                "label": "评论数据.json"
            }
        ]
    }

# 使用上下文管理器
with BountyWorker(
    node_id=NODE_ID,
    node_secret=NODE_SECRET,
    config=config
) as worker:
    worker.register_handler("评论", ecommerce_review_scraper)
    worker.register_handler("抓取", ecommerce_review_scraper)
    worker.start()

示例 2:手动执行任务

# 接单成功后保存 Task ID
task_id = "tsk_8f9a2b4c-1d3e-4f5a-9b8c-7d6e5f4a3b2c"

# 手动执行指定任务
success = worker.run_task(task_id)
if success:
    print("✅ 任务执行成功")
else:
    print("❌ 任务执行失败")

常见问题

Q: 如何调试节点?

设置日志级别:

LOG_LEVEL=debug python your_worker.py

Q: 任务执行失败怎么办?

SDK 会自动调用 fail() 接口报告失败,积分会退还给买家。你可以查看错误日志排查问题。

Q: 如何处理多个服务?

为每个服务注册不同的处理函数:

worker.register_service("service-id-1", image_generator)
worker.register_service("service-id-2", text_analyzer)

Q: 悬赏金额什么时候到账?

买家验收通过后,悬赏金额(扣除平台手续费)会立即打入你的钱包。

Q: 如何查看节点状态?

节点管理 页面查看节点状态和承载的服务数量。

Q: 节点离线了怎么办?

检查 SDK 是否正常运行,确保网络连接正常。重启 SDK 后节点会自动上线。

Q: 如何更新服务配置?

在服务管理页面点击编辑,修改后保存即可。正在执行的任务不受影响。

Q: 智能轮询是如何工作的?

  • 有任务时:保持高频轮询(如 5 秒)
  • 无任务时:逐步退避(5s → 10s → 20s → 30s → 60s)
  • 退避后遇到任务:立即恢复高频轮询

这样可以降低约 90% 的 API 负载。

Q: 如何实现优雅关闭?

使用 run_until_shutdown() 方法:

# 注册信号处理器,收到 SIGINT/SIGTERM 时优雅关闭
worker.run_until_shutdown(poll_interval=5)

Q: 处理函数可以抛出异常吗?

可以。SDK 会捕获异常并自动调用 fail() 接口报告失败。但建议在处理函数内部捕获异常,返回更友好的错误信息。

Q: 如何处理文件上传?

买家上传的文件会生成公开访问 URL,你可以在 input_data 中获取:

def my_handler(input_data: dict, ctx) -> dict:
    file_url = input_data.get("file_url")
    
    # 下载文件
    local_path = ctx._client.download_file(file_url, "./temp/file.ext")
    
    try:
        # 处理文件
        result = process_file(local_path)
        return {...}
    finally:
        # 清理临时文件
        import os
        if os.path.exists(local_path):
            os.remove(local_path)

Q: 违约金是如何计算的?

  • 悬赏超时未交付时,从卖家账户扣除违约金
  • 违约金 + 悬赏金额 = 买家验收后卖家获得的总额
  • 如果按时交付,违约金也支付给卖家