Coder Social home page Coder Social logo

tau's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tau's Issues

Expose partitioning policy in `Pipe.from_tracing` as a programmable interface

Currently, we only partition the model based on the presence of IR.pipe_split (and its derivatives like IR.annotate_split_points and IR.PipeSplitWrapper). However, someone might want to make use of the graph representation of the original code to automatically partition the model into stages: https://github.com/jamesr66a/PiPPy/blob/527af1fd8123d35bd81b9fe304a8d0ed29c9fd8d/pippy/IR.py#L376 We should expose an interface on which a user can specify a computation to partition the graph

Add unit tests for PipelineDriver.py

We have unit tests for IR.py in test/test_ir.py. We should add similar unit tests to exercise the functionality of PipelineDriver.py. We can use pytest-cov (run pytest --cov=pippy test/ in root) to ensure that we have optimal coverage of this file. As of the time of this writing, coverage looks like:

---------- coverage: platform linux, python 3.7.12-final-0 -----------
Name                      Stmts   Miss  Cover
---------------------------------------------
pippy/IR.py                 371      7    98%
pippy/PipelineDriver.py     394    330    [16](https://github.com/jamesr66a/PiPPy/runs/5221199398?check_suite_focus=true#step:6:16)%
pippy/__init__.py             3      0   100%
pippy/version.py              2      2     0%
---------------------------------------------
TOTAL                       770    339    56%

RRef refcounting consistency issue

Repro:

$ test/launch_local_test_forward_backward.sh 
WARNING:torch.distributed.run:
*****************************************
Setting OMP_NUM_THREADS environment variable for each process to be 1 in default, to avoid your system being overloaded, please further tune the variable for optimal performance in your application as needed. 
*****************************************
REPLICATE config: False -> MultiUseParameterConfig.TRANSMIT
GraphModule(
  (submod_0): GraphModule()
  (submod_1): GraphModule()
  (submod_2): GraphModule()
  (_loss): MSELoss()
)



def forward(self, x, target):
    submod_0 = self.submod_0(x)
    getitem_2 = submod_0[2]
    getitem = submod_0[0]
    getitem_1 = submod_0[1]
    submod_1 = self.submod_1(getitem, getitem_2)
    getitem_4 = submod_1[1]
    getitem_3 = submod_1[0]
    submod_2 = self.submod_2(getitem_3, getitem_1, getitem_4)
    _loss = self._loss(submod_2, target)
    stage_backward = pippy_IR_stage_backward(stage_output = _loss, output_grads = None, input_values = [submod_2, target]);  target = None
    getitem_5 = stage_backward[0]
    getitem_6 = stage_backward[1];  stage_backward = None
    getitem_7 = getitem_5[0]
    getitem_8 = getitem_5[1];  getitem_5 = None
    stage_backward_1 = pippy_IR_stage_backward(stage_output = submod_2, output_grads = getitem_7, input_values = [getitem_3, getitem_1, getitem_4]);  submod_2 = getitem_7 = getitem_3 = getitem_1 = getitem_4 = None
    getitem_9 = stage_backward_1[0]
    getitem_10 = stage_backward_1[1];  stage_backward_1 = None
    getitem_11 = getitem_9[0]
    getitem_12 = getitem_9[1]
    getitem_13 = getitem_9[2];  getitem_9 = None
    stage_backward_2 = pippy_IR_stage_backward(stage_output = submod_1, output_grads = [getitem_11, getitem_13], input_values = [getitem, getitem_2]);  submod_1 = getitem_11 = getitem_13 = getitem = getitem_2 = None
    getitem_14 = stage_backward_2[0]
    getitem_15 = stage_backward_2[1];  stage_backward_2 = None
    getitem_16 = getitem_14[0]
    getitem_17 = getitem_14[1];  getitem_14 = None
    stage_backward_3 = pippy_IR_stage_backward(stage_output = submod_0, output_grads = [getitem_16, getitem_12, getitem_17], input_values = [x]);  submod_0 = getitem_16 = getitem_12 = getitem_17 = x = None
    getitem_18 = stage_backward_3[0]
    getitem_19 = stage_backward_3[1];  stage_backward_3 = None
    getitem_20 = getitem_18[0];  getitem_18 = None
    sync_barrier = pippy_IR_sync_barrier(_loss, [getitem_6, getitem_10, getitem_15, getitem_19]);  _loss = getitem_6 = getitem_10 = getitem_15 = getitem_19 = None
    return sync_barrier
    
/fsx/users/jamesreed/pipeline_for_real/pippy/PipelineDriver.py:498: UserWarning: Running pipeline with 3 stages on world_size of 10. Remaining ranks will be idle.
  warnings.warn(f'Running pipeline with {len(executor_descriptors)} stages on world_size of {self.world_size}. '
Traceback (most recent call last):
  File "/fsx/users/jamesreed/pipeline_for_real/test/local_test_forward_backward.py", line 117, in <module>
    out = pipe_driver.run((input, target), {}, chunks=CHUNKS, _debug_mask_minibatches = DEBUG_MASK_MINIBATCHES)
  File "/fsx/users/jamesreed/pipeline_for_real/pippy/PipelineDriver.py", line 691, in run
    last_nodes.append(interp.run_until(lambda n: n.op == 'output'))
  File "/fsx/users/jamesreed/pipeline_for_real/pippy/PipelineDriver.py", line 606, in run_until
    self.env[node] = self.run_node(node)
  File "/fsx/users/jamesreed/pytorch/torch/fx/interpreter.py", line 152, in run_node
    return getattr(self, n.op)(n.target, args, kwargs)
  File "/fsx/users/jamesreed/pipeline_for_real/pippy/PipelineDriver.py", line 573, in call_function
    return args[0].remote().__getitem__(args[1])
  File "/fsx/users/jamesreed/pytorch/torch/distributed/rpc/rref_proxy.py", line 41, in _invoke_rpc
    rref_fut.wait()
RuntimeError: RPCErr:1:RPC ran for more than set timeout (60000 ms) and will now be marked with an error

When I replace the __getitem__ call with this DEBUG_INDEX call, things work:

https://github.com/jamesr66a/PiPPy/blob/527af1fd8123d35bd81b9fe304a8d0ed29c9fd8d/pippy/PipelineDriver.py#L554

Is this because the __getitem__ call is done synchronously or something?

Composition of PiPPy with DDP

Does this work out of the box by setting up the ProcessGroups in the correct way? If not, what do we have to do to make this work?

Design a more systematic way to close over the autograd traced program in runtime

Right now we use this HACK:

https://github.com/jamesr66a/PiPPy/blob/527af1fd8123d35bd81b9fe304a8d0ed29c9fd8d/pippy/PipelineDriver.py#L268

As it is, we are wasting communication time/BW sending the tensor values over RPC and then ignoring them and accessing the local values directly. We should figure out some better e2e design for keeping these values on the host to eventually use them for their associated autograd trace

NOTE: also, if torch.autograd allowed you to just pass in grad_fn rather than a tensor, we could save some space as well

Make PipeStageExecutor multi-threaded

This can be one approach to implementing asynchronously-scheduled pipeline stages. Micro-batches within a stage are intrinsically unordered, and thus can be (theoretically) executed in any order. If a pipeline stage has I/O bound tasks, such as collectives, we can yield the executing micro-batch and admit another one (assuming we can post multiple collectives in the same process group). Using threads and relying on the GIL for serial admission is one approach that might work here

Might also need to limit admission via registers/resources/etc

Set up CI + testing infrastructure

We should have a battery of tests that runs on PRs/commits to ensure correctness and (ideally) catch performance regressions. We could potentially set this up via GitHub Actions. Some challenges here:

  • We should flesh out our test suite and test many different configurations (e.g. GPU vs. CPU, different topologies across different interconnects/network configurations, etc)
  • We will need to allocate GPU VMs to properly test GPU execution. This isn't natively supported by GHA. AFAIU, this is done via calling out to AWS for the main PyTorch CI

[Bug] Pipe.from_tracing(transformers.T5Model()) crashes with 'GraphModule' object has no attribute 'decoder'

import inspect

import transformers.utils.fx as fx
from pippy.IR import MultiUseParameterConfig, Pipe
from transformers import *

model = T5Model(T5Config())
print(model)

input_names = model.dummy_inputs.keys()
sig = inspect.signature(model.forward)
concrete_args = {p.name: p.default for p in sig.parameters.values() if p.name not in input_names}

hf_tracer = fx.HFTracer()

model_pipe = Pipe.from_tracing(model, MultiUseParameterConfig.TRANSMIT, tracer=hf_tracer,
                               concrete_args=concrete_args)

T5Model(
  (shared): Embedding(32128, 512)
  (encoder): T5Stack(...)
  (decoder): T5Stack(...)
  ...
Traceback (most recent call last):
  File "/Users/pbelevich/PycharmProjects/PiPPy/test/hf_t5_test.py", line 15, in <module>
    model_pipe = Pipe.from_tracing(model, MultiUseParameterConfig.TRANSMIT, tracer=hf_tracer,
  File "/Users/pbelevich/PycharmProjects/PiPPy/pippy/IR.py", line 606, in from_tracing
    return Pipe._from_traced(mod, traced, multi_use_param_spec, loss_fn, **kwargs)
  File "/Users/pbelevich/PycharmProjects/PiPPy/pippy/IR.py", line 457, in _from_traced
    mod_itr = getattr(mod_itr, atom)
  File "/Users/pbelevich/miniconda3/envs/PiPPy/lib/python3.8/site-packages/torch/nn/modules/module.py", line 1186, in __getattr__
    raise AttributeError("'{}' object has no attribute '{}'".format(
AttributeError: 'GraphModule' object has no attribute 'decoder'

Maybe related to https://github.com/jamesr66a/PiPPy/issues/44

[Bug] PipelineDriverFillDrain._retrieve_output_values crashes with custom result types

HF models(e.g. GPT2Model) return custom types (e.g. BaseModelOutputWithPastAndCrossAttentions->ModelOutput->OrderedDict) and _retrieve_output_values fails while handling it:

/home/pbelevich/local/PiPPy/pippy/PipelineDriver.py:393: UserWarning: Running pipeline with 13 stages on world_size of 20. Remaining ranks will be idle.
  warnings.warn(f'Running pipeline with {len(executor_descriptors)} stages on world_size of {self.world_size}. '
Traceback (most recent call last):
  File "/home/pbelevich/local/PiPPy/test/local_test_forward.py", line 55, in <module>
    out = pipe_driver.run(gpt2_input, chunks=5, _debug_mask_minibatches=True)
  File "/home/pbelevich/local/PiPPy/pippy/PipelineDriver.py", line 570, in run
    return self._retrieve_output_values(microbatch_interpreters, last_nodes, _debug_mask_minibatches, splits_per_arg)
  File "/home/pbelevich/local/PiPPy/pippy/PipelineDriver.py", line 605, in _retrieve_output_values
    sliced_outputs.append(result[start:end])
TypeError: unhashable type: 'slice'

see https://github.com/jamesr66a/PiPPy/pull/43 for the details

Support multi-use parameters in leaf modules (whether tracing or sequential frontend)

Currently, only for the tracing frontend, we detect parameters that are used in more than one module and emit different code depending on the policy that the user specified (TRANSMIT or REPLICATE):

https://github.com/jamesr66a/PiPPy/blob/527af1fd8123d35bd81b9fe304a8d0ed29c9fd8d/pippy/IR.py#L465

We should generalize this to work on:

  1. Leaf modules in fx tracing
  2. Modules that share parameters in the sequential frontend

Concretely, these things are already supported by default with the REPLICATE mode. To support the TRANSMIT mode, we could emit code at the end of the first stage that uses the parameter that fetches the parameter value and transmits it to the subsequent use stages.

Error Handling

We should figure out how to gracefully handle when an exception is thrown in one of the pipeline stages

Use Ray?

Is there any value in using Ray for the runtime?

[Bug] RemoteInterpreter.call_function doesn't handle _null_coalesce_accumulate

# Copyright (c) Meta Platforms, Inc. and affiliates
import argparse
import os
import socket

import torch
import torch.distributed.rpc as rpc
import torch.multiprocessing as mp

from pippy.IR import Pipe, pipe_split, TrivialLossWrapper, _null_coalesce_accumulate
from pippy.PipelineDriver import PipelineDriverFillDrain, PipelineDriver1F1B, PipelineDriverBase
from pippy.microbatch import TensorChunkSpec, CustomReducer

PROFILING_ENABLED = True
CHECK_NUMERIC_EQUIVALENCE = True

schedules = {
    'FillDrain': PipelineDriverFillDrain,
    '1F1B': PipelineDriver1F1B,
}

torch.fx.Tracer.proxy_buffer_attributes = True


def run_master(args):
    all_ranks = list(range(1, args.world_size))  # exclude master rank = 0
    chunks = len(all_ranks)
    batches = 1
    bs = 4 * chunks
    hid_dim = 50

    class Code(torch.nn.Module):
        def __init__(self):
            super().__init__()
            self.linear = torch.nn.Linear(hid_dim, hid_dim)

        def forward(self, x):
            x = self.linear(x)
            pipe_split()
            y = torch.relu(x)
            pipe_split()
            z = torch.sigmoid(x)
            pipe_split()
            return y + z

    c = Code()
    c.train()
    mse_loss = torch.nn.MSELoss()
    wrapper = TrivialLossWrapper(c, mse_loss)
    accum_pipe = Pipe.from_tracing(wrapper)
    assert 4 == len(list(accum_pipe.split_gm.children()))
    assert any(n.target == _null_coalesce_accumulate for n in accum_pipe.split_gm.graph.nodes)
    input = torch.randn(bs, hid_dim)
    target = torch.randn(bs, hid_dim)
    accum_pipe(input, target)

    args_chunk_spec = (TensorChunkSpec(0), TensorChunkSpec(0))
    kwargs_chunk_spec = {}
    output_chunk_spec = CustomReducer(torch.tensor(0.0), lambda a, b: a + b)
    pipe_driver: PipelineDriverBase = schedules[args.schedule](accum_pipe, args_chunk_spec, kwargs_chunk_spec,
                                                               output_chunk_spec, args.world_size - 1,
                                                               all_ranks=all_ranks, _debug_mask_minibatches=True)

    for i in range(batches):
        pipe_driver.run(chunks, input, target)


def run_worker(rank, world_size, args):
    print(f"rank = {rank} host/pid = {socket.gethostname()}/{os.getpid()}")
    os.environ['MASTER_ADDR'] = args.master_addr
    os.environ['MASTER_PORT'] = args.master_port
    if args.rank == -1:  # run via mp.spawn
        # each worker will see its GPU as `cuda:0`
        try:
            import subprocess
            device_count = int(subprocess.getoutput('nvidia-smi --list-gpus | wc -l'))
            os.environ['CUDA_VISIBLE_DEVICES'] = str(rank % device_count)
        except ValueError:
            pass
    assert not torch.cuda.is_available() or torch.cuda.device_count() == 1, \
        "Do not use torch.cuda.* before setting CUDA_VISIBLE_DEVICES"
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, _transports=["uv"])  # uv for AWS EFA instances
    if args.use_cuda and torch.cuda.is_available():
        for i in range(world_size):
            options.set_device_map(f"worker{i}", {0: 0})
    rpc.init_rpc(
        f"worker{rank}",
        rank=rank,
        world_size=world_size,
        rpc_backend_options=options
    )
    if rank == 0:
        run_master(args)
    rpc.shutdown()


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--world_size', type=int, default=int(os.getenv("WORLD_SIZE", 5)))
    parser.add_argument('--rank', type=int, default=int(os.getenv("RANK", -1)))
    parser.add_argument('--master_addr', type=str, default=os.getenv('MASTER_ADDR', 'localhost'))
    parser.add_argument('--master_port', type=str, default=os.getenv('MASTER_PORT', '29500'))
    parser.add_argument('-s', '--schedule', type=str, default=list(schedules.keys())[0], choices=schedules.keys())
    parser.add_argument('--replicate', type=int, default=int(os.getenv("REPLICATE", '0')))
    parser.add_argument('--use_cuda', type=int, default=1)
    args = parser.parse_args()

    if args.rank == -1:
        mp.spawn(run_worker, args=(args.world_size, args,), nprocs=args.world_size, join=True)
    elif args.rank < args.world_size:
        run_worker(args.rank, args.world_size, args)
    else:
        print("I'm unused, exiting")

Traceback (most recent call last):
  File "/Users/pbelevich/miniconda3/envs/PiPPy/lib/python3.9/site-packages/torch/multiprocessing/spawn.py", line 69, in _wrap
    fn(i, *args)
  File "/Users/pbelevich/PycharmProjects/PiPPy/examples/slurm/hf/t5/test_null_coalesce_accumulate.py", line 93, in run_worker
    run_master(args)
  File "/Users/pbelevich/PycharmProjects/PiPPy/examples/slurm/hf/t5/test_null_coalesce_accumulate.py", line 65, in run_master
    pipe_driver.run(chunks, input, target)
  File "/Users/pbelevich/PycharmProjects/PiPPy/pippy/PipelineDriver.py", line 871, in run
    last_nodes.append(interp.run_until(lambda n: n.op == 'output'))
  File "/Users/pbelevich/PycharmProjects/PiPPy/pippy/PipelineDriver.py", line 811, in run_until
    self.env[node] = super().run_node(node)
  File "/Users/pbelevich/miniconda3/envs/PiPPy/lib/python3.9/site-packages/torch/fx/interpreter.py", line 152, in run_node
    return getattr(self, n.op)(n.target, args, kwargs)
  File "/Users/pbelevich/PycharmProjects/PiPPy/pippy/PipelineDriver.py", line 800, in call_function
    raise AssertionError(f'Unknown operator {torch.typename(target)}')
AssertionError: Unknown operator pippy.IR._null_coalesce_accumulate

Support custom `torch.fx` tracers in `Pipe.from_tracing`

Some users may want to make use of a custom torch.fx tracer when pipelining their model. An example is HF with their custom tracer:

https://github.com/huggingface/transformers/blob/f65fe3663a6c62975a9c04654703252644c9a652/src/transformers/utils/fx.py#L233

We should generalize the interface to Pipe.from_tracing to allow users to use a custom tracer: https://github.com/jamesr66a/PiPPy/blob/527af1fd8123d35bd81b9fe304a8d0ed29c9fd8d/pippy/IR.py#L382

Design interaction between PipelineDriver and DistributedOptimizer

https://github.com/jamesr66a/PiPPy/blob/5f4c6cd4676d6135dec9ee86341286416afd296f/test/local_test_forward_backward.py#L107

Our test currently just check that PipelineDriver gets the right gradient values; we're not actually applying an update step yet. I think this should just work out of the box with DistributedOptimizer (https://pytorch.org/docs/stable/rpc.html#torch.distributed.optim.DistributedOptimizer), but we should make sure

Implement get_attr normalization

This is reproducible with HF transformers 4.16.2!

Traced HF BertModel.forward looks like this:

def forward(self, input_ids):
    ...
    _tensor_constant0 = self._tensor_constant0
    ...
    _tensor_constant0_1 = self._tensor_constant0
    ...
    _tensor_constant0_2 = self._tensor_constant0
    ...
    _tensor_constant0_3 = self._tensor_constant0
    ...
    _tensor_constant0_4 = self._tensor_constant0
    ...
    _tensor_constant0_5 = self._tensor_constant0
    ...
    _tensor_constant0_6 = self._tensor_constant0
    ...
    _tensor_constant0_7 = self._tensor_constant0
    ...
    _tensor_constant0_8 = self._tensor_constant0
    ...
    _tensor_constant0_9 = self._tensor_constant0
    ...
    _tensor_constant0_10 = self._tensor_constant0
    ...
    _tensor_constant0_11 = self._tensor_constant0
    ...

And bert_pipe = Pipe.from_traced(bert, bert_traced, ...) fails with

AttributeError: 'GraphModule' object has no attribute '_tensor_constant0'

It's caused by deleting the attribute in line 442

see https://github.com/jamesr66a/PiPPy/pull/43 for the details

bert_traced.txt

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.