scheduler调度

NOTE

当前调度版本基于sglang tag 0.5.7

SGLang 请求优先级调度机制技术文档

文档概述

本文档系统性地分析了 SGLang (v0.5.7) 中请求优先级调度的实现机制,包括整体架构、配置参数、核心代码实现、以及当前机制的局限性。


一、整体概览

1.1 支持情况

SGLang 支持请求优先级调度,提供了灵活的调度策略和配置选项。

1.2 设计目标

  • 延迟优化: 通过 FCFS、优先级调度等策略降低请求延迟
  • 吞吐优化: 通过 LPM、批处理、缓存感知策略提升系统吞吐量
  • QoS 支持: 支持显式优先级调度,可抢占低优先级请求
  • 公平性: 在优先级相同的情况下采用 FCFS 保证公平性
  • 内存效率: 基于 RadixAttention 的前缀缓存,最大化内存利用率

1.3 调度层级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────────────────────┐
│  HTTP API Layer (OpenAI-compatible)                     │
│  ↓ 接收请求,解析优先级参数                               │
├─────────────────────────────────────────────────────────┤
│  Scheduler Layer (scheduler.py)                         │
│  ↓ 验证优先级 → 加入等待队列                              │
├─────────────────────────────────────────────────────────┤
│  Schedule Policy Layer (schedule_policy.py)             │
│  ↓ 应用调度策略 (LPM/FCFS/Priority) → 排序               │
├─────────────────────────────────────────────────────────┤
│  Batch Formation Layer (PrefillAdder)                   │
│  ↓ 内存预算检查 → 选择请求 → 构建 batch                  │
├─────────────────────────────────────────────────────────┤
│  Execution Layer (Model Executor)                       │
│  ↓ 执行 Prefill / Decode                                │
└─────────────────────────────────────────────────────────┘

二、已有实现方案梳理

2.1 显式优先级机制

实现方式: 基于 req.priority 整数值的显式优先级

关键代码位置:

  • 优先级验证: scheduler.py:1658 (_set_or_validate_priority)
  • 优先级排序: schedule_policy.py:276 (_sort_by_priority_and_fcfs)
  • 抢占逻辑: scheduler.py:1668 (preempt_to_schedule)

实现细节:

1
2
3
4
5
6
7
8
9
# scheduler.py:1658 - 优先级设置或验证
def _set_or_validate_priority(self, req: Req) -> bool:
    if self.enable_priority_scheduling and req.priority is None:
        # 设置默认优先级
        if self.schedule_low_priority_values_first:
            req.priority = sys.maxsize  # 默认最低优先级
        else:
            req.priority = -sys.maxsize - 1  # 默认最高优先级
    return True
1
2
3
4
5
6
7
8
9
# schedule_policy.py:276 - 优先级排序算法
@staticmethod
def _sort_by_priority_and_fcfs(waiting_queue: List[Req], priority_sign: int):
    waiting_queue.sort(
        key=lambda x: (
            x.priority * priority_sign,      # 主排序:优先级
            x.time_stats.wait_queue_entry_time,  # 次排序:到达时间(FCFS)
        )
    )

抢占机制:

1
2
3
4
5
6
7
8
9
10
11
# scheduler.py:1668 - 抢占调度
def preempt_to_schedule(self, req: Req, server_args: ServerArgs) -> bool:
    """抢占正在运行的低优先级请求"""
    priority_diff = (req.priority - running_req.priority) * (-priority_sign)

    if priority_diff > self.priority_scheduling_preemption_threshold:
        # 执行抢占:将低优先级请求移回等待队列
        self.running_batch.remove_request(running_req)
        self.waiting_queue.append(running_req)
        return True
    return False

2.2 隐式优先级机制

2.2.1 调度策略隐含的优先级

策略类型 (schedule_policy.py):

  1. FCFS (First Come First Serve) - 默认策略
    • 隐式优先级: 请求到达时间
    • 适用场景: 公平性要求高的场景
  2. LPM (Longest Prefix Match)
    • 隐式优先级: 前缀缓存命中长度
    • 适用场景: 高重复请求场景,最大化缓存利用率
  3. LOF (Longest Output First)
    • 隐式优先级: max_new_tokens
    • 适用场景: 避免长请求饥饿
  4. DFS_WEIGHT (Depth-First Search Weighting)
    • 隐式优先级: 基于 Radix Tree 的深度优先搜索权重
    • 核心思想: 优先处理缓存树中”热门”分支(请求密集的路径)
    • 适用场景: 复杂缓存优化、多层次请求场景、缓存分支不均衡的情况
    • 与 LPM 区别: LPM 只看单个请求的前缀匹配长度,DFS_WEIGHT 考虑整个缓存树的请求分布
  5. RANDOM
    • 隐式优先级: 无(随机)
    • 适用场景: 测试、基准测试

2.2.2 阶段隐含的优先级

Prefill vs Decode 阶段差异:

  • Prefill 阶段:
    • 内存受限,通常优先处理能快速完成的请求
    • 支持分块处理(chunked prefill)
    • max_prefill_tokens 限制
  • Decode 阶段:
    • 请求持续运行直到完成
    • 支持流式输出
    • max_total_tokens 限制

2.3 方案对比

机制 优先级依据 可配置性 适用场景 公平性
显式优先级 用户指定 priority 高,支持抢占 QoS 要求明确的场景 低,可能饥饿
FCFS 请求到达时间 低,默认策略 通用场景
LPM 前缀缓存长度 中,可配置 高重复请求 中,大队列降级为FCFS
DFS_WEIGHT 缓存树分支权重 中,可配置 复杂缓存层次、请求分布不均 中,偏向热门路径
LOF max_new_tokens 中,可配置 避免长请求饥饿
RANDOM 随机 低,可配置 测试

三、可配置参数与行为说明

3.1 优先级调度参数

参数名 默认值 CLI 参数 含义 影响范围
enable_priority_scheduling False --enable-priority-scheduling 启用优先级调度 全局调度行为
schedule_low_priority_values_first False --schedule-low-priority-values-first 低值优先(反转优先级方向) 优先级排序逻辑
priority_scheduling_preemption_threshold 10 --priority-scheduling-preemption-threshold 抢占阈值(值越大越保守) 抢占频率
abort_on_priority_when_disabled False --abort-on-priority-when-disabled 禁用优先级调度时拒绝带优先级的请求 请求验证

3.2 调度策略参数

参数名 默认值 可选值 CLI 参数 含义 影响范围
schedule_policy "fcfs" "lpm", "fcfs", "lof", "random", "dfs-weight" --schedule-policy 调度策略选择 全局调度行为

3.3 队列管理参数

参数名 默认值 CLI 参数 含义 影响范围
max_queued_requests None --max-queued-requests 等待队列最大长度 队列容量限制
max_running_requests None --max-running-requests 运行队列最大长度 并发请求数限制
max_total_tokens None --max-total-tokens 内存池最大 token 数 内存使用限制

3.4 批处理参数

参数名 默认值 CLI 参数 含义 影响范围
max_prefill_tokens 16384 --max-prefill-tokens Prefill batch 最大 token 数 Prefill batch 大小
prefill_max_requests None --prefill-max-requests Prefill batch 最大请求数 Prefill batch 大小
chunked_prefill_size None --chunked-prefill-size 分块 prefill 的 chunk 大小 长请求处理
enable_dynamic_chunking False --enable-dynamic-chunking 启用动态 chunk 大小 流水线并行优化

3.5 环境变量

环境变量 默认值 位置 含义
SGLANG_CLIP_MAX_NEW_TOKENS_ESTIMATION 4096 schedule_policy.py 限制 max_new_tokens 估计值
IN_BATCH_PREFIX_CACHING_CHECK_THRESHOLD 32 schedule_policy.py 批次内前缀缓存检查阈值
IN_BATCH_PREFIX_CACHING_DEPRIORITIZE_THRESHOLD 32 schedule_policy.py 前缀缓存去优先化阈值

3.6 硬编码行为(无法配置)

行为 代码位置 值/逻辑 影响
LPM 队列大小限制 schedule_policy.py:141-143 队列 > 128 时降级为 FCFS 大队列场景自动降级
NSA 上下文并行限制 schedule_policy.py:576-580 Prefill batch 限制为 1 NSA CP 模式批处理限制
忽略 EOS 保留 token schedule_policy.py:62 固定保留 1 token 内存计算精度
优先级符号计算 schedule_policy.py:96 固定逻辑 优先级方向控制

四、结合代码的详细解析

4.1 关键代码位置

文件路径 核心类/函数 行号 功能描述
python/sglang/srt/managers/scheduler.py Scheduler 235 主调度器类
python/sglang/srt/managers/schedule_policy.py SchedulePolicy 63 调度策略实现
python/sglang/srt/managers/schedule_policy.py PrefillAdder 362 Prefill batch 构建
python/sglang/srt/managers/schedule_batch.py ScheduleBatch - 批次数据结构
python/sglang/srt/server_args.py ServerArgs - 配置参数定义

4.2 请求生命周期完整流程

4.2.1 请求进入系统

入口: scheduler.py:1642 (_add_request_to_queue)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _add_request_to_queue(self, req: Req):
    # 1. 验证或设置优先级
    if not self._set_or_validate_priority(req):
        return False

    # 2. 检查队列限制
    if self._abort_on_queued_limit(req):
        return False

    # 3. 预取 KV Cache(可选)
    self._prefetch_kvcache(req)

    # 4. 加入等待队列
    self.waiting_queue.append(req)

    # 5. 记录入队时间
    req.time_stats.wait_queue_entry_time = time.perf_counter()

    return True

4.2.2 请求排队与调度策略计算

位置: schedule_policy.py:140-200 (calc_priority)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def calc_priority(self, waiting_queue: List[Req]) -> bool:
    # 1. 确定活动策略(处理大队列降级)
    active_policy = self._determine_active_policy(waiting_queue)

    # 2. 应用调度策略
    if active_policy == CacheAgnosticPolicy.FCFS:
        if self.enable_priority_scheduling:
            # 按优先级 + FCFS 排序
            SchedulePolicy._sort_by_priority_and_fcfs(
                waiting_queue, self.priority_sign
            )
        # 否则保持 FCFS(已按入队时间排序)

    elif active_policy == CacheAwarePolicy.LPM:
        # 计算每个请求的前缀匹配长度
        for req in waiting_queue:
            req.prefix_len = self._calc_prefix_match_len(req)
        # 按前缀长度降序排序
        waiting_queue.sort(key=lambda x: -x.prefix_len)

    elif active_policy == CacheAwarePolicy.DFS_WEIGHT:
        # 按深度优先搜索权重排序
        SchedulePolicy._sort_by_dfs_weight(waiting_queue, self.tree_cache)

    elif active_policy == CacheAgnosticPolicy.LOF:
        # 按 max_new_tokens 降序排序
        waiting_queue.sort(key=lambda x: -x.max_new_tokens)

    elif active_policy == CacheAgnosticPolicy.RANDOM:
        # 随机打乱
        random.shuffle(waiting_queue)

    return True

大队列降级逻辑 (schedule_policy.py:140-144):

1
2
3
4
5
def _determine_active_policy(self, waiting_queue: List[Req]) -> Policy:
    if self.policy == CacheAwarePolicy.LPM and len(waiting_queue) > 128:
        # LPM 在大队列时计算开销过大,降级为 FCFS
        return CacheAgnosticPolicy.FCFS
    return self.policy

4.2.2.1 DFS_WEIGHT 策略详解

位置: schedule_policy.py:233-252 (_sort_by_dfs_weight)

核心思想: DFS_WEIGHT 策略通过分析 Radix Tree 中每个节点的请求分布,优先调度”热门”路径上的请求。这种方法利用缓存层次结构,最大化缓存复用率。

实现步骤:

步骤 1: 建立节点到请求的映射 (schedule_policy.py:237-239)

1
2
3
4
5
6
@staticmethod
def _sort_by_dfs_weight(waiting_queue: List[Req], tree_cache: BasePrefixCache) -> None:
    """基于深度优先搜索权重排序等待队列"""
    last_node_to_reqs = defaultdict(list)
    for req in waiting_queue:
        last_node_to_reqs[req.last_node].append(req)

步骤 2: 计算每个节点的权重 (schedule_policy.py:241-244)

1
2
3
4
node_to_weight = defaultdict(int)
for node in last_node_to_reqs:
    node_to_weight[node] = len(last_node_to_reqs[node])
SchedulePolicy._calc_weight(tree_cache.root_node, node_to_weight)

步骤 3: 递归计算子树权重 (schedule_policy.py:289-292)

1
2
3
4
5
6
@staticmethod
def _calc_weight(cur_node: TreeNode, node_to_weight: Dict[TreeNode, int]) -> None:
    """递归计算每个节点的权重(自身请求数 + 所有子节点的请求数)"""
    for child in cur_node.children.values():
        SchedulePolicy._calc_weight(child, node_to_weight)
        node_to_weight[cur_node] += node_to_weight[child]

权重计算示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Radix Tree 结构:
           Root [权重: 10]
          /    \
        A[6]   B[4]
       /  \     \
     C[4] D[2]   E[4]
                /  \
              F[2]  G[2]

等待队列分布:
- 节点 C: 4 个请求 (prefix: "A-C")
- 节点 D: 2 个请求 (prefix: "A-D")
- 节点 F: 2 个请求 (prefix: "B-E-F")
- 节点 G: 2 个请求 (prefix: "B-E-G")

权重计算:
- 节点 C 权重 = 4
- 节点 D 权重 = 2
- 节点 A 权重 = 4 + 2 = 6
- 节点 F 权重 = 2
- 节点 G 权重 = 2
- 节点 E 权重 = 2 + 2 = 4
- 节点 B 权重 = 4
- Root 权重 = 6 + 4 = 10

步骤 4: 按权重深度优先遍历 (schedule_policy.py:295-307)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@staticmethod
def _get_dfs_priority(
    cur_node: TreeNode,
    node_to_priority: Dict[TreeNode, int],
    last_node_to_reqs: Dict[TreeNode, List[Req]],
    q: List,
) -> None:
    """按权重深度优先遍历,优先处理权重大的分支"""
    childs = [child for child in cur_node.children.values()]
    childs.sort(key=lambda x: -node_to_priority[x])  # 子节点按权重降序

    for child in childs:
        SchedulePolicy._get_dfs_priority(child, node_to_priority, last_node_to_reqs, q)

    q.extend(last_node_to_reqs[cur_node])  # 叶子节点的请求加入队列

调度结果 (按上述示例):

1
2
3
4
5
6
7
8
9
10
11
调度顺序:
1. 节点 A 分支 (权重 6) > 节点 B 分支 (权重 4)
   → 先处理 "A-C" 的 4 个请求
   → 再处理 "A-D" 的 2 个请求
2. 然后处理节点 B 分支
   → 先处理 "B-E-F" 的 2 个请求
   → 再处理 "B-E-G" 的 2 个请求

最终顺序:
[A-C-req1, A-C-req2, A-C-req3, A-C-req4, A-D-req1, A-D-req2,
 B-E-F-req1, B-E-F-req2, B-E-G-req1, B-E-G-req2]

DFS_WEIGHT vs LPM 对比:

特性 LPM DFS_WEIGHT
排序依据 单个请求的前缀匹配长度 整个缓存树的请求分布
缓存视角 局部:单个请求的缓存命中 全局:缓存分支的热度
适用场景 简单缓存场景 复杂层次结构、分支不均衡
计算复杂度 O(n log n) O(n + m log m),m 为节点数
缓存优化 优先处理长前缀 优先处理热门分支

实际应用场景:

  1. 多轮对话场景:

    1
    2
    3
    4
    5
    用户 A: [系统提示 + 对话历史(50轮) + 新问题]
    用户 B: [系统提示 + 对话历史(2轮) + 新问题]
    用户 C: [系统提示 + 对话历史(50轮) + 新问题,与A相似]
    
    DFS_WEIGHT 会优先调度 A 和 C(共享长历史缓存),再调度 B
  2. 多租户场景:

    1
    2
    3
    4
    租户 1: 100 个请求共享系统提示 A
    租户 2: 10 个请求共享系统提示 B
    
    DFS_WEIGHT 会优先调度租户 1 的请求(热门分支)

注意事项:

  • DFS_WEIGHT 也是 CacheAwarePolicy,需要启用 RadixCache
  • 不会自动降级为 FCFS(只有 LPM 有此降级逻辑)
  • 在大队列场景下可能比 LPM 开销更大

4.2.3 Scheduler 选择请求

位置: scheduler.py:1792 (get_next_batch_to_run)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def get_next_batch_to_run(self) -> Optional[ScheduleBatch]:
    # 主调度循环

    # 1. 合并已完成的 prefill batch 到 running batch
    if self.prefill_batch is not None:
        self._merge_prefill_to_running()

    # 2. 如果没有运行中的 batch 或需要新 prefill
    if self.running_batch is None or self.running_batch.can_add_new_req():
        # 获取新的 prefill batch
        prefill_batch = self.get_new_batch_prefill()

        if prefill_batch is not None:
            self.prefill_batch = prefill_batch
            return self.prefill_batch

    # 3. 否则运行 decode batch
    if self.running_batch is not None:
        return self.running_batch

    return None

4.2.4 构建 Prefill Batch

位置: schedule_policy.py:362 (PrefillAdder)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class PrefillAdder:
    def add_one_req(self, req: Req, has_chunked_req: bool,
                    truncation_align_size: Optional[int]) -> AddReqResult:
        """尝试添加一个请求到 prefill batch"""

        # 1. 检查内存预算
        result = self.budget_state()
        if result != AddReqResult.CONTINUE:
            return result

        # 2. 计算所需 token
        prefix_len = self.prefix_cache.match_prefix_len(req)
        input_tokens = req.req_input_indices.length()

        # 3. 检查是否需要分块 prefill
        if self.need_chunked_prefill(req, prefix_len, input_tokens):
            return self.handle_chunked_prefill(req, prefix_len, input_tokens)

        # 4. 检查 batch 大小限制
        if not self.can_add_more_requests():
            return AddReqResult.OTHER

        # 5. 添加到可运行列表
        self.can_run_list.append(req)

        # 6. 更新内存预算
        self._update_prefill_budget(prefix_len, input_tokens, req.max_new_tokens)

        return AddReqResult.CONTINUE

内存预算检查 (schedule_policy.py:430-436):

1
2
3
4
5
6
7
8
def budget_state(self) -> AddReqResult:
    if self.rem_total_tokens <= 0 or self.cur_rem_tokens <= 0:
        return AddReqResult.NO_TOKEN  # 总 token 预算不足

    if self.rem_input_tokens <= 0:
        return AddReqResult.OTHER  # 输入 token 预算不足

    return AddReqResult.CONTINUE  # 可以继续添加

4.2.5 Decode 阶段调度

特点:

  • Decode 阶段的请求持续在 running_batch 中运行
  • 不需要重新调度,直到请求完成或被抢占
  • 支持流式输出(基于 stream_interval

关键代码 (scheduler.py:728-729):

1
2
self.running_batch: Optional[ScheduleBatch] = None  # 当前运行的 decode batch
self.last_batch: Optional[ScheduleBatch] = None     # 上一个批次

4.3 优先级在不同阶段的表现

阶段 优先级作用方式 抢占支持 代码位置
Prefill 排序决定执行顺序 是(抢占 decode 请求) scheduler.py:1668
Decode 影响初始 batch 构成 是(可被抢占) scheduler.py:1668
等待队列 排序决定调度顺序 N/A schedule_policy.py:276

4.4 抢占机制详细流程

触发条件 (scheduler.py:1668):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def preempt_to_schedule(self, req: Req, server_args: ServerArgs) -> bool:
    """
    抢占正在运行的低优先级请求以服务高优先级新请求

    返回: True 如果成功抢占,False 否则
    """
    if not self.enable_priority_scheduling:
        return False

    if self.running_batch is None or len(self.running_batch.reqs) == 0:
        return False

    # 计算优先级差异
    priority_sign = -1 if self.schedule_low_priority_values_first else 1
    for running_req in self.running_batch.reqs:
        priority_diff = (req.priority - running_req.priority) * (-priority_sign)

        # 检查是否超过抢占阈值
        if priority_diff > self.priority_scheduling_preemption_threshold:
            # 执行抢占
            self.running_batch.remove_request(running_req)
            self.waiting_queue.append(running_req)
            running_req.time_stats.wait_queue_entry_time = time.perf_counter()
            return True

    return False

抢占参数说明:

  • priority_scheduling_preemption_threshold = 10:
    • 值越小,越容易触发抢占
    • 值越大,抢占越保守(避免频繁撤销请求)
    • 建议在频繁看到请求被撤销时增大此值

五、当前机制的局限性与潜在问题

5.1 已知问题

5.1.1 优先级反转

场景: 高优先级请求等待低优先级请求释放共享资源

当前缓解:

  • 抢占机制可以在一定程度上缓解
  • 但不能完全消除(受限于 preemption_threshold

5.1.2 饥饿问题

场景 1: 低优先级请求饥饿

  • 持续有高优先级请求到达时,低优先级请求可能长时间得不到服务
  • 当前无保护机制(如 aging、boosting)

场景 2: 短请求被长请求阻塞

  • 虽然 LOF 策略优先处理长请求,但可能导致短请求延迟
  • 当前无短请求优化策略

场景 3: LPM 策略的缓存冷启动问题

  • 新请求(无缓存命中)在 LPM 策略下可能被延迟
  • 大队列时自动降级为 FCFS,缓解此问题

5.1.3 大队列性能问题

问题:

  • LPM 策略在队列 > 128 时自动降级为 FCFS
  • 降级阈值硬编码,无法配置
  • DFS_WEIGHT 不会自动降级,在大队列时可能计算开销更大

影响:

  • 缓存优化效果减弱
  • 可能导致性能抖动
  • DFS_WEIGHT 在大队列时可能成为瓶颈(需要遍历整个缓存树)

建议:

  • 大队列场景(> 100 请求)优先使用 FCFS
  • 如果使用 DFS_WEIGHT,建议监控队列长度,必要时切换策略

5.2 高并发场景问题

5.2.1 内存压力下的调度退化

场景: GPU 内存不足时

表现:

  • 批次大小急剧减小
  • 吞吐量下降
  • 优先级可能失效(受内存预算限制)

代码位置: schedule_policy.py:430-436

5.2.2 抢占频繁导致性能抖动

场景:

  • 高优先级请求频繁到达
  • preemption_threshold 设置过小

表现:

  • 大量请求被撤销和重新调度
  • 缓存命中率下降
  • 端到端延迟增加

建议: 增大 priority_scheduling_preemption_threshold

5.3 Streaming 场景问题

5.3.1 流式请求的优先级处理

当前行为:

  • Streaming 请求在 decode 阶段持续占用资源
  • 一旦开始 streaming,优先级调整不再生效

潜在问题:

  • 高优先级请求可能需要等待长时间 streaming 完成
  • 缺乏中期优先级调整机制

5.3.2 Chunked Prefill 的优先级

当前行为:

  • 长请求分块 prefill 时,每个 chunk 独立调度
  • 可能与其他请求交错执行

潜在问题:

  • 长请求的实际执行顺序可能不符合预期优先级

5.4 Speculative Decucing 场景

当前状态:

  • SGLang 支持投机解码
  • 优先级调度与投机解码的交互未明确优化

潜在问题:

  • 投机解码的额外开销可能影响优先级调度的效果
  • Draft model 的调度与 main model 的调度可能不同步

5.5 硬编码限制

限制 影响 是否需要改进
LPM 降级阈值 (128) 大队列时缓存优化失效 ,应可配置
NSA CP batch size = 1 NSA CP 模式吞吐受限 ,未来版本可能支持
IGNORE_EOS_TOKENS = 1 内存计算精度 否,影响较小
优先级符号计算逻辑 优先级方向控制灵活性 ,可增加更多选项

六、总结与扩展建议

6.1 当前调度模型逻辑流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌─────────────────────────────────────────────────────────────┐
│  1. 请求到达 HTTP API Layer                                 │
│     ↓ 解析 priority 参数 (如果提供)                          │
├─────────────────────────────────────────────────────────────┤
│  2. Scheduler: _add_request_to_queue()                      │
│     ↓ 验证/设置优先级 → 检查队列限制 → 加入 waiting_queue    │
├─────────────────────────────────────────────────────────────┤
│  3. Schedule Policy: calc_priority()                        │
│     ↓ 应用调度策略 (LPM/FCFS/Priority/LOF/RANDOM)           │
│     ↓ 按策略对 waiting_queue 排序                            │
├─────────────────────────────────────────────────────────────┤
│  4. Scheduler: get_next_batch_to_run()                      │
│     ↓ 尝试抢占 (如果启用优先级调度)                          │
│     ↓ 调用 get_new_batch_prefill()                          │
├─────────────────────────────────────────────────────────────┤
│  5. PrefillAdder: add_one_req()                             │
│     ↓ 检查内存预算 → 检查 batch 限制 → 添加到 can_run_list  │
│     ↓ 构建 prefill_batch                                     │
├─────────────────────────────────────────────────────────────┤
│  6. Model Executor: 执行 Prefill                            │
│     ↓ 完成 prefill 后,请求移至 running_batch                │
├─────────────────────────────────────────────────────────────┤
│  7. Model Executor: 执行 Decode (持续直到完成)              │
│     ↓ 支持 streaming                                        │
│     ↓ 支持被高优先级请求抢占                                 │
├─────────────────────────────────────────────────────────────┤
│  8. 请求完成 → 返回结果 → 释放资源                          │
└─────────────────────────────────────────────────────────────┘

6.2 核心特性总结

特性 实现状态 评价
显式优先级调度 ✅ 完整支持 支持优先级设置、排序、抢占
多种调度策略 ✅ 支持 LPM、FCFS、LOF、RANDOM 等
抢占机制 ✅ 支持 可配置阈值,避免频繁抢占
公平性保证 ⚠️ 部分 FCFS 提供基本公平性,但无饥饿保护
QoS 支持 ⚠️ 基础 优先级调度 + 抢占,但无延迟 SLA
缓存优化 ✅ 优秀 LPM 策略 + RadixAttention
内存管理 ✅ 优秀 动态预算,分页对齐
流式支持 ✅ 支持 streaming + chunked prefill
可配置性 ⚠️ 中等 丰富参数,但部分行为硬编码

6.3 扩展建议

6.3.1 短期改进(易于实现)

  1. 可配置的 LPM 降级阈值
    • 当前硬编码为 128
    • 建议添加参数: --lpm-degrade-threshold
  2. 优先级老化机制
    • 防止低优先级请求饥饿
    • 实现: 随等待时间线性增加优先级
  3. 短请求优化
    • 添加调度策略: SHORT_FIRST
    • 基于 input_tokens + max_new_tokens 估计
  4. 优先级统计指标
    • 添加 Prometheus 指标:
      • sglang_priority_starvation_count
      • sglang_priority_preemption_count
      • sglang_priority_waiting_time_seconds

6.3.2 中期改进(需要架构调整)

  1. 多级优先级队列
    • 实现: 多个 waiting_queue(高/中/低优先级)
    • 调度: 加权轮转 (Weighted Round Robin)
  2. 延迟 SLA 支持
    • 添加参数: deadline_ms
    • 调度策略: EARLIEST_DEADLINE_FIRST
  3. 动态优先级调整
    • 基于: 等待时间、系统负载、用户类别
    • 实现: priority_adjustment_function hook
  4. Batch 内优先级
    • 当前: batch 内无优先级
    • 改进: batch 内按优先级排序,影响采样顺序

6.3.3 长期改进(需要重新设计)

  1. 完全抢占式调度
    • 支持请求暂停/恢复
    • 需要重新设计 checkpoint 机制
  2. 多租户隔离
    • 租户级别的资源配额
    • 租户级别的优先级策略
  3. 自适应调度
    • 基于历史数据自动选择调度策略
    • 机器学习驱动的调度优化
  4. 分布式优先级调度
    • 跨多个 worker 的全局优先级
    • 需要分布式协调服务

6.4 使用建议

场景 1: 在线服务(低延迟优先)

1
2
3
4
5
6
python -m sglang serve \
    --schedule-policy fcfs \
    --enable-priority-scheduling \
    --schedule-low-priority-values-first \
    --max-prefill-tokens 8192 \
    --prefill-max-requests 32

场景 2: 批处理(高吞吐优先)

1
2
3
4
python -m sglang serve \
    --schedule-policy lpm \
    --max-prefill-tokens 32768 \
    --max-total-tokens 1000000

场景 3: 混合工作负载

1
2
3
4
5
6
python -m sglang serve \
    --schedule-policy fcfs \
    --enable-priority-scheduling \
    --priority-scheduling-preemption-threshold 20 \
    --chunked-prefill-size 4096 \
    --enable-dynamic-chunking

附录 A: 配置示例

A.1 启用优先级调度(高值优先)

1
2
3
4
python -m sglang serve \
    --model-path meta-llama/Llama-3.1-8B-Instruct \
    --enable-priority-scheduling \
    --priority-scheduling-preemption-threshold 10

行为:

  • priority 值越大,优先级越高
  • priority 值相差 > 10 时触发抢占

A.2 启用优先级调度(低值优先)

1
2
3
4
python -m sglang serve \
    --model-path meta-llama/Llama-3.1-8B-Instruct \
    --enable-priority-scheduling \
    --schedule-low-priority-values-first

行为:

  • priority 值越小,优先级越高
  • 适合 0 = 最高优先级的语义

A.3 使用 LPM 策略

1
2
3
python -m sglang serve \
    --model-path meta-llama/Llama-3.1-8B-Instruct \
    --schedule-policy lpm

行为:

  • 优先调度与前缀缓存匹配度高的请求
  • 队列 > 128 时自动降级为 FCFS

A.4 使用 DFS_WEIGHT 策略

1
2
3
python -m sglang serve \
    --model-path meta-llama/Llama-3.1-8B-Instruct \
    --schedule-policy dfs-weight

行为:

  • 基于缓存树的深度优先搜索权重调度
  • 优先处理热门分支(请求密集的缓存路径)
  • 最大化缓存复用率,适合复杂缓存层次结构
  • 不会自动降级为 FCFS(与 LPM 不同)

典型应用场景:

  1. 多轮对话服务: 用户共享长对话历史
  2. 多租户系统: 不同租户使用不同的系统提示词
  3. 复杂 prompt 模板: 多个请求共享大量 prompt 前缀

附录 B: API 使用示例

B.1 OpenAI API(指定优先级)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from openai import OpenAI

client = OpenAI(
    base_url="http://localhost:8000/v1",
    api_key="EMPTY"
)

response = client.chat.completions.create(
    model="meta-llama/Llama-3.1-8B-Instruct",
    messages=[{"role": "user", "content": "Hello!"}],
    extra_body={
        "priority": 100  # 高优先级
    }
)

print(response.choices[0].message.content)

B.2 SGLang 原生 API(指定优先级)

1
2
3
4
5
6
7
8
9
10
11
import sglang as sgl

@sgl.function
def hello(s):
    s += sgl.user("Hello!")
    s += sgl.assistant(sgl.gen("response"))

result = hello.run(
    model_path="meta-llama/Llama-3.1-8B-Instruct",
    priority=100  # 高优先级
)

附录 C: 关键代码索引

功能 文件路径 类/函数 行号
主调度器 scheduler.py Scheduler 235
优先级验证 scheduler.py _set_or_validate_priority 1658
抢占逻辑 scheduler.py preempt_to_schedule 1668
调度策略 schedule_policy.py SchedulePolicy 63
优先级排序 schedule_policy.py _sort_by_priority_and_fcfs 276
DFS_WEIGHT 排序 schedule_policy.py _sort_by_dfs_weight 233
DFS_WEIGHT 权重计算 schedule_policy.py _calc_weight 289
DFS_WEIGHT 优先级遍历 schedule_policy.py _get_dfs_priority 295
Prefill Adder schedule_policy.py PrefillAdder 362
内存预算检查 schedule_policy.py budget_state 430
LPM 降级逻辑 schedule_policy.py _determine_active_policy 140
配置参数定义 server_args.py ServerArgs -