Coder Social home page Coder Social logo

paddlepaddle / paddlefleetx Goto Github PK

View Code? Open in Web Editor NEW
422.0 23.0 157.0 651.83 MB

飞桨大模型开发套件,提供大语言模型、跨模态大模型、生物计算大模型等领域的全流程开发工具链。

License: Apache License 2.0

Python 87.91% C++ 1.21% Shell 9.80% Makefile 0.01% Dockerfile 0.03% Cuda 1.04%
fleet-api paddlepaddle benchmark distributed-algorithm large-scale model-parallelism data-parallelism pipeline-parallelism cloud paddlecloud

paddlefleetx's Introduction


简介

PaddleFleetX是基于飞桨深度学习框架开发的大模型套件,旨在提供高性能、灵活易用的大模型全流程应用能力,在开发训练精调压推推理部署六大环节提供端到端全流程优化。

飞桨大模型套件

特色介绍

大模型开发:动静统一开发模式,4D混合并行策略灵活配置

大模型开发

基于飞桨动静统一的开发模式,大模型套件全面使用动态图开发,在Generate API中可自动完成算子融合具备静态图的调试性能。全场景统一训练器Trainer可以轻松完成4D混合并行的配置,在预训练与精调环节皆可使用。

大模型训练:发挥基础计算潜能、全面提升分布式效率

飞桨针对大模型训练,对数据读取、混合精度计算策略、高性能算子库、并行策略自动寻优、流水线调度的整个全流程实现优化,助力文心大模型训练速度提升3倍。

飞桨支持大模型训练

大模型精调:主流精调算法实现性能全面领先

提供了主流的精调算法,包括SFT、Prefix-Tuning、LoRA三种主流的精调算法,有效降低的大模型训练的资源门槛。统一的训练器Trainer实现了预训练加速技术在精调场景的复用,并通过变长数据流优化大幅提升精调性能。

大模型精调

大模型压缩:自研量化压缩算法实现无损量化

飞桨自研的Shift-SmoothQuant算法相比SmoothQuant算法可以实现更平滑的激活分布,有效提升量化后模型的精度度和生成结果的稳定性。通过PaddleSlim的大模型压缩工具,我们在 C-Eval 和 NL2SQL 两个数据集上对主流开源大模型可以实现无损量化。更多技术介绍与使用说明可以参考PaddleSlim

模型压缩

模型压缩

大模型推理:针对大模型场景特性匹配最优量化推理方案

Paddle Inference针对大模型Prompt阶段与Token Generation阶段的计算特性的不同,在通用场景提供静态量化,在访存受限场景提供混合量化与低比特的推理方案。

飞桨支撑大模型推理

推理引擎

大模型部署:实时感知负载动态插入请求,最大化硬件利用率

由于大模型生成场景解码阶段耗时较长,且不同Query下生成长度不一,为了最大化服务吞吐,我们在FastDeploy服务框架结合推理引擎实现了动态插入技术,科实时感知服务负载,动态插入用户请求最大化推理硬件利用率。

大模型服务部署

PaddleFleetX 应用案例

大语言模型

基于PaddleFleetX的核心能力,我们在PaddleNLP中提供了丰富的大语言模型全流程开发与应用示例,更多详细使用说明可以参考PaddleNLP大语言模型

跨模态大模型

除了大语言模型外,PaddleFleetX还提供跨模态大模型的开发与训练,如多模态预训练、文生图扩散模型等,覆盖图片、文本、视频和音频等模态,更多详细使用说明可以参考PaddleMIX

生物计算大模型

在生物计算领域,基于飞桨4D并行策略与高性能优化,我们在PaddleHelix中提供众多业界领先的生物计算预训练模型,更多详细使用说明可以参考PaddleHelix

Citation

@misc{paddlefleetx,
    title={PaddleFleetX: An Easy-to-use and High-Performance One-stop Tool for Deep Learning},
    author={PaddleFleetX Contributors},
    howpublished = {\url{https://github.com/PaddlePaddle/PaddleFleetX}},
    year={2022}
}

License

PaddleFleetX 基于 Apache 2.0 license 许可发布。

paddlefleetx's People

Contributors

carryyu avatar dependabot[bot] avatar feixliu avatar firestonelib avatar forfishes avatar fuyinno4 avatar ghostscreaming avatar guoxiawang avatar haohongxiang avatar heavengate avatar hermitsun avatar kuizhiqing avatar liujie0926 avatar rachelxu7 avatar richardwoosjtu avatar ronny1996 avatar shawnnew avatar sljlp avatar sneaxiy avatar superxiang avatar sylartianii avatar ustckay avatar vivienfanghuagood avatar wawltor avatar wuhuachaocoding avatar xreki avatar xu98bin avatar zeyuchen avatar zhaoyinglia avatar zhwesky2010 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

paddlefleetx's Issues

Fleet ps on paddlecloud coredump when stop

env: paddle 1.5.1 with fleet, 1 ps, 2 trainers, use dataset

ps and worker0 stop success, but worker1 coredump
trainer failed, exit_code=134
pure virtual method called
terminate called without an active exception
pure virtual method called
terminate called recursively
./thirdparty/paddle_cpu/bin/python: line 11: 2727 Aborted (core dumped) $SCRIPTPATH/python "$@"
*error messages

Fleet会开源其源码吗

Fleet是否能够开放其分布式参数服务器代码,我们希望基于参数服务器开发一些op

Can't load images.

Hi, I want to run the following command:
CUDA_VISIBLE_DEVICES=0 python -m paddle.distributed.launch --selected_gpus=0 train_with_fleet.py --batch_size=128 --model=ResNet50 --total_images=50000 --data_dir=/data/PaddlePaddle/imagenet/
but I get 'NoneType' object has no attribute 'shape', which means that probably images couldn't be found.

Inside imagenet directory I have train_list.txt and train dir, where in train_list.txt I point to images inside train direcotry. This works just fine for PaddlePaddle/models/ repository when I run resnet50 from there. Here it doesn't work. At first I got error that train.txt doesn't exist, so I created it, just renamed my previous train_list.txt, but it still doesn't work.

Could you clarify what the structure of the directory with dataset should look like??

load_program resnet 实例报错

运行代码如下:

import paddle.fluid as fluid
import time
import os
import numpy as np
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
from paddle.fluid.incubate.fleet.base import role_maker
from save_load import Model

def build_strategy():
    exec_strategy = fluid.ExecutionStrategy()
    exec_strategy.num_iteration_per_drop_scope = 30
    dist_strategy = DistributedStrategy()
    dist_strategy.mode = "collective"
    dist_strategy.collective_mode = "grad_allreduce"
    dist_strategy.nccl_comm_num = 1
    dist_strategy.exec_strategy = exec_strategy
    return dist_strategy


def gen_data():
    return {"image": np.random.random(size=(32,3,224,224)).astype('float32'),
            "label": np.random.randint(10, size=(32, 1)).astype('int64')}


program_path = 'resnet'
test_model = Model()
test_model.load_model(program_path)
#print(test_model.main_program)
with fluid.program_guard(test_model.main_program, test_model.startup_program):
    with fluid.unique_name.guard(test_model.generator):
        optimizer = fluid.optimizer.Momentum(learning_rate=test_model.lr,
                                             momentum=0.9,
                                             parameter_list = test_model.parameter_list,
                                             regularization=fluid.regularizer.L2Decay(0.0001))
        role = role_maker.PaddleCloudRoleMaker(is_collective=True)
        fleet.init(role)
        dist_strategy = build_strategy()
        dist_optimizer = fleet.distributed_optimizer(optimizer, strategy=dist_strategy)
        _, param_grads = dist_optimizer.minimize(test_model.loss,startup_program=test_model.startup_program, parameter_list=test_model.parameter_list)

gpu_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace(gpu_id)
exe = fluid.Executor(place)
exe.run(test_model.startup_program)
step = 1001
total_time = 0
for i in range(step):
    start_time = time.time()
    cost_val = exe.run(
        program=fleet.main_program,
        feed=gen_data(),
        fetch_list=[test_model.loss.name])
    end_time = time.time()
    total_time += (end_time - start_time)

单机两卡运行正常,单机四卡报Nccl error:
截屏2020-06-08上午9 44 44 (2)

Why not compile the main program in the _try_to_compile() function in collective mode?

def _try_to_compile(self, startup_program, main_program):
        node_num = self._node_num()
        assert node_num >= 1, "nccl2 node_num must >= 1, now:{}" % node_num

        exec_strategy = self._strategy.exec_strategy

        if node_num <= 1:
            if self._strategy.nccl_comm_num > 1:
                logging.warn("set nccl_comm_num=1 since you only have 1 node.")
            self._strategy.nccl_comm_num = 1

            if self._strategy.use_hierarchical_allreduce:
                logging.warn(
                    "set use_hierarchical_allreduce=False since you only have 1 node."
                )
            self._strategy.use_hierarchical_allreduce = False

        sync_allreduce = os.getenv("FLAGS_sync_nccl_allreduce")
        if sync_allreduce is None or sync_allreduce == "1":
            exec_strategy.num_threads = self._strategy.nccl_comm_num + 1
            if self._strategy.use_hierarchical_allreduce:
                exec_strategy.num_threads = 2 * self._strategy.nccl_comm_num + 1
            if exec_strategy.num_threads > 4:
                logging.warn(
                    "if you use use_hierarchical_allreduce or "
                    "with multi nccl comm, please export FLAGS_sync_nccl_allreduce = 0"
                )

        # NOTE. open sync_batch_norm will hang when use multi num_threads
        sync_batch_norm = self._strategy.sync_batch_norm
        if sync_batch_norm is not None and sync_batch_norm is True:
            self._strategy.nccl_comm_num = 1
            self._strategy.use_hierarchical_allreduce = False
            exec_strategy.num_threads = 1
            logging.warn(
                "use sync_batch_norm will hang when set num_threads > 1, so "
                "set num_threads=1, nccl_comm_num=1, use_hierarchical_allreduce=False."
            )

        if self.print_config:
            print("node_num:", node_num, "num_threads:",
                  exec_strategy.num_threads, "use_hierarchical_allreduce:",
                  self._strategy.use_hierarchical_allreduce, "nccl_comm_num:",
                  self._strategy.nccl_comm_num, "FLAGS_sync_nccl_allreduce:",
                  sync_allreduce)

        self._transpile(startup_program, main_program)

       \** if self._strategy.mode == "collective":\**
            \**return main_program\**

        self._strategy.num_trainers = fleet.worker_num()
        self._strategy.trainer_id = fleet.worker_index()
        self._strategy.trainers_endpoints = fleet.worker_endpoints()
        self._strategy.enable_backward_optimizer_op_deps = True

        self._compiled_program = compiler.CompiledProgram(main_program)

        self._compiled_program.with_data_parallel(
            loss_name=self._loss.name,
            build_strategy=self._strategy,
            exec_strategy=self._strategy.exec_strategy,
            share_vars_from=None)

        return self._compiled_program

Can't see the benefits in speed of distributed training

The general purpose of using distributed training is to speed up training, however, I can't see that during mine.
I train NeXtVLAD referring to https://github.com/PaddlePaddle/models/tree/develop/PaddleCV/PaddleVideo.
In the beginning, I used node-alone multi-card training(8 cards in total), the training process of which is as follows:
image
(it costs about 20s per step)
Then, I used multi-node multi-card training(16 cards in total), the training process of which is as follows:
image
(it costs about 40s per step)
To double check this problem, I used only one card to train the model, it's like this:
image
(it costs even less time per step)
Here is how I read data in the training:
image
And I used Fleet to organize the distributed training:
image
I need help. Thx. BR.

Image-Text Matching example

A distributed training example of image-text matching is needed. In detail, validation can be performed during training every several epochs.

使用fleet lightning 接口,单机单卡训练imagenet,性能不符合预期

复现代码:

import fleet_lightning as lighting
import paddle.fluid as fluid
from paddle.fluid.incubate.fleet.collective import fleet, DistributedStrategy
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
import time
# lightning help users to focus more on learning to train a large scale model
# if you want to learn how to write a model, lightning is not for you
# focus more on engineering staff in fleet-lightning
configs = lighting.parse_train_configs()

model = lighting.applications.Resnet50()
loader = lighting.imagenet_dataset_from_filelist(
    "/ssd2/lilong/ImageNet/val.txt",model.inputs)

optimizer = fluid.optimizer.Momentum(
    learning_rate=configs.lr,
    momentum=configs.momentum,
    parameter_list=model.parameter_list(),
    regularization=fluid.regularizer.L2Decay(0.0001))
optimizer.minimize(model.loss,
                   parameter_list=model.parameter_list())

place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
total_time = 0
for i, data in enumerate(loader()):
    start_time = time.time()
    cost_val = exe.run(fluid.default_main_program(),
                       feed=data,
                       fetch_list=[model.loss.name])
    end_time = time.time()
    total_time += (end_time - start_time)
    print(" step%d cost = %f, total time cost = %f" %
          ( i, cost_val[0], total_time))

数据集:5万张图片,batch size:32
总训练时间:287.22s
性能:174.08 imgs/s

多卡训练时做多卡评估

现在Fleet可以支持 多卡训练的时候做多卡评估么?
之前记得涉及到allreduce的事情代码量很繁琐,现在有新的方案or简洁的代码示例么?
谢谢

exe.run方法中报错:paddle.fluid.core_avx.EnforceNotMet: Invoke operator create_double_buffer_reader error.

paddle版本:1.5.0-cpu
cnn分类任务,使用pyreader方式进行数据读取,batch是自己组的,具体代码和错误如下:
`# -- coding: utf-8 -
import sys
sys.path.append("../")
import collections
import paddle.fluid as fluid
import paddle.fluid.param_attr as attr
from nlpt.model.base_model import Config
from nlpt.encoder.base_encoder import ernie_encoder
from nlpt.encoder.base_encoder import custom_encoder_tensor
from nlpt.encoder.base_encoder import language_model_encoder
from nlpt.encoder.base_encoder import pairwise_matching_encoder
from nlpt.model.classify_model import ClassifyModel
from nlpt.model.custom_model import CustomModel
from nlpt.model.lm_model import LanguageModel
from nlpt.model.matching_model import MatchingModel
from nlpt.model.sequence_label import SequenceLabelModel

class ModelConfig(Config):
def init(self):
Config.init(self)
self.use_cuda = False # 使用GPU训练时设置为true,使用CPU训练时设置为false
self.do_train = True # 训练模型时设置为ture
self.do_val = False # 训练过程中需要进行验证集评估的时候设置为true
self.do_test = False # 训练过程中需要进行测试集评估的时候设置为true
self.do_predict = False # 如果需要直接做预测,请设置为true
self.batch_size = 13 # batch_size大小
self.learning_rate = 2e-5 # 学习率设置
self.save_steps = 1000 # 模型保存的间隔, 即训练多少个batch之后保存一次模型
self.weight_decay = 0 # 衰减权重
self.validation_steps = 100 # 当do_val或者do_test设置为true时生效,表示训练间隔多少个batch之后开始评估和预测
self.epoch = 5 # 训练多少轮
self.skip_steps = 10 # 间隔多少个batch时打印训练日志
self.num_labels = 2 # 分类任务中,类别数量
self.max_seq_len = 128 # 输入文本的最大长度
self.is_ernie = False # 当前任务是基于ernie的时候为true,否则为false
self.evaluate = "acc,auc,recall" # 评估指标设置,可以设置一到多个评估指标,各指标以逗号隔开,指标包括:acc, auc, recall, precision, f1。
# 需要做预测或者热启动的时候请填写该参数,其对应的模型为我们在训练过程中存储到./output/checkpoints中的数据
# self.init_checkpoint = "./init_model/"
self.is_local = True

def create_model(pyreader_name, is_inference=False):
py_reader, list = custom_encoder_tensor(cfg, pyreader_name)
text_a = list[0]
label = list[1]
text_a_mask = list[2]
text_a_lens = list[3]

dict_dim = 33261
emb_dim = 128
hid_dim = 128
hid_dim2 = 96
win_size = 3

unpad_data = fluid.layers.sequence_unpad(text_a, length=text_a_lens)

emb = fluid.layers.embedding(input=unpad_data, size=[dict_dim, emb_dim])

conv = fluid.nets.sequence_conv_pool(
    input=emb,
    num_filters=hid_dim,
    filter_size=win_size,
    act="tanh",
    pool_type="max")

# full connect layer
fc_1 = fluid.layers.fc(input=[conv], size=hid_dim2)

# softmax layer
prediction = fluid.layers.fc(input=[fc_1], size=cfg.num_labels, act="softmax")

if is_inference:
    feed_targets_name = [text_a.name, text_a_lens.name]
    return feed_targets_name, prediction

cost = fluid.layers.cross_entropy(input=prediction, label=label)
avg_cost = fluid.layers.mean(x=cost)

graph_vars = collections.OrderedDict()
graph_vars["loss"] = avg_cost
graph_vars["classify_infer"] = prediction
graph_vars["label"] = label

return py_reader, graph_vars

if name == 'main':
cfg = ModelConfig()
_, graph_vars = create_model('init')
if graph_vars.contains("classify_infer"):
model = ClassifyModel(cfg, create_model)
elif graph_vars.contains("match_pos_score"):
model = MatchingModel(cfg, create_model)
elif graph_vars.contains("sequence_label_infer"):
model = SequenceLabelModel(cfg, create_model)
elif graph_vars.contains("lm_input"):
model = LanguageModel(cfg, create_model)
else:
model = CustomModel(cfg, create_model)
if cfg.do_train:
model.train()
if cfg.do_predict:
model.predict()`

model.py

`import logging
import multiprocessing
import os
import time

from paddle import fluid

import nlpt.model.global_key_manager as global_key_manager
from nlpt.utils.optimization import optimization
from nlpt.utils import log
from nlpt.utils.init import init_checkpoint, init_parameters
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.transpiler.distribute_transpiler import DistributeTranspilerConfig
import paddle.fluid.incubate.fleet.base.role_maker as role_maker

class Config(object):

# common params
def __init__(self):
    self.label_map_config = None
    self.do_lower_case = False
    self.use_cuda = True
    self.in_tokens = False
    self.random_seed = 1
    self.lr_scheduler = 'linear_warmup_decay'
    self.use_fp16 = False
    self.loss_scaling = 1.0
    self.weight_decay = 0
    self.init_parameters = None
    self.init_checkpoint = None
    self.use_fast_executor = False
    #self.ernie_config_path = "./thirdparty/config/ernie_config.json"
    self.vocab_path = os.path.join(os.path.dirname(__file__) + '/../config/vocab.txt')
    self.verbose = True
    self.is_local = os.getenv("PADDLE_IS_LOCAL", "0") == "1"
    self.evaluate = "acc,auc,recall"

    # when online should be set default
    self.num_iteration_per_drop_scope = 1
    self.is_ernie = True

    # default data_config_path
    self.data_config_path = "../nlpt/config/data_config.json"

    # default data path
    self.train_set = "../train_data/classify"
    self.test_set = "../test_data/classify"
    self.dev_set = "../thirdparty/classify/dev_data/"
    self.predict_set = "../thirdparty/classify/predict_data/"

    # default save model
    self.save_inference_model_path = "./output/inference_models"
    self.checkpoints = "./output/checkpoints"

class BaseModel(object):
"""The BaseModel class adds training & evaluation routines to a Network.
"""

def __init__(self, config, create_net):
    print('BaseModel init....')
    self.config = config
    if config.use_cuda:
        place = fluid.CUDAPlace(int(os.getenv('FLAGS_selected_gpus', '0')))
        self.dev_count = fluid.core.get_cuda_device_count()
    else:
        place = fluid.CPUPlace()
        self.dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))

    self.executor = fluid.Executor(place)
    self.startup_prog = fluid.Program()

    self.key_dict_manger = global_key_manager.key_dict_manager

    # multi nodes
    self.num_trainers = 1
    self.trainer_id = 0
    self.is_fleet = False

    self.create_net = create_net

    # Todo: replace by warmup_proportion
    self.warmup_steps = 0
    self.build_reader()
    logging.debug("finish build reader")

    if not self.config.use_cuda:
        self.init_fleet(config.is_local)

    self.build_program()
    logging.debug("finish build graph")

    logging.debug("PADDLE_IS_LOCAL:%d" % config.is_local)
    if self.config.use_cuda:
        self.prepare_nccl2_env(config.is_local)
        logging.debug("finish prepare nccl2 env")
        # run startup_prog after transpile for nccl2
        self.executor.run(self.startup_prog)
    else:
        self.prepare_fleet_2(config.is_local)
        logging.debug("finish prepare fleet env")


    self.load_pretrained_models()

    self.build_executor()
    logging.debug("finish build executor")

    # should be executed after self.build_reader() is called
    if self.reader.label_map:
        self.config.label_id2text = {id_label:text_label for text_label, id_label in self.reader.label_map.items()}

    self.print_config()

def print_config(self):
    print("*********************************** Task Config **************************************")
    for k, v in self.config.__dict__.items():
        print("{0}:{1}".format(k, v))
    print("**************************************************************************************")

# TODO:need override
def init_reader(self):
    print("init reader...")

def extend_graph_vars(self, create_net):
    """ add metrics for standard classify task
    """
    def wrapper(* config, **kwconfig):
        pyreader, graph_vars = create_net(*config, **kwconfig)
        for k, v in graph_vars.items():
            v.persistable = True

        return pyreader, graph_vars

    return wrapper

def build_reader(self):
    self.init_reader()
    if not self.reader:
        print("reader not init.")
        return

    if self.config.do_train:
        self.train_data_generator = self.reader.data_generator(
            data_path=self.config.train_set,
            batch_size=self.config.batch_size,
            epoch=self.config.epoch,
            shuffle=True,
            phase="train")

    if self.config.do_test:
        self.test_data_generator = self.reader.data_generator(
            data_path=self.config.test_set,
            batch_size=self.config.batch_size,
            epoch=1,
            shuffle=False)

    if self.config.do_val:
        self.dev_data_generator = self.reader.data_generator(
            data_path=self.config.dev_set,
            batch_size=self.config.batch_size,
            epoch=1,
            shuffle=False)

    if self.config.do_predict:
        self.predict_data_generator = self.reader.data_generator(
            data_path=self.config.predict_set,
            batch_size=self.config.batch_size,
            epoch=1,
            shuffle=False,
            phase="predict")

def build_program(self):
    self.define_train_program()
    self.define_test_program()
    self.define_infer_program()
    self.set_reader_provider()

def build_executor(self):
    if self.is_fleet:
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = int(os.getenv("CPU_NUM"))
        build_strategy = fluid.BuildStrategy()
        build_strategy.async_mode = False

        if int(os.getenv("CPU_NUM")) > 1:
            build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce

        self.train_exe = fluid.ParallelExecutor(
            use_cuda=self.config.use_cuda,
            loss_name=self.graph_vars["loss"].name,
            main_program=self.train_program,
            build_strategy=build_strategy,
            exec_strategy=exec_strategy)
    else:
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_iteration_per_drop_scope = self.config.num_iteration_per_drop_scope
        if self.config.use_fast_executor:
            exec_strategy.use_experimental_executor = True

        self.train_exe = fluid.ParallelExecutor(
            use_cuda=self.config.use_cuda,
            loss_name=self.graph_vars["loss"].name,
            exec_strategy=exec_strategy,
            main_program=self.train_program,
            num_trainers=self.num_trainers,
            trainer_id=self.trainer_id)



# TODO: need override
def loss_optimizer(self):
    print("init loss_optimizer")
    if not self.config.use_cuda and not self.config.is_local:
        print("is fleet ....")
        self.optimizer = fluid.optimizer.Adam(learning_rate=self.config.learning_rate)
    else:
        optimizer, scheduled_lr = optimization(
            loss=self.graph_vars["loss"],
            warmup_steps=self.warmup_steps,
            num_train_steps=1000,
            learning_rate=self.config.learning_rate,
            train_program=self.train_program,
            startup_prog=self.startup_prog,
            weight_decay=self.config.weight_decay,
            scheduler=self.config.lr_scheduler,
            use_fp16=self.config.use_fp16,
            loss_scaling=self.config.loss_scaling)
        self.optimizer = optimizer






def define_train_program(self):
    if self.is_fleet:
        self.train_program = fleet.main_program
    else:
        self.train_program = fluid.Program()
    with fluid.program_guard(self.train_program, self.startup_prog):
        with fluid.unique_name.guard():
            create_net = self.extend_graph_vars(self.create_net)
            self.train_pyreader, graph_vars = create_net(pyreader_name="train_reader")
            self.graph_vars = self.check_graph_vars(graph_vars)
            self.loss_optimizer()


def define_test_program(self):
    self.test_program = fluid.Program()
    with fluid.program_guard(self.test_program, self.startup_prog):
        with fluid.unique_name.guard():
            create_net = self.extend_graph_vars(self.create_net)
            self.test_pyreader, graph_vars = create_net(pyreader_name="test_reader")
            self.graph_vars = self.check_graph_vars(graph_vars)
    self.test_program = self.test_program.clone(for_test=True)


def define_infer_program(self):
    self.infer_program = fluid.Program()
    with fluid.program_guard(self.infer_program, self.startup_prog):
        with fluid.unique_name.guard():
            self.feed_target_names, self.inference_output = self.create_net(pyreader_name="infer_reader", is_inference=True)
    self.infer_program = self.infer_program.clone(for_test=True)

def check_graph_vars(self, graph_vars):
    keys = list(graph_vars.keys())
    for k in keys:
        if not self.key_dict_manger.check_key_legitimacy(k):
            del graph_vars[k]

    print("after check ", graph_vars)
    return graph_vars

def load_pretrained_models(self):
    config = self.config
    exe = self.executor

    if config.do_train:
        if config.init_checkpoint and config.init_parameters:
            raise ValueError(
                "ERROR: config 'init_checkpoint' and 'init_parameters' "
                "both are set! Only one of them should be set. "
                "if you want warmstart checkpoint keep its learning_rate and moments, plese set 'init_checkpoint'. "
                "if you want warmstart checkpoint with only its parameters, and you want reset a new learning_rate "
                "by config, plese set 'init_parameters'")

        if config.init_checkpoint:
            init_checkpoint(
                exe,
                config.init_checkpoint,
                main_program=self.train_program,
                use_fp16=config.use_fp16)

        elif config.init_parameters:
            init_parameters(
                exe,
                config.init_parameters,
                main_program=self.train_program,
                use_fp16=config.use_fp16)

    elif config.do_val or config.do_test or config.do_predict:
        if config.init_checkpoint:
            init_checkpoint(
                exe,
                config.init_checkpoint,
                main_program=self.train_program,
                use_fp16=config.use_fp16)
        elif config.init_parameters:
            init_parameters(
                exe,
                config.init_parameters,
                main_program=self.train_program,
                use_fp16=config.use_fp16)
        else:
            raise ValueError("config 'init_checkpoint' or 'init_paramters' should be set if"
                             "only doing validation or testing or predict!")


# TODO: need to override
def set_reader_provider(self):
    print("set pyreader data provider.")
    # self.use_lod_tensor = True
    # self.train_pyreader.decorate_tensor_provider(self.train_data_generator)
    # self.test_pyreader.decorate_tensor_provider(self.test_data_generator)

def prepare_nccl2_env(self, is_local):
    if not is_local:
        port = os.getenv("PADDLE_PORT", "6174")
        trainers = os.getenv("PADDLE_TRAINERS")  # ip,ip...
        logging.debug("trainers form env:{}".format(trainers))

        
        trainer_endpoints = []
        for trainer_ip in trainers.split(","):
            trainer_endpoint = ":".join([trainer_ip, port])
            trainer_endpoints.append(trainer_endpoint)
        trainer_endpoints = ",".join(trainer_endpoints)
        logging.debug("trainers endpoints:{}".format(trainer_endpoints))

        #eplist = []
        #for ip in pserver_ips.split(","):
        #    eplist.append(':'.join([ip, port]))
        #pserver_endpoints = ",".join(eplist)  # ip:port,ip:port...
        num_trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))

        current_endpoint = os.getenv("POD_IP") + ":" + port
        logging.debug("current_endpoint: {}".format(current_endpoint))

        trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))

        config = fluid.DistributeTranspilerConfig()
        config.mode = "nccl2"

        t = fluid.DistributeTranspiler(config=config)
        #t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
        t.transpile(trainer_id, trainers=trainer_endpoints, current_endpoint=current_endpoint, \
                    program=self.train_program, startup_program=self.startup_prog)

        self.num_trainers = num_trainers
        self.trainer_id = trainer_id
        logging.debug("nccl_num_trainers:{} nccl_trainer_id:{}".format(self.num_trainers, self.trainer_id))

def init_fleet(self, is_local):
    if not is_local:
        trainer_id = int(os.environ["PADDLE_TRAINER_ID"])
        print("trainer_id:", trainer_id)
        trainers = int(os.environ["PADDLE_TRAINERS"])
        print("trainers:", trainers)
        training_role = os.environ["PADDLE_TRAINING_ROLE"]
        training_role = role_maker.Role.WORKER if training_role == "TRAINER" else role_maker.Role.SERVER

        num_trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))
        self.num_trainers = num_trainers
        self.trainer_id = trainer_id
        ports = os.getenv("PADDLE_PSERVER_PORTS")
        print("ports:", ports)
        pserver_ip = os.getenv("PADDLE_PSERVER_IP", "")
        print("pserver_ip:", pserver_ip)
        pserver_endpoints = []
        for port in ports.split(","):
            pserver_endpoints.append(':'.join([pserver_ip, port]))

        role = role_maker.UserDefinedRoleMaker(current_id=trainer_id, role=training_role, worker_num=trainers,
                                               server_endpoints=pserver_endpoints)

        fleet.init(role_maker=role)

        self.startup_prog = fleet.startup_program


def prepare_fleet_2(self, is_local):
    if not is_local:
        strategy = DistributeTranspilerConfig()
        # strategy.sync_mode = bool(int(os.getenv("DISTRIBUTED_SYNC_MODE")))
        strategy.sync_mode = True
        optimizer = fleet.distributed_optimizer(self.optimizer, strategy)
        optimizer.minimize(self.graph_vars["loss"])

        print("minimize fleet2 ...")
        if fleet.is_server():
            # with open("pserver.proto.{}".format(fleet.server_endpoints()[fleet.server_index()]), "w") as f:
            #     f.write(str(fleet.main_program))
            fleet.init_server()
            fleet.run_server()
        elif fleet.is_worker():
            fleet.init_worker()
            self.executor.run(self.startup_prog)
            # train_loop(fleet.main_program, fleet.worker_index() == 0)
            # fleet.stop_worker()
        print("fleet_num_trainers:{} fleet_trainer_id:{}".format(self.num_trainers, self.trainer_id))
        self.is_fleet = True
    else:
        self.executor.run(self.startup_prog)



def prepare_fleet(self, is_local):
    if not is_local:
        pserver_endpoints = os.getenv("PADDLE_PSERVER_ENDPOINTS")
        pserver_endpoints = pserver_endpoints.split(",")

        role = role_maker.UserDefinedRoleMaker(
            current_id=int(os.getenv("CURRENT_ID")),
            role=role_maker.Role.WORKER if bool(int(os.getenv("IS_WORKER"))) else role_maker.Role.SERVER,
            worker_num=int(os.getenv("WORKER_NUM")),
            server_endpoints=pserver_endpoints
        )

        fleet.init(role_maker=role)
        strategy = DistributeTranspilerConfig()
        strategy.sync_mode = bool(int(os.getenv("DISTRIBUTED_SYNC_MODE")))
        optimizer = fleet.distributed_optimizer(self.optimizer, strategy)
        optimizer.minimize(self.graph_vars["loss"])

        print("minimize fleet ...")
        if fleet.is_server():
            with open("pserver.proto.{}".format(fleet.server_endpoints()[fleet.server_index()]), "w") as f:
                f.write(str(fleet.main_program))
            fleet.init_server()
            fleet.run_server()
        elif fleet.is_worker():
            fleet.init_worker()
            self.executor.run(fleet.startup_program)
            # train_loop(fleet.main_program, fleet.worker_index() == 0)
            # fleet.stop_worker()
    else:
        self.executor.run(self.startup_prog)



def train(self):
    print("start train....")
    self.train_pyreader.start()
    steps = 0
    time_begin = time.time()
    while True:
        try:
            steps += 1
            if steps % self.config.skip_steps != 0:
                self.train_exe.run(fetch_list=[])
            else:
                if self.config.verbose:
                    print("train pyreader queue size: %d, " % self.train_pyreader.queue.size())

                outputs, current_learning_rate = self.evaluate(self.train_exe,
                                                               self.train_program,
                                                               self.train_pyreader,
                                                               self.graph_vars,
                                                               "train",
                                                               steps)

                num_train_examples = self.reader.get_num_examples(self.config.train_set)
                current_example, current_epoch = self.reader.get_train_progress()
                time_end = time.time()
                used_time = time_end - time_begin

                log_info = "current_learning_rate: %f, " % current_learning_rate
                log_info += "epoch: %d, progress: %d/%d, step: %d, " % (
                current_epoch, current_example, num_train_examples, steps)
                log_info += "speed: %f steps/s" % (self.config.skip_steps / used_time)
                print(log_info)

                try:
                    if outputs:
                        import paddlecloud.visual_util as visualdl
                        x_dic = {"x_name": "step", "x_value": steps}
                        y_ls = []
                        for key, value in outputs.items():
                            y = {}
                            y["y_name"] = key
                            y["y_value"] = value
                            y_ls.append(y)

                        visualdl.show_fluid_trend(x_dic, y_ls, tag="train")
                except Exception:
                    print("import paddlecloud.visual_util failed")

                time_begin = time.time()

            if steps % self.config.save_steps == 0:
                save_checkpoint_path = os.path.join(self.config.checkpoints, "step_" + str(steps))
                fluid.io.save_persistables(self.executor, save_checkpoint_path, self.train_program)
                print("save checkpoinmts to %s" % save_checkpoint_path)
                save_inference_model_path = os.path.join(self.config.save_inference_model_path, "step_" + str(steps))
                fluid.io.save_inference_model(
                    save_inference_model_path,
                    self.feed_target_names,
                    [self.inference_output],
                    self.executor,
                    main_program=self.infer_program)
                print("save inference model to %s" % save_inference_model_path)

            if steps % self.config.validation_steps == 0:
                # evaluate dev set
                if self.config.do_val:
                    self.do_test_val("dev", steps)
                # evaluate test set
                if self.config.do_test:
                    self.do_test_val("test", steps)

        except fluid.core.EOFException:
            save_path = os.path.join(self.config.checkpoints, "step_" + str(steps))
            fluid.io.save_persistables(self.executor, save_path, self.train_program)
            save_inference_model_path = os.path.join(self.config.save_inference_model_path, "step_" + str(steps))
            fluid.io.save_inference_model(
                save_inference_model_path,
                self.feed_target_names,
                [self.inference_output],
                self.executor,
                main_program=self.infer_program)
            print("save inference model to %s" % save_inference_model_path)

            self.train_pyreader.reset()
            break

    # final eval on dev set
    if self.config.do_val:
        print("Final validation result:")
        self.do_test_val("dev", steps)

    # final eval on test set
    if self.config.do_test:
        print("Final test result:")
        self.do_test_val("test", steps)

def do_test_val(self, eval_phase, step):
    if eval_phase == "dev":
        data_generator = self.dev_data_generator
    elif eval_phase == "test":
        data_generator = self.test_data_generator
    elif eval_phase == "predict":
        data_generator = self.predict_data_generator
    else:
        raise ValueError("%s is illegal" % eval_phase)

    if self.use_lod_tensor:
        self.test_pyreader.decorate_paddle_reader(data_generator)
    else:
        self.test_pyreader.decorate_tensor_provider(data_generator)

    outputs, current_learning_rate = self.evaluate(self.executor,
                                                   self.test_program,
                                                   self.test_pyreader,
                                                   self.graph_vars,
                                                   eval_phase,
                                                   step)
    try:
        if outputs and len(outputs) != 0:
            import paddlecloud.visual_util as visualdl
            x_dic = {"x_name": "step", "x_value": step}
            y_ls = []
            for key, value in outputs.items():
                y = {}
                y["y_name"] = key
                y["y_value"] = value
                y_ls.append(y)

            visualdl.show_fluid_trend(x_dic, y_ls, tag=eval_phase)
    except Exception:
        print("import paddlecloud.visual_util failed")

# TODO: need override
def evaluate(self, exe, program, pyreader, graph_vars, eval_phase, step):
    print("evaluate in base model...")


def predict(self):
    # Todo:
    logging.debug("start do predict")
    self.do_test_val("predict", 0)

log.init_log("./log/test", level=logging.DEBUG)
`

错误信息:
image
image

*** SIGTERM (@0x38400000db2) received by PID 5342 (TID 0x7efe568a6700) from PID 3506; stack trace: ***

在孔明集群中运行collective demo,出现如下错误

WARNING:root:set nccl_comm_num=1 since you only have 1 node. server not ready, wait 3 sec to retry... not ready endpoints:['127.0.0.1:6171', '127.0.0.1:6172', '127.0.0.1:6173', '127.0.0.1:6174', '127.0.0.1:6175', '127.0.0.1:6176', '127.0.0.1:6177'] server not ready, wait 3 sec to retry... not ready endpoints:['127.0.0.1:6171', '127.0.0.1:6172', '127.0.0.1:6173', '127.0.0.1:6174', '127.0.0.1:6175', '127.0.0.1:6176', '127.0.0.1:6177'] server not ready, wait 3 sec to retry... not ready endpoints:['127.0.0.1:6171', '127.0.0.1:6172', '127.0.0.1:6173', '127.0.0.1:6174', '127.0.0.1:6175', '127.0.0.1:6176', '127.0.0.1:6177'] W1005 11:17:27.760031 5342 init.cc:212] *** Aborted at 1570245447 (unix time) try "date -d @1570245447" if you are using GNU date *** W1005 11:17:27.761384 5342 init.cc:212] PC: @ 0x0 (unknown) W1005 11:17:27.761484 5342 init.cc:212] *** SIGTERM (@0x38400000db2) received by PID 5342 (TID 0x7efe568a6700) from PID 3506; stack trace: *** W1005 11:17:27.762459 5342 init.cc:212] @ 0x7efe5647d160 (unknown) W1005 11:17:27.763466 5342 init.cc:212] @ 0x7efe55a95ed3 __GI_select W1005 11:17:27.764262 5342 init.cc:212] @ 0x7efe4aa9b2ad time_sleep W1005 11:17:27.764425 5342 init.cc:212] @ 0x4b8934 PyEval_EvalFrameEx W1005 11:17:27.764565 5342 init.cc:212] @ 0x4b88af PyEval_EvalFrameEx W1005 11:17:27.764710 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.764847 5342 init.cc:212] @ 0x4b83b7 PyEval_EvalFrameEx W1005 11:17:27.764991 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.765123 5342 init.cc:212] @ 0x4b83b7 PyEval_EvalFrameEx W1005 11:17:27.765264 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.765395 5342 init.cc:212] @ 0x4b83b7 PyEval_EvalFrameEx W1005 11:17:27.765534 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.765666 5342 init.cc:212] @ 0x4b83b7 PyEval_EvalFrameEx W1005 11:17:27.765805 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.765940 5342 init.cc:212] @ 0x4b83b7 PyEval_EvalFrameEx W1005 11:17:27.766070 5342 init.cc:212] @ 0x4b88af PyEval_EvalFrameEx W1005 11:17:27.766199 5342 init.cc:212] @ 0x4b88af PyEval_EvalFrameEx W1005 11:17:27.766336 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.766465 5342 init.cc:212] @ 0x4b83b7 PyEval_EvalFrameEx W1005 11:17:27.766604 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.766733 5342 init.cc:212] @ 0x4b83b7 PyEval_EvalFrameEx W1005 11:17:27.766877 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.767011 5342 init.cc:212] @ 0x4b83b7 PyEval_EvalFrameEx W1005 11:17:27.767155 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.767288 5342 init.cc:212] @ 0x4b83b7 PyEval_EvalFrameEx W1005 11:17:27.767426 5342 init.cc:212] @ 0x4ba7c8 PyEval_EvalCodeEx W1005 11:17:27.767554 5342 init.cc:212] @ 0x4ba8f2 PyEval_EvalCode W1005 11:17:27.767681 5342 init.cc:212] @ 0x4e6e9d PyRun_FileExFlags W1005 11:17:27.767832 5342 init.cc:212] @ 0x4e8721 PyRun_SimpleFileExFlags W1005 11:17:27.767956 5342 init.cc:212] @ 0x415bbd Py_Main W1005 11:17:27.768987 5342 init.cc:212] @ 0x7efe559d7bd5 __libc_start_main W1005 11:17:27.769170 5342 init.cc:212] @ 0x414d51 (unknown)

job 地址
http://yq01-sys-hic-p40-0189.yq01.baidu.com:8388/v1/slurmjobs/137316/workspace
麻烦哪位大神可以帮忙解决,万分感谢

fleet模式,训练过程中如何进行模型评估?

paddle-cpu最新版本
单独训练没有问题,能正常收敛。
如果想在训练过程中进行模型评估,要怎么做,目前开源的代码中没有找见。我按如下的fluid.unique_name.guard的方式构造test_program的时候报错。
image

报错信息:
image

代码:
组网部分
image

训练代码:

`import logging
import multiprocessing
import os
import time

from paddle import fluid

import nlpt.model.global_key_manager as global_key_manager
from nlpt.utils.optimization import optimization
from nlpt.utils import log
from nlpt.utils.init import init_checkpoint, init_parameters
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet
from paddle.fluid.incubate.fleet.base import role_maker

class Config(object):

# common params
def __init__(self):
    self.label_map_config = None
    self.do_lower_case = False
    self.use_cuda = True
    self.in_tokens = False
    self.random_seed = 1
    self.lr_scheduler = 'linear_warmup_decay'
    self.use_fp16 = False
    self.loss_scaling = 1.0
    self.weight_decay = 0
    self.init_parameters = None
    self.init_checkpoint = None
    self.use_fast_executor = False
    #self.ernie_config_path = "./thirdparty/config/ernie_config.json"
    self.vocab_path = os.path.join(os.path.dirname(__file__) + '/../config/vocab.txt')
    self.verbose = True
    self.is_local = os.getenv("PADDLE_IS_LOCAL", "0") == "1"
    self.evaluate = "acc,auc,recall"

    # when online should be set default
    self.num_iteration_per_drop_scope = 1
    self.is_ernie = True

    # default data_config_path
    self.data_config_path = "../nlpt/config/data_config.json"

    # default data path
    self.train_set = "../train_data/classify"
    self.test_set = "../test_data/classify"
    self.dev_set = "../thirdparty/dev_data"
    self.predict_set = "./thirdparty/classify/predict_data"

    # default save model
    self.save_inference_model_path = "./output/inference_models"
    self.checkpoints = "./output/checkpoints"

class BaseModel(object):
"""The BaseModel class adds training & evaluation routines to a Network.
"""

def __init__(self, config, create_net):
    print('BaseModel init....')
    self.config = config
    if config.use_cuda:
        place = fluid.CUDAPlace(int(os.getenv('FLAGS_selected_gpus', '0')))
        self.dev_count = fluid.core.get_cuda_device_count()
    else:
        place = fluid.CPUPlace()
        self.dev_count = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))

    self.executor = fluid.Executor(place)
    self.startup_prog = fluid.Program()

    self.key_dict_manger = global_key_manager.key_dict_manager

    # multi nodes
    self.num_trainers = 1
    self.trainer_id = 0
    self.is_fleet = False

    self.create_net = create_net

    # Todo: replace by warmup_proportion
    self.warmup_steps = 0

    logging.debug("PADDLE_IS_LOCAL:%d" % config.is_local)
    # init fleet if needed
    if not self.config.use_cuda:
        self.init_fleet_paddle_cloud(config.is_local)

    self.build_reader()
    logging.debug("finish build reader")

    self.build_program()
    logging.debug("finish build graph")

    if self.is_fleet:
        if fleet.is_worker():
            self.set_reader_provider()
    else:
        self.set_reader_provider()

    if self.config.use_cuda:
        self.prepare_nccl2_env(config.is_local)
        logging.debug("finish prepare nccl2 env")
        # run startup_prog after transpile for nccl2
        self.executor.run(self.startup_prog)
    else:
        self.prepare_fleet_paddle_cloud(config.is_local)
        logging.debug("finish prepare fleet env")


    self.load_pretrained_models()

    self.build_executor()
    logging.debug("finish build executor")

    # should be executed after self.build_reader() is called
    if self.reader.label_map:
        self.config.label_id2text = {id_label:text_label for text_label, id_label in self.reader.label_map.items()}

    self.print_config()

def print_config(self):
    print("*********************************** Task Config **************************************")
    for k, v in self.config.__dict__.items():
        print("{0}:{1}".format(k, v))
    print("**************************************************************************************")

# TODO:need override
def init_reader(self):
    print("init reader...")

def extend_graph_vars(self, create_net):
    """ add metrics for standard classify task
    """
    def wrapper(* config, **kwconfig):
        pyreader, graph_vars = create_net(*config, **kwconfig)
        for k, v in graph_vars.items():
            v.persistable = True

        return pyreader, graph_vars

    return wrapper

def build_reader(self):
    self.init_reader()
    if not self.reader:
        print("reader not init.")
        return
    if self.is_fleet and fleet.is_server():
        print("fleet server not need reader ...")
        return
    if self.config.do_train:
        self.train_data_generator = self.reader.data_generator(
            data_path=self.config.train_set,
            batch_size=self.config.batch_size,
            epoch=self.config.epoch,
            shuffle=True,
            phase="train")

    if self.config.do_test:
        self.test_data_generator = self.reader.data_generator(
            data_path=self.config.test_set,
            batch_size=self.config.batch_size,
            epoch=1,
            shuffle=False)

    if self.config.do_val:
        self.dev_data_generator = self.reader.data_generator(
            data_path=self.config.dev_set,
            batch_size=self.config.batch_size,
            epoch=1,
            shuffle=False)

    if self.config.do_predict:
        self.predict_data_generator = self.reader.data_generator(
            data_path=self.config.predict_set,
            batch_size=self.config.batch_size,
            epoch=1,
            shuffle=False,
            phase="predict")

def build_program(self):
    self.define_train_program()
    self.define_test_program()
    self.define_infer_program()


def build_executor(self):
    if self.is_fleet:
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_threads = self.dev_count
        build_strategy = fluid.BuildStrategy()
        build_strategy.async_mode = False
        print("CPU_NUM = ", self.dev_count)
        if self.dev_count > 1:
            build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce

        self.train_exe = fluid.ParallelExecutor(
            use_cuda=self.config.use_cuda,
            loss_name=self.graph_vars["loss"].name,
            main_program=self.train_program,
            build_strategy=build_strategy,
            exec_strategy=exec_strategy)
    else:
        exec_strategy = fluid.ExecutionStrategy()
        exec_strategy.num_iteration_per_drop_scope = self.config.num_iteration_per_drop_scope
        if self.config.use_fast_executor:
            exec_strategy.use_experimental_executor = True

        self.train_exe = fluid.ParallelExecutor(
            use_cuda=self.config.use_cuda,
            loss_name=self.graph_vars["loss"].name,
            exec_strategy=exec_strategy,
            main_program=self.train_program,
            num_trainers=self.num_trainers,
            trainer_id=self.trainer_id)



# TODO: need override
def loss_optimizer(self):
    print("init loss_optimizer")
    if not self.config.use_cuda and not self.config.is_local:
        print("is fleet ....")
        optimizer = fluid.optimizer.Adam(learning_rate=self.config.learning_rate)
        self.optimizer = fleet.distributed_optimizer(optimizer)
        self.optimizer.minimize(self.graph_vars["loss"])
        print("minimize fleet paddle cloud...")
    else:
        # optimizer, scheduled_lr = optimization(
        #     loss=self.graph_vars["loss"],
        #     warmup_steps=self.warmup_steps,
        #     num_train_steps=1000,
        #     learning_rate=self.config.learning_rate,
        #     train_program=self.train_program,
        #     startup_prog=self.startup_prog,
        #     weight_decay=self.config.weight_decay,
        #     scheduler=self.config.lr_scheduler,
        #     use_fp16=self.config.use_fp16,
        #     loss_scaling=self.config.loss_scaling)
        # self.optimizer = optimizer
        self.optimizer = fluid.optimizer.Adam(learning_rate=self.config.learning_rate)
        self.optimizer.minimize(self.graph_vars["loss"])

def define_train_program(self):
    self.train_program = fluid.Program()
    with fluid.program_guard(self.train_program, self.startup_prog):
        with fluid.unique_name.guard():
            create_net = self.extend_graph_vars(self.create_net)
            self.train_pyreader, graph_vars = create_net(pyreader_name="train_reader")
            self.graph_vars = self.check_graph_vars(graph_vars)
            self.loss_optimizer()


def define_test_program(self):
    self.test_program = fluid.Program()
    with fluid.program_guard(self.test_program, self.startup_prog):
        with fluid.unique_name.guard():
            create_net = self.extend_graph_vars(self.create_net)
            self.test_pyreader, graph_vars = create_net(pyreader_name="test_reader")
            self.graph_vars = self.check_graph_vars(graph_vars)
    self.test_program = self.test_program.clone(for_test=True)

def define_infer_program(self):
    self.infer_program = fluid.Program()
    with fluid.program_guard(self.infer_program, self.startup_prog):
        with fluid.unique_name.guard():
            self.feed_target_names, self.inference_output = self.create_net(pyreader_name="infer_reader", is_inference=True)
    self.infer_program = self.infer_program.clone(for_test=True)

def check_graph_vars(self, graph_vars):
    keys = list(graph_vars.keys())
    for k in keys:
        if not self.key_dict_manger.check_key_legitimacy(k):
            del graph_vars[k]

    print("after check ", graph_vars)
    return graph_vars

def load_pretrained_models(self):
    config = self.config
    exe = self.executor

    if config.do_train:
        if config.init_checkpoint and config.init_parameters:
            raise ValueError(
                "ERROR: config 'init_checkpoint' and 'init_parameters' "
                "both are set! Only one of them should be set. "
                "if you want warmstart checkpoint keep its learning_rate and moments, plese set 'init_checkpoint'. "
                "if you want warmstart checkpoint with only its parameters, and you want reset a new learning_rate "
                "by config, plese set 'init_parameters'")

        if config.init_checkpoint:
            init_checkpoint(
                exe,
                config.init_checkpoint,
                main_program=self.train_program,
                use_fp16=config.use_fp16)

        elif config.init_parameters:
            init_parameters(
                exe,
                config.init_parameters,
                main_program=self.train_program,
                use_fp16=config.use_fp16)

    elif config.do_val or config.do_test or config.do_predict:
        if config.init_checkpoint:
            init_checkpoint(
                exe,
                config.init_checkpoint,
                main_program=self.train_program,
                use_fp16=config.use_fp16)
        elif config.init_parameters:
            init_parameters(
                exe,
                config.init_parameters,
                main_program=self.train_program,
                use_fp16=config.use_fp16)
        else:
            raise ValueError("config 'init_checkpoint' or 'init_paramters' should be set if"
                             "only doing validation or testing or predict!")


# TODO: need to override
def set_reader_provider(self):
    print("set pyreader data provider.")
    self.use_lod_tensor = True
    self.train_pyreader.decorate_tensor_provider(self.train_data_generator)
    if self.config.do_val or self.config.do_test or self.config.do_predict:
        self.test_pyreader.decorate_tensor_provider(self.test_data_generator)

def prepare_nccl2_env(self, is_local):
    if not is_local:
        port = os.getenv("PADDLE_PORT", "6174")
        trainers = os.getenv("PADDLE_TRAINERS")  # ip,ip...
        logging.debug("trainers form env:{}".format(trainers))

        
        trainer_endpoints = []
        for trainer_ip in trainers.split(","):
            trainer_endpoint = ":".join([trainer_ip, port])
            trainer_endpoints.append(trainer_endpoint)
        trainer_endpoints = ",".join(trainer_endpoints)
        logging.debug("trainers endpoints:{}".format(trainer_endpoints))

        #eplist = []
        #for ip in pserver_ips.split(","):
        #    eplist.append(':'.join([ip, port]))
        #pserver_endpoints = ",".join(eplist)  # ip:port,ip:port...
        num_trainers = int(os.getenv("PADDLE_TRAINERS_NUM", "0"))

        current_endpoint = os.getenv("POD_IP") + ":" + port
        logging.debug("current_endpoint: {}".format(current_endpoint))

        trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))

        config = fluid.DistributeTranspilerConfig()
        config.mode = "nccl2"

        t = fluid.DistributeTranspiler(config=config)
        #t.transpile(trainer_id, pservers=pserver_endpoints, trainers=trainers)
        t.transpile(trainer_id, trainers=trainer_endpoints, current_endpoint=current_endpoint, \
                    program=self.train_program, startup_program=self.startup_prog)

        self.num_trainers = num_trainers
        self.trainer_id = trainer_id
        logging.debug("nccl_num_trainers:{} nccl_trainer_id:{}".format(self.num_trainers, self.trainer_id))



def init_fleet_paddle_cloud(self, is_local):
    if is_local:
        self.is_fleet = False
    else:
        role = role_maker.PaddleCloudRoleMaker()
        fleet.init(role)
        self.is_fleet = True
        # self.startup_prog = fleet.startup_program
        print("init fleet paddle cloud...")


def prepare_fleet_paddle_cloud(self, is_local):
    if is_local:
        self.executor.run(self.startup_prog)
    else:
        if fleet.is_server():
            fleet.init_server()
            fleet.run_server()
        elif fleet.is_worker():
            fleet.init_worker()
            self.executor.run(self.startup_prog)


def train(self):
    if self.is_fleet and fleet.is_server():
        print("is fleet.server, over")
        return
    # print("worker_index%d start train...." % fleet.worker_index())
    self.train_pyreader.start()
    steps = 0
    time_begin = time.time()
    fetch_list = ['embedding_0.w_0', 'sequence_conv_0.w_0', 'fc_0.w_0']
    while True:
        try:
            steps += 1
            if steps % self.config.skip_steps != 0:
                var_emb, var_conv, var_fc = self.train_exe.run(fetch_list=fetch_list)
                print("var_emb is {}".format(var_emb))
                print("var_conv is {}".format(var_conv))
                print("var_fc is {}".format(var_fc))
                # print("all_parameters is {}".format(fluid.default_main_program().block(0).all_parameters()))
            else:
                if self.config.verbose:
                    print("train pyreader queue size: %d, " % (self.train_pyreader.queue.size()))

                outputs, current_learning_rate = self.evaluate(self.train_exe,
                                                               self.train_program,
                                                               self.train_pyreader,
                                                               self.graph_vars,
                                                               "train",
                                                               steps)

                num_train_examples = self.reader.get_num_examples(self.config.train_set)
                current_example, current_epoch = self.reader.get_train_progress()
                time_end = time.time()
                used_time = time_end - time_begin
                log_info = "current_learning_rate: %f, " % current_learning_rate
                log_info += "epoch: %d, progress: %d/%d, step: %d, " % (
                current_epoch, current_example, num_train_examples, steps)
                log_info += "speed: %f steps/s" % (self.config.skip_steps / used_time)
                print(log_info)

                try:
                    if outputs:
                        import paddlecloud.visual_util as visualdl
                        x_dic = {"x_name": "step", "x_value": steps}
                        y_ls = []
                        for key, value in outputs.items():
                            y = {}
                            y["y_name"] = key
                            y["y_value"] = value
                            y_ls.append(y)

                        visualdl.show_fluid_trend(x_dic, y_ls, tag="train")
                except Exception:
                    print("import paddlecloud.visual_util failed")

                time_begin = time.time()

            if steps % self.config.save_steps == 0:
                save_checkpoint_path = os.path.join(self.config.checkpoints, "step_" + str(steps))
                fluid.io.save_persistables(self.executor, save_checkpoint_path, self.train_program)
                # fleet.save_persistables(self.executor, save_checkpoint_path, self.train_program)
                print("save checkpoinmts to %s" % save_checkpoint_path)
                # save_inference_model_path = os.path.join(self.config.save_inference_model_path, "step_" + str(steps))
                # fluid.io.save_inference_model(
                #     save_inference_model_path,
                #     self.feed_target_names,
                #     [self.inference_output],
                #     self.executor,
                #     main_program=self.infer_program)
                # print("save inference model to %s" % save_inference_model_path)

            if steps % self.config.validation_steps == 0:
                # evaluate dev set
                if self.config.do_val:
                    self.do_test_val("dev", steps)
                # evaluate test set
                if self.config.do_test:
                    self.do_test_val("test", steps)

        except fluid.core.EOFException:
            save_path = os.path.join(self.config.checkpoints, "step_" + str(steps))
            fluid.io.save_persistables(self.executor, save_path, self.train_program)
            # fleet.save_persistables(self.executor, save_checkpoint_path, self.train_program)
            save_inference_model_path = os.path.join(self.config.save_inference_model_path, "step_" + str(steps))
            # fleet.save_inference_model(self.executor, save_inference_model_path, self.feed_target_names, [self.inference_output], main_program=self.infer_program)
            # fluid.io.save_inference_model(
            #     save_inference_model_path,
            #     self.feed_target_names,
            #     [self.inference_output],
            #     self.executor,
            #     main_program=self.infer_program)
            # print("save inference model to %s" % (save_inference_model_path))

            self.train_pyreader.reset()
            if self.is_fleet:
                fleet.stop_worker()
            break

    # final eval on dev set
    if self.config.do_val:
        print("Final validation result:")
        self.do_test_val("dev", steps)

    # final eval on test set
    if self.config.do_test:
        print("Final test result:")
        self.do_test_val("test", steps)

def do_test_val(self, eval_phase, step):
    if eval_phase == "dev":
        data_generator = self.dev_data_generator
    elif eval_phase == "test":
        data_generator = self.test_data_generator
    elif eval_phase == "predict":
        data_generator = self.predict_data_generator
    else:
        raise ValueError("%s is illegal" % eval_phase)

    if self.use_lod_tensor:
        self.test_pyreader.decorate_paddle_reader(data_generator)
    else:
        self.test_pyreader.decorate_tensor_provider(data_generator)

    outputs, current_learning_rate = self.evaluate(self.executor,
                                                   self.test_program,
                                                   self.test_pyreader,
                                                   self.graph_vars,
                                                   eval_phase,
                                                   step)
    try:
        if outputs and len(outputs) != 0:
            import paddlecloud.visual_util as visualdl
            x_dic = {"x_name": "step", "x_value": step}
            y_ls = []
            for key, value in outputs.items():
                y = {}
                y["y_name"] = key
                y["y_value"] = value
                y_ls.append(y)

            visualdl.show_fluid_trend(x_dic, y_ls, tag=eval_phase)
    except Exception:
        print("import paddlecloud.visual_util failed")

# TODO: need override
def evaluate(self, exe, program, pyreader, graph_vars, eval_phase, step):
    print("evaluate in base model...")


def predict(self):
    # Todo:
    logging.debug("start do predict")
    self.do_test_val("predict", 0)

log.init_log("./log/test", level=logging.DEBUG)
`

subprocess.CalledProcessError 。。。returned non-zero exit status 1

在孔明集群中运行collective_operators,出现错误:

  • ./python2_paddle150/bin/python -m paddle.distributed.launch --selected_gpus=0,1,2,3,4,5,6,7 --log_dir mylog ./train.py --model=ResNet50 --batch_size=8 --total_images=1281167 --data_dir=./ --class_dim=100 --image_shape=3,224,224 --model_save_dir=./output --lr=0.1 --num_epochs=90 --l2_decay=1e-4 --nccl_comm_num=2 WARNING: Logging before InitGoogleLogging() is written to STDERR I1002 15:31:38.199759 1621 init.cc:67] Init commandline: dummy /home/slurm/job/tmp/job-136870/python2_paddle150/lib/python2.7/site-packages/paddle/distributed/launch.py --tryfromenv=check_nan_inf,benchmark,eager_delete_scope,initial_cpu_memory_in_mb,init_allocated_mem,free_idle_memory,paddle_num_threads,dist_threadpool_size,eager_delete_tensor_gb,fast_eager_deletion_mode,memory_fraction_of_eager_deletion,allocator_strategy,reader_queue_speed_test_mode,print_sub_graph_dir,pe_profile_fname,inner_op_parallelism,enable_parallel_graph,fuse_parameter_groups_size,multiple_of_cupti_buffer_size,fuse_parameter_memory_size,tracer_profile_fname,dygraph_debug,use_pinned_memory,cpu_deterministic,use_mkldnn,rpc_deadline,rpc_server_profile_path,enable_rpc_profiler,rpc_send_thread_num,rpc_get_thread_num,rpc_prefetch_thread_num,rpc_disable_reuse_port,communicator_independent_recv_thread,communicator_send_queue_size,communicator_min_send_grad_num_before_recv,communicator_thread_pool_size,communicator_max_merge_var_num,communicator_fake_rpc,communicator_send_wait_times,fraction_of_gpu_memory_to_use,initial_gpu_memory_in_mb,reallocate_gpu_memory_in_mb,cudnn_deterministic,enable_cublas_tensor_op_math,conv_workspace_size_limit,cudnn_exhaustive_search,selected_gpus,sync_nccl_allreduce,limit_of_tmp_allocation,times_excess_than_required_tmp_allocation,enable_inplace_whitelist,cudnn_batchnorm_spatial_persistent Traceback (most recent call last): File "/home/slurm/job/tmp/job-136870/python2_paddle150/lib/python2.7/runpy.py", line 174, in _run_module_as_main "main", fname, loader, pkg_name) File "/home/slurm/job/tmp/job-136870/python2_paddle150/lib/python2.7/runpy.py", line 72, in _run_code exec code in run_globals File "/home/slurm/job/tmp/job-136870/python2_paddle150/lib/python2.7/site-packages/paddle/distributed/launch.py", line 222, in launch() File "/home/slurm/job/tmp/job-136870/python2_paddle150/lib/python2.7/site-packages/paddle/distributed/launch.py", line 218, in launch start_procs(args) File "/home/slurm/job/tmp/job-136870/python2_paddle150/lib/python2.7/site-packages/paddle/distributed/launch.py", line 211, in start_procs returncode=procs[i].returncode, cmd=cmds[i]) subprocess.CalledProcessError: Command '['/home/slurm/job/tmp/job-136870/python2_paddle150/bin/python', '-u', './train.py', '--model=ResNet50', '--batch_size=8', '--total_images=1281167', '--data_dir=./', '--class_dim=100', '--image_shape=3,224,224', '--model_save_dir=./output', '--lr=0.1', '--num_epochs=90', '--l2_decay=1e-4', '--nccl_comm_num=2']' returned non-zero exit status 1

,对于原始的demo修改了run.sh

#!/bin/bash

export LD_LIBRARY_PATH=/home/work/cuda-9.0/lib64:$LD_LIBRARY_PATH
export LD_LIBRARY_PATH=/home/work/cudnn/cudnn_v7/cuda/lib64:$LD_LIBRARY_PATH
export LD_LIBRARY_PATH=$PWD/nccl_2.3.5/lib/:$LD_LIBRARY_PATH

export FLAGS_cudnn_exhaustive_search=0
export GLOG_v=1
export GLOG_logtostderr=1
export FLAGS_eager_delete_tensor_gb=0
export NCCL_DEBUG=INFO

Unset proxy

unset https_proxy http_proxy
set -xe

MODEL=ResNet50 #VGG16
MODEL_SAVE_PATH="./output"

training params

NUM_EPOCHS=90
BATCH_SIZE=8
LR=0.1

data params

the path of the data set

DATA_PATH="./"
TOTAL_IMAGES=1281167
CLASS_DIM=100
IMAGE_SHAPE=3,224,224

gpu params

NCCL_COMM_NUM=2
set -x
config="--selected_gpus=0,1,2,3,4,5,6,7 --log_dir mylog"
touch ./utils/init.py
./python2_paddle150/bin/python -m paddle.distributed.launch ${config}
./train.py
--model=${MODEL}
--batch_size=${BATCH_SIZE}
--total_images=${TOTAL_IMAGES}
--data_dir=${DATA_PATH}
--class_dim=${CLASS_DIM}
--image_shape=${IMAGE_SHAPE}
--model_save_dir=${MODEL_SAVE_PATH}
--lr=${LR}
--num_epochs=${NUM_EPOCHS}
--l2_decay=1e-4
--nccl_comm_num=2 \

该脚本作为submit命令 job-script参数。
麻烦帮忙解答。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.