PyTorch 把大部分 GPU 操作包装得和 CPU Tensor 一样简单:把 Tensor 放到
cuda 上,后面的算子就会自动派发到 CUDA 实现。但是,只要开始关心数据搬运、
并行执行、准确计时,或者需要自己写算子,就必须理解 PyTorch 与 CUDA 之间的边界。
本文是一份从 Python API 到 C++/CUDA 扩展的学习笔记,重点覆盖:
- CUDA Tensor、设备和异步执行的基本模型;
- pinned memory 与 CPU/GPU 数据传输;
- CUDA Stream 的依赖管理与计算/拷贝重叠;
- CUDA Event 的同步与性能计时;
- 从 Python 调用 C++ 函数,以及 C++ 端如何访问 Tensor;
- 注册一个包含自定义 CUDA kernel 的 PyTorch 运算;
- autograd、
torch.compile、CUDA Graph 和调试注意事项。
本文 API 依据 2026 年 5 月 25 日可访问的 PyTorch 官方文档整理。代码默认使用 NVIDIA GPU、支持 CUDA 的 PyTorch 以及可用的 CUDA Toolkit(编译扩展时需要
nvcc)。不同 PyTorch/CUDA 版本的编译器兼容性需要以实际环境为准。
1. 先建立正确的执行模型
一段看似普通的 CUDA Tensor 代码,实际上涉及三层:
- Tensor 存储位置:数据是在 CPU 内存,还是某张 GPU 的显存中。
- 算子派发:
torch.add、torch.matmul等算子根据 Tensor 的 device 选择 CPU 或 CUDA 实现。 - 主机与 CUDA 的异步关系:当 Tensor 位于 CUDA device 时,运行 Python/PyTorch 调用的 CPU 主机线程通常只是把 CUDA kernel 或拷贝任务提交 到一个 stream,而不是等待 GPU 立刻完成工作。
也就是说,下面的 y = x @ x 返回时,GPU 计算可能仍在进行中。只有 CPU 真正
需要结果,或者程序显式同步时,主机才会等待 GPU。
import torch
if not torch.cuda.is_available(): raise RuntimeError("本文示例需要支持 CUDA 的 PyTorch 环境")
device = torch.device("cuda:0")print("GPU:", torch.cuda.get_device_name(device))print("PyTorch 使用的 CUDA 版本:", torch.version.cuda)
# 数据直接在 GPU 上创建,避免先生成 CPU Tensor 再搬运。x = torch.randn(4096, 4096, device=device)y = x @ x # 向当前 CUDA stream 提交矩阵乘 kernel
# y.mean() 仍返回单元素 CUDA Tensor,其计算通常仍是异步提交的。# item() 把单元素 Tensor 取为 Python 数值;CPU 需要真实结果,因此会等待 GPU。mean_value = y.mean().item()print(mean_value)item() 只能用于单元素 Tensor,它返回的是 Python float 或 int,而不是
Tensor,因此该取值操作本身不能参与 autograd。对 CUDA Tensor 调用 item()
时,主机必须等到生成该元素的 CUDA 工作完成,并取得结果。训练循环中频繁执行
loss.item() 或 print(loss) 会插入 CPU/GPU 等待,日志通常应降低频率。
1.1 算子派发不等于执行方式相同
“根据 device 派发”只表示 PyTorch 选择哪个 backend 的实现,不表示 CPU 与 CUDA 算子具有同样的等待行为。
对于普通 eager 模式的 CPU Tensor 算子,从 Python 调用方看通常是同步的: 函数返回时,输出已经可以被后续 CPU 代码读取。算子内部可能使用 MKL、OpenMP 等多线程并行计算,但这不是 CUDA stream 意义上的异步提交。
import torch
cpu_x = torch.randn(4096, 4096) # CPU Tensorcpu_y = cpu_x @ cpu_x # 返回时,cpu_y 已经能被 CPU 正确访问print(cpu_y[0, 0])对于 CUDA Tensor 算子,只要输入和结果仍留在 GPU 上,大多数计算操作会
相对于 CPU 主机线程异步提交到当前 stream。同一 stream 仍然保证先后顺序:
relu 不会越过生成 cuda_y 的矩阵乘法提前读取结果。
import torch
cuda_x = torch.randn(4096, 4096, device="cuda")cuda_y = cuda_x @ cuda_x # 通常异步提交矩阵乘 kernelcuda_z = torch.relu(cuda_y) # 同一 stream 上排在矩阵乘之后cuda_loss = cuda_z.mean() # 结果仍是 CUDA Tensor,通常仍可异步提交value = cuda_loss.item() # 变为 Python float 时,CPU 才必须等待结果常见操作的主机侧行为可以概括如下:
| 操作 | 相对 CPU 主机线程的通常行为 | 原因 |
|---|---|---|
| CPU Tensor 上的普通 eager 算子 | 同步 | 返回后 CPU 输出可以直接读取 |
x_cuda + 1、x_cuda @ w_cuda、relu(x_cuda) | 异步提交 | 输出仍留在 GPU 上 |
loss_cuda.backward() | 通常异步提交 | 反向 CUDA kernel 被排入 stream |
x_cuda.sum() | 通常异步提交 | 结果仍是 CUDA Tensor |
x_cuda.sum().item() | 会等待 GPU | Python 需要真实标量值 |
print(x_cuda) | 通常会等待 GPU | 显示元素需要观察实际数据 |
x_cuda.cpu() | 默认表现为同步 | 返回可由 CPU 安全读取的 Tensor |
torch.cuda.synchronize() | 显式同步 | 调用者要求等待已提交的 CUDA 工作 |
因此,不应笼统地表述为“GPU 上所有操作都异步”。更准确的说法是:CUDA Tensor 上的大多数计算会异步提交;当 CPU 要观察 CUDA 结果、默认同步传输发生,或者 程序显式同步时,主机线程会等待 GPU。
1.2 device 不等于“当前设备”
Tensor 创建后,其 device 已经固定。切换当前设备会影响后续创建的 Tensor, 不会把已有 Tensor 自动搬走。
import torch
with torch.cuda.device(0): a = torch.ones(4, device="cuda") # 等价于 cuda:0
if torch.cuda.device_count() >= 2: with torch.cuda.device(1): b = torch.ones(4, device="cuda") # 位于 cuda:1
# 普通逐元素运算不能默认跨 GPU 混算,需要显式把数据移动到同一 device。 b_on_0 = b.to(a.device) c = a + b_on_0 print(c.device) # cuda:01.3 显存分配与缓存分配器
PyTorch 会缓存释放后的显存,以便下次快速复用。因此 nvidia-smi 中看到的
显存占用不一定等于当前 Tensor 真正使用的显存。
import torch
device = torch.device("cuda:0")torch.cuda.reset_peak_memory_stats(device)
x = torch.empty((8192, 8192), dtype=torch.float32, device=device)used = torch.cuda.memory_allocated(device) / 1024**2reserved = torch.cuda.memory_reserved(device) / 1024**2peak = torch.cuda.max_memory_allocated(device) / 1024**2
print(f"Tensor 正在占用: {used:.1f} MiB")print(f"缓存分配器保留: {reserved:.1f} MiB")print(f"峰值实际占用: {peak:.1f} MiB")
del x# 删除 Tensor 后 allocated 会下降,但 reserved 可能仍保留以便复用。print(torch.cuda.memory_allocated(device) / 1024**2)训练时通常应该关注 max_memory_allocated() 是否超预算,而不是频繁调用
torch.cuda.empty_cache();后者不能释放仍被 Tensor 引用的内存,也可能让后续
分配变慢。
2. 数据传输:CPU、Pinned Memory 与 GPU
GPU 计算之前,输入往往来自 CPU。最直接的方式是:
import torch
cpu_x = torch.randn(1024, 1024) # 普通 pageable CPU memorygpu_x = cpu_x.to("cuda") # 默认情况下,调用方等待传输完成cpu_y = gpu_x.cpu() # 将结果取回 CPU这可以正确运行,但不一定能充分利用 GPU。要理解优化点,需要区分两类 CPU 内存:
| CPU 内存类型 | 特点 | 常见使用方式 |
|---|---|---|
| pageable memory | 默认普通内存,操作系统可以换页 | 临时数据、小规模传输 |
| pinned/page-locked memory | 页面被锁定,CUDA 能更高效地 DMA 读取 | DataLoader batch、异步 H2D 传输 |
2.1 Pinned memory 与 non_blocking=True
从 pinned CPU memory 复制到 GPU 时,non_blocking=True 允许主机不等待该次
拷贝完成,从而有机会将传输与其他 GPU 计算重叠。
import torch
device = torch.device("cuda:0")
# pin_memory=True 创建 page-locked 的 CPU 缓冲区。cpu_batch = torch.randn((256, 1024), pin_memory=True)assert cpu_batch.is_pinned()
# 异步提交 Host-to-Device (H2D) copy。gpu_batch = cpu_batch.to(device, non_blocking=True)
# 如果下一条 GPU 算子使用同一个当前 stream,stream 顺序会保证:# copy 完成之后才执行矩阵乘法,不需要在这里手动 synchronize。weight = torch.randn((1024, 2048), device=device)output = gpu_batch @ weight
# 只有在 CPU 确实要读取最终结果时才同步。loss = output.square().mean()print(loss.item())实际训练数据一般由 DataLoader 提供 pinned batch:
import torchfrom torch.utils.data import DataLoader, TensorDataset
dataset = TensorDataset(torch.randn(10_000, 128), torch.randint(0, 10, (10_000,)))loader = DataLoader( dataset, batch_size=256, shuffle=True, num_workers=4, pin_memory=True, # worker 产出的 batch 会被放到 pinned memory persistent_workers=True,)
model = torch.nn.Linear(128, 10).cuda()
for features, labels in loader: features = features.cuda(non_blocking=True) labels = labels.cuda(non_blocking=True) logits = model(features) loss = torch.nn.functional.cross_entropy(logits, labels) loss.backward()Pinned memory 不是越多越好:它会占用无法被轻易换出的主机内存。适合固定数量的 batch 缓冲区,不适合无界缓存整个大型数据集。
2.2 GPU 到 CPU 的异步复制要特别小心
non_blocking=True 也可以提交 Device-to-Host (D2H) copy,但 CPU 读取返回
Tensor 前必须确认传输完成。
import torch
x = torch.arange(8, device="cuda", dtype=torch.float32)
# 准备 pinned CPU 输出缓冲区,再异步复制 GPU 数据。host_out = torch.empty_like(x, device="cpu", pin_memory=True)host_out.copy_(x, non_blocking=True)
# 此刻 copy 可能仍在进行;直接由 CPU 使用 host_out 存在读到未完成数据的风险。torch.cuda.current_stream().synchronize()print(host_out.tolist()) # 同步后才安全3. CUDA Stream:管理异步工作队列
CUDA Stream 可以理解成某张 GPU 上的一个有序任务队列:
- 同一 stream 内的操作按提交顺序执行;
- 不同 stream 的操作没有天然先后关系,硬件资源足够时可能并行;
- 一个 stream 要使用另一个 stream 产生的数据时,必须声明依赖。
PyTorch 默认把操作提交到当前 device 的默认 stream。简单模型训练通常不用手写 stream;数据流水线、多个独立分支或自定义 CUDA 调度时才需要它。
3.1 一个错误示例:跨 stream 使用未完成的数据
import torch
device = torch.device("cuda:0")s = torch.cuda.Stream(device=device)
a = torch.empty((4096, 4096), device=device)a.normal_() # 默认 stream 上写入 a
with torch.cuda.stream(s): # 错误思路:s 并不知道默认 stream 何时完成 normal_()。 # b = a.sum() # 可能在 a 写完前读取 a pass正确做法是让消费方 stream 等待生产方 stream。
import torch
device = torch.device("cuda:0")producer = torch.cuda.current_stream(device)consumer = torch.cuda.Stream(device=device)
a = torch.empty((4096, 4096), device=device).normal_() # producer 写 a
consumer.wait_stream(producer) # consumer 此后的任务等待 producer 当前工作with torch.cuda.stream(consumer): b = a.sum() # 现在读取依赖正确
# a 创建于 producer,却正在被 consumer 异步读取。 # 记录这次使用,防止 a 被释放后其显存过早被缓存分配器复用。 a.record_stream(consumer)
# 如果默认 stream 后续需要使用 b,也要等待 consumer。producer.wait_stream(consumer)print(b.item())这里的两个 API 负责不同问题:
consumer.wait_stream(producer)负责执行依赖:consumer不能在producer写完a以前读取它。a.record_stream(consumer)负责显存生命周期:它告诉 CUDA caching allocator,虽然a原本关联于producer,其存储还会被consumer上已 提交的任务使用。在这些任务完成前,即使 Python 侧释放了a,这块存储也 不能被分配给其他 Tensor 覆盖。
这里需要记录的是输入 a,因为它跨 stream 被读取。结果 b 是在
consumer 上产生的,本例不需要用 b.record_stream(consumer) 来表达这个
问题。在这段短示例中,a 的 Python 引用实际保留到了同步之后;
record_stream() 展示的是把此代码抽入函数、提前释放输入或复用缓冲区时仍需
保留的跨 stream 使用契约。
为了观察 record_stream() 防止的风险,可以看一个 Tensor 引用可能提前消失的
场景:
import torch
consumer = torch.cuda.Stream()producer = torch.cuda.current_stream()a = torch.randn(1_000_000, device="cuda") # producer 上产生 a
consumer.wait_stream(producer)with torch.cuda.stream(consumer): b = a * 2 a.record_stream(consumer) # consumer 使用结束前保护 a 的存储
del a # Python 变量可以删除replacement = torch.empty(1_000_000, device="cuda")# 有 record_stream 后,缓存分配器不会在 consumer 仍读取旧 a 时,# 把同一块显存过早复用给 replacement。另一种做法是由程序员自行通过 stream/event 同步保证 a 在释放前已经不再被
side stream 使用。不过,这需要更严格地管理引用释放位置,并可能过早阻塞可并行
工作。跨 stream 传递中间 Tensor 时,record_stream() 是更直观的生命周期声明。
3.2 用独立 stream 重叠数据拷贝和计算
下面演示一个简化的 double buffering 流水线:copy stream 预取下一批数据, 默认 stream 计算当前批。实际能否加速,取决于模型计算量、PCIe/NVLink 带宽以及 GPU 是否支持所需并发。
import torch
device = torch.device("cuda:0")copy_stream = torch.cuda.Stream(device=device)compute_stream = torch.cuda.current_stream(device)weight = torch.randn(1024, 1024, device=device)
# 模拟来自 DataLoader 的 pinned CPU batches。cpu_batches = [ torch.randn((512, 1024), pin_memory=True) for _ in range(6)]
def preload(cpu_batch: torch.Tensor) -> torch.Tensor: """在 copy stream 上异步把一个 pinned batch 提交到 GPU。""" with torch.cuda.stream(copy_stream): gpu_batch = cpu_batch.to(device, non_blocking=True) # gpu_batch 离开上下文后仍会在 copy_stream 上被 copy 使用。 gpu_batch.record_stream(copy_stream) return gpu_batch
next_batch = preload(cpu_batches[0])
for index in range(len(cpu_batches)): current_batch = next_batch
# 这里只等待当前 batch 先前已经提交的 copy。 # 等待插入 compute stream 后,再提交下一批 copy,才能与本轮计算重叠。 compute_stream.wait_stream(copy_stream)
if index + 1 < len(cpu_batches): # CPU 提前提交下一批 H2D copy。 next_batch = preload(cpu_batches[index + 1])
output = current_batch @ weight loss = output.square().mean()
# current_batch 的最后使用发生在 compute stream 上。 current_batch.record_stream(compute_stream) print(index, loss.item()) # 演示输出;真实训练不要每步 item() 同步上面为了展示结果每轮调用了 item(),它会造成 CPU/GPU 同步;性能测试时应降低
日志频率,或者先累计 GPU 上的统计量。
4. CUDA Event:建立依赖与准确计时
Python 的 time.perf_counter() 只测到 CPU 提交 kernel 的时间,不能直接表示
GPU 的实际执行时间。torch.cuda.Event 是插入 stream 中的 GPU 标记,适合
测量一段 GPU 工作在 stream 时间线上的耗时。
4.1 计时一个 CUDA 操作
import torch
device = torch.device("cuda:0")x = torch.randn(4096, 4096, device=device)
# 首次运行可能包含 CUDA context 初始化、cuBLAS 算法选择等额外成本,先 warm up。for _ in range(5): _ = x @ x
start = torch.cuda.Event(enable_timing=True)end = torch.cuda.Event(enable_timing=True)
start.record() # 记录到当前 streamfor _ in range(20): y = x @ xend.record()
# end.record() 也是异步提交;读取 elapsed_time 前等待 GPU 到达 end。end.synchronize()elapsed_ms = start.elapsed_time(end)print(f"平均矩阵乘时间: {elapsed_ms / 20:.3f} ms")4.2 用 Event 表示精细的 stream 依赖
如果一个 stream 只需要等待另一个 stream 的某个阶段,而不是等待其所有已提交 工作,可以记录 event 并等待这个 event。
import torch
device = torch.device("cuda:0")load_stream = torch.cuda.Stream(device=device)work_stream = torch.cuda.Stream(device=device)ready = torch.cuda.Event()
host_x = torch.randn((1024, 1024), pin_memory=True)
with torch.cuda.stream(load_stream): x = host_x.to(device, non_blocking=True) ready.record(load_stream) # event 表示 x 的 copy 已完成
with torch.cuda.stream(work_stream): work_stream.wait_event(ready) # 只等待 ready 之前的 load_stream 工作 y = torch.relu(x @ x) y.record_stream(work_stream)
torch.cuda.current_stream(device).wait_stream(work_stream)print(y.norm().item())4.3 Event 可以复用,但会覆盖“当前记录点”
Event 不是一次性对象。对同一个 torch.cuda.Event 多次调用 record() 是允许
的,但后一次记录会覆盖 Event 当前保存的完成状态。之后调用 query() 或
synchronize(),观察的是它最近一次被记录的位置。
import torch
stream = torch.cuda.Stream()checkpoint = torch.cuda.Event()
with torch.cuda.stream(stream): a = torch.randn(1024, device="cuda") checkpoint.record(stream) # 第一次记录:a 的初始化完成点
b = torch.relu(a) checkpoint.record(stream) # 第二次记录:覆盖为 b 的计算完成点
# 此时 checkpoint.synchronize() 等待的是第二次 record,即 b 完成。# 它不再提供“只等到 a 初始化完成”为止的单独检查点。checkpoint.synchronize()print(b[:4])可以把同一个 Event 想象成一张可以重复移动的书签:record() 把书签放在某个
stream 当前已经提交的工作之后;再次 record() 会把书签移动到新的位置。如果
需要同时保留“章节 A 的结束位置”和“章节 B 的结束位置”,就必须使用两张书签,
也就是两个 Event。
有一个细节尤其重要:重新记录 Event 不会回溯修改已经提交到其他 stream 的
等待命令。wait_event() 被调用时,会把“等待此刻 Event 所代表的记录点”加入
消费 stream 的队列;随后复用该 Event,只影响之后新提交的等待。
import torch
producer = torch.cuda.Stream()consumer = torch.cuda.Stream()ready = torch.cuda.Event()
with torch.cuda.stream(producer): first = torch.randn(1024, device="cuda") ready.record(producer) # 记录点 A:first 已就绪
with torch.cuda.stream(consumer): consumer.wait_event(ready) # 这次等待绑定到记录点 A first_output = first.relu()
with torch.cuda.stream(producer): second = torch.randn(1024, device="cuda") ready.record(producer) # 覆盖当前状态为记录点 B:second 已就绪
with torch.cuda.stream(consumer): consumer.wait_event(ready) # 新的等待绑定到记录点 B second_output = second.relu()这段代码中,第一次 wait_event(ready) 仍等待 first 就绪,不会因为后面将
ready 重录到 second 之后,就被改写成等待 second。因此,顺序生产、
顺序提交消费依赖的循环可以复用一个 Event;如果调用方还需要查询、同步或计时
先前的多个记录点,就不能过早覆盖它们。
4.4 什么时候需要多个 Event
判断标准不是“有多少个 CUDA 操作”,而是:
同一时刻需要保留多少个独立且仍然有意义的检查点,就需要多少个 Event。
最明显的例子是计时。要测量一段工作耗时,必须同时保留起点和终点,因此需要
start 与 end 两个启用了 timing 的 Event;用同一个 Event 连续记录两次会
丢掉起点。
对于多缓冲流水线,每个正在传输或等待使用的 slot 通常也需要自己的 ready Event。下面是双缓冲的数据预取轮廓:
import torch
device = torch.device("cuda:0")copy_stream = torch.cuda.Stream(device=device)compute_stream = torch.cuda.Stream(device=device)
buffers = [ torch.empty((256, 1024), device=device), torch.empty((256, 1024), device=device),]ready = [ torch.cuda.Event(), # slot 0 数据就绪标记,不需要计时 torch.cuda.Event(), # slot 1 数据就绪标记]consumed = [ torch.cuda.Event(), # slot 0 已被计算读取完毕 torch.cuda.Event(), # slot 1 已被计算读取完毕]
# 假设 cpu_batches 的 Tensor 来自 pin_memory=True 的 DataLoader。for step, cpu_batch in enumerate(cpu_batches): slot = step % 2
with torch.cuda.stream(copy_stream): if step >= 2: # 再次写入同一个 slot 前,先等待两轮之前的计算不再读取它。 copy_stream.wait_event(consumed[slot]) buffers[slot].copy_(cpu_batch, non_blocking=True) ready[slot].record(copy_stream) # 只覆盖当前将被复用的 buffer slot
with torch.cuda.stream(compute_stream): compute_stream.wait_event(ready[slot]) output = model(buffers[slot]) consumed[slot].record(compute_stream)这里使用两个 ready Event,不是因为一次传输必须配一个永久新 Event,而是因为
两个 buffer 可以处于不同的在途阶段。consumed 则表示每个 slot 什么时候可以
被下一次传输安全覆盖:否则即使 ready 安排正确,写入缓冲区本身也会与上一轮
计算的读取产生竞争。
相反,如果多个 consumer 都只需要等待同一份输入就绪,一个 ready Event 就足够:
ready = torch.cuda.Event()
with torch.cuda.stream(producer): x = load_to_gpu() ready.record(producer)
consumer_a.wait_event(ready) # 两个 consumer 等待同一个检查点consumer_b.wait_event(ready)4.5 Event 的开销:适合阶段边界,不适合无节制打点
Event 是轻量级同步标记,但不是免费的。首先区分两种用途:
ready = torch.cuda.Event() # enable_timing=False:只做依赖同步start = torch.cuda.Event(enable_timing=True) # 需要记录时间戳end = torch.cuda.Event(enable_timing=True)只需要 stream 依赖时,默认的非计时 Event 已经足够;只有测量 GPU 时间时才启用
enable_timing=True。直观上:
- 在一次 batch 传输完成处、一次较大计算区域两侧放少量 Event,通常是合理用法;
- 为每一个极小 kernel 前后都插入 timing Event,会增加提交与记录工作,也可能 明显扰动要测量的短操作;
event.synchronize()最显眼的性能影响通常不是标记本身,而是 CPU 在该位置 停下来等待此前尚未完成的 GPU 工作;- Event 的具体开销会随 GPU、驱动、CUDA 版本和负载变化,不能用一个固定数字 替代实际测试。
可以在自己的设备上用下面的代码获得数量级印象。它将同一个非计时 Event 重录 很多次,测量这些 marker 加入 GPU 时间线的总体开销:
import torch
iterations = 10_000marker = torch.cuda.Event() # 同步用 marker,不启用 timingstart = torch.cuda.Event(enable_timing=True)end = torch.cuda.Event(enable_timing=True)
torch.cuda.synchronize()
start.record()for _ in range(iterations): marker.record() # Event 可被连续复用end.record()
end.synchronize()total_ms = start.elapsed_time(end)print(f"{iterations} 次 record 总时间: {total_ms:.3f} ms")print(f"平均每次 record: {total_ms * 1000 / iterations:.3f} us")还可以用“只包围整个循环一次”和“包围每一个小操作”的方式对比 timing Event 密度对结果的影响:
import torch
x = torch.randn(1024, device="cuda")iterations = 1000
# 粗粒度计时:仅测整个工作区域。coarse_start = torch.cuda.Event(enable_timing=True)coarse_end = torch.cuda.Event(enable_timing=True)coarse_start.record()for _ in range(iterations): y = x + 1coarse_end.record()coarse_end.synchronize()coarse_ms = coarse_start.elapsed_time(coarse_end)
# 细粒度计时:在相同小操作之间加入大量计时 Event。marks = [ (torch.cuda.Event(enable_timing=True), torch.cuda.Event(enable_timing=True)) for _ in range(iterations)]fine_start = torch.cuda.Event(enable_timing=True)fine_end = torch.cuda.Event(enable_timing=True)fine_start.record()for before, after in marks: before.record() y = x + 1 after.record()fine_end.record()fine_end.synchronize()fine_ms = fine_start.elapsed_time(fine_end)
print(f"粗粒度计时总时间: {coarse_ms:.3f} ms")print(f"逐操作打点后的总时间: {fine_ms:.3f} ms")这个对比的目的不是得到可以套用到所有机器的常数,而是确认 Event 的使用粒度: 它非常适合作为流水线阶段边界和较大性能区域的标记;对于细碎 kernel,应先明确 是否真的需要逐个同步或逐个计时。
5. 从 Python 调用 C++:先使用 ATen 已有 CUDA 算子
ATen 是 A Tensor Library 的缩写,是 PyTorch 底层的 C++ Tensor 与
数学运算库。它提供 C++ 侧的 at::Tensor 类型,以及 at::relu、
at::empty_like、at::matmul 等算子接口。Python 层调用的许多 torch.*
运算,最终都会进入 PyTorch 的 operator/dispatcher 体系,再根据 Tensor 的
device、dtype、autograd、autocast 等 dispatch key 选择具体实现。
可以把这一调用关系简化为:
Python: torch.relu(x) ↓PyTorch dispatcher:检查 device、dtype、autograd 等信息 ↓ATen/operator 的 CPU kernel、CUDA kernel 或其他 backend 实现因此,在 C++ 扩展中调用 ATen 操作并不表示“回到 CPU 计算”:输入为 CUDA
Tensor 时,at::relu 和逐元素算术会派发到已有的 CUDA 实现;输入为 CPU
Tensor 时,则会使用 CPU 实现。教程和旧式 pybind 扩展中也常写
torch::Tensor/torch::relu,对常规扩展编写而言,它们是面向 PyTorch 的
对应 C++ 入口;下文注册底层 custom operator 时使用 at::Tensor,以明确表示
依赖 ATen/libtorch 接口。
自定义 C++ 扩展不等于必须立刻写 CUDA kernel。先使用 ATen 组合已有运算,适合 以下场景:
- 需要复用已有 C++ 库或把部分控制逻辑放到 C++;
- 希望先验证接口和正确性,再决定是否融合成自定义 kernel;
- 计算可以由已有 PyTorch 算子高效表达。
下面使用 torch.utils.cpp_extension.load_inline 即时编译一个扩展。第一次执行会
编译并缓存动态库,需要本机有可用 C++ 编译器。
import torchfrom torch.utils.cpp_extension import load_inline
cpp_source = r"""#include <ATen/ATen.h>#include <torch/extension.h>
// at::Tensor 和 at::relu 来自 ATen;输入在 CUDA 上时,运算派发到 CUDA 实现。at::Tensor scale_relu(const at::Tensor& input, double scale) { TORCH_CHECK(input.is_cuda(), "scale_relu expects a CUDA tensor"); TORCH_CHECK(input.is_floating_point(), "input must be floating point"); return at::relu(input * scale + 1.0);}
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { m.def("scale_relu", &scale_relu, "scale then ReLU (ATen CUDA dispatch)");}"""
ext = load_inline( name="aten_cuda_call_example", cpp_sources=cpp_source, functions=None, # C++ 源码已经显式写了 PYBIND11_MODULE with_cuda=False, # 没有自行编译 .cu 文件;仍然可以处理 CUDA Tensor extra_cflags=["-O3"], verbose=True,)
x = torch.tensor([-2.0, 0.0, 3.0], device="cuda")y = ext.scale_relu(x, 2.0)torch.testing.assert_close(y, torch.tensor([0.0, 1.0, 7.0], device="cuda"))print(y)这里的 C++ 函数复用了 PyTorch 已有的 ATen 运算,但没有保证把乘法、加法和 ReLU 三个操作融合为一个 CUDA kernel。ATen 的优势是容易正确复用现有能力; 如果目标是减少中间 Tensor、显存读写或 kernel launch 次数,或者实现 PyTorch 尚不存在的运算,就需要下一节的自定义 kernel,并通过 profile 验证收益。
6. C++ 端访问 Tensor 数据:指针属于哪里
这是写扩展时最容易踩坑的地方:
tensor.data_ptr<float>()返回底层存储的地址,但地址所在的设备由tensor.device()决定。CPU C++ 代码可以解引用 CPU Tensor 的地址; CUDA Tensor 的地址必须交给 CUDA kernel(或 CUDA API)在设备侧访问。
6.1 CPU Tensor:C++ 循环可以直接读取
下面的扩展要求 CPU、连续且 float32 的二维 Tensor,然后用 accessor 做带
维度信息的索引访问。
import torchfrom torch.utils.cpp_extension import load_inline
cpp_source = r"""#include <torch/extension.h>
double sum_rows_cpu(torch::Tensor input) { TORCH_CHECK(input.device().is_cpu(), "input must be on CPU"); TORCH_CHECK(input.scalar_type() == torch::kFloat32, "input must be float32"); TORCH_CHECK(input.dim() == 2, "input must be a matrix");
// contiguous() 保证逻辑相邻元素也在内存中连续排列。 auto x = input.contiguous(); auto view = x.accessor<float, 2>();
double result = 0.0; for (int64_t row = 0; row < view.size(0); ++row) { for (int64_t col = 0; col < view.size(1); ++col) { result += view[row][col]; // CPU 地址可以在主机代码中直接读取 } } return result;}
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) { m.def("sum_rows_cpu", &sum_rows_cpu);}"""
ext = load_inline( name="cpu_tensor_access_example", cpp_sources=cpp_source, extra_cflags=["-O3"], verbose=True,)
x = torch.arange(12, dtype=torch.float32).reshape(3, 4)assert ext.sum_rows_cpu(x) == x.sum().item()6.2 CUDA Tensor:主机只取得地址,kernel 才解引用
在 CUDA 扩展中最常见的形式是:
// x 是 CUDA Tensor;这里只是取出设备地址作为 kernel 参数。const float* x_ptr = x.data_ptr<float>();float* out_ptr = out.data_ptr<float>();
// __global__ kernel 在 GPU 上运行,因此可以访问 x_ptr/out_ptr。my_kernel<<<blocks, threads, 0, stream>>>(x_ptr, out_ptr, x.numel());不要在普通 C++ 函数中写 x.data_ptr<float>()[0] 来读取 CUDA Tensor;那个
指针不属于 CPU 可直接访问的内存空间。
还要注意两个前提:
data_ptr<float>()必须与 Tensor 实际dtype匹配;- 自定义按线性内存读取的 kernel 通常应要求
contiguous(),或者自行处理 stride。
7. 编写并调用一个自定义 CUDA Kernel
这一节实现 SiLU(x) = x * sigmoid(x) 的 float32 CUDA 算子。虽然 PyTorch
本身已经提供高质量的 SiLU 实现,但这个运算足够简单,适合观察完整扩展结构。
官方当前推荐将 C++/CUDA 实现注册为 PyTorch custom operator。相比仅通过
pybind 暴露函数,这样 torch.ops、autograd 和 torch.compile 能明确认识到
这个运算。
下面为了突出 Tensor、stream 与 kernel 的关系,使用常见的 ATen 接口编写即时 编译示例。若要把扩展打包发布给多个 PyTorch 版本使用,PyTorch 2.10 及更高版本 还提供了 LibTorch Stable ABI 方案,应按官方教程改用其稳定接口。
项目目录可以如下组织:
cuda_notes_ext/├── ops.cpp├── silu_kernel.cu└── build_and_test.py7.1 C++ 注册入口:ops.cpp
#include <ATen/ATen.h>#include <torch/library.h>
// 真正实现位于 silu_kernel.cu 中。at::Tensor silu_cuda(const at::Tensor& x);
// 定义 Python 可见的算子 schema:torch.ops.cuda_notes.silu(x)。TORCH_LIBRARY(cuda_notes, m) { m.def("silu(Tensor x) -> Tensor");}
// 当输入来自 CUDA backend 时,将调用绑定到 silu_cuda。TORCH_LIBRARY_IMPL(cuda_notes, CUDA, m) { m.impl("silu", &silu_cuda);}这里只注册了 CUDA 实现,因此传入 CPU Tensor 会得到“没有 CPU backend 实现”的报错。这比静默搬运数据更容易发现错误,也符合一个 CUDA 学习算子的定位。
7.2 CUDA 实现:silu_kernel.cu
#include <ATen/ATen.h>#include <c10/util/Exception.h>#include <c10/cuda/CUDAGuard.h>#include <c10/cuda/CUDAException.h>#include <c10/cuda/CUDAStream.h>
// 每个 CUDA thread 处理一个 float 元素。__global__ void silu_forward_kernel( const float* input, float* output, int64_t numel) { const int64_t index = static_cast<int64_t>(blockIdx.x) * blockDim.x + threadIdx.x;
if (index < numel) { const float value = input[index]; const float sigmoid = 1.0f / (1.0f + expf(-value)); output[index] = value * sigmoid; }}
at::Tensor silu_cuda(const at::Tensor& input) { TORCH_CHECK(input.is_cuda(), "silu expects a CUDA tensor"); TORCH_CHECK( input.scalar_type() == at::kFloat, "this learning kernel currently supports float32 only");
// 本 kernel 按一维连续下标读写;非连续输入先生成连续副本。 auto x = input.contiguous(); auto output = at::empty_like(x);
if (x.numel() == 0) { return output; }
// 在输入所在 device 上启动 kernel,避免多 GPU 程序中启动到错误设备。 const c10::cuda::CUDAGuard device_guard(x.device());
constexpr int threads = 256; const int blocks = static_cast<int>((x.numel() + threads - 1) / threads);
// 使用 PyTorch 当前 stream,而不是擅自使用 default stream。 // 这样 kernel 能正确参与 Python 侧的 stream 依赖关系。 const auto stream = c10::cuda::getCurrentCUDAStream(x.get_device()); silu_forward_kernel<<<blocks, threads, 0, stream.stream()>>>( x.data_ptr<float>(), output.data_ptr<float>(), x.numel());
// 启动配置错误等问题会在离 kernel 最近的位置抛出,更方便调试。 C10_CUDA_KERNEL_LAUNCH_CHECK(); return output;}这里有三个非常重要的实现细节:
- 用
CUDAGuard选择输入所在 GPU,而不是假定永远只有cuda:0; - 用
getCurrentCUDAStream()启动 kernel,尊重调用者当前的 PyTorch stream; - 用
C10_CUDA_KERNEL_LAUNCH_CHECK()尽早发现 kernel launch 错误。
7.3 编译、加载和验证:build_and_test.py
torch.utils.cpp_extension.load 会调用本机编译器和 nvcc,生成动态库并加载
注册信息。is_python_module=False 表示这不是 pybind Python 模块,而是加载后
通过 torch.ops 调用的算子库。
from pathlib import Path
import torchimport torch.nn.functional as Ffrom torch.utils.cpp_extension import load
root = Path(__file__).parent
load( name="cuda_notes_ext", sources=[str(root / "ops.cpp"), str(root / "silu_kernel.cu")], extra_cflags=["-O3"], extra_cuda_cflags=["-O3"], is_python_module=False, # 加载 TORCH_LIBRARY 注册,而不是 import 扩展模块 verbose=True,)
# 让 FakeTensor/torch.compile 知道输出的 shape、dtype 和 device。@torch.library.register_fake("cuda_notes::silu")def _fake_silu(x): return torch.empty_like(x)
# 自定义 forward kernel 不会自动生成 backward;这里用已有 PyTorch 算子表达梯度。def _setup_context(ctx, inputs, output): (x,) = inputs ctx.save_for_backward(x)
def _backward(ctx, grad_output): (x,) = ctx.saved_tensors sigmoid = torch.sigmoid(x) grad_x = grad_output * (sigmoid + x * sigmoid * (1 - sigmoid)) return grad_x
torch.library.register_autograd( "cuda_notes::silu", _backward, setup_context=_setup_context,)
x = torch.randn(1024, device="cuda", dtype=torch.float32, requires_grad=True)y = torch.ops.cuda_notes.silu(x)reference = F.silu(x)torch.testing.assert_close(y, reference, rtol=1e-5, atol=1e-6)
# 同时验证自定义 backward 注册是否与参考实现一致。grad = torch.randn_like(y)custom_grad = torch.autograd.grad(y, x, grad, retain_graph=True)[0]reference_grad = torch.autograd.grad(reference, x, grad)[0]torch.testing.assert_close(custom_grad, reference_grad, rtol=1e-5, atol=1e-6)
# 注册 fake kernel 后,该运算可以作为 torch.compile 图中的 opaque custom op。compiled = torch.compile(lambda value: torch.ops.cuda_notes.silu(value))torch.testing.assert_close(compiled(x.detach()), F.silu(x.detach()))
print("forward, backward and torch.compile checks passed")一个生产级 kernel 还需要继续处理 float16/bfloat16、autocast、非连续布局的
性能策略、CPU fallback、测试矩阵和打包发布。学习阶段先把 device、stream、
dtype 和梯度契约写明确,比一开始追求复杂模板更重要。
8. 自定义 Kernel 与 Stream/Event 一起工作
因为上面的 CUDA 实现使用 当前 stream 启动 kernel,所以调用它时可以像普通 PyTorch CUDA 运算一样安排依赖和计时。
import torch
# 假设上一节已经加载过扩展。s = torch.cuda.Stream()ready = torch.cuda.Event()done = torch.cuda.Event(enable_timing=True)start = torch.cuda.Event(enable_timing=True)
x = torch.randn(1_000_000, device="cuda", dtype=torch.float32)ready.record() # 默认 stream 已经完成 x 的初始化时触发
with torch.cuda.stream(s): s.wait_event(ready) # 自定义 kernel 不能提前读取 x start.record(s) y = torch.ops.cuda_notes.silu(x) # kernel 将提交到 s done.record(s) y.record_stream(s)
done.synchronize()print(f"custom SiLU: {start.elapsed_time(done):.3f} ms")如果扩展硬编码到 default stream,上面的 with torch.cuda.stream(s) 语义就会
被破坏,并可能导致竞态条件。因此,“使用当前 stream”不是微小的性能细节,而是
扩展与 PyTorch 正确组合所需的接口契约。
9. 进一步的 CUDA 能力
9.1 Automatic Mixed Precision (AMP)
训练中常用 torch.autocast 让一部分运算使用 float16 或 bfloat16。本文的
kernel 明确只支持 float32,因此在 autocast 区域中不能假定它会自动支持半精度。
import torch
x = torch.randn(1024, device="cuda", dtype=torch.float32)
with torch.autocast(device_type="cuda", dtype=torch.float16): # 对只支持 float32 的学习 kernel,显式退出 autocast 或实现对应 dtype kernel。 with torch.autocast(device_type="cuda", enabled=False): y = torch.ops.cuda_notes.silu(x.float())真正要用于模型训练的扩展,应实现所需 dtype 并为 autocast 注册合理行为,而不是
把所有输入无条件转换为 float32,否则会引入额外传输和显存开销。
9.2 CUDA Graph
常规 eager 执行中,CPU 每次迭代都提交一批 kernel。模型很小、kernel 很碎时, 提交开销可能占比明显。CUDA Graph 可以捕获一段固定工作并反复 replay,减少 CPU 提交开销;代价是捕获阶段使用的内存地址和形状需要稳定。
import torch
model = torch.nn.Sequential( torch.nn.Linear(128, 256), torch.nn.ReLU(), torch.nn.Linear(256, 10),).cuda()
static_input = torch.randn(32, 128, device="cuda")
# capture 前预热,避免把懒初始化和首次分配放进捕获流程。warmup_stream = torch.cuda.Stream()warmup_stream.wait_stream(torch.cuda.current_stream())with torch.cuda.stream(warmup_stream): for _ in range(3): _ = model(static_input)torch.cuda.current_stream().wait_stream(warmup_stream)
graph = torch.cuda.CUDAGraph()with torch.cuda.graph(graph): static_output = model(static_input)
new_batch = torch.randn_like(static_input)static_input.copy_(new_batch) # 保持 static_input 的显存地址不变,只替换内容graph.replay()result = static_output # replay 后输出已更新自定义 operator 是否适合 graph capture,仍取决于内部是否在捕获阶段执行不允许的 操作,以及输入输出内存策略是否稳定。
10. 调试和性能验证清单
10.1 正确性优先
- 与纯 PyTorch 参考实现比较 forward 输出;
- 需要训练时比较 backward 梯度,并覆盖边界尺寸和空 Tensor;
- 检查不同 dtype、contiguous/non-contiguous 输入和多 GPU device 行为;
- 自定义运算可使用
torch.library.opcheck检查注册是否满足 PyTorch 子系统契约。
10.2 异步错误定位
CUDA kernel 错误可能因为异步执行而延后暴露,导致报错行看起来与根因无关。 调试阶段可以临时同步:
import os
# 应在 import torch 以及启动 CUDA 工作之前设置。os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
import torch在自定义 .cu 代码中保留 C10_CUDA_KERNEL_LAUNCH_CHECK(),可以把启动错误
尽量定位到自定义 kernel 调用附近。
10.3 性能测量
- 先 warm up,再用
torch.cuda.Event(enable_timing=True)计时; - 不要把频繁
item()、print()或无意的synchronize()算入稳定态吞吐; - 用 PyTorch Profiler 或 Nsight Systems 观察 copy 与 kernel 是否实际重叠;
- 自定义 kernel 不一定比 PyTorch 内置算子快:融合是否减少显存往返、launch 数量和中间 Tensor,必须通过 profile 验证。
总结:从“会用 GPU”到“理解 CUDA 边界”
PyTorch 的 CUDA API 可以按以下顺序学习:
- 先掌握 Tensor 的 device、显存和异步执行模型;
- 使用 pinned memory 与
non_blocking=True组织数据传输; - 使用 Stream 表达可并行任务,并用 Event 建立依赖和计时;
- 在 C++ 扩展中区分 CPU 地址与 CUDA 地址,始终遵守 dtype、布局和 device 契约;
- 用 custom operator 注册自定义 CUDA kernel,再补齐 autograd 与
torch.compile支持; - 最后再通过 AMP、CUDA Graph 和 profiler 解决真实工作负载中的性能问题。
最重要的原则是:CUDA 工作默认是异步的,跨 stream 的依赖不能靠直觉;自定义 kernel 也必须像 PyTorch 自带算子一样尊重当前 device、当前 stream 和 Tensor 契约。
参考资料
- PyTorch: CUDA semantics
- PyTorch:
torch.cudaAPI - PyTorch:
Tensor.item() - PyTorch:
Tensor.record_stream() - PyTorch:
torch.cuda.Event - PyTorch C++ API: ATen Tensor Library
- PyTorch: Custom C++ and CUDA Operators
- PyTorch C++ API: CUDA Streams
- PyTorch: C++ Extension utilities
- NVIDIA CUDA Runtime API: Event Management