rivate int tryRequestBuffers () { assert Thread.holdsLock(bufferQueue); int numRequestedBuffers = 0 ; while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers && !isWaitingForFloatingBuffers) { BufferPool bufferPool = inputChannel.inputGate.getBufferPool(); Buffer buffer = bufferPool.requestBuffer(); if (buffer != null ) { bufferQueue.addFloatingBuffer(buffer); numRequestedBuffers++; } else if (bufferPool.addBufferListener( this )) { private void notifyCreditAvailable () throws IOException { checkPartitionRequestQueueInitialized(); partitionRequestClient.notifyCreditAvailable( this ); } public void notifyCreditAvailable (RemoteInputChannel inputChannel) { sendToChannel( new AddCreditMessage (inputChannel)); AddCredit request = (AddCredit) msg; outboundQueue.addCreditOrResumeConsumption( request.receiverId, reader -> reader.addCredit(request.credit)); } void addCreditOrResumeConsumption ( InputChannelID receiverId, Consumer<NetworkSequenceViewReader> operation) throws Exception { if (fatalError) { return ; } NetworkSequenceViewReader reader = obtainReader(receiverId); operation.accept(reader); enqueueAvailableReader(reader); } int main () { cusolverDnHandle_t cusolverH = NULL ; const int m = 3 ; const int lda = m; float *A = ( float *) malloc (lda*m * sizeof ( float )); A[ 0 ] = 3.5 ; A[ 1 ] = 0.5 ; A[ 2 ] = 0 ; A[ 3 ] = 0.5 ; A[ 4 ] = 3.5 ; A[ 5 ] = 0 ; A[ 6 ] = 0 ; A[ 7 ] = 0 ; A[ 8 ] = 2 ; float W[m]; // eigenvalues最终保存结果 int info_gpu = 0 ; //计算状态保存 // 步骤1:声明句柄 cusolverDnCreate(&cusolverH); // 步骤2:分配显存空间 float *d_A = NULL ; cudaMalloc(( void **)&d_A, sizeof ( float ) * lda * m); //声明Hermite矩阵(与计算后的特征向量为同一空间) float *d_W = NULL ; cudaMalloc(( void **)&d_W, sizeof ( float ) * m); //声明特征值存储空间 int *devInfo = NULL ; cudaMalloc(( void **)&devInfo, sizeof ( int )); //声明计算结果状态空间 cudaMemcpy(d_A, A, sizeof ( float ) * lda * m, cudaMemcpyHostToDevice); //数据拷贝 // 步骤3:申请计算缓存空间,并在显存中申请该空间 float *d_work = NULL ; int lwork = 0 ; cusolverEigMode_t jobz = CUSOLVER_EIG_MODE_VECTOR; // compute eigenvalues and eigenvectors. cublasFillMode_t uplo = CUBLAS_FILL_MODE_LOWER; cusolverDnSsyevd_bufferSize(cusolverH, jobz, uplo, m, d_A, lda, d_W, &lwork); //计算evd计算所需存储空间,保存到lwork中 cudaMalloc(( void **)&d_work, sizeof ( float )*lwork); // 步骤4:特征分解 cusolverDnSsyevd(cusolverH, jobz, uplo, m, d_A, lda, d_W, d_work, lwork, devInfo); cudaDeviceSynchronize(); //步骤5:数据读回 cudaMemcpy(A, d_A, sizeof ( float )*lda*m, cudaMemcpyDeviceToHost); cudaMemcpy(W, d_W, sizeof ( float )*m, cudaMemcpyDeviceToHost); cudaMemcpy(&info_gpu, devInfo, sizeof ( int ), cudaMemcpyDeviceToHost); printf ( "%d\n" , info_gpu); printf ( "eigenvalue = (matlab base-1), ascending order\n" ); for ( int i = 0 ; i < m; i++) { printf ( "W[%d] = %E\n" , i + 1 , W[i]); } for ( size_t i = 0 ; i < m; i++) { for ( size_t j = 0 ; j < m; j++) { int main () { cusolverDnHandle_t cusolverH = NULL ; const int m = 4 ; const int lda = m; cuDoubleComplex *A = (cuDoubleComplex*) malloc (lda*m* sizeof (cuDoubleComplex)); A[ 0 ] = make_cuDoubleComplex( 1.9501e2 , 0 ); A[ 1 ] = make_cuDoubleComplex( 0.2049e2 , 0.1811e2 ); A[ 2 ] = make_cuDoubleComplex( 0.5217e2 , 0.3123e2 ); A[ 3 ] = make_cuDoubleComplex( 0.2681e2 , 0.3998e2 ); A[ 4 ] = make_cuDoubleComplex( 0.2049e2 , -0.1811e2 ); A[ 5 ] = make_cuDoubleComplex( 1.8272e2 , 0 ); A[ 6 ] = make_cuDoubleComplex( 0.5115e2 , -0.0987e2 ); A[ 7 ] = make_cuDoubleComplex( 0.4155e2 , -0.0435e2 ); A[ 8 ] = make_cuDoubleComplex( 0.5217e2 , -0.3123e2 ); A[ 9 ] = make_cuDoubleComplex( 0.5115e2 , 0.0987e2 ); A[ 10 ] = make_cuDoubleComplex( 2.3984e2 , 0 ); A[ 11 ] = make_cuDoubleComplex( 0.4549e2 , -0.0510e2 ); A[ 12 ] = make_cuDoubleComplex( 0.2681e2 , -0.3998e2 ); A[ 13 ] = make_cuDoubleComplex( 0.4155e2 , 0.0435e2 ); A[ 14 ] = make_cuDoubleComplex( 0.4549e2 , 0.0510e2 ); A[ 15 ] = make_cuDoubleComplex( 2.2332e2 , 0 ); //1.9501 + 0.0000i 0.2049 - 0.1811i 0.5217 - 0.3123i 0.2681 - 0.3998i // 0.2049 + 0.1811i 1.8272 + 0.0000i 0.5115 + 0.0987i 0.4155 + 0.0435i // 0.5217 + 0.3123i 0.5115 - 0.0987i 2.3984 + 0.0000i 0.4549 + 0.0510i // 0.2681 + 0.3998i 0.4155 - 0.0435i 0.4549 - 0.0510i 2.2332 + 0.0000i double W[m]; // eigenvalues最终保存结果 int info_gpu = 0 ; //计算状态保存 // step 1: create cusolver/cublas handle cusolverDnCreate(&cusolverH); // step 2: copy A and B to device cuDoubleComplex *d_A = NULL ; cudaMalloc(( void **)&d_A, sizeof (cuDoubleComplex) * lda * m); //声明Hermite矩阵(与计算后的特征向量为同一空间) double *d_W = NULL ; cudaMalloc(( void **)&d_W, sizeof ( double ) * m); //声明特征值存储空间 int *devInfo = NULL ;cudaMalloc(( void **)&devInfo, sizeof ( int )); //声明计算结果状态空间 cudaMemcpy(d_A, A, sizeof (cuDoubleComplex) * lda * m, cudaMemcpyHostToDevice); //数据拷贝 // step 3: query working space of syevd cuDoubleComplex *d_work = NULL ; int lwork = 0 ; cusolverEigMode_t jobz = CUSOLVER_EIG_MODE_VECTOR; // compute eigenvalues and eigenvectors. cublasFillMode_t uplo = CUBLAS_FILL_MODE_LOWER; cusolverDnZheevd_bufferSize(cusolverH, jobz, uplo, m, d_A, lda, d_W, &lwork); //计算evd计算所需存储空间,保存到lwork中 cudaMalloc(( void **)&d_work, sizeof (cuDoubleComplex)*lwork); // step 4: compute spectrum cusolverDnZheevd(cusolverH, jobz, uplo, m, d_A, lda, d_W, d_work, lwork, devInfo); cudaDeviceSynchronize(); cudaMemcpy(A, d_A, sizeof (cuDoubleComplex)*lda*m,cudaMemcpyDeviceToHost); cudaMemcpy(W, d_W, sizeof ( double )*m, cudaMemcpyDeviceToHost); cudaMemcpy(&info_gpu, devInfo, sizeof ( int ), cudaMemcpyDeviceToHost); printf ( "%d\n" , info_gpu); printf ( "eigenvalue = (matlab base-1), ascending order\n" ); for ( int i = 0 ; i < m; i++) { printf ( "W[%d] = %E\n" , i + 1 , W[i]); } for ( size_t i = 0 ; i < 4 ; i++) { for ( size_t j = 0 ; j < 4 ; j++) { printf ( "%.4f + %.4f j\n" , A[i * 4 + j].x, A[i * 4 + j].y); } } cudaFree(d_A); cudaFree(d_W); cudaFree(devInfo);
import threadingimport queueimport random# 1. 初始化线程安全的等待队列waiting_queue = queue.Queue()
MAX_BATCH_SIZE = 3# --- 模拟用户请求线程 (生产者) ---def user_request_producer():
request_id = 1 while True: # 模拟用户随机到达:每 1~2 秒来一个新请求
time.sleep(random.uniform(1, 2))
# 每个请求需要的 Token 长度随机(3到8之间)
req = {"id": f"REQ-{request_id}", "remain": random.randint(3, 8)}
waiting_queue.put(req)
print(f"\n[用户端] 送入新请求: {req['id']} (预计长度: {req['remain']})")
request_id += 1 if request_id > 5: break# --- 核心推理循环 (消费者/执行器) ---def inference_loop():
running_batch = []
print("--- 推理引擎已启动 ---")
iteration = 0 while True: # A. 补位逻辑:只要 Batch 没满且队列里有货,就拉进来
while len(running_batch) < MAX_BATCH_SIZE: try: # 使用 block=False,如果队列空了直接报错进 except,不阻塞推理逻辑
new_req = waiting_queue.get(block=False)
running_batch.append(new_req) print(f" >>> [调度] {new_req['id']} 进入 Batch") except queue.Empty: break
# B. 推理逻辑:如果当前 Batch 有任务,就执行一次 Stepuhhwo.baijinpai.cn
if running_batch:
iteration += 1 print("="*20 + f"{iteration=}" + "="*20) # 模拟 GPU 推理耗时 (Step 耗时)
time.sleep(1.2) 9mlva.baijinpai.cn
# 当前 Batch 状态展示
active_ids = [f"{r['id']}(剩{r['remain']-1})" for r in running_batch] print(f"[GPU推理] 处理中: {active_ids}")
# 每一个请求的剩余长度减 1
finished_this_step = [] for req in running_batch:
req["remain"] -= 1 if req["remain"] <= 0:
finished_this_step.append(req) uwgg.baijinpai.cn
# C. 剔除逻辑:做完的立刻踢出,下一轮循环开头就会有新请求补进来
for req in finished_this_step: print(f" <<< [完成] {req['id']} 生成完毕,释放位置")
running_batch.remove(req) else: # 如果 Batch 和 队列都空了,稍微歇会,避免 CPU 空转
time.sleep(0.5)# --- 启动程序 ---if __name__ == "__main__": # 启动用户请求线程
t = threading.Thread(target=user_request_producer, daemon=True)
t.start() # 主线程执行推理循环 46u8o.baijinpai.cn
try:
、
import queueimport randomimport threading# 核心队列waiting_queue = queue.Queue()
running_queue = []
MAX_BATCH_SIZE = 4def user_request_producer(): """
修改点:模拟爆发式请求到达,以触发多请求 Prefill """
# 第一波:爆发式到达 (3个请求同时进入队列)
print("\n[用户] --- 爆发式请求到达 (3个请求) ---") for i in range(1, 4):
req = {"id": f"REQ-{i}", "remain": random.randint(2, 5)}
waiting_queue.put(req) print(f"[用户] 请求 {req['id']} 进入等待队列")
# 延迟一会儿,再来第二波单点请求
time.sleep(5) print("\n[用户] --- 延迟请求到达 (1个请求) ---")
req = {"id": "REQ-4", "remain": 3}
waiting_queue.put(req) print(f"[用户] 请求 {req['id']} 进入等待队列")def inference_loop(): print("--- 连续批处理引擎:多请求 Prefill 模式 ---")
iteration = 0
while True: pkpmar.baijinpai.cn
current_batch = []
is_prefill_stage = False
# 1. 调度:构建当前批次
# 只要 waiting_queue 非空,就尽可能填满 MAX_BATCH_SIZE
if not waiting_queue.empty():
is_prefill_stage = True while not waiting_queue.empty() and len(current_batch) < MAX_BATCH_SIZE:
req = waiting_queue.get()
current_batch.append(req) elif running_queue:
is_prefill_stage = False
current_batch = list(running_queue)
if not current_batch:hqtin.baijinpai.cn
time.sleep(0.5) continue
# 2. 执行:模拟推理
iteration += 1 print(f"\n{'='*15} Iteration {iteration} {'='*15}")
if is_prefill_stage: bkp4n.baijinpai.cn print(f"[PREFILL] 批量生成中: {[r['id'] for r in current_batch]}")
time.sleep(1.5)
else: print(f"[DECODE ] 批量生成中: {[f'{r['id']}(剩{r['remain']})' for r in current_batch]}")
time.sleep(0.4)
# 3. 统一状态更新
for req in current_batch:
req['remain'] -= 1 # 4. 统一判断生命周期
# 注意:为了避免在遍历列表时删除元素,我们先收集要删除的对象
to_remove_from_running = []
for req in current_batch: dkmsr.baijinpai.cn if req['remain'] <= 0: print(f" <<< [完成] {req['id']} 退出系统") if req in running_queue:
to_remove_from_running.append(req) else: if is_prefill_stage:
running_queue.append(req) print(f" -> {req['id']} Prefill 完成,转入 running_queue") else: pass
# 真正的从 running_queue 移除
for req in to_remove_from_running: oibhu.baijinpai.cn
running_queue.remove(req)if __name__ == "__main__":
t = threading.Thread(target=user_request_producer, daemon=True)
t.start() try:
inference_loop()
kv_cache = torch.empty( 2, # K 和 V num_layers, # 层数 num_blocks, # 总块数 block_size, # 每块 token 数 num_kv_heads // tp_size, # KV head 数(考虑张量并行) head_dim # head 维度
value_ptr,
value_stride,
k_cache_ptr,
v_cache_ptr,
slot_mapping_ptr,
D: tl.constexpr,
):
idx = tl.program_id(0)
slot = tl.load(slot_mapping_ptr + idx) if slot == -1: return
key_offsets = idx * key_stride + tl.arange(0, D)
value_offsets = idx * value_stride + tl.arange(0, D)
key = tl.load(key_ptr + key_offsets) vymrj.baijinpai.cn
value = tl.load(value_ptr + value_offsets)
cache_offsets = slot * D + tl.arange(0, D)
tl.store(k_cache_ptr + cache_offsets, key)
tl.store(v_cache_ptr + cache_offsets, value)def store_kvcache(key: torch.Tensor, value: torch.Tensor, k_cache: torch.Tensor, v_cache: torch.Tensor, slot_mapping: torch.Tensor):
N, num_heads, head_dim = key.shape xll.baijinpai.cn
D = num_heads * head_dim assert key.stride(-1) == 1 and value.stride(-1) == 1 assert key.stride(1) == head_dim and value.stride(1) == head_dim assert k_cache.stride(1) == D and v_cache.stride(1) == D assert slot_mapping.numel() == N
store_kvcache_kernel[(N,)](key, key.stride(0), value, value.stride(0), k_cache, v_cache, slot_mapping, D)
