需要根据积压申请 Buffer 并更新 Credit 值

来源:这里教程网 时间:2026-03-02 13:09:48 作者:

rivate int tryRequestBuffers () {     assert Thread.holdsLock(bufferQueue);     int numRequestedBuffers = 0 ;     while (bufferQueue.getAvailableBufferSize() < numRequiredBuffers            && !isWaitingForFloatingBuffers) {         BufferPool bufferPool = inputChannel.inputGate.getBufferPool();         Buffer buffer = bufferPool.requestBuffer();         if (buffer != null ) {            bufferQueue.addFloatingBuffer(buffer);            numRequestedBuffers++;        } else if (bufferPool.addBufferListener( this )) { private void notifyCreditAvailable () throws IOException {    checkPartitionRequestQueueInitialized();    partitionRequestClient.notifyCreditAvailable( this ); } public void notifyCreditAvailable (RemoteInputChannel inputChannel) {    sendToChannel( new AddCreditMessage (inputChannel));     AddCredit request = (AddCredit) msg;    outboundQueue.addCreditOrResumeConsumption(            request.receiverId, reader -> reader.addCredit(request.credit)); } void addCreditOrResumeConsumption (        InputChannelID receiverId, Consumer<NetworkSequenceViewReader> operation)         throws Exception {     if (fatalError) {         return ;    }     NetworkSequenceViewReader reader = obtainReader(receiverId);    operation.accept(reader);    enqueueAvailableReader(reader); } include "cuda_runtime.h" # include "device_launch_parameters.h" # include <cusolverDn.h> int main () { cusolverDnHandle_t cusolverH = NULL ; const int m = 3 ; const int lda = m; float *A = ( float *) malloc (lda*m * sizeof ( float )); A[ 0 ] = 3.5 ; A[ 1 ] = 0.5 ; A[ 2 ] = 0 ; A[ 3 ] = 0.5 ; A[ 4 ] = 3.5 ; A[ 5 ] = 0 ; A[ 6 ] = 0 ; A[ 7 ] = 0 ; A[ 8 ] = 2 ; float W[m]; // eigenvalues最终保存结果 int info_gpu = 0 ; //计算状态保存 // 步骤1:声明句柄 cusolverDnCreate(&cusolverH); // 步骤2:分配显存空间 float *d_A = NULL ; cudaMalloc(( void **)&d_A, sizeof ( float ) * lda * m); //声明Hermite矩阵(与计算后的特征向量为同一空间) float *d_W = NULL ; cudaMalloc(( void **)&d_W, sizeof ( float ) * m); //声明特征值存储空间 int *devInfo = NULL ; cudaMalloc(( void **)&devInfo, sizeof ( int )); //声明计算结果状态空间 cudaMemcpy(d_A, A, sizeof ( float ) * lda * m, cudaMemcpyHostToDevice); //数据拷贝 // 步骤3:申请计算缓存空间,并在显存中申请该空间 float *d_work = NULL ; int lwork = 0 ; cusolverEigMode_t jobz = CUSOLVER_EIG_MODE_VECTOR; // compute eigenvalues and eigenvectors. cublasFillMode_t uplo = CUBLAS_FILL_MODE_LOWER; cusolverDnSsyevd_bufferSize(cusolverH, jobz, uplo, m, d_A, lda, d_W, &lwork); //计算evd计算所需存储空间,保存到lwork中 cudaMalloc(( void **)&d_work, sizeof ( float )*lwork); // 步骤4:特征分解 cusolverDnSsyevd(cusolverH, jobz, uplo, m, d_A, lda, d_W, d_work, lwork, devInfo); cudaDeviceSynchronize(); //步骤5:数据读回 cudaMemcpy(A, d_A, sizeof ( float )*lda*m, cudaMemcpyDeviceToHost); cudaMemcpy(W, d_W, sizeof ( float )*m, cudaMemcpyDeviceToHost); cudaMemcpy(&info_gpu, devInfo, sizeof ( int ), cudaMemcpyDeviceToHost); printf ( "%d\n" , info_gpu); printf ( "eigenvalue = (matlab base-1), ascending order\n" ); for ( int i = 0 ; i < m; i++) { printf ( "W[%d] = %E\n" , i + 1 , W[i]); } for ( size_t i = 0 ; i < m; i++) { for ( size_t j = 0 ; j < m; j++) { include <stdio.h> # include <stdlib.h> # include <cusolverDn.h> int main () { cusolverDnHandle_t cusolverH = NULL ; const int m = 4 ; const int lda = m; cuDoubleComplex *A = (cuDoubleComplex*) malloc (lda*m* sizeof (cuDoubleComplex)); A[ 0 ] = make_cuDoubleComplex( 1.9501e2 , 0 ); A[ 1 ] = make_cuDoubleComplex( 0.2049e2 , 0.1811e2 ); A[ 2 ] = make_cuDoubleComplex( 0.5217e2 , 0.3123e2 ); A[ 3 ] = make_cuDoubleComplex( 0.2681e2 , 0.3998e2 ); A[ 4 ] = make_cuDoubleComplex( 0.2049e2 , -0.1811e2 ); A[ 5 ] = make_cuDoubleComplex( 1.8272e2 , 0 ); A[ 6 ] = make_cuDoubleComplex( 0.5115e2 , -0.0987e2 ); A[ 7 ] = make_cuDoubleComplex( 0.4155e2 , -0.0435e2 ); A[ 8 ] = make_cuDoubleComplex( 0.5217e2 , -0.3123e2 ); A[ 9 ] = make_cuDoubleComplex( 0.5115e2 , 0.0987e2 ); A[ 10 ] = make_cuDoubleComplex( 2.3984e2 , 0 ); A[ 11 ] = make_cuDoubleComplex( 0.4549e2 , -0.0510e2 ); A[ 12 ] = make_cuDoubleComplex( 0.2681e2 , -0.3998e2 ); A[ 13 ] = make_cuDoubleComplex( 0.4155e2 , 0.0435e2 ); A[ 14 ] = make_cuDoubleComplex( 0.4549e2 , 0.0510e2 ); A[ 15 ] = make_cuDoubleComplex( 2.2332e2 , 0 ); //1.9501 + 0.0000i   0.2049 - 0.1811i   0.5217 - 0.3123i   0.2681 - 0.3998i // 0.2049 + 0.1811i   1.8272 + 0.0000i   0.5115 + 0.0987i   0.4155 + 0.0435i // 0.5217 + 0.3123i   0.5115 - 0.0987i   2.3984 + 0.0000i   0.4549 + 0.0510i // 0.2681 + 0.3998i   0.4155 - 0.0435i   0.4549 - 0.0510i   2.2332 + 0.0000i double W[m]; // eigenvalues最终保存结果 int info_gpu = 0 ; //计算状态保存 // step 1: create cusolver/cublas handle cusolverDnCreate(&cusolverH); // step 2: copy A and B to device cuDoubleComplex *d_A = NULL ; cudaMalloc(( void **)&d_A, sizeof (cuDoubleComplex) * lda * m); //声明Hermite矩阵(与计算后的特征向量为同一空间) double *d_W = NULL ; cudaMalloc(( void **)&d_W, sizeof ( double ) * m); //声明特征值存储空间 int *devInfo = NULL ;cudaMalloc(( void **)&devInfo, sizeof ( int )); //声明计算结果状态空间 cudaMemcpy(d_A, A, sizeof (cuDoubleComplex) * lda * m, cudaMemcpyHostToDevice); //数据拷贝 // step 3: query working space of syevd cuDoubleComplex *d_work = NULL ; int lwork = 0 ; cusolverEigMode_t jobz = CUSOLVER_EIG_MODE_VECTOR; // compute eigenvalues and eigenvectors. cublasFillMode_t uplo = CUBLAS_FILL_MODE_LOWER; cusolverDnZheevd_bufferSize(cusolverH, jobz, uplo, m, d_A, lda, d_W, &lwork); //计算evd计算所需存储空间,保存到lwork中 cudaMalloc(( void **)&d_work, sizeof (cuDoubleComplex)*lwork); // step 4: compute spectrum cusolverDnZheevd(cusolverH, jobz, uplo, m, d_A, lda, d_W, d_work, lwork, devInfo); cudaDeviceSynchronize(); cudaMemcpy(A, d_A, sizeof (cuDoubleComplex)*lda*m,cudaMemcpyDeviceToHost); cudaMemcpy(W, d_W, sizeof ( double )*m, cudaMemcpyDeviceToHost); cudaMemcpy(&info_gpu, devInfo, sizeof ( int ), cudaMemcpyDeviceToHost); printf ( "%d\n" , info_gpu); printf ( "eigenvalue = (matlab base-1), ascending order\n" ); for ( int i = 0 ; i < m; i++) { printf ( "W[%d] = %E\n" , i + 1 , W[i]); } for ( size_t i = 0 ; i < 4 ; i++) { for ( size_t j = 0 ; j < 4 ; j++) { printf ( "%.4f + %.4f j\n" , A[i * 4 + j].x, A[i * 4 + j].y); } } cudaFree(d_A); cudaFree(d_W); cudaFree(devInfo);

import threadingimport queueimport random# 1. 初始化线程安全的等待队列waiting_queue = queue.Queue()
MAX_BATCH_SIZE = 3# --- 模拟用户请求线程 (生产者) ---def user_request_producer():
    request_id = 1    while True:        # 模拟用户随机到达:每 1~2 秒来一个新请求
        time.sleep(random.uniform(1, 2))        
        # 每个请求需要的 Token 长度随机(3到8之间)
        req = {"id": f"REQ-{request_id}", "remain": random.randint(3, 8)}
        waiting_queue.put(req)        
        print(f"\n[用户端] 送入新请求: {req['id']} (预计长度: {req['remain']})")
        request_id += 1        if request_id > 5:          break# --- 核心推理循环 (消费者/执行器) ---def inference_loop():
    running_batch = []    
    print("--- 推理引擎已启动 ---")
    iteration = 0    while True:        # A. 补位逻辑:只要 Batch 没满且队列里有货,就拉进来
        while len(running_batch) < MAX_BATCH_SIZE:            try:                # 使用 block=False,如果队列空了直接报错进 except,不阻塞推理逻辑
                new_req = waiting_queue.get(block=False)
                running_batch.append(new_req)                print(f"  >>> [调度] {new_req['id']} 进入 Batch")            except queue.Empty:                break
        # B. 推理逻辑:如果当前 Batch 有任务,就执行一次 Stepuhhwo.baijinpai.cn
        if running_batch:
            iteration += 1            print("="*20 + f"{iteration=}" + "="*20)            # 模拟 GPU 推理耗时 (Step 耗时)
            time.sleep(1.2) 9mlva.baijinpai.cn
            
            # 当前 Batch 状态展示
            active_ids = [f"{r['id']}(剩{r['remain']-1})" for r in running_batch]            print(f"[GPU推理] 处理中: {active_ids}")            
            # 每一个请求的剩余长度减 1
            finished_this_step = []            for req in running_batch:
                req["remain"] -= 1                if req["remain"] <= 0:
                    finished_this_step.append(req)   uwgg.baijinpai.cn         
            # C. 剔除逻辑:做完的立刻踢出,下一轮循环开头就会有新请求补进来
            for req in finished_this_step:                print(f"  <<< [完成] {req['id']} 生成完毕,释放位置")
                running_batch.remove(req)        else:            # 如果 Batch 和 队列都空了,稍微歇会,避免 CPU 空转
            time.sleep(0.5)# --- 启动程序 ---if __name__ == "__main__":    # 启动用户请求线程
    t = threading.Thread(target=user_request_producer, daemon=True)
    t.start()    # 主线程执行推理循环  46u8o.baijinpai.cn
    try:

import queueimport randomimport threading# 核心队列waiting_queue = queue.Queue()  
running_queue = []             
MAX_BATCH_SIZE = 4def user_request_producer():    """
    修改点:模拟爆发式请求到达,以触发多请求 Prefill    """
    # 第一波:爆发式到达 (3个请求同时进入队列)
    print("\n[用户] --- 爆发式请求到达 (3个请求) ---")    for i in range(1, 4):
        req = {"id": f"REQ-{i}", "remain": random.randint(2, 5)}
        waiting_queue.put(req)        print(f"[用户] 请求 {req['id']} 进入等待队列")    
    # 延迟一会儿,再来第二波单点请求
    time.sleep(5)    print("\n[用户] --- 延迟请求到达 (1个请求) ---")
    req = {"id": "REQ-4", "remain": 3}
    waiting_queue.put(req)    print(f"[用户] 请求 {req['id']} 进入等待队列")def inference_loop():    print("--- 连续批处理引擎:多请求 Prefill 模式 ---")
    iteration = 0    
    while True:  pkpmar.baijinpai.cn
        current_batch = []
        is_prefill_stage = False        
        # 1. 调度:构建当前批次
        # 只要 waiting_queue 非空,就尽可能填满 MAX_BATCH_SIZE
        if not waiting_queue.empty():
            is_prefill_stage = True            while not waiting_queue.empty() and len(current_batch) < MAX_BATCH_SIZE:
                req = waiting_queue.get()
                current_batch.append(req)        elif running_queue:
            is_prefill_stage = False
            current_batch = list(running_queue)        
        if not current_batch:hqtin.baijinpai.cn
            time.sleep(0.5)            continue
        # 2. 执行:模拟推理
        iteration += 1        print(f"\n{'='*15} Iteration {iteration} {'='*15}")        
        if is_prefill_stage:    bkp4n.baijinpai.cn        print(f"[PREFILL] 批量生成中: {[r['id'] for r in current_batch]}")
            time.sleep(1.5) 
        else:            print(f"[DECODE ] 批量生成中: {[f'{r['id']}(剩{r['remain']})' for r in current_batch]}")
            time.sleep(0.4) 
        # 3. 统一状态更新
        for req in current_batch:
            req['remain'] -= 1        # 4. 统一判断生命周期
        # 注意:为了避免在遍历列表时删除元素,我们先收集要删除的对象
        to_remove_from_running = []        
        for req in current_batch:    dkmsr.baijinpai.cn        if req['remain'] <= 0:                print(f"  <<< [完成] {req['id']} 退出系统")                if req in running_queue:
                    to_remove_from_running.append(req)            else:                if is_prefill_stage:
                    running_queue.append(req)                    print(f"  -> {req['id']} Prefill 完成,转入 running_queue")                else:                    pass
        
        # 真正的从 running_queue 移除
        for req in to_remove_from_running:   oibhu.baijinpai.cn
            running_queue.remove(req)if __name__ == "__main__":
    t = threading.Thread(target=user_request_producer, daemon=True)
    t.start()    try:
        inference_loop()
kv_cache = torch.empty(    2,                          # K 和 V
    num_layers,                 # 层数
    num_blocks,                 # 总块数
    block_size,                 # 每块 token 数
    num_kv_heads // tp_size,    # KV head 数(考虑张量并行)
    head_dim                    # head 维度

value_ptr,
    value_stride,
    k_cache_ptr,
    v_cache_ptr,
    slot_mapping_ptr,
    D: tl.constexpr,
):
    idx = tl.program_id(0)
    slot = tl.load(slot_mapping_ptr + idx)    if slot == -1: return
    key_offsets = idx * key_stride + tl.arange(0, D)
    value_offsets = idx * value_stride + tl.arange(0, D)
    key = tl.load(key_ptr + key_offsets)  vymrj.baijinpai.cn
    value = tl.load(value_ptr + value_offsets)
    cache_offsets = slot * D + tl.arange(0, D)
    tl.store(k_cache_ptr + cache_offsets, key)
    tl.store(v_cache_ptr + cache_offsets, value)def store_kvcache(key: torch.Tensor, value: torch.Tensor, k_cache: torch.Tensor, v_cache: torch.Tensor, slot_mapping: torch.Tensor):
    N, num_heads, head_dim = key.shape    xll.baijinpai.cn
    D = num_heads * head_dim    assert key.stride(-1) == 1 and value.stride(-1) == 1    assert key.stride(1) == head_dim and value.stride(1) == head_dim    assert k_cache.stride(1) == D and v_cache.stride(1) == D    assert slot_mapping.numel() == N
    store_kvcache_kernel[(N,)](key, key.stride(0), value, value.stride(0), k_cache, v_cache, slot_mapping, D)

相关推荐