7983 字
40 分钟
PyTorch中的CUDA API

PyTorch 把大部分 GPU 操作包装得和 CPU Tensor 一样简单:把 Tensor 放到 cuda 上,后面的算子就会自动派发到 CUDA 实现。但是,只要开始关心数据搬运、 并行执行、准确计时,或者需要自己写算子,就必须理解 PyTorch 与 CUDA 之间的边界。

本文是一份从 Python API 到 C++/CUDA 扩展的学习笔记,重点覆盖:

  • CUDA Tensor、设备和异步执行的基本模型;
  • pinned memory 与 CPU/GPU 数据传输;
  • CUDA Stream 的依赖管理与计算/拷贝重叠;
  • CUDA Event 的同步与性能计时;
  • 从 Python 调用 C++ 函数,以及 C++ 端如何访问 Tensor;
  • 注册一个包含自定义 CUDA kernel 的 PyTorch 运算;
  • autograd、torch.compile、CUDA Graph 和调试注意事项。

本文 API 依据 2026 年 5 月 25 日可访问的 PyTorch 官方文档整理。代码默认使用 NVIDIA GPU、支持 CUDA 的 PyTorch 以及可用的 CUDA Toolkit(编译扩展时需要 nvcc)。不同 PyTorch/CUDA 版本的编译器兼容性需要以实际环境为准。

1. 先建立正确的执行模型#

一段看似普通的 CUDA Tensor 代码,实际上涉及三层:

  1. Tensor 存储位置:数据是在 CPU 内存,还是某张 GPU 的显存中。
  2. 算子派发torch.addtorch.matmul 等算子根据 Tensor 的 device 选择 CPU 或 CUDA 实现。
  3. 主机与 CUDA 的异步关系:当 Tensor 位于 CUDA device 时,运行 Python/PyTorch 调用的 CPU 主机线程通常只是把 CUDA kernel 或拷贝任务提交 到一个 stream,而不是等待 GPU 立刻完成工作。

也就是说,下面的 y = x @ x 返回时,GPU 计算可能仍在进行中。只有 CPU 真正 需要结果,或者程序显式同步时,主机才会等待 GPU。

import torch
if not torch.cuda.is_available():
raise RuntimeError("本文示例需要支持 CUDA 的 PyTorch 环境")
device = torch.device("cuda:0")
print("GPU:", torch.cuda.get_device_name(device))
print("PyTorch 使用的 CUDA 版本:", torch.version.cuda)
# 数据直接在 GPU 上创建,避免先生成 CPU Tensor 再搬运。
x = torch.randn(4096, 4096, device=device)
y = x @ x # 向当前 CUDA stream 提交矩阵乘 kernel
# y.mean() 仍返回单元素 CUDA Tensor,其计算通常仍是异步提交的。
# item() 把单元素 Tensor 取为 Python 数值;CPU 需要真实结果,因此会等待 GPU。
mean_value = y.mean().item()
print(mean_value)

item() 只能用于单元素 Tensor,它返回的是 Python floatint,而不是 Tensor,因此该取值操作本身不能参与 autograd。对 CUDA Tensor 调用 item() 时,主机必须等到生成该元素的 CUDA 工作完成,并取得结果。训练循环中频繁执行 loss.item()print(loss) 会插入 CPU/GPU 等待,日志通常应降低频率。

1.1 算子派发不等于执行方式相同#

“根据 device 派发”只表示 PyTorch 选择哪个 backend 的实现,不表示 CPU 与 CUDA 算子具有同样的等待行为。

对于普通 eager 模式的 CPU Tensor 算子,从 Python 调用方看通常是同步的: 函数返回时,输出已经可以被后续 CPU 代码读取。算子内部可能使用 MKL、OpenMP 等多线程并行计算,但这不是 CUDA stream 意义上的异步提交。

import torch
cpu_x = torch.randn(4096, 4096) # CPU Tensor
cpu_y = cpu_x @ cpu_x # 返回时,cpu_y 已经能被 CPU 正确访问
print(cpu_y[0, 0])

对于 CUDA Tensor 算子,只要输入和结果仍留在 GPU 上,大多数计算操作会 相对于 CPU 主机线程异步提交到当前 stream。同一 stream 仍然保证先后顺序: relu 不会越过生成 cuda_y 的矩阵乘法提前读取结果。

import torch
cuda_x = torch.randn(4096, 4096, device="cuda")
cuda_y = cuda_x @ cuda_x # 通常异步提交矩阵乘 kernel
cuda_z = torch.relu(cuda_y) # 同一 stream 上排在矩阵乘之后
cuda_loss = cuda_z.mean() # 结果仍是 CUDA Tensor,通常仍可异步提交
value = cuda_loss.item() # 变为 Python float 时,CPU 才必须等待结果

常见操作的主机侧行为可以概括如下:

操作相对 CPU 主机线程的通常行为原因
CPU Tensor 上的普通 eager 算子同步返回后 CPU 输出可以直接读取
x_cuda + 1x_cuda @ w_cudarelu(x_cuda)异步提交输出仍留在 GPU 上
loss_cuda.backward()通常异步提交反向 CUDA kernel 被排入 stream
x_cuda.sum()通常异步提交结果仍是 CUDA Tensor
x_cuda.sum().item()会等待 GPUPython 需要真实标量值
print(x_cuda)通常会等待 GPU显示元素需要观察实际数据
x_cuda.cpu()默认表现为同步返回可由 CPU 安全读取的 Tensor
torch.cuda.synchronize()显式同步调用者要求等待已提交的 CUDA 工作

因此,不应笼统地表述为“GPU 上所有操作都异步”。更准确的说法是:CUDA Tensor 上的大多数计算会异步提交;当 CPU 要观察 CUDA 结果、默认同步传输发生,或者 程序显式同步时,主机线程会等待 GPU。

1.2 device 不等于“当前设备”#

Tensor 创建后,其 device 已经固定。切换当前设备会影响后续创建的 Tensor, 不会把已有 Tensor 自动搬走。

import torch
with torch.cuda.device(0):
a = torch.ones(4, device="cuda") # 等价于 cuda:0
if torch.cuda.device_count() >= 2:
with torch.cuda.device(1):
b = torch.ones(4, device="cuda") # 位于 cuda:1
# 普通逐元素运算不能默认跨 GPU 混算,需要显式把数据移动到同一 device。
b_on_0 = b.to(a.device)
c = a + b_on_0
print(c.device) # cuda:0

1.3 显存分配与缓存分配器#

PyTorch 会缓存释放后的显存,以便下次快速复用。因此 nvidia-smi 中看到的 显存占用不一定等于当前 Tensor 真正使用的显存。

import torch
device = torch.device("cuda:0")
torch.cuda.reset_peak_memory_stats(device)
x = torch.empty((8192, 8192), dtype=torch.float32, device=device)
used = torch.cuda.memory_allocated(device) / 1024**2
reserved = torch.cuda.memory_reserved(device) / 1024**2
peak = torch.cuda.max_memory_allocated(device) / 1024**2
print(f"Tensor 正在占用: {used:.1f} MiB")
print(f"缓存分配器保留: {reserved:.1f} MiB")
print(f"峰值实际占用: {peak:.1f} MiB")
del x
# 删除 Tensor 后 allocated 会下降,但 reserved 可能仍保留以便复用。
print(torch.cuda.memory_allocated(device) / 1024**2)

训练时通常应该关注 max_memory_allocated() 是否超预算,而不是频繁调用 torch.cuda.empty_cache();后者不能释放仍被 Tensor 引用的内存,也可能让后续 分配变慢。

2. 数据传输:CPU、Pinned Memory 与 GPU#

GPU 计算之前,输入往往来自 CPU。最直接的方式是:

import torch
cpu_x = torch.randn(1024, 1024) # 普通 pageable CPU memory
gpu_x = cpu_x.to("cuda") # 默认情况下,调用方等待传输完成
cpu_y = gpu_x.cpu() # 将结果取回 CPU

这可以正确运行,但不一定能充分利用 GPU。要理解优化点,需要区分两类 CPU 内存:

CPU 内存类型特点常见使用方式
pageable memory默认普通内存,操作系统可以换页临时数据、小规模传输
pinned/page-locked memory页面被锁定,CUDA 能更高效地 DMA 读取DataLoader batch、异步 H2D 传输

2.1 Pinned memory 与 non_blocking=True#

从 pinned CPU memory 复制到 GPU 时,non_blocking=True 允许主机不等待该次 拷贝完成,从而有机会将传输与其他 GPU 计算重叠。

import torch
device = torch.device("cuda:0")
# pin_memory=True 创建 page-locked 的 CPU 缓冲区。
cpu_batch = torch.randn((256, 1024), pin_memory=True)
assert cpu_batch.is_pinned()
# 异步提交 Host-to-Device (H2D) copy。
gpu_batch = cpu_batch.to(device, non_blocking=True)
# 如果下一条 GPU 算子使用同一个当前 stream,stream 顺序会保证:
# copy 完成之后才执行矩阵乘法,不需要在这里手动 synchronize。
weight = torch.randn((1024, 2048), device=device)
output = gpu_batch @ weight
# 只有在 CPU 确实要读取最终结果时才同步。
loss = output.square().mean()
print(loss.item())

实际训练数据一般由 DataLoader 提供 pinned batch:

import torch
from torch.utils.data import DataLoader, TensorDataset
dataset = TensorDataset(torch.randn(10_000, 128), torch.randint(0, 10, (10_000,)))
loader = DataLoader(
dataset,
batch_size=256,
shuffle=True,
num_workers=4,
pin_memory=True, # worker 产出的 batch 会被放到 pinned memory
persistent_workers=True,
)
model = torch.nn.Linear(128, 10).cuda()
for features, labels in loader:
features = features.cuda(non_blocking=True)
labels = labels.cuda(non_blocking=True)
logits = model(features)
loss = torch.nn.functional.cross_entropy(logits, labels)
loss.backward()

Pinned memory 不是越多越好:它会占用无法被轻易换出的主机内存。适合固定数量的 batch 缓冲区,不适合无界缓存整个大型数据集。

2.2 GPU 到 CPU 的异步复制要特别小心#

non_blocking=True 也可以提交 Device-to-Host (D2H) copy,但 CPU 读取返回 Tensor 前必须确认传输完成。

import torch
x = torch.arange(8, device="cuda", dtype=torch.float32)
# 准备 pinned CPU 输出缓冲区,再异步复制 GPU 数据。
host_out = torch.empty_like(x, device="cpu", pin_memory=True)
host_out.copy_(x, non_blocking=True)
# 此刻 copy 可能仍在进行;直接由 CPU 使用 host_out 存在读到未完成数据的风险。
torch.cuda.current_stream().synchronize()
print(host_out.tolist()) # 同步后才安全

3. CUDA Stream:管理异步工作队列#

CUDA Stream 可以理解成某张 GPU 上的一个有序任务队列:

  • 同一 stream 内的操作按提交顺序执行;
  • 不同 stream 的操作没有天然先后关系,硬件资源足够时可能并行;
  • 一个 stream 要使用另一个 stream 产生的数据时,必须声明依赖。

PyTorch 默认把操作提交到当前 device 的默认 stream。简单模型训练通常不用手写 stream;数据流水线、多个独立分支或自定义 CUDA 调度时才需要它。

3.1 一个错误示例:跨 stream 使用未完成的数据#

import torch
device = torch.device("cuda:0")
s = torch.cuda.Stream(device=device)
a = torch.empty((4096, 4096), device=device)
a.normal_() # 默认 stream 上写入 a
with torch.cuda.stream(s):
# 错误思路:s 并不知道默认 stream 何时完成 normal_()。
# b = a.sum() # 可能在 a 写完前读取 a
pass

正确做法是让消费方 stream 等待生产方 stream。

import torch
device = torch.device("cuda:0")
producer = torch.cuda.current_stream(device)
consumer = torch.cuda.Stream(device=device)
a = torch.empty((4096, 4096), device=device).normal_() # producer 写 a
consumer.wait_stream(producer) # consumer 此后的任务等待 producer 当前工作
with torch.cuda.stream(consumer):
b = a.sum() # 现在读取依赖正确
# a 创建于 producer,却正在被 consumer 异步读取。
# 记录这次使用,防止 a 被释放后其显存过早被缓存分配器复用。
a.record_stream(consumer)
# 如果默认 stream 后续需要使用 b,也要等待 consumer。
producer.wait_stream(consumer)
print(b.item())

这里的两个 API 负责不同问题:

  • consumer.wait_stream(producer) 负责执行依赖consumer 不能在 producer 写完 a 以前读取它。
  • a.record_stream(consumer) 负责显存生命周期:它告诉 CUDA caching allocator,虽然 a 原本关联于 producer,其存储还会被 consumer 上已 提交的任务使用。在这些任务完成前,即使 Python 侧释放了 a,这块存储也 不能被分配给其他 Tensor 覆盖。

这里需要记录的是输入 a,因为它跨 stream 被读取。结果 b 是在 consumer 上产生的,本例不需要用 b.record_stream(consumer) 来表达这个 问题。在这段短示例中,a 的 Python 引用实际保留到了同步之后; record_stream() 展示的是把此代码抽入函数、提前释放输入或复用缓冲区时仍需 保留的跨 stream 使用契约。

为了观察 record_stream() 防止的风险,可以看一个 Tensor 引用可能提前消失的 场景:

import torch
consumer = torch.cuda.Stream()
producer = torch.cuda.current_stream()
a = torch.randn(1_000_000, device="cuda") # producer 上产生 a
consumer.wait_stream(producer)
with torch.cuda.stream(consumer):
b = a * 2
a.record_stream(consumer) # consumer 使用结束前保护 a 的存储
del a # Python 变量可以删除
replacement = torch.empty(1_000_000, device="cuda")
# 有 record_stream 后,缓存分配器不会在 consumer 仍读取旧 a 时,
# 把同一块显存过早复用给 replacement。

另一种做法是由程序员自行通过 stream/event 同步保证 a 在释放前已经不再被 side stream 使用。不过,这需要更严格地管理引用释放位置,并可能过早阻塞可并行 工作。跨 stream 传递中间 Tensor 时,record_stream() 是更直观的生命周期声明。

3.2 用独立 stream 重叠数据拷贝和计算#

下面演示一个简化的 double buffering 流水线:copy stream 预取下一批数据, 默认 stream 计算当前批。实际能否加速,取决于模型计算量、PCIe/NVLink 带宽以及 GPU 是否支持所需并发。

import torch
device = torch.device("cuda:0")
copy_stream = torch.cuda.Stream(device=device)
compute_stream = torch.cuda.current_stream(device)
weight = torch.randn(1024, 1024, device=device)
# 模拟来自 DataLoader 的 pinned CPU batches。
cpu_batches = [
torch.randn((512, 1024), pin_memory=True)
for _ in range(6)
]
def preload(cpu_batch: torch.Tensor) -> torch.Tensor:
"""在 copy stream 上异步把一个 pinned batch 提交到 GPU。"""
with torch.cuda.stream(copy_stream):
gpu_batch = cpu_batch.to(device, non_blocking=True)
# gpu_batch 离开上下文后仍会在 copy_stream 上被 copy 使用。
gpu_batch.record_stream(copy_stream)
return gpu_batch
next_batch = preload(cpu_batches[0])
for index in range(len(cpu_batches)):
current_batch = next_batch
# 这里只等待当前 batch 先前已经提交的 copy。
# 等待插入 compute stream 后,再提交下一批 copy,才能与本轮计算重叠。
compute_stream.wait_stream(copy_stream)
if index + 1 < len(cpu_batches):
# CPU 提前提交下一批 H2D copy。
next_batch = preload(cpu_batches[index + 1])
output = current_batch @ weight
loss = output.square().mean()
# current_batch 的最后使用发生在 compute stream 上。
current_batch.record_stream(compute_stream)
print(index, loss.item()) # 演示输出;真实训练不要每步 item() 同步

上面为了展示结果每轮调用了 item(),它会造成 CPU/GPU 同步;性能测试时应降低 日志频率,或者先累计 GPU 上的统计量。

4. CUDA Event:建立依赖与准确计时#

Python 的 time.perf_counter() 只测到 CPU 提交 kernel 的时间,不能直接表示 GPU 的实际执行时间。torch.cuda.Event 是插入 stream 中的 GPU 标记,适合 测量一段 GPU 工作在 stream 时间线上的耗时。

4.1 计时一个 CUDA 操作#

import torch
device = torch.device("cuda:0")
x = torch.randn(4096, 4096, device=device)
# 首次运行可能包含 CUDA context 初始化、cuBLAS 算法选择等额外成本,先 warm up。
for _ in range(5):
_ = x @ x
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
start.record() # 记录到当前 stream
for _ in range(20):
y = x @ x
end.record()
# end.record() 也是异步提交;读取 elapsed_time 前等待 GPU 到达 end。
end.synchronize()
elapsed_ms = start.elapsed_time(end)
print(f"平均矩阵乘时间: {elapsed_ms / 20:.3f} ms")

4.2 用 Event 表示精细的 stream 依赖#

如果一个 stream 只需要等待另一个 stream 的某个阶段,而不是等待其所有已提交 工作,可以记录 event 并等待这个 event。

import torch
device = torch.device("cuda:0")
load_stream = torch.cuda.Stream(device=device)
work_stream = torch.cuda.Stream(device=device)
ready = torch.cuda.Event()
host_x = torch.randn((1024, 1024), pin_memory=True)
with torch.cuda.stream(load_stream):
x = host_x.to(device, non_blocking=True)
ready.record(load_stream) # event 表示 x 的 copy 已完成
with torch.cuda.stream(work_stream):
work_stream.wait_event(ready) # 只等待 ready 之前的 load_stream 工作
y = torch.relu(x @ x)
y.record_stream(work_stream)
torch.cuda.current_stream(device).wait_stream(work_stream)
print(y.norm().item())

4.3 Event 可以复用,但会覆盖“当前记录点”#

Event 不是一次性对象。对同一个 torch.cuda.Event 多次调用 record() 是允许 的,但后一次记录会覆盖 Event 当前保存的完成状态。之后调用 query()synchronize(),观察的是它最近一次被记录的位置。

import torch
stream = torch.cuda.Stream()
checkpoint = torch.cuda.Event()
with torch.cuda.stream(stream):
a = torch.randn(1024, device="cuda")
checkpoint.record(stream) # 第一次记录:a 的初始化完成点
b = torch.relu(a)
checkpoint.record(stream) # 第二次记录:覆盖为 b 的计算完成点
# 此时 checkpoint.synchronize() 等待的是第二次 record,即 b 完成。
# 它不再提供“只等到 a 初始化完成”为止的单独检查点。
checkpoint.synchronize()
print(b[:4])

可以把同一个 Event 想象成一张可以重复移动的书签:record() 把书签放在某个 stream 当前已经提交的工作之后;再次 record() 会把书签移动到新的位置。如果 需要同时保留“章节 A 的结束位置”和“章节 B 的结束位置”,就必须使用两张书签, 也就是两个 Event。

有一个细节尤其重要:重新记录 Event 不会回溯修改已经提交到其他 stream 的 等待命令wait_event() 被调用时,会把“等待此刻 Event 所代表的记录点”加入 消费 stream 的队列;随后复用该 Event,只影响之后新提交的等待。

import torch
producer = torch.cuda.Stream()
consumer = torch.cuda.Stream()
ready = torch.cuda.Event()
with torch.cuda.stream(producer):
first = torch.randn(1024, device="cuda")
ready.record(producer) # 记录点 A:first 已就绪
with torch.cuda.stream(consumer):
consumer.wait_event(ready) # 这次等待绑定到记录点 A
first_output = first.relu()
with torch.cuda.stream(producer):
second = torch.randn(1024, device="cuda")
ready.record(producer) # 覆盖当前状态为记录点 B:second 已就绪
with torch.cuda.stream(consumer):
consumer.wait_event(ready) # 新的等待绑定到记录点 B
second_output = second.relu()

这段代码中,第一次 wait_event(ready) 仍等待 first 就绪,不会因为后面将 ready 重录到 second 之后,就被改写成等待 second。因此,顺序生产、 顺序提交消费依赖的循环可以复用一个 Event;如果调用方还需要查询、同步或计时 先前的多个记录点,就不能过早覆盖它们。

4.4 什么时候需要多个 Event#

判断标准不是“有多少个 CUDA 操作”,而是:

同一时刻需要保留多少个独立且仍然有意义的检查点,就需要多少个 Event。

最明显的例子是计时。要测量一段工作耗时,必须同时保留起点和终点,因此需要 startend 两个启用了 timing 的 Event;用同一个 Event 连续记录两次会 丢掉起点。

对于多缓冲流水线,每个正在传输或等待使用的 slot 通常也需要自己的 ready Event。下面是双缓冲的数据预取轮廓:

import torch
device = torch.device("cuda:0")
copy_stream = torch.cuda.Stream(device=device)
compute_stream = torch.cuda.Stream(device=device)
buffers = [
torch.empty((256, 1024), device=device),
torch.empty((256, 1024), device=device),
]
ready = [
torch.cuda.Event(), # slot 0 数据就绪标记,不需要计时
torch.cuda.Event(), # slot 1 数据就绪标记
]
consumed = [
torch.cuda.Event(), # slot 0 已被计算读取完毕
torch.cuda.Event(), # slot 1 已被计算读取完毕
]
# 假设 cpu_batches 的 Tensor 来自 pin_memory=True 的 DataLoader。
for step, cpu_batch in enumerate(cpu_batches):
slot = step % 2
with torch.cuda.stream(copy_stream):
if step >= 2:
# 再次写入同一个 slot 前,先等待两轮之前的计算不再读取它。
copy_stream.wait_event(consumed[slot])
buffers[slot].copy_(cpu_batch, non_blocking=True)
ready[slot].record(copy_stream) # 只覆盖当前将被复用的 buffer slot
with torch.cuda.stream(compute_stream):
compute_stream.wait_event(ready[slot])
output = model(buffers[slot])
consumed[slot].record(compute_stream)

这里使用两个 ready Event,不是因为一次传输必须配一个永久新 Event,而是因为 两个 buffer 可以处于不同的在途阶段。consumed 则表示每个 slot 什么时候可以 被下一次传输安全覆盖:否则即使 ready 安排正确,写入缓冲区本身也会与上一轮 计算的读取产生竞争。

相反,如果多个 consumer 都只需要等待同一份输入就绪,一个 ready Event 就足够:

ready = torch.cuda.Event()
with torch.cuda.stream(producer):
x = load_to_gpu()
ready.record(producer)
consumer_a.wait_event(ready) # 两个 consumer 等待同一个检查点
consumer_b.wait_event(ready)

4.5 Event 的开销:适合阶段边界,不适合无节制打点#

Event 是轻量级同步标记,但不是免费的。首先区分两种用途:

ready = torch.cuda.Event() # enable_timing=False:只做依赖同步
start = torch.cuda.Event(enable_timing=True) # 需要记录时间戳
end = torch.cuda.Event(enable_timing=True)

只需要 stream 依赖时,默认的非计时 Event 已经足够;只有测量 GPU 时间时才启用 enable_timing=True。直观上:

  • 在一次 batch 传输完成处、一次较大计算区域两侧放少量 Event,通常是合理用法;
  • 为每一个极小 kernel 前后都插入 timing Event,会增加提交与记录工作,也可能 明显扰动要测量的短操作;
  • event.synchronize() 最显眼的性能影响通常不是标记本身,而是 CPU 在该位置 停下来等待此前尚未完成的 GPU 工作;
  • Event 的具体开销会随 GPU、驱动、CUDA 版本和负载变化,不能用一个固定数字 替代实际测试。

可以在自己的设备上用下面的代码获得数量级印象。它将同一个非计时 Event 重录 很多次,测量这些 marker 加入 GPU 时间线的总体开销:

import torch
iterations = 10_000
marker = torch.cuda.Event() # 同步用 marker,不启用 timing
start = torch.cuda.Event(enable_timing=True)
end = torch.cuda.Event(enable_timing=True)
torch.cuda.synchronize()
start.record()
for _ in range(iterations):
marker.record() # Event 可被连续复用
end.record()
end.synchronize()
total_ms = start.elapsed_time(end)
print(f"{iterations} 次 record 总时间: {total_ms:.3f} ms")
print(f"平均每次 record: {total_ms * 1000 / iterations:.3f} us")

还可以用“只包围整个循环一次”和“包围每一个小操作”的方式对比 timing Event 密度对结果的影响:

import torch
x = torch.randn(1024, device="cuda")
iterations = 1000
# 粗粒度计时:仅测整个工作区域。
coarse_start = torch.cuda.Event(enable_timing=True)
coarse_end = torch.cuda.Event(enable_timing=True)
coarse_start.record()
for _ in range(iterations):
y = x + 1
coarse_end.record()
coarse_end.synchronize()
coarse_ms = coarse_start.elapsed_time(coarse_end)
# 细粒度计时:在相同小操作之间加入大量计时 Event。
marks = [
(torch.cuda.Event(enable_timing=True), torch.cuda.Event(enable_timing=True))
for _ in range(iterations)
]
fine_start = torch.cuda.Event(enable_timing=True)
fine_end = torch.cuda.Event(enable_timing=True)
fine_start.record()
for before, after in marks:
before.record()
y = x + 1
after.record()
fine_end.record()
fine_end.synchronize()
fine_ms = fine_start.elapsed_time(fine_end)
print(f"粗粒度计时总时间: {coarse_ms:.3f} ms")
print(f"逐操作打点后的总时间: {fine_ms:.3f} ms")

这个对比的目的不是得到可以套用到所有机器的常数,而是确认 Event 的使用粒度: 它非常适合作为流水线阶段边界和较大性能区域的标记;对于细碎 kernel,应先明确 是否真的需要逐个同步或逐个计时。

5. 从 Python 调用 C++:先使用 ATen 已有 CUDA 算子#

ATenA Tensor Library 的缩写,是 PyTorch 底层的 C++ Tensor 与 数学运算库。它提供 C++ 侧的 at::Tensor 类型,以及 at::reluat::empty_likeat::matmul 等算子接口。Python 层调用的许多 torch.* 运算,最终都会进入 PyTorch 的 operator/dispatcher 体系,再根据 Tensor 的 device、dtype、autograd、autocast 等 dispatch key 选择具体实现。

可以把这一调用关系简化为:

Python: torch.relu(x)
PyTorch dispatcher:检查 device、dtype、autograd 等信息
ATen/operator 的 CPU kernel、CUDA kernel 或其他 backend 实现

因此,在 C++ 扩展中调用 ATen 操作并不表示“回到 CPU 计算”:输入为 CUDA Tensor 时,at::relu 和逐元素算术会派发到已有的 CUDA 实现;输入为 CPU Tensor 时,则会使用 CPU 实现。教程和旧式 pybind 扩展中也常写 torch::Tensor/torch::relu,对常规扩展编写而言,它们是面向 PyTorch 的 对应 C++ 入口;下文注册底层 custom operator 时使用 at::Tensor,以明确表示 依赖 ATen/libtorch 接口。

自定义 C++ 扩展不等于必须立刻写 CUDA kernel。先使用 ATen 组合已有运算,适合 以下场景:

  • 需要复用已有 C++ 库或把部分控制逻辑放到 C++;
  • 希望先验证接口和正确性,再决定是否融合成自定义 kernel;
  • 计算可以由已有 PyTorch 算子高效表达。

下面使用 torch.utils.cpp_extension.load_inline 即时编译一个扩展。第一次执行会 编译并缓存动态库,需要本机有可用 C++ 编译器。

import torch
from torch.utils.cpp_extension import load_inline
cpp_source = r"""
#include <ATen/ATen.h>
#include <torch/extension.h>
// at::Tensor 和 at::relu 来自 ATen;输入在 CUDA 上时,运算派发到 CUDA 实现。
at::Tensor scale_relu(const at::Tensor& input, double scale) {
TORCH_CHECK(input.is_cuda(), "scale_relu expects a CUDA tensor");
TORCH_CHECK(input.is_floating_point(), "input must be floating point");
return at::relu(input * scale + 1.0);
}
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
m.def("scale_relu", &scale_relu, "scale then ReLU (ATen CUDA dispatch)");
}
"""
ext = load_inline(
name="aten_cuda_call_example",
cpp_sources=cpp_source,
functions=None, # C++ 源码已经显式写了 PYBIND11_MODULE
with_cuda=False, # 没有自行编译 .cu 文件;仍然可以处理 CUDA Tensor
extra_cflags=["-O3"],
verbose=True,
)
x = torch.tensor([-2.0, 0.0, 3.0], device="cuda")
y = ext.scale_relu(x, 2.0)
torch.testing.assert_close(y, torch.tensor([0.0, 1.0, 7.0], device="cuda"))
print(y)

这里的 C++ 函数复用了 PyTorch 已有的 ATen 运算,但没有保证把乘法、加法和 ReLU 三个操作融合为一个 CUDA kernel。ATen 的优势是容易正确复用现有能力; 如果目标是减少中间 Tensor、显存读写或 kernel launch 次数,或者实现 PyTorch 尚不存在的运算,就需要下一节的自定义 kernel,并通过 profile 验证收益。

6. C++ 端访问 Tensor 数据:指针属于哪里#

这是写扩展时最容易踩坑的地方:

tensor.data_ptr<float>() 返回底层存储的地址,但地址所在的设备由 tensor.device() 决定。CPU C++ 代码可以解引用 CPU Tensor 的地址; CUDA Tensor 的地址必须交给 CUDA kernel(或 CUDA API)在设备侧访问。

6.1 CPU Tensor:C++ 循环可以直接读取#

下面的扩展要求 CPU、连续且 float32 的二维 Tensor,然后用 accessor 做带 维度信息的索引访问。

import torch
from torch.utils.cpp_extension import load_inline
cpp_source = r"""
#include <torch/extension.h>
double sum_rows_cpu(torch::Tensor input) {
TORCH_CHECK(input.device().is_cpu(), "input must be on CPU");
TORCH_CHECK(input.scalar_type() == torch::kFloat32, "input must be float32");
TORCH_CHECK(input.dim() == 2, "input must be a matrix");
// contiguous() 保证逻辑相邻元素也在内存中连续排列。
auto x = input.contiguous();
auto view = x.accessor<float, 2>();
double result = 0.0;
for (int64_t row = 0; row < view.size(0); ++row) {
for (int64_t col = 0; col < view.size(1); ++col) {
result += view[row][col]; // CPU 地址可以在主机代码中直接读取
}
}
return result;
}
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
m.def("sum_rows_cpu", &sum_rows_cpu);
}
"""
ext = load_inline(
name="cpu_tensor_access_example",
cpp_sources=cpp_source,
extra_cflags=["-O3"],
verbose=True,
)
x = torch.arange(12, dtype=torch.float32).reshape(3, 4)
assert ext.sum_rows_cpu(x) == x.sum().item()

6.2 CUDA Tensor:主机只取得地址,kernel 才解引用#

在 CUDA 扩展中最常见的形式是:

// x 是 CUDA Tensor;这里只是取出设备地址作为 kernel 参数。
const float* x_ptr = x.data_ptr<float>();
float* out_ptr = out.data_ptr<float>();
// __global__ kernel 在 GPU 上运行,因此可以访问 x_ptr/out_ptr。
my_kernel<<<blocks, threads, 0, stream>>>(x_ptr, out_ptr, x.numel());

不要在普通 C++ 函数中写 x.data_ptr<float>()[0] 来读取 CUDA Tensor;那个 指针不属于 CPU 可直接访问的内存空间。

还要注意两个前提:

  • data_ptr<float>() 必须与 Tensor 实际 dtype 匹配;
  • 自定义按线性内存读取的 kernel 通常应要求 contiguous(),或者自行处理 stride。

7. 编写并调用一个自定义 CUDA Kernel#

这一节实现 SiLU(x) = x * sigmoid(x)float32 CUDA 算子。虽然 PyTorch 本身已经提供高质量的 SiLU 实现,但这个运算足够简单,适合观察完整扩展结构。

官方当前推荐将 C++/CUDA 实现注册为 PyTorch custom operator。相比仅通过 pybind 暴露函数,这样 torch.ops、autograd 和 torch.compile 能明确认识到 这个运算。

下面为了突出 Tensor、stream 与 kernel 的关系,使用常见的 ATen 接口编写即时 编译示例。若要把扩展打包发布给多个 PyTorch 版本使用,PyTorch 2.10 及更高版本 还提供了 LibTorch Stable ABI 方案,应按官方教程改用其稳定接口。

项目目录可以如下组织:

cuda_notes_ext/
├── ops.cpp
├── silu_kernel.cu
└── build_and_test.py

7.1 C++ 注册入口:ops.cpp#

#include <ATen/ATen.h>
#include <torch/library.h>
// 真正实现位于 silu_kernel.cu 中。
at::Tensor silu_cuda(const at::Tensor& x);
// 定义 Python 可见的算子 schema:torch.ops.cuda_notes.silu(x)。
TORCH_LIBRARY(cuda_notes, m) {
m.def("silu(Tensor x) -> Tensor");
}
// 当输入来自 CUDA backend 时,将调用绑定到 silu_cuda。
TORCH_LIBRARY_IMPL(cuda_notes, CUDA, m) {
m.impl("silu", &silu_cuda);
}

这里只注册了 CUDA 实现,因此传入 CPU Tensor 会得到“没有 CPU backend 实现”的报错。这比静默搬运数据更容易发现错误,也符合一个 CUDA 学习算子的定位。

7.2 CUDA 实现:silu_kernel.cu#

#include <ATen/ATen.h>
#include <c10/util/Exception.h>
#include <c10/cuda/CUDAGuard.h>
#include <c10/cuda/CUDAException.h>
#include <c10/cuda/CUDAStream.h>
// 每个 CUDA thread 处理一个 float 元素。
__global__ void silu_forward_kernel(
const float* input,
float* output,
int64_t numel) {
const int64_t index =
static_cast<int64_t>(blockIdx.x) * blockDim.x + threadIdx.x;
if (index < numel) {
const float value = input[index];
const float sigmoid = 1.0f / (1.0f + expf(-value));
output[index] = value * sigmoid;
}
}
at::Tensor silu_cuda(const at::Tensor& input) {
TORCH_CHECK(input.is_cuda(), "silu expects a CUDA tensor");
TORCH_CHECK(
input.scalar_type() == at::kFloat,
"this learning kernel currently supports float32 only");
// 本 kernel 按一维连续下标读写;非连续输入先生成连续副本。
auto x = input.contiguous();
auto output = at::empty_like(x);
if (x.numel() == 0) {
return output;
}
// 在输入所在 device 上启动 kernel,避免多 GPU 程序中启动到错误设备。
const c10::cuda::CUDAGuard device_guard(x.device());
constexpr int threads = 256;
const int blocks = static_cast<int>((x.numel() + threads - 1) / threads);
// 使用 PyTorch 当前 stream,而不是擅自使用 default stream。
// 这样 kernel 能正确参与 Python 侧的 stream 依赖关系。
const auto stream = c10::cuda::getCurrentCUDAStream(x.get_device());
silu_forward_kernel<<<blocks, threads, 0, stream.stream()>>>(
x.data_ptr<float>(),
output.data_ptr<float>(),
x.numel());
// 启动配置错误等问题会在离 kernel 最近的位置抛出,更方便调试。
C10_CUDA_KERNEL_LAUNCH_CHECK();
return output;
}

这里有三个非常重要的实现细节:

  1. CUDAGuard 选择输入所在 GPU,而不是假定永远只有 cuda:0
  2. getCurrentCUDAStream() 启动 kernel,尊重调用者当前的 PyTorch stream;
  3. C10_CUDA_KERNEL_LAUNCH_CHECK() 尽早发现 kernel launch 错误。

7.3 编译、加载和验证:build_and_test.py#

torch.utils.cpp_extension.load 会调用本机编译器和 nvcc,生成动态库并加载 注册信息。is_python_module=False 表示这不是 pybind Python 模块,而是加载后 通过 torch.ops 调用的算子库。

from pathlib import Path
import torch
import torch.nn.functional as F
from torch.utils.cpp_extension import load
root = Path(__file__).parent
load(
name="cuda_notes_ext",
sources=[str(root / "ops.cpp"), str(root / "silu_kernel.cu")],
extra_cflags=["-O3"],
extra_cuda_cflags=["-O3"],
is_python_module=False, # 加载 TORCH_LIBRARY 注册,而不是 import 扩展模块
verbose=True,
)
# 让 FakeTensor/torch.compile 知道输出的 shape、dtype 和 device。
@torch.library.register_fake("cuda_notes::silu")
def _fake_silu(x):
return torch.empty_like(x)
# 自定义 forward kernel 不会自动生成 backward;这里用已有 PyTorch 算子表达梯度。
def _setup_context(ctx, inputs, output):
(x,) = inputs
ctx.save_for_backward(x)
def _backward(ctx, grad_output):
(x,) = ctx.saved_tensors
sigmoid = torch.sigmoid(x)
grad_x = grad_output * (sigmoid + x * sigmoid * (1 - sigmoid))
return grad_x
torch.library.register_autograd(
"cuda_notes::silu",
_backward,
setup_context=_setup_context,
)
x = torch.randn(1024, device="cuda", dtype=torch.float32, requires_grad=True)
y = torch.ops.cuda_notes.silu(x)
reference = F.silu(x)
torch.testing.assert_close(y, reference, rtol=1e-5, atol=1e-6)
# 同时验证自定义 backward 注册是否与参考实现一致。
grad = torch.randn_like(y)
custom_grad = torch.autograd.grad(y, x, grad, retain_graph=True)[0]
reference_grad = torch.autograd.grad(reference, x, grad)[0]
torch.testing.assert_close(custom_grad, reference_grad, rtol=1e-5, atol=1e-6)
# 注册 fake kernel 后,该运算可以作为 torch.compile 图中的 opaque custom op。
compiled = torch.compile(lambda value: torch.ops.cuda_notes.silu(value))
torch.testing.assert_close(compiled(x.detach()), F.silu(x.detach()))
print("forward, backward and torch.compile checks passed")

一个生产级 kernel 还需要继续处理 float16/bfloat16、autocast、非连续布局的 性能策略、CPU fallback、测试矩阵和打包发布。学习阶段先把 device、stream、 dtype 和梯度契约写明确,比一开始追求复杂模板更重要。

8. 自定义 Kernel 与 Stream/Event 一起工作#

因为上面的 CUDA 实现使用 当前 stream 启动 kernel,所以调用它时可以像普通 PyTorch CUDA 运算一样安排依赖和计时。

import torch
# 假设上一节已经加载过扩展。
s = torch.cuda.Stream()
ready = torch.cuda.Event()
done = torch.cuda.Event(enable_timing=True)
start = torch.cuda.Event(enable_timing=True)
x = torch.randn(1_000_000, device="cuda", dtype=torch.float32)
ready.record() # 默认 stream 已经完成 x 的初始化时触发
with torch.cuda.stream(s):
s.wait_event(ready) # 自定义 kernel 不能提前读取 x
start.record(s)
y = torch.ops.cuda_notes.silu(x) # kernel 将提交到 s
done.record(s)
y.record_stream(s)
done.synchronize()
print(f"custom SiLU: {start.elapsed_time(done):.3f} ms")

如果扩展硬编码到 default stream,上面的 with torch.cuda.stream(s) 语义就会 被破坏,并可能导致竞态条件。因此,“使用当前 stream”不是微小的性能细节,而是 扩展与 PyTorch 正确组合所需的接口契约。

9. 进一步的 CUDA 能力#

9.1 Automatic Mixed Precision (AMP)#

训练中常用 torch.autocast 让一部分运算使用 float16bfloat16。本文的 kernel 明确只支持 float32,因此在 autocast 区域中不能假定它会自动支持半精度。

import torch
x = torch.randn(1024, device="cuda", dtype=torch.float32)
with torch.autocast(device_type="cuda", dtype=torch.float16):
# 对只支持 float32 的学习 kernel,显式退出 autocast 或实现对应 dtype kernel。
with torch.autocast(device_type="cuda", enabled=False):
y = torch.ops.cuda_notes.silu(x.float())

真正要用于模型训练的扩展,应实现所需 dtype 并为 autocast 注册合理行为,而不是 把所有输入无条件转换为 float32,否则会引入额外传输和显存开销。

9.2 CUDA Graph#

常规 eager 执行中,CPU 每次迭代都提交一批 kernel。模型很小、kernel 很碎时, 提交开销可能占比明显。CUDA Graph 可以捕获一段固定工作并反复 replay,减少 CPU 提交开销;代价是捕获阶段使用的内存地址和形状需要稳定。

import torch
model = torch.nn.Sequential(
torch.nn.Linear(128, 256),
torch.nn.ReLU(),
torch.nn.Linear(256, 10),
).cuda()
static_input = torch.randn(32, 128, device="cuda")
# capture 前预热,避免把懒初始化和首次分配放进捕获流程。
warmup_stream = torch.cuda.Stream()
warmup_stream.wait_stream(torch.cuda.current_stream())
with torch.cuda.stream(warmup_stream):
for _ in range(3):
_ = model(static_input)
torch.cuda.current_stream().wait_stream(warmup_stream)
graph = torch.cuda.CUDAGraph()
with torch.cuda.graph(graph):
static_output = model(static_input)
new_batch = torch.randn_like(static_input)
static_input.copy_(new_batch) # 保持 static_input 的显存地址不变,只替换内容
graph.replay()
result = static_output # replay 后输出已更新

自定义 operator 是否适合 graph capture,仍取决于内部是否在捕获阶段执行不允许的 操作,以及输入输出内存策略是否稳定。

10. 调试和性能验证清单#

10.1 正确性优先#

  • 与纯 PyTorch 参考实现比较 forward 输出;
  • 需要训练时比较 backward 梯度,并覆盖边界尺寸和空 Tensor;
  • 检查不同 dtype、contiguous/non-contiguous 输入和多 GPU device 行为;
  • 自定义运算可使用 torch.library.opcheck 检查注册是否满足 PyTorch 子系统契约。

10.2 异步错误定位#

CUDA kernel 错误可能因为异步执行而延后暴露,导致报错行看起来与根因无关。 调试阶段可以临时同步:

import os
# 应在 import torch 以及启动 CUDA 工作之前设置。
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
import torch

在自定义 .cu 代码中保留 C10_CUDA_KERNEL_LAUNCH_CHECK(),可以把启动错误 尽量定位到自定义 kernel 调用附近。

10.3 性能测量#

  • 先 warm up,再用 torch.cuda.Event(enable_timing=True) 计时;
  • 不要把频繁 item()print() 或无意的 synchronize() 算入稳定态吞吐;
  • 用 PyTorch Profiler 或 Nsight Systems 观察 copy 与 kernel 是否实际重叠;
  • 自定义 kernel 不一定比 PyTorch 内置算子快:融合是否减少显存往返、launch 数量和中间 Tensor,必须通过 profile 验证。

总结:从“会用 GPU”到“理解 CUDA 边界”#

PyTorch 的 CUDA API 可以按以下顺序学习:

  1. 先掌握 Tensor 的 device、显存和异步执行模型;
  2. 使用 pinned memory 与 non_blocking=True 组织数据传输;
  3. 使用 Stream 表达可并行任务,并用 Event 建立依赖和计时;
  4. 在 C++ 扩展中区分 CPU 地址与 CUDA 地址,始终遵守 dtype、布局和 device 契约;
  5. 用 custom operator 注册自定义 CUDA kernel,再补齐 autograd 与 torch.compile 支持;
  6. 最后再通过 AMP、CUDA Graph 和 profiler 解决真实工作负载中的性能问题。

最重要的原则是:CUDA 工作默认是异步的,跨 stream 的依赖不能靠直觉;自定义 kernel 也必须像 PyTorch 自带算子一样尊重当前 device、当前 stream 和 Tensor 契约。

参考资料#

PyTorch中的CUDA API
https://blog.gzher.com/posts/pytorch-cuda-api/
作者
中会
发布于
2026-05-25
许可协议
CC BY-NC-SA 4.0