scheduler调度

scheduler调度
gogongxt当前调度版本基于sglang tag 0.5.7
SGLang 请求优先级调度机制技术文档
文档概述
本文档系统性地分析了 SGLang (v0.5.7) 中请求优先级调度的实现机制,包括整体架构、配置参数、核心代码实现、以及当前机制的局限性。
一、整体概览
1.1 支持情况
SGLang 支持请求优先级调度,提供了灵活的调度策略和配置选项。
1.2 设计目标
- 延迟优化: 通过 FCFS、优先级调度等策略降低请求延迟
- 吞吐优化: 通过 LPM、批处理、缓存感知策略提升系统吞吐量
- QoS 支持: 支持显式优先级调度,可抢占低优先级请求
- 公平性: 在优先级相同的情况下采用 FCFS 保证公平性
- 内存效率: 基于 RadixAttention 的前缀缓存,最大化内存利用率
1.3 调度层级
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
┌─────────────────────────────────────────────────────────┐
│ HTTP API Layer (OpenAI-compatible) │
│ ↓ 接收请求,解析优先级参数 │
├─────────────────────────────────────────────────────────┤
│ Scheduler Layer (scheduler.py) │
│ ↓ 验证优先级 → 加入等待队列 │
├─────────────────────────────────────────────────────────┤
│ Schedule Policy Layer (schedule_policy.py) │
│ ↓ 应用调度策略 (LPM/FCFS/Priority) → 排序 │
├─────────────────────────────────────────────────────────┤
│ Batch Formation Layer (PrefillAdder) │
│ ↓ 内存预算检查 → 选择请求 → 构建 batch │
├─────────────────────────────────────────────────────────┤
│ Execution Layer (Model Executor) │
│ ↓ 执行 Prefill / Decode │
└─────────────────────────────────────────────────────────┘二、已有实现方案梳理
2.1 显式优先级机制
实现方式: 基于 req.priority
整数值的显式优先级
关键代码位置:
- 优先级验证:
scheduler.py:1658(_set_or_validate_priority) - 优先级排序:
schedule_policy.py:276(_sort_by_priority_and_fcfs) - 抢占逻辑:
scheduler.py:1668(preempt_to_schedule)
实现细节:
1
2
3
4
5
6
7
8
9
# scheduler.py:1658 - 优先级设置或验证
def _set_or_validate_priority(self, req: Req) -> bool:
if self.enable_priority_scheduling and req.priority is None:
# 设置默认优先级
if self.schedule_low_priority_values_first:
req.priority = sys.maxsize # 默认最低优先级
else:
req.priority = -sys.maxsize - 1 # 默认最高优先级
return True1
2
3
4
5
6
7
8
9
# schedule_policy.py:276 - 优先级排序算法
@staticmethod
def _sort_by_priority_and_fcfs(waiting_queue: List[Req], priority_sign: int):
waiting_queue.sort(
key=lambda x: (
x.priority * priority_sign, # 主排序:优先级
x.time_stats.wait_queue_entry_time, # 次排序:到达时间(FCFS)
)
)抢占机制:
1
2
3
4
5
6
7
8
9
10
11
# scheduler.py:1668 - 抢占调度
def preempt_to_schedule(self, req: Req, server_args: ServerArgs) -> bool:
"""抢占正在运行的低优先级请求"""
priority_diff = (req.priority - running_req.priority) * (-priority_sign)
if priority_diff > self.priority_scheduling_preemption_threshold:
# 执行抢占:将低优先级请求移回等待队列
self.running_batch.remove_request(running_req)
self.waiting_queue.append(running_req)
return True
return False2.2 隐式优先级机制
2.2.1 调度策略隐含的优先级
策略类型 (schedule_policy.py):
- FCFS (First Come First Serve) - 默认策略
- 隐式优先级: 请求到达时间
- 适用场景: 公平性要求高的场景
- LPM (Longest Prefix Match)
- 隐式优先级: 前缀缓存命中长度
- 适用场景: 高重复请求场景,最大化缓存利用率
- LOF (Longest Output First)
- 隐式优先级:
max_new_tokens值 - 适用场景: 避免长请求饥饿
- 隐式优先级:
- DFS_WEIGHT (Depth-First Search Weighting)
- 隐式优先级: 基于 Radix Tree 的深度优先搜索权重
- 核心思想: 优先处理缓存树中”热门”分支(请求密集的路径)
- 适用场景: 复杂缓存优化、多层次请求场景、缓存分支不均衡的情况
- 与 LPM 区别: LPM 只看单个请求的前缀匹配长度,DFS_WEIGHT 考虑整个缓存树的请求分布
- RANDOM
- 隐式优先级: 无(随机)
- 适用场景: 测试、基准测试
2.2.2 阶段隐含的优先级
Prefill vs Decode 阶段差异:
- Prefill 阶段:
- 内存受限,通常优先处理能快速完成的请求
- 支持分块处理(chunked prefill)
- 受
max_prefill_tokens限制
- Decode 阶段:
- 请求持续运行直到完成
- 支持流式输出
- 受
max_total_tokens限制
2.3 方案对比
| 机制 | 优先级依据 | 可配置性 | 适用场景 | 公平性 |
|---|---|---|---|---|
| 显式优先级 | 用户指定 priority 值 |
高,支持抢占 | QoS 要求明确的场景 | 低,可能饥饿 |
| FCFS | 请求到达时间 | 低,默认策略 | 通用场景 | 高 |
| LPM | 前缀缓存长度 | 中,可配置 | 高重复请求 | 中,大队列降级为FCFS |
| DFS_WEIGHT | 缓存树分支权重 | 中,可配置 | 复杂缓存层次、请求分布不均 | 中,偏向热门路径 |
| LOF | max_new_tokens |
中,可配置 | 避免长请求饥饿 | 中 |
| RANDOM | 随机 | 低,可配置 | 测试 | 低 |
三、可配置参数与行为说明
3.1 优先级调度参数
| 参数名 | 默认值 | CLI 参数 | 含义 | 影响范围 |
|---|---|---|---|---|
enable_priority_scheduling |
False |
--enable-priority-scheduling |
启用优先级调度 | 全局调度行为 |
schedule_low_priority_values_first |
False |
--schedule-low-priority-values-first |
低值优先(反转优先级方向) | 优先级排序逻辑 |
priority_scheduling_preemption_threshold |
10 |
--priority-scheduling-preemption-threshold |
抢占阈值(值越大越保守) | 抢占频率 |
abort_on_priority_when_disabled |
False |
--abort-on-priority-when-disabled |
禁用优先级调度时拒绝带优先级的请求 | 请求验证 |
3.2 调度策略参数
| 参数名 | 默认值 | 可选值 | CLI 参数 | 含义 | 影响范围 |
|---|---|---|---|---|---|
schedule_policy |
"fcfs" |
"lpm", "fcfs", "lof",
"random", "dfs-weight" |
--schedule-policy |
调度策略选择 | 全局调度行为 |
3.3 队列管理参数
| 参数名 | 默认值 | CLI 参数 | 含义 | 影响范围 |
|---|---|---|---|---|
max_queued_requests |
None |
--max-queued-requests |
等待队列最大长度 | 队列容量限制 |
max_running_requests |
None |
--max-running-requests |
运行队列最大长度 | 并发请求数限制 |
max_total_tokens |
None |
--max-total-tokens |
内存池最大 token 数 | 内存使用限制 |
3.4 批处理参数
| 参数名 | 默认值 | CLI 参数 | 含义 | 影响范围 |
|---|---|---|---|---|
max_prefill_tokens |
16384 |
--max-prefill-tokens |
Prefill batch 最大 token 数 | Prefill batch 大小 |
prefill_max_requests |
None |
--prefill-max-requests |
Prefill batch 最大请求数 | Prefill batch 大小 |
chunked_prefill_size |
None |
--chunked-prefill-size |
分块 prefill 的 chunk 大小 | 长请求处理 |
enable_dynamic_chunking |
False |
--enable-dynamic-chunking |
启用动态 chunk 大小 | 流水线并行优化 |
3.5 环境变量
| 环境变量 | 默认值 | 位置 | 含义 |
|---|---|---|---|
SGLANG_CLIP_MAX_NEW_TOKENS_ESTIMATION |
4096 |
schedule_policy.py |
限制 max_new_tokens 估计值 |
IN_BATCH_PREFIX_CACHING_CHECK_THRESHOLD |
32 |
schedule_policy.py |
批次内前缀缓存检查阈值 |
IN_BATCH_PREFIX_CACHING_DEPRIORITIZE_THRESHOLD |
32 |
schedule_policy.py |
前缀缓存去优先化阈值 |
3.6 硬编码行为(无法配置)
| 行为 | 代码位置 | 值/逻辑 | 影响 |
|---|---|---|---|
| LPM 队列大小限制 | schedule_policy.py:141-143 |
队列 > 128 时降级为 FCFS | 大队列场景自动降级 |
| NSA 上下文并行限制 | schedule_policy.py:576-580 |
Prefill batch 限制为 1 | NSA CP 模式批处理限制 |
| 忽略 EOS 保留 token | schedule_policy.py:62 |
固定保留 1 token | 内存计算精度 |
| 优先级符号计算 | schedule_policy.py:96 |
固定逻辑 | 优先级方向控制 |
四、结合代码的详细解析
4.1 关键代码位置
| 文件路径 | 核心类/函数 | 行号 | 功能描述 |
|---|---|---|---|
python/sglang/srt/managers/scheduler.py |
Scheduler |
235 | 主调度器类 |
python/sglang/srt/managers/schedule_policy.py |
SchedulePolicy |
63 | 调度策略实现 |
python/sglang/srt/managers/schedule_policy.py |
PrefillAdder |
362 | Prefill batch 构建 |
python/sglang/srt/managers/schedule_batch.py |
ScheduleBatch |
- | 批次数据结构 |
python/sglang/srt/server_args.py |
ServerArgs |
- | 配置参数定义 |
4.2 请求生命周期完整流程
4.2.1 请求进入系统
入口: scheduler.py:1642
(_add_request_to_queue)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def _add_request_to_queue(self, req: Req):
# 1. 验证或设置优先级
if not self._set_or_validate_priority(req):
return False
# 2. 检查队列限制
if self._abort_on_queued_limit(req):
return False
# 3. 预取 KV Cache(可选)
self._prefetch_kvcache(req)
# 4. 加入等待队列
self.waiting_queue.append(req)
# 5. 记录入队时间
req.time_stats.wait_queue_entry_time = time.perf_counter()
return True4.2.2 请求排队与调度策略计算
位置: schedule_policy.py:140-200
(calc_priority)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def calc_priority(self, waiting_queue: List[Req]) -> bool:
# 1. 确定活动策略(处理大队列降级)
active_policy = self._determine_active_policy(waiting_queue)
# 2. 应用调度策略
if active_policy == CacheAgnosticPolicy.FCFS:
if self.enable_priority_scheduling:
# 按优先级 + FCFS 排序
SchedulePolicy._sort_by_priority_and_fcfs(
waiting_queue, self.priority_sign
)
# 否则保持 FCFS(已按入队时间排序)
elif active_policy == CacheAwarePolicy.LPM:
# 计算每个请求的前缀匹配长度
for req in waiting_queue:
req.prefix_len = self._calc_prefix_match_len(req)
# 按前缀长度降序排序
waiting_queue.sort(key=lambda x: -x.prefix_len)
elif active_policy == CacheAwarePolicy.DFS_WEIGHT:
# 按深度优先搜索权重排序
SchedulePolicy._sort_by_dfs_weight(waiting_queue, self.tree_cache)
elif active_policy == CacheAgnosticPolicy.LOF:
# 按 max_new_tokens 降序排序
waiting_queue.sort(key=lambda x: -x.max_new_tokens)
elif active_policy == CacheAgnosticPolicy.RANDOM:
# 随机打乱
random.shuffle(waiting_queue)
return True大队列降级逻辑
(schedule_policy.py:140-144):
1
2
3
4
5
def _determine_active_policy(self, waiting_queue: List[Req]) -> Policy:
if self.policy == CacheAwarePolicy.LPM and len(waiting_queue) > 128:
# LPM 在大队列时计算开销过大,降级为 FCFS
return CacheAgnosticPolicy.FCFS
return self.policy4.2.2.1 DFS_WEIGHT 策略详解
位置: schedule_policy.py:233-252
(_sort_by_dfs_weight)
核心思想: DFS_WEIGHT 策略通过分析 Radix Tree 中每个节点的请求分布,优先调度”热门”路径上的请求。这种方法利用缓存层次结构,最大化缓存复用率。
实现步骤:
步骤 1: 建立节点到请求的映射
(schedule_policy.py:237-239)
1
2
3
4
5
6
@staticmethod
def _sort_by_dfs_weight(waiting_queue: List[Req], tree_cache: BasePrefixCache) -> None:
"""基于深度优先搜索权重排序等待队列"""
last_node_to_reqs = defaultdict(list)
for req in waiting_queue:
last_node_to_reqs[req.last_node].append(req)步骤 2: 计算每个节点的权重
(schedule_policy.py:241-244)
1
2
3
4
node_to_weight = defaultdict(int)
for node in last_node_to_reqs:
node_to_weight[node] = len(last_node_to_reqs[node])
SchedulePolicy._calc_weight(tree_cache.root_node, node_to_weight)步骤 3: 递归计算子树权重
(schedule_policy.py:289-292)
1
2
3
4
5
6
@staticmethod
def _calc_weight(cur_node: TreeNode, node_to_weight: Dict[TreeNode, int]) -> None:
"""递归计算每个节点的权重(自身请求数 + 所有子节点的请求数)"""
for child in cur_node.children.values():
SchedulePolicy._calc_weight(child, node_to_weight)
node_to_weight[cur_node] += node_to_weight[child]权重计算示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Radix Tree 结构:
Root [权重: 10]
/ \
A[6] B[4]
/ \ \
C[4] D[2] E[4]
/ \
F[2] G[2]
等待队列分布:
- 节点 C: 4 个请求 (prefix: "A-C")
- 节点 D: 2 个请求 (prefix: "A-D")
- 节点 F: 2 个请求 (prefix: "B-E-F")
- 节点 G: 2 个请求 (prefix: "B-E-G")
权重计算:
- 节点 C 权重 = 4
- 节点 D 权重 = 2
- 节点 A 权重 = 4 + 2 = 6
- 节点 F 权重 = 2
- 节点 G 权重 = 2
- 节点 E 权重 = 2 + 2 = 4
- 节点 B 权重 = 4
- Root 权重 = 6 + 4 = 10步骤 4: 按权重深度优先遍历
(schedule_policy.py:295-307)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@staticmethod
def _get_dfs_priority(
cur_node: TreeNode,
node_to_priority: Dict[TreeNode, int],
last_node_to_reqs: Dict[TreeNode, List[Req]],
q: List,
) -> None:
"""按权重深度优先遍历,优先处理权重大的分支"""
childs = [child for child in cur_node.children.values()]
childs.sort(key=lambda x: -node_to_priority[x]) # 子节点按权重降序
for child in childs:
SchedulePolicy._get_dfs_priority(child, node_to_priority, last_node_to_reqs, q)
q.extend(last_node_to_reqs[cur_node]) # 叶子节点的请求加入队列调度结果 (按上述示例):
1
2
3
4
5
6
7
8
9
10
11
调度顺序:
1. 节点 A 分支 (权重 6) > 节点 B 分支 (权重 4)
→ 先处理 "A-C" 的 4 个请求
→ 再处理 "A-D" 的 2 个请求
2. 然后处理节点 B 分支
→ 先处理 "B-E-F" 的 2 个请求
→ 再处理 "B-E-G" 的 2 个请求
最终顺序:
[A-C-req1, A-C-req2, A-C-req3, A-C-req4, A-D-req1, A-D-req2,
B-E-F-req1, B-E-F-req2, B-E-G-req1, B-E-G-req2]DFS_WEIGHT vs LPM 对比:
| 特性 | LPM | DFS_WEIGHT |
|---|---|---|
| 排序依据 | 单个请求的前缀匹配长度 | 整个缓存树的请求分布 |
| 缓存视角 | 局部:单个请求的缓存命中 | 全局:缓存分支的热度 |
| 适用场景 | 简单缓存场景 | 复杂层次结构、分支不均衡 |
| 计算复杂度 | O(n log n) | O(n + m log m),m 为节点数 |
| 缓存优化 | 优先处理长前缀 | 优先处理热门分支 |
实际应用场景:
多轮对话场景:
1
2
3
4
5用户 A: [系统提示 + 对话历史(50轮) + 新问题] 用户 B: [系统提示 + 对话历史(2轮) + 新问题] 用户 C: [系统提示 + 对话历史(50轮) + 新问题,与A相似] DFS_WEIGHT 会优先调度 A 和 C(共享长历史缓存),再调度 B多租户场景:
1
2
3
4租户 1: 100 个请求共享系统提示 A 租户 2: 10 个请求共享系统提示 B DFS_WEIGHT 会优先调度租户 1 的请求(热门分支)
注意事项:
- DFS_WEIGHT 也是 CacheAwarePolicy,需要启用 RadixCache
- 不会自动降级为 FCFS(只有 LPM 有此降级逻辑)
- 在大队列场景下可能比 LPM 开销更大
4.2.3 Scheduler 选择请求
位置: scheduler.py:1792
(get_next_batch_to_run)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def get_next_batch_to_run(self) -> Optional[ScheduleBatch]:
# 主调度循环
# 1. 合并已完成的 prefill batch 到 running batch
if self.prefill_batch is not None:
self._merge_prefill_to_running()
# 2. 如果没有运行中的 batch 或需要新 prefill
if self.running_batch is None or self.running_batch.can_add_new_req():
# 获取新的 prefill batch
prefill_batch = self.get_new_batch_prefill()
if prefill_batch is not None:
self.prefill_batch = prefill_batch
return self.prefill_batch
# 3. 否则运行 decode batch
if self.running_batch is not None:
return self.running_batch
return None4.2.4 构建 Prefill Batch
位置: schedule_policy.py:362
(PrefillAdder)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class PrefillAdder:
def add_one_req(self, req: Req, has_chunked_req: bool,
truncation_align_size: Optional[int]) -> AddReqResult:
"""尝试添加一个请求到 prefill batch"""
# 1. 检查内存预算
result = self.budget_state()
if result != AddReqResult.CONTINUE:
return result
# 2. 计算所需 token
prefix_len = self.prefix_cache.match_prefix_len(req)
input_tokens = req.req_input_indices.length()
# 3. 检查是否需要分块 prefill
if self.need_chunked_prefill(req, prefix_len, input_tokens):
return self.handle_chunked_prefill(req, prefix_len, input_tokens)
# 4. 检查 batch 大小限制
if not self.can_add_more_requests():
return AddReqResult.OTHER
# 5. 添加到可运行列表
self.can_run_list.append(req)
# 6. 更新内存预算
self._update_prefill_budget(prefix_len, input_tokens, req.max_new_tokens)
return AddReqResult.CONTINUE内存预算检查
(schedule_policy.py:430-436):
1
2
3
4
5
6
7
8
def budget_state(self) -> AddReqResult:
if self.rem_total_tokens <= 0 or self.cur_rem_tokens <= 0:
return AddReqResult.NO_TOKEN # 总 token 预算不足
if self.rem_input_tokens <= 0:
return AddReqResult.OTHER # 输入 token 预算不足
return AddReqResult.CONTINUE # 可以继续添加4.2.5 Decode 阶段调度
特点:
- Decode 阶段的请求持续在
running_batch中运行 - 不需要重新调度,直到请求完成或被抢占
- 支持流式输出(基于
stream_interval)
关键代码 (scheduler.py:728-729):
1
2
self.running_batch: Optional[ScheduleBatch] = None # 当前运行的 decode batch
self.last_batch: Optional[ScheduleBatch] = None # 上一个批次4.3 优先级在不同阶段的表现
| 阶段 | 优先级作用方式 | 抢占支持 | 代码位置 |
|---|---|---|---|
| Prefill | 排序决定执行顺序 | 是(抢占 decode 请求) | scheduler.py:1668 |
| Decode | 影响初始 batch 构成 | 是(可被抢占) | scheduler.py:1668 |
| 等待队列 | 排序决定调度顺序 | N/A | schedule_policy.py:276 |
4.4 抢占机制详细流程
触发条件 (scheduler.py:1668):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def preempt_to_schedule(self, req: Req, server_args: ServerArgs) -> bool:
"""
抢占正在运行的低优先级请求以服务高优先级新请求
返回: True 如果成功抢占,False 否则
"""
if not self.enable_priority_scheduling:
return False
if self.running_batch is None or len(self.running_batch.reqs) == 0:
return False
# 计算优先级差异
priority_sign = -1 if self.schedule_low_priority_values_first else 1
for running_req in self.running_batch.reqs:
priority_diff = (req.priority - running_req.priority) * (-priority_sign)
# 检查是否超过抢占阈值
if priority_diff > self.priority_scheduling_preemption_threshold:
# 执行抢占
self.running_batch.remove_request(running_req)
self.waiting_queue.append(running_req)
running_req.time_stats.wait_queue_entry_time = time.perf_counter()
return True
return False抢占参数说明:
priority_scheduling_preemption_threshold = 10:- 值越小,越容易触发抢占
- 值越大,抢占越保守(避免频繁撤销请求)
- 建议在频繁看到请求被撤销时增大此值
五、当前机制的局限性与潜在问题
5.1 已知问题
5.1.1 优先级反转
场景: 高优先级请求等待低优先级请求释放共享资源
当前缓解:
- 抢占机制可以在一定程度上缓解
- 但不能完全消除(受限于
preemption_threshold)
5.1.2 饥饿问题
场景 1: 低优先级请求饥饿
- 持续有高优先级请求到达时,低优先级请求可能长时间得不到服务
- 当前无保护机制(如 aging、boosting)
场景 2: 短请求被长请求阻塞
- 虽然 LOF 策略优先处理长请求,但可能导致短请求延迟
- 当前无短请求优化策略
场景 3: LPM 策略的缓存冷启动问题
- 新请求(无缓存命中)在 LPM 策略下可能被延迟
- 大队列时自动降级为 FCFS,缓解此问题
5.1.3 大队列性能问题
问题:
- LPM 策略在队列 > 128 时自动降级为 FCFS
- 降级阈值硬编码,无法配置
- DFS_WEIGHT 不会自动降级,在大队列时可能计算开销更大
影响:
- 缓存优化效果减弱
- 可能导致性能抖动
- DFS_WEIGHT 在大队列时可能成为瓶颈(需要遍历整个缓存树)
建议:
- 大队列场景(> 100 请求)优先使用 FCFS
- 如果使用 DFS_WEIGHT,建议监控队列长度,必要时切换策略
5.2 高并发场景问题
5.2.1 内存压力下的调度退化
场景: GPU 内存不足时
表现:
- 批次大小急剧减小
- 吞吐量下降
- 优先级可能失效(受内存预算限制)
代码位置:
schedule_policy.py:430-436
5.2.2 抢占频繁导致性能抖动
场景:
- 高优先级请求频繁到达
preemption_threshold设置过小
表现:
- 大量请求被撤销和重新调度
- 缓存命中率下降
- 端到端延迟增加
建议: 增大
priority_scheduling_preemption_threshold
5.3 Streaming 场景问题
5.3.1 流式请求的优先级处理
当前行为:
- Streaming 请求在 decode 阶段持续占用资源
- 一旦开始 streaming,优先级调整不再生效
潜在问题:
- 高优先级请求可能需要等待长时间 streaming 完成
- 缺乏中期优先级调整机制
5.3.2 Chunked Prefill 的优先级
当前行为:
- 长请求分块 prefill 时,每个 chunk 独立调度
- 可能与其他请求交错执行
潜在问题:
- 长请求的实际执行顺序可能不符合预期优先级
5.4 Speculative Decucing 场景
当前状态:
- SGLang 支持投机解码
- 但优先级调度与投机解码的交互未明确优化
潜在问题:
- 投机解码的额外开销可能影响优先级调度的效果
- Draft model 的调度与 main model 的调度可能不同步
5.5 硬编码限制
| 限制 | 影响 | 是否需要改进 |
|---|---|---|
| LPM 降级阈值 (128) | 大队列时缓存优化失效 | 是,应可配置 |
| NSA CP batch size = 1 | NSA CP 模式吞吐受限 | 是,未来版本可能支持 |
| IGNORE_EOS_TOKENS = 1 | 内存计算精度 | 否,影响较小 |
| 优先级符号计算逻辑 | 优先级方向控制灵活性 | 是,可增加更多选项 |
六、总结与扩展建议
6.1 当前调度模型逻辑流程图
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌─────────────────────────────────────────────────────────────┐
│ 1. 请求到达 HTTP API Layer │
│ ↓ 解析 priority 参数 (如果提供) │
├─────────────────────────────────────────────────────────────┤
│ 2. Scheduler: _add_request_to_queue() │
│ ↓ 验证/设置优先级 → 检查队列限制 → 加入 waiting_queue │
├─────────────────────────────────────────────────────────────┤
│ 3. Schedule Policy: calc_priority() │
│ ↓ 应用调度策略 (LPM/FCFS/Priority/LOF/RANDOM) │
│ ↓ 按策略对 waiting_queue 排序 │
├─────────────────────────────────────────────────────────────┤
│ 4. Scheduler: get_next_batch_to_run() │
│ ↓ 尝试抢占 (如果启用优先级调度) │
│ ↓ 调用 get_new_batch_prefill() │
├─────────────────────────────────────────────────────────────┤
│ 5. PrefillAdder: add_one_req() │
│ ↓ 检查内存预算 → 检查 batch 限制 → 添加到 can_run_list │
│ ↓ 构建 prefill_batch │
├─────────────────────────────────────────────────────────────┤
│ 6. Model Executor: 执行 Prefill │
│ ↓ 完成 prefill 后,请求移至 running_batch │
├─────────────────────────────────────────────────────────────┤
│ 7. Model Executor: 执行 Decode (持续直到完成) │
│ ↓ 支持 streaming │
│ ↓ 支持被高优先级请求抢占 │
├─────────────────────────────────────────────────────────────┤
│ 8. 请求完成 → 返回结果 → 释放资源 │
└─────────────────────────────────────────────────────────────┘6.2 核心特性总结
| 特性 | 实现状态 | 评价 |
|---|---|---|
| 显式优先级调度 | ✅ 完整支持 | 支持优先级设置、排序、抢占 |
| 多种调度策略 | ✅ 支持 | LPM、FCFS、LOF、RANDOM 等 |
| 抢占机制 | ✅ 支持 | 可配置阈值,避免频繁抢占 |
| 公平性保证 | ⚠️ 部分 | FCFS 提供基本公平性,但无饥饿保护 |
| QoS 支持 | ⚠️ 基础 | 优先级调度 + 抢占,但无延迟 SLA |
| 缓存优化 | ✅ 优秀 | LPM 策略 + RadixAttention |
| 内存管理 | ✅ 优秀 | 动态预算,分页对齐 |
| 流式支持 | ✅ 支持 | streaming + chunked prefill |
| 可配置性 | ⚠️ 中等 | 丰富参数,但部分行为硬编码 |
6.3 扩展建议
6.3.1 短期改进(易于实现)
- 可配置的 LPM 降级阈值
- 当前硬编码为 128
- 建议添加参数:
--lpm-degrade-threshold
- 优先级老化机制
- 防止低优先级请求饥饿
- 实现: 随等待时间线性增加优先级
- 短请求优化
- 添加调度策略:
SHORT_FIRST - 基于
input_tokens + max_new_tokens估计
- 添加调度策略:
- 优先级统计指标
- 添加 Prometheus 指标:
sglang_priority_starvation_countsglang_priority_preemption_countsglang_priority_waiting_time_seconds
- 添加 Prometheus 指标:
6.3.2 中期改进(需要架构调整)
- 多级优先级队列
- 实现: 多个 waiting_queue(高/中/低优先级)
- 调度: 加权轮转 (Weighted Round Robin)
- 延迟 SLA 支持
- 添加参数:
deadline_ms - 调度策略:
EARLIEST_DEADLINE_FIRST
- 添加参数:
- 动态优先级调整
- 基于: 等待时间、系统负载、用户类别
- 实现:
priority_adjustment_functionhook
- Batch 内优先级
- 当前: batch 内无优先级
- 改进: batch 内按优先级排序,影响采样顺序
6.3.3 长期改进(需要重新设计)
- 完全抢占式调度
- 支持请求暂停/恢复
- 需要重新设计 checkpoint 机制
- 多租户隔离
- 租户级别的资源配额
- 租户级别的优先级策略
- 自适应调度
- 基于历史数据自动选择调度策略
- 机器学习驱动的调度优化
- 分布式优先级调度
- 跨多个 worker 的全局优先级
- 需要分布式协调服务
6.4 使用建议
场景 1: 在线服务(低延迟优先)
1
2
3
4
5
6
python -m sglang serve \
--schedule-policy fcfs \
--enable-priority-scheduling \
--schedule-low-priority-values-first \
--max-prefill-tokens 8192 \
--prefill-max-requests 32场景 2: 批处理(高吞吐优先)
1
2
3
4
python -m sglang serve \
--schedule-policy lpm \
--max-prefill-tokens 32768 \
--max-total-tokens 1000000场景 3: 混合工作负载
1
2
3
4
5
6
python -m sglang serve \
--schedule-policy fcfs \
--enable-priority-scheduling \
--priority-scheduling-preemption-threshold 20 \
--chunked-prefill-size 4096 \
--enable-dynamic-chunking附录 A: 配置示例
A.1 启用优先级调度(高值优先)
1
2
3
4
python -m sglang serve \
--model-path meta-llama/Llama-3.1-8B-Instruct \
--enable-priority-scheduling \
--priority-scheduling-preemption-threshold 10行为:
- priority 值越大,优先级越高
- priority 值相差 > 10 时触发抢占
A.2 启用优先级调度(低值优先)
1
2
3
4
python -m sglang serve \
--model-path meta-llama/Llama-3.1-8B-Instruct \
--enable-priority-scheduling \
--schedule-low-priority-values-first行为:
- priority 值越小,优先级越高
- 适合 0 = 最高优先级的语义
A.3 使用 LPM 策略
1
2
3
python -m sglang serve \
--model-path meta-llama/Llama-3.1-8B-Instruct \
--schedule-policy lpm行为:
- 优先调度与前缀缓存匹配度高的请求
- 队列 > 128 时自动降级为 FCFS
A.4 使用 DFS_WEIGHT 策略
1
2
3
python -m sglang serve \
--model-path meta-llama/Llama-3.1-8B-Instruct \
--schedule-policy dfs-weight行为:
- 基于缓存树的深度优先搜索权重调度
- 优先处理热门分支(请求密集的缓存路径)
- 最大化缓存复用率,适合复杂缓存层次结构
- 不会自动降级为 FCFS(与 LPM 不同)
典型应用场景:
- 多轮对话服务: 用户共享长对话历史
- 多租户系统: 不同租户使用不同的系统提示词
- 复杂 prompt 模板: 多个请求共享大量 prompt 前缀
附录 B: API 使用示例
B.1 OpenAI API(指定优先级)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from openai import OpenAI
client = OpenAI(
base_url="http://localhost:8000/v1",
api_key="EMPTY"
)
response = client.chat.completions.create(
model="meta-llama/Llama-3.1-8B-Instruct",
messages=[{"role": "user", "content": "Hello!"}],
extra_body={
"priority": 100 # 高优先级
}
)
print(response.choices[0].message.content)B.2 SGLang 原生 API(指定优先级)
1
2
3
4
5
6
7
8
9
10
11
import sglang as sgl
@sgl.function
def hello(s):
s += sgl.user("Hello!")
s += sgl.assistant(sgl.gen("response"))
result = hello.run(
model_path="meta-llama/Llama-3.1-8B-Instruct",
priority=100 # 高优先级
)附录 C: 关键代码索引
| 功能 | 文件路径 | 类/函数 | 行号 |
|---|---|---|---|
| 主调度器 | scheduler.py |
Scheduler |
235 |
| 优先级验证 | scheduler.py |
_set_or_validate_priority |
1658 |
| 抢占逻辑 | scheduler.py |
preempt_to_schedule |
1668 |
| 调度策略 | schedule_policy.py |
SchedulePolicy |
63 |
| 优先级排序 | schedule_policy.py |
_sort_by_priority_and_fcfs |
276 |
| DFS_WEIGHT 排序 | schedule_policy.py |
_sort_by_dfs_weight |
233 |
| DFS_WEIGHT 权重计算 | schedule_policy.py |
_calc_weight |
289 |
| DFS_WEIGHT 优先级遍历 | schedule_policy.py |
_get_dfs_priority |
295 |
| Prefill Adder | schedule_policy.py |
PrefillAdder |
362 |
| 内存预算检查 | schedule_policy.py |
budget_state |
430 |
| LPM 降级逻辑 | schedule_policy.py |
_determine_active_policy |
140 |
| 配置参数定义 | server_args.py |
ServerArgs |
- |




