from os import path
import torch.distributed as dist
import torch.autograd as autograd
_src_path = path.join(path.dirname(path.abspath(__file__)), "src")
# Activation names
ACT_RELU = "relu"
ACT_LEAKY_RELU = "leaky_relu"
ACT_ELU = "elu"
ACT_NONE = "none"
def normalize_shape(x):
if x.ndimension() == 1:
return x.view(1, -1, 1)
else:
return x.view(x.size(0), x.size(1), -1)
import torch
def broadcast_to(v, x):
if x.ndimension() == 2:
return v
else:
broadcast_size = [1, -1]
for i in range(2, x.ndimension()):
broadcast_size.append(1)
return v.view(broadcast_size)
def forward_cpu(x, mean, var, weight, bias, affine, eps):
gamma = torch.abs(weight) + eps if affine else torch.ones_like(var)
mul = torch.rsqrt(var + eps) * gamma
x.sub_(broadcast_to(mean, x))
x.mul_(broadcast_to(mul, x))
if affine:
x.add_(broadcast_to(bias, x))
return x
def _check(fn, *args, **kwargs):
success = fn(*args, **kwargs)
if not success:
raise RuntimeError("CUDA Error encountered in {}".format(fn))
def _reduce(x):
if len(x.size()) == 2:
return x.sum(dim=0)
else:
n, c = x.size()[0:2]
return x.contiguous().view((n, c, -1)).sum(2).sum(0)
def _count_samples(x):
count = 1
for i, s in enumerate(x.size()):
if i != 1:
count *= s
return count
def leaky_relu_forward(z, slope):
return torch.nn.functional.leaky_relu(z, negative_slope=slope)
def _act_forward(ctx, x):
if ctx.activation == ACT_LEAKY_RELU:
leaky_relu_forward(x, ctx.slope)
class InPlaceABN(autograd.Function):
@staticmethod
def forward(ctx, x, weight, bias, running_mean, running_var,
training=False, momentum=0.1, eps=1e-05, activation=ACT_LEAKY_RELU, slope=0.01):
# Save context
ctx.training = training
ctx.momentum = momentum
ctx.eps = eps
ctx.activation = activation
ctx.slope = slope
ctx.affine = weight is not None and bias is not None
# Prepare inputs
count = _count_samples(x)
x = x.contiguous()
weight = weight.contiguous() if ctx.affine else x.new_empty(0)
bias = bias.contiguous() if ctx.affine else x.new_empty(0)
mean, var = running_mean.contiguous(), running_var.contiguous()
ctx.mark_dirty(x)
# BN forward + activation
forward_cpu(x, mean, var, weight, bias, ctx.affine,ctx.eps)
_act_forward(ctx, x)
# Output
ctx.var = var
ctx.save_for_backward(x, var, weight, bias)
ctx.mark_non_differentiable(running_mean, running_var)
return x, running_mean, running_var
class InPlaceABNSync(autograd.Function):
@classmethod
def forward(cls, ctx, x, weight, bias, running_mean, running_var,
training=False, momentum=0.1, eps=1e-05, activation=ACT_LEAKY_RELU, slope=0.01, equal_batches=True):
# Save context
ctx.training = training
ctx.momentum = momentum
ctx.eps = eps
ctx.activation = activation
ctx.slope = slope
ctx.affine = weight is not None and bias is not None
# Prepare inputs
ctx.world_size = dist.get_world_size() if dist.is_initialized() else 1
# count = _count_samples(x)
batch_size = x.new_tensor([x.shape[0]], dtype=torch.long)
x = x.contiguous()
weight = weight.contiguous() if ctx.affine else x.new_empty(0)
bias = bias.contiguous() if ctx.affine else x.new_empty(0)
mean, var = running_mean.contiguous(), running_var.contiguous()
ctx.mark_dirty(x)
# BN forward + activation
forward_cpu(x, mean, var, weight, bias, ctx.affine,ctx.eps)
_act_forward(ctx, x)
# Output
ctx.var = var
ctx.save_for_backward(x, var, weight, bias)
ctx.mark_non_differentiable(running_mean, running_var)
return x, running_mean, running_var
inplace_abn = InPlaceABN.apply
inplace_abn_sync = InPlaceABNSync.apply
__all__ = ["inplace_abn", "inplace_abn_sync", "ACT_RELU", "ACT_LEAKY_RELU", "ACT_ELU", "ACT_NONE"]