5-chunked-prefill

AI-摘要
User GPT
AI初始化中...
介绍自己 🙈
生成本文简介 👋
推荐相关文章 📖
前往主页 🏠
前往爱发电购买
5-chunked-prefill
gogongxt关于chunked prefill
NOTE
为什么要有chunked prefill?
- 减少prefill过程中的激活值占用空间,防止OOM,主要是
,这个矩阵乘的维度和序列长度有关,切成小块可以有效减少大小 - 另一个好处是在序列较长且混合时可以更好的利用单次推理的空余空间
在完整sglang的实现中,跟prefill分块的参数有两个:
--max-prefill-tokens单次forward能够跑的最大的长度(这个参数可以不管,这个算是历史遗留参数,在没有chunked-prefill-size时才有用)--chunked-prefill-size每个请求被chunk拆分的块长度(关键看这个参数,决定了prefill单个forward batch的tokens个数)
为什么要有这两个值:
- 第一个是限制系统总体prefill推理时的计算量,这个是可以有很多个req组成的(可以不关注)
- 第二个是为了把一个长请求拆分成更小的块,例如32K的请求拆分成4个8K的,这样在做prefill时可以防止激活值膨胀太大导致OOM。
在minisglang中的实现
然而在minisglang中的实现只有一个参数就是max_extend_tokens,这个参数和chunked_prefill_size一个意思
对标上面sglang参数其实也可以理解为
max_extend_tokens=chunked_prefill_size
下面代码中的prefill_budget就是固定参数max_extend_tokens
核心调度请求的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def schedule_next_batch(self, prefill_budget: int) -> Batch | None:
"""
调度下一个 prefill 批次
调度策略:
1. 如果 pending_list 为空,返回 None
2. 创建 PrefillAdder,初始化预算和预留空间
3. 遍历 pending_list,尝试添加请求
4. 如果某个请求无法添加,break(保证 FIFO 顺序)
5. 如果没有请求被添加,返回 None
6. 更新 pending_list:分块请求优先,然后是未处理的请求
7. 返回 Batch 对象
Args:
prefill_budget: 本批次可处理的 token 预算
Returns:
Batch 包含本批次要处理的请求列表,phase="prefill"
None 如果没有请求可以处理
"""
# 如果没有待处理请求,直接返回 None
if len(self.pending_list) == 0:
return None
# estimated offset due to in-flight decode
# 创建 PrefillAdder,传入:
# - token_budget: 本批次的 token 预算
# - reserved_sizc: 正在 decode 的 token 数量(已占用但未释放的资源)
# - cache_manager 和 table_manager: 资源管理器
adder = PrefillAdder(
token_budget=prefill_budget,
reserved_size=self.decode_manager.inflight_tokens,
cache_manager=self.cache_manager,
table_manager=self.table_manager,
)
# 本批次成功添加的请求列表
reqs: List[Req] = []
# 需要继续 prefill 的分块请求列表(将重新加入 pending_list)
chunked_list: List[PendingReq] = []
# 遍历待处理队列,尝试添加请求
for pending_req in self.pending_list:
if req := adder.try_add_one(pending_req):
# 清空上次的 chunked_req 引用(避免错误复用)
pending_req.chunked_req = None
# 检查是否是分块请求
if isinstance(req, ChunkedReq):
# 保存当前的 chunked_req,供下次调度使用
pending_req.chunked_req = req
# 加入分块列表,稍后重新入队(优先级高)
chunked_list.append(pending_req)
# 加入本批次执行列表
reqs.append(req)
else:
# 无法添加当前请求(预算耗尽或资源不足)
# break 而不是 continue:保证 FIFO 顺序,避免后面的请求"插队"
break # We cannot add more requests
# 如果没有请求被添加,返回 None
if len(reqs) == 0:
return None
# 更新 pending_list:
# 1. chunked_list: 需要继续 prefill 的请求(优先级高)
# 2. pending_list[len(reqs):]: 未被处理的请求(本批次没轮到的)
self.pending_list = chunked_list + self.pending_list[len(reqs):]
# 返回 Batch 对象,标记为 prefill 阶段
return Batch(reqs=reqs, phase="prefill")根据代码:
一个请求会根据当前批次的剩余空间判断是否切块,最后转成类型
Req或者是ChunkedReqReq表示当前是完整的请求ChunkedReq表示当前请求被切开来了,还有没有做prefill计算的部分,下一轮还得接着算
ChunkedReq会放在靠前处理,下一批优先把之前留下来的ChunkedReq进行处理。这个也很好理解,因为这一轮prefill已经处理请求序列前面一部分,那么下一轮prefill肯定希望优先把这个请求做完
最后处理完都会加入到reqs列表中,这个就是当前prefill轮要处理的tokens(肯定是不大于
max_extend_tokens的)
sequenceDiagram
participant P as Pending List (队列)
participant GPU as GPU Forward (Budget: 2000)
participant D as Decode Pool (完成区)
Note over P: 初始: [A(5k), B(0.5k), C(1.2k)]
rect rgb(232, 245, 233)
Note over GPU: Round 1: 处理 A 的前 2k
P->>GPU: Req-A (Chunk 1: 2000/5000)
Note over GPU: 预算 2000 - 2000 = 0 (满了)
GPU-->>P: A (ChunkedReq) 剩余 3000 回队首
end
Note over P: 状态: [A(剩3k), B(0.5k), C(1.2k)]
rect rgb(232, 245, 233)
Note over GPU: Round 2: 处理 A 的中间 2k
P->>GPU: Req-A (Chunk 2: 2000/3000)
Note over GPU: 预算 2000 - 2000 = 0 (满了)
GPU-->>P: A (ChunkedReq) 剩余 1000 回队首
end
Note over P: 状态: [A(剩1k), B(0.5k), C(1.2k)]
rect rgb(225, 245, 254)
Note over GPU: Round 3: 蚕食 A 剩余 + 填入 B + 部分 C
P->>GPU: Req-A (Final: 1000/1000)
P->>GPU: Req-B (Full: 500/500)
P->>GPU: Req-C (Chunk 1: 500/1200)
Note over GPU: 预算 2000 - 1000 - 500 - 500 = 0
GPU-->>D: Req-A (Req) 完成, Req-B (Req) 完成
GPU-->>P: Req-C (ChunkedReq) 剩余 700 回队首
end
Note over P: 状态: [C(剩0.7k)]
rect rgb(255, 243, 224)
Note over GPU: Round 4: 处理 C 剩余
P->>GPU: Req-C (Final: 700/700)
Note over GPU: 预算剩余 1300 (空闲)
GPU-->>D: Req-C (Req) 完成
end
Note over D: 最终结果: [A, B, C] 全部进入 Decode
完整的PrefillAdder代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
@dataclass
class PrefillAdder:
"""
Prefill 添加器:负责将待处理请求添加到 prefill 批次中
核心职责:
1. 管理本批次的 token 预算(token_budget)
2. 为新请求分配资源(KV cache 和物理存储)
3. 处理分块请求的续处理
4. 更新预留空间统计
工作流程:
- 尝试添加请求 → 检查预算和资源 → 分配/复用 cache_handle 和 table_idx
→ 复制 token 数据到 GPU → 返回 Req 或 ChunkedReq
"""
# 本批次可处理的 token 数量上限(每次 try_add_one 会扣减)
token_budget: int
# 已预留但未释放的 token 数量(包括正在 decode 的请求和本批次已添加的请求)
reserved_size: int
# KV cache 管理器:负责 RadixCache 匹配和页面分配
cache_manager: CacheManager
# 表管理器:负责 GPU 上的 token_pool 和 page_table 物理存储
table_manager: TableManager
def _try_allocate_one(self, req: PendingReq) -> Tuple[BaseCacheHandle, int] | None:
"""
尝试为一个新请求分配资源
分配步骤:
1. 检查是否有可用的 table 槽位
2. 在 RadixCache 中查找共同前缀(match_req)
3. 估算总需求(剩余 prefill + decode)
4. 检查是否有足够的 KV cache 空间
5. 锁定 cache handle(防止被驱逐)
6. 分配物理存储槽位(table_idx)
7. 如果有 RadixCache 命中,复制 cached 部分到物理存储
Args:
req: 待处理的请求
Returns:
(cache_handle, table_idx) 如果分配成功
None 如果资源不足
"""
# 检查物理存储是否已满
if self.table_manager.available_size == 0:
return None
# 在 RadixCache 中查找与当前请求匹配的前缀
# handle: 逻辑缓存句柄,包含 cached_len 等元信息
# match_indices: 匹配的页面索引,用于构建 page_table
handle, match_indices = self.cache_manager.match_req(req)
cached_len = handle.cached_len # RadixCache 命中的长度(可能是 0)
# TODO: better estimate policy
# 计算还需要 prefill 的 token 数量
extend_len = req.input_len - cached_len
# 估算总需求:剩余 prefill + decode 阶段需要的空间
estimated_len = extend_len + req.output_len
# 第一次检查:预估需求是否超过可用空间
# reserved_size 包含正在 decode 的请求和本批次已添加的请求
if estimated_len + self.reserved_size > self.cache_manager.available_size:
return None
# 锁定 cache handle,防止在调度期间被驱逐
self.cache_manager.lock(handle)
# 第二次检查:锁定后再次确认(可能期间有其他请求释放了空间)
if estimated_len + self.reserved_size > self.cache_manager.available_size:
# 空间不足,解锁并返回 None
return self.cache_manager.unlock(handle)
# 分配物理存储槽位(在 token_pool 和 page_table 中的索引)
table_idx = self.table_manager.allocate()
# 如果有 RadixCache 命中,需要将 cached 部分复制到物理存储
if cached_len > 0: # NOTE: set the cached part
# 获取物理存储的对应槽位(前 cached_len 个位置)
device_ids = self.table_manager.token_pool[table_idx][:cached_len]
page_entry = self.table_manager.page_table[table_idx][:cached_len]
# 复制 token IDs 到 GPU(非阻塞,异步复制)
device_ids.copy_(req.input_ids[:cached_len].pin_memory(), non_blocking=True)
# 复制页面索引(指向 RadixCache 的物理页)
page_entry.copy_(match_indices)
# 返回 cache handle 和物理槽位索引
return handle, table_idx
def _add_one_req(
self,
pending_req: PendingReq,
cache_handle: BaseCacheHandle,
table_idx: int,
cached_len: int,
) -> Req:
"""
创建一个请求对象(Req 或 ChunkedReq)并准备数据
处理逻辑:
1. 计算剩余需要 prefill 的 token 数量
2. 根据预算决定本次处理多少 token(chunk_size)
3. 判断是否需要分块
4. 扣减 token_budget,增加 reserved_size
5. 复制 token 数据到 GPU
6. 创建并返回 Req 或 ChunkedReq
Args:
pending_req: 待处理的请求
cache_handle: KV cache 句柄(新分配或复用的)
table_idx: 物理存储槽位索引
cached_len: RadixCache 命中的长度(注意:始终是初始值,不会累加)
Returns:
Req 如果 prefill 完成,ChunkedReq 如果需要继续 prefill
"""
# 计算剩余需要 prefill 的 token 数量
remain_len = pending_req.input_len - cached_len
# 本次实际处理的 token 数量:不超过预算和剩余长度
chunk_size = min(self.token_budget, remain_len)
# 判断是否需要分块
# 如果 chunk_size < remain_len,说明还有剩余部分需要下次处理
is_chunked = chunk_size < remain_len
# 根据是否分块选择类类型
CLS = ChunkedReq if is_chunked else Req
# 扣减本批次的 token 预算
self.token_budget -= chunk_size
# 增加预留空间:剩余 prefill 长度 + decode 需求
# 注意:这里加的是 remain_len(总剩余),而不是 chunk_size(本次处理)
# 这样可以确保整个请求的总需求被预留
self.reserved_size += remain_len + pending_req.output_len
# NOTE: update the tokens ids only; new pages will be allocated in the scheduler
# 计算本次处理的 token 在 input_ids 中的切片位置
# 从 cached_len 开始,处理 chunk_size 个 token
_slice = slice(cached_len, cached_len + chunk_size)
# 获取物理存储中对应的切片位置
device_ids = self.table_manager.token_pool[table_idx][_slice]
# 复制 token 数据到 GPU(非阻塞,异步复制)
device_ids.copy_(pending_req.input_ids[_slice].pin_memory(), non_blocking=True)
# 创建并返回请求对象
return CLS(
# input_ids 包含从开始到本次处理结束的所有 token
# 注意:这只是一个视图/切片,不是深拷贝
input_ids=pending_req.input_ids[: cached_len + chunk_size],
table_idx=table_idx,
# cached_len 始终是 RadixCache 命中的初始长度,不会累加已处理的长度
cached_len=cached_len,
output_len=pending_req.output_len,
uid=pending_req.uid,
cache_handle=cache_handle,
sampling_params=pending_req.sampling_params,
)
def try_add_one(self, pending_req: PendingReq) -> Req | None:
"""
尝试添加一个请求到本批次
处理两种情况:
1. 续处理:pending_req.chunked_req 不为空,说明上次被分块了
→ 直接复用已有的 cache_handle 和 table_idx
2. 新请求:pending_req.chunked_req 为空
→ 调用 _try_allocate_one 分配新资源
Args:
pending_req: 待处理的请求
Returns:
Req 如果成功添加且 prefill 完成
ChunkedReq 如果成功添加但需要继续 prefill
None 如果无法添加(预算耗尽或资源不足)
"""
# 首先检查本批次的 token 预算是否已耗尽
if self.token_budget <= 0:
return None
# 情况1:续处理已有的分块请求
# chunked_req 包含上次的 cache_handle、table_idx 和 cached_len
if chunked_req := pending_req.chunked_req:
# 直接复用已有的资源,继续处理剩余部分
return self._add_one_req(
pending_req=pending_req,
cache_handle=chunked_req.cache_handle,
table_idx=chunked_req.table_idx,
cached_len=chunked_req.cached_len,
)
# 情况2:新请求,尝试分配资源
if resource := self._try_allocate_one(pending_req):
cache_handle, table_idx = resource
# 使用新分配的资源创建请求对象
return self._add_one_req(
pending_req=pending_req,
cache_handle=cache_handle,
table_idx=table_idx,
cached_len=cache_handle.cached_len,
)
# 资源不足,无法添加
return Nonetry_add_one关键就在于之前对于分块的处理,分成两种情况:
续处理:
pending_req.chunked_req不为空,说明上次被分块了→ 直接复用已有的 cache_handle 和 table_idx
新请求:
pending_req.chunked_req为空→ 调用 _try_allocate_one 分配新资源
NOTE
- 对于ChunkedReq来说,计算一轮,就认为计算的那部分就是prefix命中的部分,这样可以方便管理kvcache,不用特殊对待ChunkedReq
- 直接某一次剩下的token可以在当前轮做完,就又变回来普通Req
- 普通Req和ChunkedReq计算过程都是一样的
- 计算完成后略有区别,普通请求prefill计算完成后会做处理,ChunkedReq会跳过处理
评论
匿名评论隐私政策






