From 68dea62b5dae82e6ea15f6499e639bea3d18a3df Mon Sep 17 00:00:00 2001 From: Zhong Hui Date: Fri, 18 Mar 2022 19:46:02 +0800 Subject: [PATCH] init verison for paddlenlp trainer. --- .../ernie-1.0/finetune/finetune.py | 1 - .../finetune/sequence_classification.py | 87 +- .../ernie-1.0/finetune/trainer_args.py | 997 +++++++++++++ .../ernie-1.0/finetune/trainer_base.py | 1289 +++++++++++++++-- .../ernie-1.0/finetune/trainer_callback.py | 660 +++++++++ .../ernie-1.0/finetune/trainer_utils.py | 202 +++ paddlenlp/transformers/model_utils.py | 6 + 7 files changed, 3125 insertions(+), 117 deletions(-) create mode 100644 examples/language_model/ernie-1.0/finetune/trainer_args.py create mode 100644 examples/language_model/ernie-1.0/finetune/trainer_callback.py create mode 100644 examples/language_model/ernie-1.0/finetune/trainer_utils.py diff --git a/examples/language_model/ernie-1.0/finetune/finetune.py b/examples/language_model/ernie-1.0/finetune/finetune.py index 0a6db96097da..70ae65b32964 100644 --- a/examples/language_model/ernie-1.0/finetune/finetune.py +++ b/examples/language_model/ernie-1.0/finetune/finetune.py @@ -280,7 +280,6 @@ def do_train(args): tokenizer, args, test_ds=all_ds["test"]) - trainer.train() trainer.eval() diff --git a/examples/language_model/ernie-1.0/finetune/sequence_classification.py b/examples/language_model/ernie-1.0/finetune/sequence_classification.py index 324399299eee..cdbf887cc1e0 100644 --- a/examples/language_model/ernie-1.0/finetune/sequence_classification.py +++ b/examples/language_model/ernie-1.0/finetune/sequence_classification.py @@ -23,10 +23,10 @@ import numpy as np import paddlenlp -from paddlenlp.data import Stack, Tuple, Pad, Dict +from paddlenlp.data import Stack, Tuple, Pad from paddlenlp.utils.log import logger -from trainer_base import TrainerBase +from trainer_base import TrainerBase, Trainer def convert_example(example, tokenizer, max_seq_length=512, is_test=False): @@ -46,7 +46,12 @@ def convert_example(example, tokenizer, max_seq_length=512, is_test=False): if is_test: return input_ids, token_type_ids label = np.array([example["label"]], dtype="int64") - return input_ids, token_type_ids, label + # return input_ids, token_type_ids, label + return { + "input_ids": input_ids, + "token_type_ids": token_type_ids, + "labels": label + } def seq_trans_fn(example, tokenizer, args): @@ -130,6 +135,39 @@ def clue_batchify_fn(tokenizer, args): return batchify_fn +class Dict(object): + def __init__(self, fn): + assert isinstance(fn, (dict)), 'Input pattern not understood. The input of Dict must be a dict with key of input column name and value of collate_fn ' \ + 'Received fn=%s' % (str(fn)) + + self._fn = fn + + for col_name, ele_fn in self._fn.items(): + assert callable( + ele_fn + ), 'Batchify functions must be callable! type(fn[%d]) = %s' % ( + col_name, str(type(ele_fn))) + + def __call__(self, data): + + ret = {} + for col_name, ele_fn in self._fn.items(): + result = ele_fn([ele[col_name] for ele in data]) + ret[col_name] = result + + return ret + + +def clue_batchify_fn_dict(tokenizer, args): + batchify_fn = lambda samples, fn=Dict({ + 'input_ids': Pad(axis=0, pad_val=tokenizer.pad_token_id), # input + "token_type_ids": Pad(axis=0, pad_val=tokenizer.pad_token_type_id), # segment + "labels": Stack(dtype="int64" if args.label_list else "float32") # label + }): fn(samples) + + return batchify_fn + + @paddle.no_grad() def evaluate(model, criterion, metric, data_loader, mode="dev"): """ @@ -278,7 +316,7 @@ def train(self): best_dev_acc, corr_test_acc)) -class SeqTrainer(ClueTrainer): +class SeqTrainer2(ClueTrainer): def dataloader_inner(self): trans_fn = partial( seq_trans_fn, tokenizer=self.tokenizer, args=self.args) @@ -290,3 +328,44 @@ def dataloader_inner(self): self.dev_ds, "dev", self.args.batch_size, batchify_fn, trans_fn) self.test_dl = self.create_dataloader( self.test_ds, "dev", self.args.batch_size, batchify_fn, trans_fn) + + +class SeqTrainer(Trainer): + def __init__(self, train_ds, dev_ds, model, tokenizer, args, *arg, + **kwargs): + + trans_fn = partial(seq_trans_fn, tokenizer=tokenizer, args=args) + batchify_fn = clue_batchify_fn_dict(tokenizer, args) + + train_ds = train_ds.map(trans_fn) + dev_ds = dev_ds.map(trans_fn) + + loss_fct = paddle.nn.loss.CrossEntropyLoss( + ) if train_ds.label_list else paddle.nn.loss.MSELoss() + + def compute_metrics(p): + preds = p.predictions[0] if isinstance(p.predictions, + tuple) else p.predictions + probs = F.softmax(preds, axis=1) + metric = Accuracy() + metric.reset() + result = metric.compute(preds, p.label_ids) + metric.update(result) + accu = metric.accumulate() + metric.reset() + return {"eval_accuracy": accu} + + # return { + # "accuracy": (preds == p.label_ids).astype(np.float32).mean() + # .item() + # } + + super().__init__( + model, + loss_fct, + args, + batchify_fn, + train_ds, + dev_ds, + tokenizer, + compute_metrics=compute_metrics) diff --git a/examples/language_model/ernie-1.0/finetune/trainer_args.py b/examples/language_model/ernie-1.0/finetune/trainer_args.py new file mode 100644 index 000000000000..e0ad30e6361a --- /dev/null +++ b/examples/language_model/ernie-1.0/finetune/trainer_args.py @@ -0,0 +1,997 @@ +# Copyright 2020 The HuggingFace Team. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import json +import math +import os +import warnings +from dataclasses import asdict, dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any, Dict, List, Optional + +from utils import logging + + +class ExplicitEnum(Enum): + """ + Enum with more explicit error message for missing values. + """ + + @classmethod + def _missing_(cls, value): + raise ValueError( + f"{value} is not a valid {cls.__name__}, please select one of {list(cls._value2member_map_.keys())}" + ) + + +class IntervalStrategy(ExplicitEnum): + NO = "no" + STEPS = "steps" + EPOCH = "epoch" + + +class EvaluationStrategy(ExplicitEnum): + NO = "no" + STEPS = "steps" + EPOCH = "epoch" + + +# logger = logging.get_logger(__name__) +log_levels = logging.get_log_levels_dict().copy() +trainer_log_levels = dict(**log_levels, passive=-1) + + +def default_logdir() -> str: + """ + Same default as PyTorch + """ + import socket + from datetime import datetime + + current_time = datetime.now().strftime("%b%d_%H-%M-%S") + return os.path.join("runs", current_time + "_" + socket.gethostname()) + + +@dataclass +class TrainingArguments: + """ + TrainingArguments is the subset of the arguments we use in our example scripts **which relate to the training loop + itself**. + + Using [`HfArgumentParser`] we can turn this class into + [argparse](https://docs.python.org/3/library/argparse#module-argparse) arguments that can be specified on the + command line. + + Parameters: + output_dir (`str`): + The output directory where the model predictions and checkpoints will be written. + overwrite_output_dir (`bool`, *optional*, defaults to `False`): + If `True`, overwrite the content of the output directory. Use this to continue training if `output_dir` + points to a checkpoint directory. + do_train (`bool`, *optional*, defaults to `False`): + Whether to run training or not. This argument is not directly used by [`Trainer`], it's intended to be used + by your training/evaluation scripts instead. See the [example + scripts](https://github.com/huggingface/transformers/tree/master/examples) for more details. + do_eval (`bool`, *optional*): + Whether to run evaluation on the validation set or not. Will be set to `True` if `evaluation_strategy` is + different from `"no"`. This argument is not directly used by [`Trainer`], it's intended to be used by your + training/evaluation scripts instead. See the [example + scripts](https://github.com/huggingface/transformers/tree/master/examples) for more details. + do_predict (`bool`, *optional*, defaults to `False`): + Whether to run predictions on the test set or not. This argument is not directly used by [`Trainer`], it's + intended to be used by your training/evaluation scripts instead. See the [example + scripts](https://github.com/huggingface/transformers/tree/master/examples) for more details. + evaluation_strategy (`str` or [`~trainer_utils.IntervalStrategy`], *optional*, defaults to `"no"`): + The evaluation strategy to adopt during training. Possible values are: + + - `"no"`: No evaluation is done during training. + - `"steps"`: Evaluation is done (and logged) every `eval_steps`. + - `"epoch"`: Evaluation is done at the end of each epoch. + + prediction_loss_only (`bool`, *optional*, defaults to `False`): + When performing evaluation and generating predictions, only returns the loss. + per_device_train_batch_size (`int`, *optional*, defaults to 8): + The batch size per GPU/TPU core/CPU for training. + per_device_eval_batch_size (`int`, *optional*, defaults to 8): + The batch size per GPU/TPU core/CPU for evaluation. + gradient_accumulation_steps (`int`, *optional*, defaults to 1): + Number of updates steps to accumulate the gradients for, before performing a backward/update pass. + + + + When using gradient accumulation, one step is counted as one step with backward pass. Therefore, logging, + evaluation, save will be conducted every `gradient_accumulation_steps * xxx_step` training examples. + + + + eval_accumulation_steps (`int`, *optional*): + Number of predictions steps to accumulate the output tensors for, before moving the results to the CPU. If + left unset, the whole predictions are accumulated on GPU/TPU before being moved to the CPU (faster but + requires more memory). + learning_rate (`float`, *optional*, defaults to 5e-5): + The initial learning rate for [`AdamW`] optimizer. + weight_decay (`float`, *optional*, defaults to 0): + The weight decay to apply (if not zero) to all layers except all bias and LayerNorm weights in [`AdamW`] + optimizer. + adam_beta1 (`float`, *optional*, defaults to 0.9): + The beta1 hyperparameter for the [`AdamW`] optimizer. + adam_beta2 (`float`, *optional*, defaults to 0.999): + The beta2 hyperparameter for the [`AdamW`] optimizer. + adam_epsilon (`float`, *optional*, defaults to 1e-8): + The epsilon hyperparameter for the [`AdamW`] optimizer. + max_grad_norm (`float`, *optional*, defaults to 1.0): + Maximum gradient norm (for gradient clipping). + num_train_epochs(`float`, *optional*, defaults to 3.0): + Total number of training epochs to perform (if not an integer, will perform the decimal part percents of + the last epoch before stopping training). + max_steps (`int`, *optional*, defaults to -1): + If set to a positive number, the total number of training steps to perform. Overrides `num_train_epochs`. + In case of using a finite iterable dataset the training may stop before reaching the set number of steps + when all data is exhausted + lr_scheduler_type (`str` or [`SchedulerType`], *optional*, defaults to `"linear"`): + The scheduler type to use. See the documentation of [`SchedulerType`] for all possible values. + warmup_ratio (`float`, *optional*, defaults to 0.0): + Ratio of total training steps used for a linear warmup from 0 to `learning_rate`. + warmup_steps (`int`, *optional*, defaults to 0): + Number of steps used for a linear warmup from 0 to `learning_rate`. Overrides any effect of `warmup_ratio`. + log_level (`str`, *optional*, defaults to `passive`): + Logger log level to use on the main process. Possible choices are the log levels as strings: 'debug', + 'info', 'warning', 'error' and 'critical', plus a 'passive' level which doesn't set anything and lets the + application set the level. + log_level_replica (`str`, *optional*, defaults to `passive`): + Logger log level to use on replicas. Same choices as `log_level`" + log_on_each_node (`bool`, *optional*, defaults to `True`): + In multinode distributed training, whether to log using `log_level` once per node, or only on the main + node. + logging_dir (`str`, *optional*): + [TensorBoard](https://www.tensorflow.org/tensorboard) log directory. Will default to + *output_dir/runs/**CURRENT_DATETIME_HOSTNAME***. + logging_strategy (`str` or [`~trainer_utils.IntervalStrategy`], *optional*, defaults to `"steps"`): + The logging strategy to adopt during training. Possible values are: + + - `"no"`: No logging is done during training. + - `"epoch"`: Logging is done at the end of each epoch. + - `"steps"`: Logging is done every `logging_steps`. + + logging_first_step (`bool`, *optional*, defaults to `False`): + Whether to log and evaluate the first `global_step` or not. + logging_steps (`int`, *optional*, defaults to 500): + Number of update steps between two logs if `logging_strategy="steps"`. + logging_nan_inf_filter (`bool`, *optional*, defaults to `True`): + Whether to filter `nan` and `inf` losses for logging. If set to `True` the loss of every step that is `nan` + or `inf` is filtered and the average loss of the current logging window is taken instead. + + + + `logging_nan_inf_filter` only influences the logging of loss values, it does not change the behavior the + gradient is computed or applied to the model. + + + + save_strategy (`str` or [`~trainer_utils.IntervalStrategy`], *optional*, defaults to `"steps"`): + The checkpoint save strategy to adopt during training. Possible values are: + + - `"no"`: No save is done during training. + - `"epoch"`: Save is done at the end of each epoch. + - `"steps"`: Save is done every `save_steps`. + save_steps (`int`, *optional*, defaults to 500): + Number of updates steps before two checkpoint saves if `save_strategy="steps"`. + save_total_limit (`int`, *optional*): + If a value is passed, will limit the total amount of checkpoints. Deletes the older checkpoints in + `output_dir`. + save_on_each_node (`bool`, *optional*, defaults to `False`): + When doing multi-node distributed training, whether to save models and checkpoints on each node, or only on + the main one. + + This should not be activated when the different nodes use the same storage as the files will be saved with + the same names for each node. + no_cuda (`bool`, *optional*, defaults to `False`): + Whether to not use CUDA even when it is available or not. + seed (`int`, *optional*, defaults to 42): + Random seed that will be set at the beginning of training. To ensure reproducibility across runs, use the + [`~Trainer.model_init`] function to instantiate the model if it has some randomly initialized parameters. + bf16 (`bool`, *optional*, defaults to `False`): + Whether to use bf16 16-bit (mixed) precision training instead of 32-bit training. Requires Ampere or higher + NVIDIA architecture. This is an experimental API and it may change. + fp16 (`bool`, *optional*, defaults to `False`): + Whether to use fp16 16-bit (mixed) precision training instead of 32-bit training. + fp16_opt_level (`str`, *optional*, defaults to 'O1'): + For `fp16` training, Apex AMP optimization level selected in ['O0', 'O1', 'O2', and 'O3']. See details on + the [Apex documentation](https://nvidia.github.io/apex/amp). + fp16_backend (`str`, *optional*, defaults to `"auto"`): + This argument is deprecated. Use `half_precision_backend` instead. + half_precision_backend (`str`, *optional*, defaults to `"auto"`): + The backend to use for mixed precision training. Must be one of `"auto"`, `"amp"` or `"apex"`. `"auto"` + will use AMP or APEX depending on the PyTorch version detected, while the other choices will force the + requested backend. + bf16_full_eval (`bool`, *optional*, defaults to `False`): + Whether to use full bfloat16 evaluation instead of 32-bit. This will be faster and save memory but can harm + metric values. This is an experimental API and it may change. + fp16_full_eval (`bool`, *optional*, defaults to `False`): + Whether to use full float16 evaluation instead of 32-bit. This will be faster and save memory but can harm + metric values. + tf32 (`bool`, *optional*): + Whether to enable tf32 mode, available in Ampere and newer GPU architectures. This is an experimental API + and it may change. + local_rank (`int`, *optional*, defaults to -1): + Rank of the process during distributed training. + xpu_backend (`str`, *optional*): + The backend to use for xpu distributed training. Must be one of `"mpi"` or `"ccl"`. + tpu_num_cores (`int`, *optional*): + When training on TPU, the number of TPU cores (automatically passed by launcher script). + dataloader_drop_last (`bool`, *optional*, defaults to `False`): + Whether to drop the last incomplete batch (if the length of the dataset is not divisible by the batch size) + or not. + eval_steps (`int`, *optional*): + Number of update steps between two evaluations if `evaluation_strategy="steps"`. Will default to the same + value as `logging_steps` if not set. + dataloader_num_workers (`int`, *optional*, defaults to 0): + Number of subprocesses to use for data loading (PyTorch only). 0 means that the data will be loaded in the + main process. + past_index (`int`, *optional*, defaults to -1): + Some models like [TransformerXL](../model_doc/transformerxl) or [XLNet](../model_doc/xlnet) can make use of + the past hidden states for their predictions. If this argument is set to a positive int, the `Trainer` will + use the corresponding output (usually index 2) as the past state and feed it to the model at the next + training step under the keyword argument `mems`. + run_name (`str`, *optional*): + A descriptor for the run. Typically used for [wandb](https://www.wandb.com/) and + [mlflow](https://www.mlflow.org/) logging. + disable_tqdm (`bool`, *optional*): + Whether or not to disable the tqdm progress bars and table of metrics produced by + [`~notebook.NotebookTrainingTracker`] in Jupyter Notebooks. Will default to `True` if the logging level is + set to warn or lower (default), `False` otherwise. + remove_unused_columns (`bool`, *optional*, defaults to `True`): + If using `datasets.Dataset` datasets, whether or not to automatically remove the columns unused by the + model forward method. + + (Note that this behavior is not implemented for [`TFTrainer`] yet.) + label_names (`List[str]`, *optional*): + The list of keys in your dictionary of inputs that correspond to the labels. + + Will eventually default to `["labels"]` except if the model used is one of the `XxxForQuestionAnswering` in + which case it will default to `["start_positions", "end_positions"]`. + load_best_model_at_end (`bool`, *optional*, defaults to `False`): + Whether or not to load the best model found during training at the end of training. + + + + When set to `True`, the parameters `save_strategy` needs to be the same as `eval_strategy`, and in the case + it is "steps", `save_steps` must be a round multiple of `eval_steps`. + + + + metric_for_best_model (`str`, *optional*): + Use in conjunction with `load_best_model_at_end` to specify the metric to use to compare two different + models. Must be the name of a metric returned by the evaluation with or without the prefix `"eval_"`. Will + default to `"loss"` if unspecified and `load_best_model_at_end=True` (to use the evaluation loss). + + If you set this value, `greater_is_better` will default to `True`. Don't forget to set it to `False` if + your metric is better when lower. + greater_is_better (`bool`, *optional*): + Use in conjunction with `load_best_model_at_end` and `metric_for_best_model` to specify if better models + should have a greater metric or not. Will default to: + + - `True` if `metric_for_best_model` is set to a value that isn't `"loss"` or `"eval_loss"`. + - `False` if `metric_for_best_model` is not set, or set to `"loss"` or `"eval_loss"`. + ignore_data_skip (`bool`, *optional*, defaults to `False`): + When resuming training, whether or not to skip the epochs and batches to get the data loading at the same + stage as in the previous training. If set to `True`, the training will begin faster (as that skipping step + can take a long time) but will not yield the same results as the interrupted training would have. + sharded_ddp (`bool`, `str` or list of [`~trainer_utils.ShardedDDPOption`], *optional*, defaults to `False`): + Use Sharded DDP training from [FairScale](https://github.com/facebookresearch/fairscale) (in distributed + training only). This is an experimental feature. + + A list of options along the following: + + - `"simple"`: to use first instance of sharded DDP released by fairscale (`ShardedDDP`) similar to ZeRO-2. + - `"zero_dp_2"`: to use the second instance of sharded DPP released by fairscale (`FullyShardedDDP`) in + Zero-2 mode (with `reshard_after_forward=False`). + - `"zero_dp_3"`: to use the second instance of sharded DPP released by fairscale (`FullyShardedDDP`) in + Zero-3 mode (with `reshard_after_forward=True`). + - `"offload"`: to add ZeRO-offload (only compatible with `"zero_dp_2"` and `"zero_dp_3"`). + + If a string is passed, it will be split on space. If a bool is passed, it will be converted to an empty + list for `False` and `["simple"]` for `True`. + deepspeed (`str` or `dict`, *optional*): + Use [Deepspeed](https://github.com/microsoft/deepspeed). This is an experimental feature and its API may + evolve in the future. The value is either the location of DeepSpeed json config file (e.g., + `ds_config.json`) or an already loaded json file as a `dict`" + label_smoothing_factor (`float`, *optional*, defaults to 0.0): + The label smoothing factor to use. Zero means no label smoothing, otherwise the underlying onehot-encoded + labels are changed from 0s and 1s to `label_smoothing_factor/num_labels` and `1 - label_smoothing_factor + + label_smoothing_factor/num_labels` respectively. + debug (`str` or list of [`~debug_utils.DebugOption`], *optional*, defaults to `""`): + Enable one or more debug features. This is an experimental feature. + + Possible options are: + + - `"underflow_overflow"`: detects overflow in model's input/outputs and reports the last frames that led to + the event + - `"tpu_metrics_debug"`: print debug metrics on TPU + + The options should be separated by whitespaces. + optim (`str` or [`training_args.OptimizerNames`], *optional*, defaults to `"adamw_hf"`): + The optimizer to use: adamw_hf, adamw_torch, adamw_apex_fused, or adafactor. + adafactor (`bool`, *optional*, defaults to `False`): + This argument is deprecated. Use `--optim adafactor` instead. + group_by_length (`bool`, *optional*, defaults to `False`): + Whether or not to group together samples of roughly the same length in the training dataset (to minimize + padding applied and be more efficient). Only useful if applying dynamic padding. + length_column_name (`str`, *optional*, defaults to `"length"`): + Column name for precomputed lengths. If the column exists, grouping by length will use these values rather + than computing them on train startup. Ignored unless `group_by_length` is `True` and the dataset is an + instance of `Dataset`. + report_to (`str` or `List[str]`, *optional*, defaults to `"all"`): + The list of integrations to report the results and logs to. Supported platforms are `"azure_ml"`, + `"comet_ml"`, `"mlflow"`, `"tensorboard"` and `"wandb"`. Use `"all"` to report to all integrations + installed, `"none"` for no integrations. + ddp_find_unused_parameters (`bool`, *optional*): + When using distributed training, the value of the flag `find_unused_parameters` passed to + `DistributedDataParallel`. Will default to `False` if gradient checkpointing is used, `True` otherwise. + ddp_bucket_cap_mb (`int`, *optional*): + When using distributed training, the value of the flag `bucket_cap_mb` passed to `DistributedDataParallel`. + dataloader_pin_memory (`bool`, *optional*, defaults to `True`): + Whether you want to pin memory in data loaders or not. Will default to `True`. + skip_memory_metrics (`bool`, *optional*, defaults to `True`): + Whether to skip adding of memory profiler reports to metrics. This is skipped by default because it slows + down the training and evaluation speed. + push_to_hub (`bool`, *optional*, defaults to `False`): + Whether or not to push the model to the Hub every time the model is saved. If this is activated, + `output_dir` will begin a git directory synced with the the repo (determined by `hub_model_id`) and the + content will be pushed each time a save is triggered (depending on your `save_strategy`). Calling + [`~Trainer.save_model`] will also trigger a push. + + + + If `output_dir` exists, it needs to be a local clone of the repository to which the [`Trainer`] will be + pushed. + + + + resume_from_checkpoint (`str`, *optional*): + The path to a folder with a valid checkpoint for your model. This argument is not directly used by + [`Trainer`], it's intended to be used by your training/evaluation scripts instead. See the [example + scripts](https://github.com/huggingface/transformers/tree/master/examples) for more details. + hub_model_id (`str`, *optional*): + The name of the repository to keep in sync with the local *output_dir*. It can be a simple model ID in + which case the model will be pushed in your namespace. Otherwise it should be the whole repository name, + for instance `"user_name/model"`, which allows you to push to an organization you are a member of with + `"organization_name/model"`. Will default to `user_name/output_dir_name` with *output_dir_name* being the + name of `output_dir`. + + Will default to to the name of `output_dir`. + hub_strategy (`str` or [`~trainer_utils.HubStrategy`], *optional*, defaults to `"every_save"`): + Defines the scope of what is pushed to the Hub and when. Possible values are: + + - `"end"`: push the model, its configuration, the tokenizer (if passed along to the [`Trainer`]) and a + draft of a model card when the [`~Trainer.save_model`] method is called. + - `"every_save"`: push the model, its configuration, the tokenizer (if passed along to the [`Trainer`]) and + a draft of a model card each time there is a model save. The pushes are asynchronous to not block + training, and in case the save are very frequent, a new push is only attempted if the previous one is + finished. A last push is made with the final model at the end of training. + - `"checkpoint"`: like `"every_save"` but the latest checkpoint is also pushed in a subfolder named + last-checkpoint, allowing you to resume training easily with + `trainer.train(resume_from_checkpoint="last-checkpoint")`. + - `"all_checkpoints"`: like `"checkpoint"` but all checkpoints are pushed like they appear in the output + folder (so you will get one checkpoint folder per folder in your final repository) + + hub_token (`str`, *optional*): + The token to use to push the model to the Hub. Will default to the token in the cache folder obtained with + `huggingface-cli login`. + gradient_checkpointing (`bool`, *optional*, defaults to `False`): + If True, use gradient checkpointing to save memory at the expense of slower backward pass. + """ + + output_dir: str = field( + metadata={ + "help": + "The output directory where the model predictions and checkpoints will be written." + }, ) + overwrite_output_dir: bool = field( + default=False, + metadata={ + "help": + ("Overwrite the content of the output directory. " + "Use this to continue training if output_dir points to a checkpoint directory." + ) + }, ) + + do_train: bool = field( + default=False, metadata={"help": "Whether to run training."}) + do_eval: bool = field( + default=False, + metadata={"help": "Whether to run eval on the dev set."}) + do_predict: bool = field( + default=False, + metadata={"help": "Whether to run predictions on the test set."}) + evaluation_strategy: IntervalStrategy = field( + default="no", + metadata={"help": "The evaluation strategy to use."}, ) + prediction_loss_only: bool = field( + default=False, + metadata={ + "help": + "When performing evaluation and predictions, only returns the loss." + }, ) + + per_device_train_batch_size: int = field( + default=8, + metadata={"help": "Batch size per GPU/TPU core/CPU for training."}) + per_device_eval_batch_size: int = field( + default=8, + metadata={"help": "Batch size per GPU/TPU core/CPU for evaluation."}) + + per_gpu_train_batch_size: Optional[int] = field( + default=None, + metadata={ + "help": + "Deprecated, the use of `--per_device_train_batch_size` is preferred. " + "Batch size per GPU/TPU core/CPU for training." + }, ) + per_gpu_eval_batch_size: Optional[int] = field( + default=None, + metadata={ + "help": + "Deprecated, the use of `--per_device_eval_batch_size` is preferred. " + "Batch size per GPU/TPU core/CPU for evaluation." + }, ) + + gradient_accumulation_steps: int = field( + default=1, + metadata={ + "help": + "Number of updates steps to accumulate before performing a backward/update pass." + }, ) + eval_accumulation_steps: Optional[int] = field( + default=None, + metadata={ + "help": + "Number of predictions steps to accumulate before moving the tensors to the CPU." + }, ) + + learning_rate: float = field( + default=5e-5, + metadata={"help": "The initial learning rate for AdamW."}) + weight_decay: float = field( + default=0.0, + metadata={"help": "Weight decay for AdamW if we apply some."}) + adam_beta1: float = field( + default=0.9, metadata={"help": "Beta1 for AdamW optimizer"}) + adam_beta2: float = field( + default=0.999, metadata={"help": "Beta2 for AdamW optimizer"}) + adam_epsilon: float = field( + default=1e-8, metadata={"help": "Epsilon for AdamW optimizer."}) + max_grad_norm: float = field( + default=1.0, metadata={"help": "Max gradient norm."}) + + num_train_epochs: float = field( + default=3.0, + metadata={"help": "Total number of training epochs to perform."}) + max_steps: int = field( + default=-1, + metadata={ + "help": + "If > 0: set total number of training steps to perform. Override num_train_epochs." + }, ) + lr_scheduler_type: str = field( + default="linear", + metadata={"help": "The scheduler type to use."}, ) + warmup_ratio: float = field( + default=0.0, + metadata={ + "help": "Linear warmup over warmup_ratio fraction of total steps." + }) + warmup_steps: int = field( + default=0, metadata={"help": "Linear warmup over warmup_steps."}) + + log_level: Optional[str] = field( + default="passive", + metadata={ + "help": + "Logger log level to use on the main node. Possible choices are the log levels as strings: 'debug', 'info', 'warning', 'error' and 'critical', plus a 'passive' level which doesn't set anything and lets the application set the level. Defaults to 'passive'.", + "choices": trainer_log_levels.keys(), + }, ) + log_level_replica: Optional[str] = field( + default="passive", + metadata={ + "help": + "Logger log level to use on replica nodes. Same choices and defaults as ``log_level``", + "choices": trainer_log_levels.keys(), + }, ) + log_on_each_node: bool = field( + default=True, + metadata={ + "help": + "When doing a multinode distributed training, whether to log once per node or just once on the main node." + }, ) + logging_dir: Optional[str] = field( + default=None, metadata={"help": "Tensorboard log dir."}) + logging_strategy: IntervalStrategy = field( + default="steps", + metadata={"help": "The logging strategy to use."}, ) + logging_first_step: bool = field( + default=False, metadata={"help": "Log the first global_step"}) + logging_steps: int = field( + default=500, metadata={"help": "Log every X updates steps."}) + + save_strategy: IntervalStrategy = field( + default="steps", + metadata={"help": "The checkpoint save strategy to use."}, ) + save_steps: int = field( + default=500, + metadata={"help": "Save checkpoint every X updates steps."}) + save_total_limit: Optional[int] = field( + default=None, + metadata={ + "help": + ("Limit the total amount of checkpoints. " + "Deletes the older checkpoints in the output_dir. Default is unlimited checkpoints" + ) + }, ) + save_on_each_node: bool = field( + default=False, + metadata={ + "help": + "When doing multi-node distributed training, whether to save models and checkpoints on each node, or only on the main one" + }, ) + no_cuda: bool = field( + default=False, + metadata={"help": "Do not use CUDA even when it is available"}) + seed: int = field( + default=42, + metadata={ + "help": "Random seed that will be set at the beginning of training." + }) + + fp16: bool = field( + default=False, + metadata={ + "help": "Whether to use fp16 (mixed) precision instead of 32-bit" + }, ) + fp16_opt_level: str = field( + default="O1", + metadata={ + "help": + ("For fp16: Apex AMP optimization level selected in ['O0', 'O1', 'O2', and 'O3']. " + "See details at https://nvidia.github.io/apex/amp.html") + }, ) + local_rank: int = field( + default=-1, metadata={"help": "For distributed training: local_rank"}) + + debug: str = field( + default="", + metadata={ + "help": "Whether or not to enable debug mode. Current options: " + "`underflow_overflow` (Detect underflow and overflow in activations and weights), " + "`tpu_metrics_debug` (print debug metrics on TPU)." + }, ) + + dataloader_drop_last: bool = field( + default=False, + metadata={ + "help": + "Drop the last incomplete batch if it is not divisible by the batch size." + }) + eval_steps: int = field( + default=None, metadata={"help": "Run an evaluation every X steps."}) + dataloader_num_workers: int = field( + default=0, + metadata={ + "help": + "Number of subprocesses to use for data loading (PyTorch only). 0 means that the data will be loaded in the main process." + }, ) + + past_index: int = field( + default=-1, + metadata={ + "help": + "If >=0, uses the corresponding part of the output as the past state for next step." + }, ) + + run_name: Optional[str] = field( + default=None, + metadata={ + "help": + "An optional descriptor for the run. Notably used for wandb logging." + }) + + label_names: Optional[List[str]] = field( + default=None, + metadata={ + "help": + "The list of keys in your dictionary of inputs that correspond to the labels." + }) + + load_best_model_at_end: Optional[bool] = field( + default=False, + metadata={ + "help": + "Whether or not to load the best model found during training at the end of training." + }, ) + metric_for_best_model: Optional[str] = field( + default=None, + metadata={ + "help": "The metric to use to compare two different models." + }) + greater_is_better: Optional[bool] = field( + default=None, + metadata={ + "help": + "Whether the `metric_for_best_model` should be maximized or not." + }) + ignore_data_skip: bool = field( + default=False, + metadata={ + "help": + "When resuming training, whether or not to skip the first epochs and batches to get to the same training data." + }, ) + optim: str = field( + default="adamw", + metadata={"help": "The optimizer to use."}, ) + report_to: Optional[List[str]] = field( + default=None, + metadata={ + "help": + "The list of integrations to report the results and logs to." + }) + + dataloader_pin_memory: bool = field( + default=True, + metadata={"help": "Whether or not to pin memory for DataLoader."}) + skip_memory_metrics: bool = field( + default=True, + metadata={ + "help": + "Whether or not to skip adding of memory profiler reports to metrics." + }) + + resume_from_checkpoint: Optional[str] = field( + default=None, + metadata={ + "help": + "The path to a folder with a valid checkpoint for your model." + }, ) + + gradient_checkpointing: bool = field( + default=False, + metadata={ + "help": + "If True, use gradient checkpointing to save memory at the expense of slower backward pass." + }, ) + _n_gpu: int = field(init=False, repr=False, default=-1) + + def __post_init__(self): + # Handle --use_env option in torch.distributed.launch (local_rank not passed as an arg then). + # This needs to happen before any call to self.device or self.n_gpu. + env_local_rank = int(os.environ.get("LOCAL_RANK", -1)) + if env_local_rank != -1 and env_local_rank != self.local_rank: + self.local_rank = env_local_rank + + # convert to int + self.log_level = trainer_log_levels[self.log_level] + self.log_level_replica = trainer_log_levels[self.log_level_replica] + + # expand paths, if not os.makedirs("~/bar") will make directory + # in the current directory instead of the actual home + # see https://github.com/huggingface/transformers/issues/10628 + if self.output_dir is not None: + self.output_dir = os.path.expanduser(self.output_dir) + if self.logging_dir is None and self.output_dir is not None: + self.logging_dir = os.path.join(self.output_dir, default_logdir()) + if self.logging_dir is not None: + self.logging_dir = os.path.expanduser(self.logging_dir) + + if self.disable_tqdm is None: + self.disable_tqdm = logger.getEffectiveLevel() > logging.WARN + + if isinstance(self.evaluation_strategy, EvaluationStrategy): + warnings.warn( + "using `EvaluationStrategy` for `evaluation_strategy` is deprecated and will be removed in version 5 of 🤗 Transformers. Use `IntervalStrategy` instead", + FutureWarning, ) + # Go back to the underlying string or we won't be able to instantiate `IntervalStrategy` on it. + self.evaluation_strategy = self.evaluation_strategy.value + + self.evaluation_strategy = IntervalStrategy(self.evaluation_strategy) + self.logging_strategy = IntervalStrategy(self.logging_strategy) + self.save_strategy = IntervalStrategy(self.save_strategy) + + self.lr_scheduler_type = SchedulerType(self.lr_scheduler_type) + if self.do_eval is False and self.evaluation_strategy != IntervalStrategy.NO: + self.do_eval = True + + # eval_steps has to be defined and non-zero, fallbacks to logging_steps if the latter is non-zero + if self.evaluation_strategy == IntervalStrategy.STEPS and ( + self.eval_steps is None or self.eval_steps == 0): + if self.logging_steps > 0: + logger.info( + f"using `logging_steps` to initialize `eval_steps` to {self.logging_steps}" + ) + self.eval_steps = self.logging_steps + else: + raise ValueError( + f"evaluation strategy {self.evaluation_strategy} requires either non-zero --eval_steps or --logging_steps" + ) + + # logging_steps must be non-zero for logging_strategy that is other than 'no' + if self.logging_strategy == IntervalStrategy.STEPS and self.logging_steps == 0: + raise ValueError( + f"logging strategy {self.logging_strategy} requires non-zero --logging_steps" + ) + + # Sanity checks for load_best_model_at_end: we require save and eval strategies to be compatible. + if self.load_best_model_at_end: + if self.evaluation_strategy != self.save_strategy: + raise ValueError( + "--load_best_model_at_end requires the save and eval strategy to match, but found\n- Evaluation " + f"strategy: {self.evaluation_strategy}\n- Save strategy: {self.save_strategy}" + ) + if self.evaluation_strategy == IntervalStrategy.STEPS and self.save_steps % self.eval_steps != 0: + raise ValueError( + "--load_best_model_at_end requires the saving steps to be a round multiple of the evaluation " + f"steps, but found {self.save_steps}, which is not a round multiple of {self.eval_steps}." + ) + + if self.load_best_model_at_end and self.metric_for_best_model is None: + self.metric_for_best_model = "loss" + if self.greater_is_better is None and self.metric_for_best_model is not None: + self.greater_is_better = self.metric_for_best_model not in [ + "loss", "eval_loss" + ] + if self.run_name is None: + self.run_name = self.output_dir + + self.optim = OptimizerNames(self.optim) + + if self.warmup_ratio < 0 or self.warmup_ratio > 1: + raise ValueError("warmup_ratio must lie in range [0,1]") + elif self.warmup_ratio > 0 and self.warmup_steps > 0: + logger.info( + "Both warmup_ratio and warmup_steps given, warmup_steps will override any effect of warmup_ratio during training" + ) + + if isinstance(self.debug, str): + self.debug = [DebugOption(s) for s in self.debug.split()] + + def __str__(self): + self_as_dict = asdict(self) + + # Remove deprecated arguments. That code should be removed once + # those deprecated arguments are removed from TrainingArguments. (TODO: v5) + del self_as_dict["per_gpu_train_batch_size"] + del self_as_dict["per_gpu_eval_batch_size"] + + self_as_dict = { + k: f"<{k.upper()}>" if k.endswith("_token") else v + for k, v in self_as_dict.items() + } + + attrs_as_str = [f"{k}={v},\n" for k, v in sorted(self_as_dict.items())] + return f"{self.__class__.__name__}(\n{''.join(attrs_as_str)})" + + __repr__ = __str__ + + @property + def train_batch_size(self) -> int: + """ + The actual batch size for training (may differ from `per_gpu_train_batch_size` in distributed training). + """ + if self.per_gpu_train_batch_size: + logger.warning( + "Using deprecated `--per_gpu_train_batch_size` argument which will be removed in a future " + "version. Using `--per_device_train_batch_size` is preferred.") + per_device_batch_size = self.per_gpu_train_batch_size or self.per_device_train_batch_size + train_batch_size = per_device_batch_size * max(1, self.n_gpu) + return train_batch_size + + @property + def eval_batch_size(self) -> int: + """ + The actual batch size for evaluation (may differ from `per_gpu_eval_batch_size` in distributed training). + """ + if self.per_gpu_eval_batch_size: + logger.warning( + "Using deprecated `--per_gpu_eval_batch_size` argument which will be removed in a future " + "version. Using `--per_device_eval_batch_size` is preferred.") + per_device_batch_size = self.per_gpu_eval_batch_size or self.per_device_eval_batch_size + eval_batch_size = per_device_batch_size * max(1, self.n_gpu) + return eval_batch_size + + @property + def n_gpu(self): + """ + The number of GPUs used by this process. + + Note: + This will only be greater than one when you have multiple GPUs available but are not using distributed + training. For distributed training, it will always be 1. + """ + # Make sure `self._n_gpu` is properly setup. + # _ = self._setup_devices + return self._n_gpu + + @property + def world_size(self): + """ + The number of processes used in parallel. + """ + if self.local_rank != -1: + return torch.distributed.get_world_size() + return 1 + + @property + def process_index(self): + """ + The index of the current process used. + """ + if self.local_rank != -1: + return torch.distributed.get_rank() + return 0 + + @property + def local_process_index(self): + """ + The index of the local process used. + """ + if self.local_rank != -1: + return self.local_rank + return 0 + + @property + def should_log(self): + """ + Whether or not the current process should produce log. + """ + if self.log_on_each_node: + return self.local_process_index == 0 + else: + return self.process_index == 0 + + @property + def should_save(self): + """ + Whether or not the current process should write to disk, e.g., to save models and checkpoints. + """ + if self.save_on_each_node: + return self.local_process_index == 0 + else: + if is_sagemaker_mp_enabled(): + return smp.rank() == 0 + else: + return self.process_index == 0 + + def get_process_log_level(self): + """ + Returns the log level to be used depending on whether this process is the main process of node 0, main process + of node non-0, or a non-main process. + + For the main process the log level defaults to `logging.INFO` unless overridden by `log_level` argument. + + For the replica processes the log level defaults to `logging.WARNING` unless overridden by `log_level_replica` + argument. + + The choice between the main and replica process settings is made according to the return value of `should_log`. + """ + + log_level_main_node = logging.INFO if self.log_level == -1 else self.log_level + log_level_replica_node = logging.WARNING if self.log_level_replica == -1 else self.log_level_replica + return log_level_main_node if self.should_log else log_level_replica_node + + @contextlib.contextmanager + def main_process_first(self, local=True, desc="work"): + """ + A context manager for torch distributed environment where on needs to do something on the main process, while + blocking replicas, and when it's finished releasing the replicas. + + One such use is for `datasets`'s `map` feature which to be efficient should be run once on the main process, + which upon completion saves a cached version of results and which then automatically gets loaded by the + replicas. + + Args: + local (`bool`, *optional*, defaults to `True`): + if `True` first means process of rank 0 of each node if `False` first means process of rank 0 of node + rank 0 In multi-node environment with a shared filesystem you most likely will want to use + `local=False` so that only the main process of the first node will do the processing. If however, the + filesystem is not shared, then the main process of each node will need to do the processing, which is + the default behavior. + desc (`str`, *optional*, defaults to `"work"`): + a work description to be used in debug logs + + """ + if is_torch_available() and self.world_size > 1: + if local: + is_main_process = self.local_process_index == 0 + main_process_desc = "main local process" + else: + is_main_process = self.process_index == 0 + main_process_desc = "main process" + + try: + if not is_main_process: + # tell all replicas to wait + logger.debug( + f"{self.process_index}: waiting for the {main_process_desc} to perform {desc}" + ) + if is_torch_tpu_available(): + xm.rendezvous(desc) + elif is_sagemaker_dp_enabled(): + sm_dist.barrier() + else: + torch.distributed.barrier() + yield + finally: + if is_main_process: + # the wait is over + logger.debug( + f"{self.process_index}: {main_process_desc} completed {desc}, releasing all replicas" + ) + if is_torch_tpu_available(): + xm.rendezvous(desc) + elif is_sagemaker_dp_enabled(): + sm_dist.barrier() + else: + torch.distributed.barrier() + else: + yield + + def get_warmup_steps(self, num_training_steps: int): + """ + Get number of steps used for a linear warmup. + """ + warmup_steps = (self.warmup_steps if self.warmup_steps > 0 else + math.ceil(num_training_steps * self.warmup_ratio)) + return warmup_steps + + def to_dict(self): + """ + Serializes this instance while replace `Enum` by their values (for JSON serialization support). It obfuscates + the token values by removing their value. + """ + d = asdict(self) + for k, v in d.items(): + if isinstance(v, Enum): + d[k] = v.value + if isinstance(v, list) and len(v) > 0 and isinstance(v[0], Enum): + d[k] = [x.value for x in v] + if k.endswith("_token"): + d[k] = f"<{k.upper()}>" + return d + + def to_json_string(self): + """ + Serializes this instance to a JSON string. + """ + return json.dumps(self.to_dict(), indent=2) + + def to_sanitized_dict(self) -> Dict[str, Any]: + """ + Sanitized serialization to use with TensorBoard’s hparams + """ + d = self.to_dict() + d = { + ** d, ** { + "train_batch_size": self.train_batch_size, + "eval_batch_size": self.eval_batch_size + } + } + + valid_types = [bool, int, float, str] + if is_torch_available(): + valid_types.append(torch.Tensor) + + return { + k: v if type(v) in valid_types else str(v) + for k, v in d.items() + } diff --git a/examples/language_model/ernie-1.0/finetune/trainer_base.py b/examples/language_model/ernie-1.0/finetune/trainer_base.py index 6b1790063aa4..914481545458 100644 --- a/examples/language_model/ernie-1.0/finetune/trainer_base.py +++ b/examples/language_model/ernie-1.0/finetune/trainer_base.py @@ -12,128 +12,258 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections +import contextlib +import inspect +import math +import os +import random +import re +import shutil +import sys +import time +import warnings +from collections.abc import Mapping +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Union + +from tqdm.auto import tqdm + import paddle import paddle.nn as nn import paddle.nn.functional as F from paddlenlp.transformers import LinearDecayWithWarmup from paddlenlp.utils.log import logger -from paddle.io import DataLoader +from paddle.io import DataLoader, DistributedBatchSampler +import numpy as np -class TrainerBase(object): - """ - """ +from trainer_args import TrainingArguments +# from trainer_callback import TrainerState, TrainerControl - def create_dataloader(self, - dataset, - mode='train', - batch_size=16, - batchify_fn=None, - trans_fn=None, - batched=False): - """ - """ - if trans_fn: - dataset = dataset.map(trans_fn, batched=batched) +from trainer_utils import ( + IntervalStrategy, + EvaluationStrategy, + EvalPrediction, + PredictionOutput, + EvalLoopOutput, + speed_metrics, ) - shuffle = True if mode == 'train' else False - if mode == 'train': - batch_sampler = paddle.io.DistributedBatchSampler( - dataset, batch_size=batch_size, shuffle=shuffle) - else: - batch_sampler = paddle.io.BatchSampler( - dataset, batch_size=batch_size, shuffle=shuffle) +from trainer_callback import ( + CallbackHandler, + DefaultFlowCallback, + PrinterCallback, + ProgressCallback, + TrainerCallback, + TrainerControl, + TrainerState, ) - return paddle.io.DataLoader( - dataset=dataset, - batch_sampler=batch_sampler, - collate_fn=batchify_fn, - num_workers=0, - return_list=True) +DEFAULT_CALLBACKS = [DefaultFlowCallback] - def train(self, *args, **kwargs): - """ - """ - pass +from utils import logging - def eval(self, *args, **kwargs): - """ - """ - pass +from paddlenlp.transformers.model_utils import PretrainedModel, unwrap_model +from paddlenlp.transformers.tokenizer_utils import PretrainedTokenizer - def prepare_train_config(self): - """ - """ - if self.args.max_steps > 0: - self.args.num_training_steps = self.args.max_steps - self.args.num_train_epochs = math.ceil( - self.args.num_training_steps / len(self.train_dl)) +# logger = logging.get_logger(__name__) - else: - self.args.num_training_steps = len( - self.train_dl) * self.args.num_train_epochs - self.args.num_train_epochs = self.args.num_train_epochs +from paddle.io import Dataset - if self.args.num_training_steps // self.args.valid_steps < self.args.minimum_valid_times: - exp_step = self.args.num_training_steps / self.args.minimum_valid_times - exp_step = max(int(exp_step - exp_step % 10), 10) - logger.info("Set eval step to %d" % exp_step) - self.args.valid_steps = exp_step - warmup = self.args.warmup_steps if self.args.warmup_steps > 0 else self.args.warmup_proportion +class DataCollator: + pass - self.lr_scheduler = LinearDecayWithWarmup( - self.args.learning_rate, self.args.num_training_steps, warmup) - # Generate parameter names needed to perform weight decay. - # All bias and LayerNorm parameters are excluded. - decay_params = [ - p.name for n, p in self.model.named_parameters() - if not any(nd in n for nd in ["bias", "norm"]) - ] +class DataCollatorWithPadding: + def __init__(self, *args, **kwargs): + pass - self.optimizer = paddle.optimizer.AdamW( - learning_rate=self.lr_scheduler, - beta1=0.9, - beta2=0.999, - epsilon=self.args.adam_epsilon, - parameters=self.model.parameters(), - weight_decay=self.args.weight_decay, - apply_decay_param_fun=lambda x: x in decay_params, - grad_clip=nn.ClipGradByGlobalNorm(self.args.max_grad_norm)) - def print_config(self): - """ - """ - logger.info('{:^40}'.format("Configuration Arguments")) - logger.info('{:20}:{}'.format("paddle commit id", - paddle.version.commit)) - for arg in vars(self.args): - logger.info('{:20}:{}'.format(arg, getattr(self.args, arg))) +def paddle_pad_and_concatenate(tensor1, tensor2, padding_index=-100): + """Concatenates `tensor1` and `tensor2` on first axis, applying padding on the second if necessary.""" + if len(tensor1.shape) == 1 or tensor1.shape[1] == tensor2.shape[1]: + return paddle.concat((tensor1, tensor2), axis=0) + + raise ValueError("pass") + # Let's figure out the new shape + new_shape = (tensor1.shape[0] + tensor2.shape[0], max( + tensor1.shape[1], tensor2.shape[1])) + tensor1.shape[2:] + + # Now let's fill the result tensor + result = tensor1.new_full(new_shape, padding_index) + result[:tensor1.shape[0], :tensor1.shape[1]] = tensor1 + result[tensor1.shape[0]:, :tensor2.shape[1]] = tensor2 + return result + + +def nested_concat(tensors, new_tensors, padding_index=-100): + """ + Concat the `new_tensors` to `tensors` on the first dim and pad them on the second if needed. Works for tensors or + nested list/tuples of tensors. + """ + assert type(tensors) == type( + new_tensors + ), f"Expected `tensors` and `new_tensors` to have the same type but found {type(tensors)} and {type(new_tensors)}." + if isinstance(tensors, (list, tuple)): + return type(tensors)(nested_concat( + t, n, padding_index=padding_index) + for t, n in zip(tensors, new_tensors)) + elif isinstance(tensors, paddle.Tensor): + return paddle_pad_and_concatenate( + tensors, new_tensors, padding_index=padding_index) + elif isinstance(tensors, np.ndarray): + return numpy_pad_and_concatenate( + tensors, new_tensors, padding_index=padding_index) + else: + raise TypeError( + f"Unsupported type for concatenation: got {type(tensors)}") + + +def nested_detach(tensors): + "Detach `tensors` (even if it's a nested list/tuple of tensors)." + if isinstance(tensors, (list, tuple)): + return type(tensors)(nested_detach(t) for t in tensors) + return tensors.detach() + + +# Name of the files used for checkpointing +TRAINING_ARGS_NAME = "training_args.bin" +TRAINER_STATE_NAME = "trainer_state.json" +OPTIMIZER_NAME = "optimizer.pdparams" +SCHEDULER_NAME = "scheduler.pdparams" +SCALER_NAME = "scaler.pdparams" + +PREFIX_CHECKPOINT_DIR = "training" + + +def set_seed(seed): + # Use the same data seed(for data shuffle) for all procs to guarantee data + # consistency after sharding. + random.seed(seed) + np.random.seed(seed) + # Maybe different op seeds(for dropout) for different procs is better. By: + # `paddle.seed(args.seed + paddle.distributed.get_rank())` + paddle.seed(seed) class Trainer: """ + Trainer is a simple but feature-complete training and eval loop for PyTorch, optimized for 🤗 Transformers. + + Args: + model ([`PretrainedModel`] or `paddle.nn.Module`, *optional*): + The model to train, evaluate or use for predictions. If not provided, a `model_init` must be passed. + + + + [`Trainer`] is optimized to work with the [`PretrainedModel`] provided by the library. You can still use + your own models defined as `paddle.nn.Module` as long as they work the same way as the 🤗 Transformers + models. + + + + args ([`TrainingArguments`], *optional*): + The arguments to tweak for training. Will default to a basic instance of [`TrainingArguments`] with the + `output_dir` set to a directory named *tmp_trainer* in the current directory if not provided. + data_collator (`DataCollator`, *optional*): + The function to use to form a batch from a list of elements of `train_dataset` or `eval_dataset`. Will + default to [`default_data_collator`] if no `tokenizer` is provided, an instance of + [`DataCollatorWithPadding`] otherwise. + train_dataset (`paddle.utils.data.Dataset` or `paddle.utils.data.IterableDataset`, *optional*): + The dataset to use for training. If it is an `datasets.Dataset`, columns not accepted by the + `model.forward()` method are automatically removed. + + Note that if it's a `paddle.utils.data.IterableDataset` with some randomization and you are training in a + distributed fashion, your iterable dataset should either use a internal attribute `generator` that is a + `paddle.Generator` for the randomization that must be identical on all processes (and the Trainer will + manually set the seed of this `generator` at each epoch) or have a `set_epoch()` method that internally + sets the seed of the RNGs used. + eval_dataset (`paddle.utils.data.Dataset`, *optional*): + The dataset to use for evaluation. If it is an `datasets.Dataset`, columns not accepted by the + `model.forward()` method are automatically removed. + tokenizer ([`PretrainedTokenizer`], *optional*): + The tokenizer used to preprocess the data. If provided, will be used to automatically pad the inputs the + maximum length when batching inputs, and it will be saved along the model to make it easier to rerun an + interrupted training or reuse the fine-tuned model. + compute_metrics (`Callable[[EvalPrediction], Dict]`, *optional*): + The function that will be used to compute metrics at evaluation. Must take a [`EvalPrediction`] and return + a dictionary string to metric values. + optimizers (`Tuple[paddle.optimizer.Optimizer, paddle.optimizer.lr.LRScheduler]`, *optional*): A tuple + containing the optimizer and the scheduler to use. Will default to an instance of [`AdamW`] on your model + and a scheduler given by [`get_linear_schedule_with_warmup`] controlled by `args`. + + Important attributes: + + - **model** -- Always points to the core model. If using a transformers model, it will be a [`PretrainedModel`] + subclass. + - **model_wrapped** -- Always points to the most external model in case one or more other modules wrap the + original model. This is the model that should be used for the forward pass. For example, under `DeepSpeed`, + the inner model is wrapped in `DeepSpeed` and then again in `paddle.nn.DistributedDataParallel`. If the inner + model hasn't been wrapped, then `self.model_wrapped` is the same as `self.model`. + - **is_model_parallel** -- Whether or not a model has been switched to a model parallel mode (different from + data parallelism, this means some of the model layers are split on different GPUs). + - **place_model_on_device** -- Whether or not to automatically place the model on the device - it will be set + to `False` if model parallel or deepspeed is used, or if the default + `TrainingArguments.place_model_on_device` is overridden to return `False` . + - **is_in_train** -- Whether or not a model is currently running `train` (e.g. when `evaluate` is called while + in `train`) + """ def __init__( self, - model: Union[PreTrainedModel, nn.Layer]=None, + model: Union[PretrainedModel, nn.Layer]=None, + criterion: Union[nn.Layer]=None, args: TrainingArguments=None, data_collator: Optional[DataCollator]=None, train_dataset: Optional[Dataset]=None, eval_dataset: Optional[Dataset]=None, - tokenizer: Optional[PreTrainedTokenizerBase]=None, + tokenizer: Optional[PretrainedTokenizer]=None, compute_metrics: Optional[Callable[[EvalPrediction], Dict]]=None, - optimizers: Tuple[paddle.optim.Optimizer, paddle.optim.lr_scheduler. - LambdaLR]=(None, None), ): + optimizers: Tuple[paddle.optimizer.Optimizer, + paddle.optimizer.lr.LRScheduler]=(None, None), ): + logger.info("init!!!!") + if args is None: output_dir = "tmp_trainer" logger.info( f"No `TrainingArguments` passed, using `output_dir={output_dir}`." ) args = TrainingArguments(output_dir=output_dir) + args.world_size = 1 + args.fp16 = args.use_amp + args.do_grad_scaling = args.use_amp + self.do_grad_scaling = args.do_grad_scaling + args.train_batch_size = args.batch_size + args.eval_batch_size = args.batch_size + + args.dataloader_drop_last = True + args.dataloader_num_workers = 0 + args.dataloader_pin_memory = True + args.n_gpu = 1 + args.lr_scheduler_type = "linear" + args.adam_beta1 = 0.9 + args.adam_beta2 = 0.999 + args.optim = "OptimizerNames.ADAMW" + args.past_index = -1 + args.per_device_train_batch_size = args.batch_size + args.per_device_eval_batch_size = args.batch_size + args.logging_first_step = True + args.logging_strategy = IntervalStrategy.STEPS + args.evaluation_strategy = IntervalStrategy.STEPS + args.save_strategy = IntervalStrategy.STEPS + args.eval_steps = 500 + args.save_steps = 500 + args.label_names = None + args.prediction_loss_only = False + args.output_dir = "./out" + args.should_save = True + args.local_rank = int(os.getenv("PADDLE_RANK_IN_NODE", 0)) + args.save_total_limit = 3 + args.metric_for_best_model = "accuracy" + args.greater_is_better = True + self.args = args # Seed must be set before instantiating the model when using model set_seed(self.args.seed) @@ -141,8 +271,12 @@ def __init__( raise RuntimeError( "`Trainer` requires either a `model` or `model_init` argument") + if self.args.should_save: + os.makedirs(self.args.output_dir, exist_ok=True) + default_collator = default_data_collator if tokenizer is None else DataCollatorWithPadding( tokenizer) + self.data_collator = data_collator if data_collator is not None else default_collator self.train_dataset = train_dataset self.eval_dataset = eval_dataset @@ -150,10 +284,20 @@ def __init__( self.model_wrapped = model self.model = model + self.criterion = criterion self.compute_metrics = compute_metrics self.optimizer, self.lr_scheduler = optimizers + self.state = TrainerState() + self.control = TrainerControl() + callbacks = DEFAULT_CALLBACKS + self.callback_handler = CallbackHandler(callbacks, self.model, + self.tokenizer, self.optimizer, + self.lr_scheduler) + + self.add_callback(ProgressCallback) + if args.max_steps > 0: logger.info( "max_steps is given, it will override any value given in num_train_epochs" @@ -166,28 +310,129 @@ def __init__( ) if args.fp16: - logger.info(f"Using half precision backend") + logger.info(f"Using half precision") + + default_label_names = (["start_positions", "end_positions"] if + "QusetionAnswering" in type(self.model).__name__ + else ["labels"]) + self.label_names = default_label_names if self.args.label_names is None else self.args.label_names + + def add_callback(self, callback): + """ + Add a callback to the current list of [`~transformer.TrainerCallback`]. + + Args: + callback (`type` or [`~transformer.TrainerCallback`]): + A [`~transformer.TrainerCallback`] class or an instance of a [`~transformer.TrainerCallback`]. In the + first case, will instantiate a member of that class. + """ + self.callback_handler.add_callback(callback) def train( self, resume_from_checkpoint: Optional[Union[str, bool]]=None, - trial: Union["optuna.Trial", Dict[str, Any]]=None, ignore_keys_for_eval: Optional[List[str]]=None, **kwargs, ): + print("training!!!!") + logger.info("training!!!!") train_dataloader = self.get_train_dataloader() model = self._wrap_model(self.model_wrapped) - self.create_optimizer_and_scheduler(num_training_steps=max_steps) - for epoch in range(epochs_trained, num_train_epochs): + self.state = TrainerState() + + if self.args.max_steps > 0: + self.args.num_training_steps = self.args.max_steps + self.args.num_train_epochs = math.ceil( + self.args.num_training_steps / len(train_dataloader)) + + else: + self.args.num_training_steps = len( + train_dataloader) * self.args.num_train_epochs + self.args.num_train_epochs = self.args.num_train_epochs + + if self.args.num_training_steps // self.args.valid_steps < self.args.minimum_valid_times: + exp_step = self.args.num_training_steps / self.args.minimum_valid_times + exp_step = max(int(exp_step - exp_step % 10), 10) + logger.info("Set eval step to %d" % exp_step) + self.args.valid_steps = exp_step + + args = self.args + + self.create_optimizer_and_scheduler( + num_training_steps=args.num_training_steps) + + num_examples = len(self.train_dataset) + total_train_batch_size = self.args.per_device_train_batch_size * paddle.distributed.get_world_size( + ) + + logger.info("***** Running training *****") + logger.info(f" Num examples = {num_examples}") + logger.info(f" Num Epochs = {self.args.num_train_epochs}") + logger.info( + f" Instantaneous batch size per device = {self.args.per_device_train_batch_size}" + ) + logger.info( + f" Total train batch size (w. parallel, distributed & accumulation) = {total_train_batch_size}" + ) + logger.info(f" Gradient Accumulation steps = {1}") + logger.info( + f" Total optimization steps = {self.args.num_training_steps}") + + self.state.epoch = 0 + self.state.max_steps = int(self.args.num_training_steps) + self.state.num_train_epochs = int(self.args.num_train_epochs) + self.state.is_local_process_zero = 0 + self.state.is_world_process_zero = 0 + + start_time = time.time() + epochs_trained = 0 + steps_trained_in_current_epoch = 0 + steps_trained_progress_bar = None + + self.training_bar = tqdm(total=self.state.max_steps) + + epoch_iterator = train_dataloader + steps_in_epoch = len(epoch_iterator) + + self.callback_handler.model = self.model + self.callback_handler.optimizer = self.optimizer + self.callback_handler.lr_scheduler = self.lr_scheduler + self.callback_handler.train_dataloader = train_dataloader + + self.control = self.callback_handler.on_train_begin(args, self.state, + self.control) + + tr_loss = paddle.to_tensor(0.0) + self._total_loss_scalar = 0.0 + self._globalstep_last_logged = self.state.global_step + + for epoch in range(epochs_trained, args.num_train_epochs): step = -1 + + self.control = self.callback_handler.on_epoch_begin( + args, self.state, self.control) + for step, inputs in enumerate(epoch_iterator): + # print(inputs) + # print("=="*20) tr_loss_step = self.training_step(model, inputs) - self.scaler.step(self.optimizer) - self.scaler.update() - self.optimizer.step() + # self.scaler.step(self.optimizer) + # self.scaler.update() + tr_loss += tr_loss_step + self.training_bar.update(1) + self.optimizer.step() self.lr_scheduler.step() - model.zero_grad() + self.optimizer.clear_grad() + + self.state.global_step += 1 + self.state.epoch = epoch + (step + 1) / steps_in_epoch + + self.control = self.callback_handler.on_step_end( + args, self.state, self.control) + + self._maybe_log_save_evaluate(tr_loss, model, epoch, + ignore_keys_for_eval) def training_step( self, model: nn.Layer, @@ -202,71 +447,723 @@ def training_step( return loss.detach() + def _get_train_sampler(self) -> Optional[paddle.io.Sampler]: + if not isinstance(self.train_dataset, collections.abc.Sized): + return None + + if self.args.world_size <= 1: + # return RandomSampler(self.train_dataset) + return DistributedBatchSampler( + self.train_dataset, + # num_replicas=self.args.world_size, + # rank=self.args.process_index, + batch_size=self.args.batch_size, + shuffle=True, + # seed=self.args.seed, + ) + else: + return DistributedBatchSampler( + self.train_dataset, + # num_replicas=self.args.world_size, + # rank=self.args.process_index, + # seed=self.args.seed, + ) + + def _maybe_log_save_evaluate(self, tr_loss, model, epoch, + ignore_keys_for_eval): + if self.control.should_log: + + logs: Dict[str, float] = {} + + # all_gather + mean() to get average loss over all processes + # tr_loss_scalar = self._nested_gather(tr_loss).mean().item() + tr_loss_scalar = tr_loss.mean().item() + + # reset tr_loss to zero + tr_loss -= tr_loss + + logs["loss"] = round(tr_loss_scalar / ( + self.state.global_step - self._globalstep_last_logged), 4) + logs["learning_rate"] = self._get_learning_rate() + + self._total_loss_scalar += tr_loss_scalar + self._globalstep_last_logged = self.state.global_step + + self.log(logs) + + metrics = None + if self.control.should_evaluate: + metrics = self.evaluate(ignore_keys=ignore_keys_for_eval) + + if self.control.should_save: + self._save_checkpoint(model, metrics=metrics) + self.control = self.callback_handler.on_save(self.args, self.state, + self.control) + + def _get_learning_rate(self): + return self.optimizer.get_lr() + def get_train_dataloader(self): - pass + """ + Returns the training [`~paddle.io.DataLoader`]. + + Will use no sampler if `self.train_dataset` does not implement `__len__`, a random sampler (adapted to + distributed training if necessary) otherwise. + + Subclass and override this method if you want to inject some custom behavior. + """ + if self.train_dataset is None: + raise ValueError("Trainer: training requires a train_dataset.") + + train_dataset = self.train_dataset + + train_sampler = self._get_train_sampler() + + return DataLoader( + train_dataset, + # batch_size=self.args.train_batch_size, + batch_sampler=train_sampler, + collate_fn=self.data_collator, + # drop_last=self.args.dataloader_drop_last, + num_workers=self.args.dataloader_num_workers, + # pin_memory=self.args.dataloader_pin_memory, + ) def _get_eval_sampler(self, eval_dataset: Dataset): - pass + if self.args.world_size <= 1: + return DistributedBatchSampler( + eval_dataset, + # num_replicas=self.args.world_size, + # rank=self.args.process_index, + batch_size=self.args.per_device_eval_batch_size, + shuffle=False, + # seed=self.args.seed, + ) + else: + return DistributedBatchSampler( + eval_dataset, + batch_size=self.args.per_device_eval_batch_size, + shuffle=False) def get_eval_dataloader(self, eval_dataset: Optional[Dataset]=None) -> DataLoader: - pass + """ + Returns the evaluation [`~paddle.io.DataLoader`]. + + Subclass and override this method if you want to inject some custom behavior. + + Args: + eval_dataset (`paddle.io.Dataset`, *optional*): + If provided, will override `self.eval_dataset`. If it is an `datasets.Dataset`, columns not accepted by + the `model.forward()` method are automatically removed. It must implement `__len__`. + """ + if eval_dataset is None and self.eval_dataset is None: + raise ValueError("Trainer: evaluation requires an eval_dataset.") + eval_dataset = eval_dataset if eval_dataset is not None else self.eval_dataset + + eval_sampler = self._get_eval_sampler(eval_dataset) + + return DataLoader( + eval_dataset, + # batch_size=self.args.train_batch_size, + batch_sampler=eval_sampler, + collate_fn=self.data_collator, + # drop_last=self.args.dataloader_drop_last, + num_workers=self.args.dataloader_num_workers, + # pin_memory=self.args.dataloader_pin_memory, + ) def get_test_dataloader(self, test_dataset: Dataset) -> DataLoader: - pass + """ + Returns the test [`~paddle.io.DataLoader`]. + + Subclass and override this method if you want to inject some custom behavior. + + Args: + test_dataset (`paddle.io.Dataset`, *optional*): + The test dataset to use. If it is an `datasets.Dataset`, columns not accepted by the `model.forward()` + method are automatically removed. It must implement `__len__`. + """ + + test_sampler = self._get_eval_sampler(test_dataset) + + # We use the same batch_size as for eval. + return DataLoader( + test_dataset, + sampler=test_sampler, + batch_size=self.args.eval_batch_size, + collate_fn=self.data_collator, + drop_last=self.args.dataloader_drop_last, + pin_memory=self.args.dataloader_pin_memory, ) def create_optimizer_and_scheduler(self, num_training_steps: int): - pass + """ + Setup the optimizer and the learning rate scheduler. - def create_optimizer(self): - pass + We provide a reasonable default that works well. If you want to use something else, you can pass a tuple in the + Trainer's init through `optimizers`, or subclass and override this method (or `create_optimizer` and/or + `create_scheduler`) in a subclass. + """ + self.create_scheduler(num_training_steps=num_training_steps) + self.create_optimizer(self.lr_scheduler) + + def create_optimizer(self, lr_scheduler=None): + """ + Setup the optimizer. + + We provide a reasonable default that works well. If you want to use something else, you can pass a tuple in the + Trainer's init through `optimizers`, or subclass and override this method in a subclass. + """ + if self.optimizer is None: + decay_parameters = [ + p.name for n, p in self.model.named_parameters() + if not any(nd in n for nd in ["bias", "norm"]) + ] + apply_decay_param_fun = lambda x: x in decay_parameters + + optimizer_cls, optimizer_kwargs = Trainer.get_optimizer_cls_and_kwargs( + self.args) + + self.optimizer = optimizer_cls( + learning_rate=self.lr_scheduler + if lr_scheduler is None else lr_scheduler, + apply_decay_param_fun=apply_decay_param_fun, + parameters=self.model.parameters(), + weight_decay=self.args.weight_decay, + grad_clip=nn.ClipGradByGlobalNorm(self.args.max_grad_norm), + **optimizer_kwargs) + + return self.optimizer @staticmethod def get_optimizer_cls_and_kwargs( args: TrainingArguments) -> Tuple[Any, Any]: - pass + """ + Returns the optimizer class and optimizer parameters based on the training arguments. + + Args: + args (`paddlenlp.training_args.TrainingArguments`): + The training arguments for the training session. + + """ + # optimizer_kwargs = {"lr": args.learning_rate} + optimizer_kwargs = {} + adam_kwargs = { + "beta1": args.adam_beta1, + "beta2": args.adam_beta2, + "epsilon": args.adam_epsilon, + } + if args.optim == "OptimizerNames.ADAMW": + from paddle.optimizer import AdamW + + optimizer_cls = AdamW + optimizer_kwargs.update(adam_kwargs) + else: + raise ValueError( + f"Trainer cannot instantiate unsupported optimizer: {args.optim}" + ) + return optimizer_cls, optimizer_kwargs def create_scheduler(self, num_training_steps: int, - optimizer: paddle.optim.Optimizer=None): - pass + optimizer: paddle.optimizer.Optimizer=None): + """ + Setup the scheduler. The optimizer of the trainer must have been set up either before this method is called or + passed as an argument. + + Args: + num_training_steps (int): The number of training steps to do. + """ + + def get_scheduler(lr_scheduler_type, learning_rate, num_warmup_steps, + num_training_steps): + # TODO @ZHUI support others + return LinearDecayWithWarmup(learning_rate, num_training_steps, + num_warmup_steps) + + warmup = self.args.warmup_steps if self.args.warmup_steps > 0 else self.args.warmup_proportion + + if self.lr_scheduler is None: + self.lr_scheduler = get_scheduler( + self.args.lr_scheduler_type, + learning_rate=self.args.learning_rate, + num_warmup_steps=warmup, + num_training_steps=num_training_steps, ) + + return self.lr_scheduler def _wrap_model(self, model, training=True): - pass + # train/eval could be run multiple-times - if already wrapped, don't re-wrap it again + if unwrap_model(model) is not model: + return model + + if self.args.n_gpu > 1: + model = nn.DistributedDataParallel(model) + + # Note: in paddle.distributed mode, there's no point in wrapping the model + # inside a DistributedDataParallel as we'll be under `no_grad` anyways. + if not training: + return model + + return model def _prepare_input( self, data: Union[paddle.Tensor, Any]) -> Union[paddle.Tensor, Any]: - pass + """ + Prepares one `data` before feeding it to the model, be it a tensor or a nested list/dictionary of tensors. + """ + if isinstance(data, Mapping): + return type(data)( + {k: self._prepare_input(v) + for k, v in data.items()}) + elif isinstance(data, (tuple, list)): + return type(data)(self._prepare_input(v) for v in data) + elif isinstance(data, paddle.Tensor): + kwargs = dict(device=self.args.device) + # update data type for pure fp16 + return data + # return data.to(**kwargs) + return data def _prepare_inputs(self, inputs: Dict[str, Union[paddle.Tensor, Any]] ) -> Dict[str, Union[paddle.Tensor, Any]]: - pass + """ + Prepare `inputs` before feeding them to the model, converting them to tensors if they are not already and + handling potential state. + """ + inputs = self._prepare_input(inputs) + if self.args.past_index >= 0 and self._past is not None: + inputs["mems"] = self._past + + return inputs def autocast_smart_context_manager(self): - pass + """ + A helper wrapper that creates an appropriate context manager for `autocast` while feeding it the desired + arguments, depending on the situation. + """ + if self.args.use_amp: + ctx_manager = autocast() + else: + ctx_manager = contextlib.nullcontext() if sys.version_info >= ( + 3, 7) else contextlib.suppress() + + return ctx_manager + + def compute_loss(self, model, inputs, return_outputs=False): + """ + How the loss is computed by Trainer. By default, all models return the loss in the first element. + Subclass and override for custom behavior. + """ + if self.criterion is not None: + labels = inputs.pop("labels") + else: + labels = None + + # print(inputs) + + outputs = model(**inputs) + + # outputs = model(*inputs) + + if self.criterion is not None: + # print(outputs) + loss = self.criterion(outputs, labels) + outputs = (loss, outputs) + # Save past state if it exists + # TODO: this needs to be fixed and made cleaner later. + if self.args.past_index >= 0: + self._past = outputs[self.args.past_index] + + # We don't use .loss here since the model may return tuples instead of ModelOutput. + loss = outputs["loss"] if isinstance(outputs, dict) else outputs[0] + + return (loss, outputs) if return_outputs else loss def training_step( self, model: nn.Layer, inputs: Dict[str, Union[paddle.Tensor, Any]]) -> paddle.Tensor: - pass + """ + Perform a training step on a batch of inputs. - def save_model(self, - output_dir: Optional[str]=None, - _internal_call: bool=False): - pass + Subclass and override to inject custom behavior. + + Args: + model (`nn.Module`): + The model to train. + inputs (`Dict[str, Union[paddle.Tensor, Any]]`): + The inputs and targets of the model. + + The dictionary will be unpacked before being fed to the model. Most models expect the targets under the + argument `labels`. Check your model's documentation for all accepted arguments. + + Return: + `paddle.Tensor`: The tensor with training loss on this batch. + """ + model.train() + inputs = self._prepare_inputs(inputs) + + with self.autocast_smart_context_manager(): + loss = self.compute_loss(model, inputs) + + if self.args.n_gpu > 1: + loss = loss.mean( + ) # mean() to average on multi-gpu parallel training + + # if self.args.gradient_accumulation_steps > 1: + # # deepspeed handles loss scaling by gradient_accumulation_steps in its `backward` + # loss = loss / self.args.gradient_accumulation_steps + + if self.do_grad_scaling: + self.scaler.scale(loss).backward() + else: + loss.backward() + # print(loss) + return loss.detach() + + def save_model(self, output_dir: Optional[str]=None): + """ + Will save the model, so you can reload it using `from_pretrained()`. + + Will only save from the main process. + """ + + if output_dir is None: + output_dir = self.args.output_dir + + if self.args.should_save: + self._save(output_dir) + + def _save_checkpoint(self, model, metrics=None): + # In all cases, including ddp/dp/deepspeed, self.model is always a reference to the model we + # want to save except FullyShardedDDP. + # assert unwrap_model(model) is self.model, "internal model should be a reference to self.model" + + # Save model checkpoint + checkpoint_folder = f"{PREFIX_CHECKPOINT_DIR}-{self.state.global_step}" + + run_dir = self.args.output_dir + + output_dir = os.path.join(run_dir, checkpoint_folder) + + self.save_model(output_dir) + + if self.args.should_save: + # deepspeed.save_checkpoint above saves model/optim/sched + paddle.save(self.optimizer.state_dict(), + os.path.join(output_dir, OPTIMIZER_NAME)) + with warnings.catch_warnings(record=True) as caught_warnings: + paddle.save(self.lr_scheduler.state_dict(), + os.path.join(output_dir, SCHEDULER_NAME)) + if self.do_grad_scaling: + paddle.save(self.scaler.state_dict(), + os.path.join(output_dir, SCALER_NAME)) + + # Determine the new best metric / best model checkpoint + if metrics is not None and self.args.metric_for_best_model is not None: + metric_to_check = self.args.metric_for_best_model + if not metric_to_check.startswith("eval_"): + metric_to_check = f"eval_{metric_to_check}" + metric_value = metrics[metric_to_check] + + operator = np.greater if self.args.greater_is_better else np.less + if (self.state.best_metric is None or + self.state.best_model_checkpoint is None or + operator(metric_value, self.state.best_metric)): + self.state.best_metric = metric_value + self.state.best_model_checkpoint = output_dir + + # Save the Trainer state + if self.args.should_save: + self.state.save_to_json( + os.path.join(output_dir, TRAINER_STATE_NAME)) + + # Save RNG state in non-distributed training + rng_states = { + "python": random.getstate(), + "numpy": np.random.get_state(), + } + + # A process can arrive here before the process 0 has a chance to save the model, in which case output_dir may + # not yet exist. + os.makedirs(output_dir, exist_ok=True) + local_rank = self.args.local_rank + + if local_rank == -1: + paddle.save(rng_states, os.path.join(output_dir, "rng_state.pth")) + else: + paddle.save(rng_states, + os.path.join(output_dir, f"rng_state_{local_rank}.pth")) + + # Maybe delete some older checkpoints. + if self.args.should_save: + self._rotate_checkpoints(use_mtime=True, output_dir=run_dir) + + def _sorted_checkpoints(self, + output_dir=None, + checkpoint_prefix=PREFIX_CHECKPOINT_DIR, + use_mtime=False) -> List[str]: + ordering_and_checkpoint_path = [] + + glob_checkpoints = [ + str(x) for x in Path(output_dir).glob(f"{checkpoint_prefix}-*") + ] + + for path in glob_checkpoints: + if use_mtime: + ordering_and_checkpoint_path.append( + (os.path.getmtime(path), path)) + else: + regex_match = re.match(f".*{checkpoint_prefix}-([0-9]+)", path) + if regex_match is not None and regex_match.groups() is not None: + ordering_and_checkpoint_path.append( + (int(regex_match.groups()[0]), path)) + + checkpoints_sorted = sorted(ordering_and_checkpoint_path) + checkpoints_sorted = [ + checkpoint[1] for checkpoint in checkpoints_sorted + ] + # Make sure we don't delete the best model. + if self.state.best_model_checkpoint is not None: + best_model_index = checkpoints_sorted.index( + str(Path(self.state.best_model_checkpoint))) + for i in range(best_model_index, len(checkpoints_sorted) - 2): + checkpoints_sorted[i], checkpoints_sorted[ + i + 1] = checkpoints_sorted[i + 1], checkpoints_sorted[i] + return checkpoints_sorted + + def _rotate_checkpoints(self, use_mtime=False, output_dir=None) -> None: + if self.args.save_total_limit is None or self.args.save_total_limit <= 0: + return + + # Check if we should delete older checkpoint(s) + checkpoints_sorted = self._sorted_checkpoints( + use_mtime=use_mtime, output_dir=output_dir) + if len(checkpoints_sorted) <= self.args.save_total_limit: + return + + # If save_total_limit=1 with load_best_model_at_end=True, we could end up deleting the last checkpoint, which + # we don't do to allow resuming. + save_total_limit = self.args.save_total_limit + if (self.state.best_model_checkpoint is not None and + self.args.save_total_limit == 1 and + checkpoints_sorted[-1] != self.state.best_model_checkpoint): + save_total_limit = 2 + + number_of_checkpoints_to_delete = max( + 0, len(checkpoints_sorted) - save_total_limit) + checkpoints_to_be_deleted = checkpoints_sorted[: + number_of_checkpoints_to_delete] + for checkpoint in checkpoints_to_be_deleted: + logger.info( + f"Deleting older checkpoint [{checkpoint}] due to args.save_total_limit" + ) + shutil.rmtree(checkpoint) def _save(self, output_dir: Optional[str]=None, state_dict=None): - pass + # If we are executing this function, we are the process zero, so we don't check for that. + output_dir = output_dir if output_dir is not None else self.args.output_dir + os.makedirs(output_dir, exist_ok=True) + logger.info(f"Saving model checkpoint to {output_dir}") + # Save a trained model and configuration using `save_pretrained()`. + # They can then be reloaded using `from_pretrained()` + if not isinstance(self.model, PretrainedModel): + if isinstance(unwrap_model(self.model), PretrainedModel): + if state_dict is None: + state_dict = self.model.state_dict() + unwrap_model(self.model).save_pretrained( + output_dir, state_dict=state_dict) + else: + logger.info( + "Trainer.model is not a `PretrainedModel`, only saving its state dict." + ) + if state_dict is None: + state_dict = self.model.state_dict() + paddle.save(state_dict, os.path.join(output_dir, WEIGHTS_NAME)) + else: + self.model.save_pretrained(output_dir) + if self.tokenizer is not None: + self.tokenizer.save_pretrained(output_dir) + + # Good practice: save your training arguments together with the trained model + paddle.save(self.args, os.path.join(output_dir, TRAINING_ARGS_NAME)) def _load_optimizer_and_scheduler(self, checkpoint): - pass + """If optimizer and scheduler states exist, load them.""" + if checkpoint is None: + return + + if os.path.isfile(os.path.join( + checkpoint, OPTIMIZER_NAME)) and os.path.isfile( + os.path.join(checkpoint, SCHEDULER_NAME)): + # Load in optimizer and scheduler states + map_location = self.args.device + self.optimizer.load_state_dict( + paddle.load( + os.path.join(checkpoint, OPTIMIZER_NAME), + map_location=map_location)) + with warnings.catch_warnings(record=True) as caught_warnings: + self.lr_scheduler.load_state_dict( + paddle.load(os.path.join(checkpoint, SCHEDULER_NAME))) + reissue_pt_warnings(caught_warnings) + if self.do_grad_scaling and os.path.isfile( + os.path.join(checkpoint, SCALER_NAME)): + self.scaler.load_state_dict( + paddle.load(os.path.join(checkpoint, SCALER_NAME))) + + def log(self, logs: Dict[str, float]) -> None: + """ + Log `logs` on the various objects watching training. + + Subclass and override this method to inject custom behavior. + + Args: + logs (`Dict[str, float]`): + The values to log. + """ + if self.state.epoch is not None: + logs["epoch"] = round(self.state.epoch, 2) + + output = { ** logs, ** {"step": self.state.global_step}} + self.state.log_history.append(output) + self.control = self.callback_handler.on_log(self.args, self.state, + self.control, logs) def evaluate( self, eval_dataset: Optional[Dataset]=None, ignore_keys: Optional[List[str]]=None, metric_key_prefix: str="eval", ) -> Dict[str, float]: - pass + """ + Run evaluation and returns metrics. + + The calling script will be responsible for providing a method to compute metrics, as they are task-dependent + (pass it to the init `compute_metrics` argument). + + You can also subclass and override this method to inject custom behavior. + + Args: + eval_dataset (`Dataset`, *optional*): + Pass a dataset if you wish to override `self.eval_dataset`. If it is an `datasets.Dataset`, columns not + accepted by the `model.forward()` method are automatically removed. It must implement the `__len__` + method. + ignore_keys (`Lst[str]`, *optional*): + A list of keys in the output of your model (if it is a dictionary) that should be ignored when + gathering predictions. + metric_key_prefix (`str`, *optional*, defaults to `"eval"`): + An optional prefix to be used as the metrics key prefix. For example the metrics "bleu" will be named + "eval_bleu" if the prefix is "eval" (default) + + Returns: + A dictionary containing the evaluation loss and the potential metrics computed from the predictions. The + dictionary also contains the epoch number which comes from the training state. + """ + eval_dataloader = self.get_eval_dataloader(eval_dataset) + start_time = time.time() + + output = self.evaluation_loop( + eval_dataloader, + description="Evaluation", + # No point gathering the predictions if there are no metrics, otherwise we defer to + # self.args.prediction_loss_only + prediction_loss_only=True if self.compute_metrics is None else None, + ignore_keys=ignore_keys, + metric_key_prefix=metric_key_prefix, ) + + total_batch_size = self.args.eval_batch_size * self.args.world_size + output.metrics.update( + speed_metrics( + metric_key_prefix, + start_time, + num_samples=output.num_samples, + num_steps=math.ceil(output.num_samples / total_batch_size), )) + + self.log(output.metrics) + + self.control = self.callback_handler.on_evaluate( + self.args, self.state, self.control, output.metrics) + + return output.metrics + + def evaluation_loop( + self, + dataloader: DataLoader, + description: str, + prediction_loss_only: Optional[bool]=None, + ignore_keys: Optional[List[str]]=None, + metric_key_prefix: str="eval", ) -> EvalLoopOutput: + """ + Prediction/evaluation loop, shared by `Trainer.evaluate()` and `Trainer.predict()`. + + Works both with or without labels. + """ + args = self.args + + prediction_loss_only = prediction_loss_only if prediction_loss_only is not None else args.prediction_loss_only + prediction_loss_only = False + + model = self._wrap_model(self.model, training=False) + + batch_size = dataloader.batch_size + num_samples = self.num_examples(dataloader) + logger.info(f"***** Running {description} *****") + logger.info(f" Num examples = {num_samples}") + logger.info(f" Batch size = {batch_size}") + + model.eval() + + self.callback_handler.eval_dataloader = dataloader + # Do this before wrapping. + eval_dataset = dataloader.dataset + + if args.past_index >= 0: + self._past = None + + # Initialize containers + # losses/preds/labels on GPU/TPU (accumulated for eval_accumulation_steps) + losses_host = None + preds_host = None + labels_host = None + # losses/preds/labels on CPU (final containers) + all_losses = None + all_preds = None + all_labels = None + # Will be useful when we have an iterable dataset so don't know its length. + + observed_num_examples = 0 + # Main evaluation loop + losses = [] + for step, inputs in enumerate(dataloader): + # Update the observed num examples + # Prediction step + loss, logits, labels = self.prediction_step( + model, inputs, prediction_loss_only, ignore_keys=ignore_keys) + losses.append(loss.numpy()) + + all_preds = logits if all_preds is None else nested_concat( + all_preds, logits, padding_index=-100) + all_labels = labels if all_labels is None else nested_concat( + all_labels, labels, padding_index=-100) + + model.train() + + # Metrics! + if self.compute_metrics is not None and all_preds is not None and all_labels is not None: + metrics = self.compute_metrics( + EvalPrediction( + predictions=all_preds, label_ids=all_labels)) + else: + metrics = {} + + metrics["eval_loss"] = float(np.mean(losses)) + + print(metrics) + + return EvalLoopOutput( + predictions=all_preds, + label_ids=all_labels, + metrics=metrics, + num_samples=num_samples) def predict(self, test_dataset: Dataset, @@ -282,8 +1179,176 @@ def prediction_step( ignore_keys: Optional[List[str]]=None, ) -> Tuple[Optional[ paddle.Tensor], Optional[paddle.Tensor], Optional[ paddle.Tensor]]: + """ + Perform an evaluation step on `model` using `inputs`. + + Subclass and override to inject custom behavior. + + Args: + model (`nn.Module`): + The model to evaluate. + inputs (`Dict[str, Union[torch.Tensor, Any]]`): + The inputs and targets of the model. + + The dictionary will be unpacked before being fed to the model. Most models expect the targets under the + argument `labels`. Check your model's documentation for all accepted arguments. + prediction_loss_only (`bool`): + Whether or not to return the loss only. + ignore_keys (`Lst[str]`, *optional*): + A list of keys in the output of your model (if it is a dictionary) that should be ignored when + gathering predictions. + + Return: + Tuple[Optional[torch.Tensor], Optional[torch.Tensor], Optional[torch.Tensor]]: A tuple with the loss, + logits and labels (each being optional). + """ + has_labels = all(inputs.get(k) is not None for k in self.label_names) + inputs = self._prepare_inputs(inputs) + if ignore_keys is None: + if hasattr(self.model, "config"): + ignore_keys = getattr(self.model.config, + "keys_to_ignore_at_inference", []) + else: + ignore_keys = [] + + # labels may be popped when computing the loss (label smoothing for instance) so we grab them first. + if has_labels: + labels = nested_detach( + tuple(inputs.get(name) for name in self.label_names)) + if len(labels) == 1: + labels = labels[0] + else: + labels = None + + with paddle.no_grad(): + if has_labels: + with self.autocast_smart_context_manager(): + loss, outputs = self.compute_loss( + model, inputs, return_outputs=True) + loss = loss.mean().detach() + + if isinstance(outputs, dict): + logits = tuple(v for k, v in outputs.items() + if k not in ignore_keys + ["loss"]) + else: + logits = outputs[1:] + else: + loss = None + with self.autocast_smart_context_manager(): + outputs = model(**inputs) + if isinstance(outputs, dict): + logits = tuple(v for k, v in outputs.items() + if k not in ignore_keys) + else: + logits = outputs + # TODO: this needs to be fixed and made cleaner later. + if self.args.past_index >= 0: + self._past = outputs[self.args.past_index - 1] + + if prediction_loss_only: + return (loss, None, None) + + logits = nested_detach(logits) + if len(logits) == 1: + logits = logits[0] + + return (loss, logits, labels) + + def num_examples(self, dataloader: DataLoader) -> int: + """ + Helper to get number of samples in a [`~torch.utils.data.DataLoader`] by accessing its dataset. + + Will raise an exception if the underlying dataset does not implement method `__len__` + """ + return len(dataloader.dataset) + + def create_dataloader(self, + dataset, + mode='train', + batch_size=16, + batchify_fn=None, + trans_fn=None, + batched=False): + """ + """ + if trans_fn: + dataset = dataset.map(trans_fn, batched=batched) + + shuffle = True if mode == 'train' else False + if mode == 'train': + batch_sampler = paddle.io.DistributedBatchSampler( + dataset, batch_size=batch_size, shuffle=shuffle) + else: + batch_sampler = paddle.io.BatchSampler( + dataset, batch_size=batch_size, shuffle=shuffle) + + return paddle.io.DataLoader( + dataset=dataset, + batch_sampler=batch_sampler, + collate_fn=batchify_fn, + num_workers=0, + return_list=True) + + def eval(self, *args, **kwargs): + """ + """ pass + def prepare_train_config(self): + """ + """ + if self.args.max_steps > 0: + self.args.num_training_steps = self.args.max_steps + self.args.num_train_epochs = math.ceil( + self.args.num_training_steps / len(self.train_dl)) + + else: + self.args.num_training_steps = len( + self.train_dl) * self.args.num_train_epochs + self.args.num_train_epochs = self.args.num_train_epochs + + if self.args.num_training_steps // self.args.valid_steps < self.args.minimum_valid_times: + exp_step = self.args.num_training_steps / self.args.minimum_valid_times + exp_step = max(int(exp_step - exp_step % 10), 10) + logger.info("Set eval step to %d" % exp_step) + self.args.valid_steps = exp_step + + warmup = self.args.warmup_steps if self.args.warmup_steps > 0 else self.args.warmup_proportion + + self.lr_scheduler = LinearDecayWithWarmup( + self.args.learning_rate, self.args.num_training_steps, warmup) + + # Generate parameter names needed to perform weight decay. + # All bias and LayerNorm parameters are excluded. + decay_params = [ + p.name for n, p in self.model.named_parameters() + if not any(nd in n for nd in ["bias", "norm"]) + ] + + self.optimizer = paddle.optimizer.AdamW( + learning_rate=self.lr_scheduler, + beta1=0.9, + beta2=0.999, + epsilon=self.args.adam_epsilon, + parameters=self.model.parameters(), + weight_decay=self.args.weight_decay, + apply_decay_param_fun=lambda x: x in decay_params, + grad_clip=nn.ClipGradByGlobalNorm(self.args.max_grad_norm)) + + def print_config(self): + """ + """ + logger.info('{:^40}'.format("Configuration Arguments")) + logger.info('{:20}:{}'.format("paddle commit id", + paddle.version.commit)) + for arg in vars(self.args): + logger.info('{:20}:{}'.format(arg, getattr(self.args, arg))) + + +class TrainerBase(object): + """ + """ + def create_dataloader(self, dataset, mode='train', diff --git a/examples/language_model/ernie-1.0/finetune/trainer_callback.py b/examples/language_model/ernie-1.0/finetune/trainer_callback.py new file mode 100644 index 000000000000..70f8cfd96eea --- /dev/null +++ b/examples/language_model/ernie-1.0/finetune/trainer_callback.py @@ -0,0 +1,660 @@ +# coding=utf-8 +# Copyright 2020-present the HuggingFace Inc. team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Callbacks to use with the Trainer class and customize the training loop. +""" +import dataclasses +import json +from dataclasses import dataclass +from typing import Dict, List, Optional, Union + +import numpy as np +from tqdm.auto import tqdm + +from trainer_utils import IntervalStrategy +from trainer_args import TrainingArguments +from utils import logging + +logger = logging.get_logger(__name__) + + +@dataclass +class TrainerState: + """ + A class containing the [`Trainer`] inner state that will be saved along the model and optimizer when checkpointing + and passed to the [`TrainerCallback`]. + + + + In all this class, one step is to be understood as one update step. When using gradient accumulation, one update + step may require several forward and backward passes: if you use `gradient_accumulation_steps=n`, then one update + step requires going through *n* batches. + + + + Args: + epoch (`float`, *optional*): + Only set during training, will represent the epoch the training is at (the decimal part being the + percentage of the current epoch completed). + global_step (`int`, *optional*, defaults to 0): + During training, represents the number of update steps completed. + max_steps (`int`, *optional*, defaults to 0): + The number of update steps to do during the current training. + total_flos (`float`, *optional*, defaults to 0): + The total number of floating operations done by the model since the beginning of training (stored as floats + to avoid overflow). + log_history (`List[Dict[str, float]]`, *optional*): + The list of logs done since the beginning of training. + best_metric (`float`, *optional*): + When tracking the best model, the value of the best metric encountered so far. + best_model_checkpoint (`str`, *optional*): + When tracking the best model, the value of the name of the checkpoint for the best model encountered so + far. + is_local_process_zero (`bool`, *optional*, defaults to `True`): + Whether or not this process is the local (e.g., on one machine if training in a distributed fashion on + several machines) main process. + is_world_process_zero (`bool`, *optional*, defaults to `True`): + Whether or not this process is the global main process (when training in a distributed fashion on several + machines, this is only going to be `True` for one process). + is_hyper_param_search (`bool`, *optional*, defaults to `False`): + Whether we are in the process of a hyper parameter search using Trainer.hyperparameter_search. This will + impact the way data will be logged in TensorBoard. + """ + + epoch: Optional[float] = None + global_step: int = 0 + max_steps: int = 0 + num_train_epochs: int = 0 + total_flos: float = 0 + log_history: List[Dict[str, float]] = None + best_metric: Optional[float] = None + best_model_checkpoint: Optional[str] = None + is_local_process_zero: bool = True + is_world_process_zero: bool = True + is_hyper_param_search: bool = False + trial_name: str = None + trial_params: Dict[str, Union[str, float, int, bool]] = None + + def __post_init__(self): + if self.log_history is None: + self.log_history = [] + + def save_to_json(self, json_path: str): + """Save the content of this instance in JSON format inside `json_path`.""" + json_string = json.dumps( + dataclasses.asdict(self), indent=2, sort_keys=True) + "\n" + with open(json_path, "w", encoding="utf-8") as f: + f.write(json_string) + + @classmethod + def load_from_json(cls, json_path: str): + """Create an instance from the content of `json_path`.""" + with open(json_path, "r", encoding="utf-8") as f: + text = f.read() + return cls(**json.loads(text)) + + +@dataclass +class TrainerControl: + """ + A class that handles the [`Trainer`] control flow. This class is used by the [`TrainerCallback`] to activate some + switches in the training loop. + + Args: + should_training_stop (`bool`, *optional*, defaults to `False`): + Whether or not the training should be interrupted. + + If `True`, this variable will not be set back to `False`. The training will just stop. + should_epoch_stop (`bool`, *optional*, defaults to `False`): + Whether or not the current epoch should be interrupted. + + If `True`, this variable will be set back to `False` at the beginning of the next epoch. + should_save (`bool`, *optional*, defaults to `False`): + Whether or not the model should be saved at this step. + + If `True`, this variable will be set back to `False` at the beginning of the next step. + should_evaluate (`bool`, *optional*, defaults to `False`): + Whether or not the model should be evaluated at this step. + + If `True`, this variable will be set back to `False` at the beginning of the next step. + should_log (`bool`, *optional*, defaults to `False`): + Whether or not the logs should be reported at this step. + + If `True`, this variable will be set back to `False` at the beginning of the next step. + """ + + should_training_stop: bool = False + should_epoch_stop: bool = False + should_save: bool = False + should_evaluate: bool = False + should_log: bool = False + + def _new_training(self): + """Internal method that resets the variable for a new training.""" + self.should_training_stop = False + + def _new_epoch(self): + """Internal method that resets the variable for a new epoch.""" + self.should_epoch_stop = False + + def _new_step(self): + """Internal method that resets the variable for a new step.""" + self.should_save = False + self.should_evaluate = False + self.should_log = False + + +class TrainerCallback: + """ + A class for objects that will inspect the state of the training loop at some events and take some decisions. At + each of those events the following arguments are available: + + Args: + args ([`TrainingArguments`]): + The training arguments used to instantiate the [`Trainer`]. + state ([`TrainerState`]): + The current state of the [`Trainer`]. + control ([`TrainerControl`]): + The object that is returned to the [`Trainer`] and can be used to make some decisions. + model ([`PreTrainedModel`] or `torch.nn.Module`): + The model being trained. + tokenizer ([`PreTrainedTokenizer`]): + The tokenizer used for encoding the data. + optimizer (`torch.optim.Optimizer`): + The optimizer used for the training steps. + lr_scheduler (`torch.optim.lr_scheduler.LambdaLR`): + The scheduler used for setting the learning rate. + train_dataloader (`torch.utils.data.DataLoader`, *optional*): + The current dataloader used for training. + eval_dataloader (`torch.utils.data.DataLoader`, *optional*): + The current dataloader used for training. + metrics (`Dict[str, float]`): + The metrics computed by the last evaluation phase. + + Those are only accessible in the event `on_evaluate`. + logs (`Dict[str, float]`): + The values to log. + + Those are only accessible in the event `on_log`. + + The `control` object is the only one that can be changed by the callback, in which case the event that changes it + should return the modified version. + + The argument `args`, `state` and `control` are positionals for all events, all the others are grouped in `kwargs`. + You can unpack the ones you need in the signature of the event using them. As an example, see the code of the + simple [`~transformer.PrinterCallback`]. + + Example: + + ```python + class PrinterCallback(TrainerCallback): + def on_log(self, args, state, control, logs=None, **kwargs): + _ = logs.pop("total_flos", None) + if state.is_local_process_zero: + print(logs) + ```""" + + def on_init_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called at the end of the initialization of the [`Trainer`]. + """ + pass + + def on_train_begin(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called at the beginning of training. + """ + pass + + def on_train_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called at the end of training. + """ + pass + + def on_epoch_begin(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called at the beginning of an epoch. + """ + pass + + def on_epoch_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called at the end of an epoch. + """ + pass + + def on_step_begin(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called at the beginning of a training step. If using gradient accumulation, one training step might take + several inputs. + """ + pass + + def on_substep_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called at the end of an substep during gradient accumulation. + """ + pass + + def on_step_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called at the end of a training step. If using gradient accumulation, one training step might take + several inputs. + """ + pass + + def on_evaluate(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called after an evaluation phase. + """ + pass + + def on_save(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called after a checkpoint save. + """ + pass + + def on_log(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called after logging the last logs. + """ + pass + + def on_prediction_step(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + """ + Event called after a prediction step. + """ + pass + + +class CallbackHandler(TrainerCallback): + """Internal class that just calls the list of callbacks in order.""" + + def __init__(self, callbacks, model, tokenizer, optimizer, lr_scheduler): + self.callbacks = [] + for cb in callbacks: + self.add_callback(cb) + self.model = model + self.tokenizer = tokenizer + self.optimizer = optimizer + self.lr_scheduler = lr_scheduler + self.train_dataloader = None + self.eval_dataloader = None + + if not any( + isinstance(cb, DefaultFlowCallback) for cb in self.callbacks): + logger.warning( + "The Trainer will not work properly if you don't have a `DefaultFlowCallback` in its callbacks. You\n" + + + "should add one before training with `trainer.add_callback(DefaultFlowCallback). The current list of" + + "callbacks is\n:" + self.callback_list) + + def add_callback(self, callback): + cb = callback() if isinstance(callback, type) else callback + cb_class = callback if isinstance(callback, + type) else callback.__class__ + if cb_class in [c.__class__ for c in self.callbacks]: + logger.warning( + f"You are adding a {cb_class} to the callbacks of this Trainer, but there is already one. The current" + + "list of callbacks is\n:" + self.callback_list) + self.callbacks.append(cb) + + def pop_callback(self, callback): + if isinstance(callback, type): + for cb in self.callbacks: + if isinstance(cb, callback): + self.callbacks.remove(cb) + return cb + else: + for cb in self.callbacks: + if cb == callback: + self.callbacks.remove(cb) + return cb + + def remove_callback(self, callback): + if isinstance(callback, type): + for cb in self.callbacks: + if isinstance(cb, callback): + self.callbacks.remove(cb) + return + else: + self.callbacks.remove(callback) + + @property + def callback_list(self): + return "\n".join(cb.__class__.__name__ for cb in self.callbacks) + + def on_init_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + return self.call_event("on_init_end", args, state, control) + + def on_train_begin(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + control.should_training_stop = False + return self.call_event("on_train_begin", args, state, control) + + def on_train_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + return self.call_event("on_train_end", args, state, control) + + def on_epoch_begin(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + control.should_epoch_stop = False + return self.call_event("on_epoch_begin", args, state, control) + + def on_epoch_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + return self.call_event("on_epoch_end", args, state, control) + + def on_step_begin(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + control.should_log = False + control.should_evaluate = False + control.should_save = False + return self.call_event("on_step_begin", args, state, control) + + def on_substep_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + return self.call_event("on_substep_end", args, state, control) + + def on_step_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + return self.call_event("on_step_end", args, state, control) + + def on_evaluate(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + metrics): + control.should_evaluate = False + return self.call_event( + "on_evaluate", args, state, control, metrics=metrics) + + def on_save(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + control.should_save = False + return self.call_event("on_save", args, state, control) + + def on_log(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + logs): + control.should_log = False + return self.call_event("on_log", args, state, control, logs=logs) + + def on_prediction_step(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl): + return self.call_event("on_prediction_step", args, state, control) + + def call_event(self, event, args, state, control, **kwargs): + for callback in self.callbacks: + result = getattr(callback, event)( + args, + state, + control, + model=self.model, + tokenizer=self.tokenizer, + optimizer=self.optimizer, + lr_scheduler=self.lr_scheduler, + train_dataloader=self.train_dataloader, + eval_dataloader=self.eval_dataloader, + **kwargs, ) + # A Callback can skip the return of `control` if it doesn't change it. + if result is not None: + control = result + return control + + +class DefaultFlowCallback(TrainerCallback): + """ + A [`TrainerCallback`] that handles the default flow of the training loop for logs, evaluation and checkpoints. + """ + + def on_step_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + # Log + if state.global_step == 1 and args.logging_first_step: + control.should_log = True + if args.logging_strategy == IntervalStrategy.STEPS and state.global_step % args.logging_steps == 0: + control.should_log = True + + # Evaluate + if args.evaluation_strategy == IntervalStrategy.STEPS and state.global_step % args.eval_steps == 0: + control.should_evaluate = True + + # Save + if (args.save_strategy == IntervalStrategy.STEPS and + args.save_steps > 0 and + state.global_step % args.save_steps == 0): + control.should_save = True + + # End training + if state.global_step >= state.max_steps: + control.should_training_stop = True + + return control + + def on_epoch_end(self, + args: TrainingArguments, + state: TrainerState, + control: TrainerControl, + **kwargs): + # Log + if args.logging_strategy == IntervalStrategy.EPOCH: + control.should_log = True + + # Evaluate + if args.evaluation_strategy == IntervalStrategy.EPOCH: + control.should_evaluate = True + + # Save + if args.save_strategy == IntervalStrategy.EPOCH: + control.should_save = True + + return control + + +class ProgressCallback(TrainerCallback): + """ + A [`TrainerCallback`] that displays the progress of training or evaluation. + """ + + def __init__(self): + self.training_bar = None + self.prediction_bar = None + + def on_train_begin(self, args, state, control, **kwargs): + if state.is_local_process_zero: + self.training_bar = tqdm(total=state.max_steps) + self.current_step = 0 + + def on_step_end(self, args, state, control, **kwargs): + if state.is_local_process_zero: + self.training_bar.update(state.global_step - self.current_step) + self.current_step = state.global_step + + def on_prediction_step(self, + args, + state, + control, + eval_dataloader=None, + **kwargs): + if state.is_local_process_zero and has_length(eval_dataloader.dataset): + if self.prediction_bar is None: + self.prediction_bar = tqdm( + total=len(eval_dataloader), leave=self.training_bar is None) + self.prediction_bar.update(1) + + def on_evaluate(self, args, state, control, **kwargs): + if state.is_local_process_zero: + if self.prediction_bar is not None: + self.prediction_bar.close() + self.prediction_bar = None + + def on_log(self, args, state, control, logs=None, **kwargs): + if state.is_local_process_zero and self.training_bar is not None: + _ = logs.pop("total_flos", None) + self.training_bar.write(str(logs)) + + def on_train_end(self, args, state, control, **kwargs): + if state.is_local_process_zero: + self.training_bar.close() + self.training_bar = None + + +class PrinterCallback(TrainerCallback): + """ + A bare [`TrainerCallback`] that just prints the logs. + """ + + def on_log(self, args, state, control, logs=None, **kwargs): + _ = logs.pop("total_flos", None) + if state.is_local_process_zero: + print(logs) + + +class EarlyStoppingCallback(TrainerCallback): + """ + A [`TrainerCallback`] that handles early stopping. + + Args: + early_stopping_patience (`int`): + Use with `metric_for_best_model` to stop training when the specified metric worsens for + `early_stopping_patience` evaluation calls. + early_stopping_threshold(`float`, *optional*): + Use with TrainingArguments `metric_for_best_model` and `early_stopping_patience` to denote how much the + specified metric must improve to satisfy early stopping conditions. ` + + This callback depends on [`TrainingArguments`] argument *load_best_model_at_end* functionality to set best_metric + in [`TrainerState`]. + """ + + def __init__(self, + early_stopping_patience: int=1, + early_stopping_threshold: Optional[float]=0.0): + self.early_stopping_patience = early_stopping_patience + self.early_stopping_threshold = early_stopping_threshold + # early_stopping_patience_counter denotes the number of times validation metrics failed to improve. + self.early_stopping_patience_counter = 0 + + def check_metric_value(self, args, state, control, metric_value): + # best_metric is set by code for load_best_model + operator = np.greater if args.greater_is_better else np.less + if state.best_metric is None or ( + operator(metric_value, state.best_metric) and + abs(metric_value - state.best_metric) > + self.early_stopping_threshold): + self.early_stopping_patience_counter = 0 + else: + self.early_stopping_patience_counter += 1 + + def on_train_begin(self, args, state, control, **kwargs): + assert args.load_best_model_at_end, "EarlyStoppingCallback requires load_best_model_at_end = True" + assert ( + args.metric_for_best_model is not None + ), "EarlyStoppingCallback requires metric_for_best_model is defined" + assert ( + args.evaluation_strategy != IntervalStrategy.NO + ), "EarlyStoppingCallback requires IntervalStrategy of steps or epoch" + + def on_evaluate(self, args, state, control, metrics, **kwargs): + metric_to_check = args.metric_for_best_model + if not metric_to_check.startswith("eval_"): + metric_to_check = f"eval_{metric_to_check}" + metric_value = metrics.get(metric_to_check) + + if metric_value is None: + logger.warning( + f"early stopping required metric_for_best_model, but did not find {metric_to_check} so early stopping is disabled" + ) + return + + self.check_metric_value(args, state, control, metric_value) + if self.early_stopping_patience_counter >= self.early_stopping_patience: + control.should_training_stop = True diff --git a/examples/language_model/ernie-1.0/finetune/trainer_utils.py b/examples/language_model/ernie-1.0/finetune/trainer_utils.py new file mode 100644 index 000000000000..66da32be4794 --- /dev/null +++ b/examples/language_model/ernie-1.0/finetune/trainer_utils.py @@ -0,0 +1,202 @@ +# coding=utf-8 +# Copyright 2020-present the HuggingFace Inc. team. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Utilities for the Trainer and TFTrainer class. Should be independent from PyTorch and TensorFlow. +""" + +import copy +import functools +import gc +import inspect +import os +import random +import re +import threading +import time +from enum import Enum +from typing import Any, Dict, NamedTuple, Optional, Tuple, Union + +import numpy as np + + +class ExplicitEnum(Enum): + """ + Enum with more explicit error message for missing values. + """ + + @classmethod + def _missing_(cls, value): + raise ValueError( + f"{value} is not a valid {cls.__name__}, please select one of {list(cls._value2member_map_.keys())}" + ) + + +class EvalPrediction(NamedTuple): + """ + Evaluation output (always contains labels), to be used to compute metrics. + + Parameters: + predictions (`np.ndarray`): Predictions of the model. + label_ids (`np.ndarray`): Targets to be matched. + """ + + predictions: Union[np.ndarray, Tuple[np.ndarray]] + label_ids: Union[np.ndarray, Tuple[np.ndarray]] + + +class EvalLoopOutput(NamedTuple): + predictions: Union[np.ndarray, Tuple[np.ndarray]] + label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]] + metrics: Optional[Dict[str, float]] + num_samples: Optional[int] + + +class PredictionOutput(NamedTuple): + predictions: Union[np.ndarray, Tuple[np.ndarray]] + label_ids: Optional[Union[np.ndarray, Tuple[np.ndarray]]] + metrics: Optional[Dict[str, float]] + + +class TrainOutput(NamedTuple): + global_step: int + training_loss: float + metrics: Dict[str, float] + + +PREFIX_CHECKPOINT_DIR = "checkpoint" +_re_checkpoint = re.compile(r"^" + PREFIX_CHECKPOINT_DIR + r"\-(\d+)$") + + +def get_last_checkpoint(folder): + content = os.listdir(folder) + checkpoints = [ + path for path in content + if _re_checkpoint.search(path) is not None and os.path.isdir( + os.path.join(folder, path)) + ] + if len(checkpoints) == 0: + return + return os.path.join( + folder, + max(checkpoints, + key=lambda x: int(_re_checkpoint.search(x).groups()[0]))) + + +class IntervalStrategy(ExplicitEnum): + NO = "no" + STEPS = "steps" + EPOCH = "epoch" + + +class EvaluationStrategy(ExplicitEnum): + NO = "no" + STEPS = "steps" + EPOCH = "epoch" + + +class BestRun(NamedTuple): + """ + The best run found by an hyperparameter search (see [`~Trainer.hyperparameter_search`]). + + Parameters: + run_id (`str`): + The id of the best run (if models were saved, the corresponding checkpoint will be in the folder ending + with run-{run_id}). + objective (`float`): + The objective that was obtained for this run. + hyperparameters (`Dict[str, Any]`): + The hyperparameters picked to get this run. + """ + + run_id: str + objective: float + hyperparameters: Dict[str, Any] + + +def default_compute_objective(metrics: Dict[str, float]) -> float: + """ + The default objective to maximize/minimize when doing an hyperparameter search. It is the evaluation loss if no + metrics are provided to the [`Trainer`], the sum of all metrics otherwise. + + Args: + metrics (`Dict[str, float]`): The metrics returned by the evaluate method. + + Return: + `float`: The objective to minimize or maximize + """ + metrics = copy.deepcopy(metrics) + loss = metrics.pop("eval_loss", None) + _ = metrics.pop("epoch", None) + # Remove speed metrics + speed_metrics = [ + m for m in metrics.keys() + if m.endswith("_runtime") or m.endswith("_per_second") + ] + for sm in speed_metrics: + _ = metrics.pop(sm, None) + return loss if len(metrics) == 0 else sum(metrics.values()) + + +def is_main_process(local_rank): + """ + Whether or not the current process is the local process, based on `xm.get_ordinal()` (for TPUs) first, then on + `local_rank`. + """ + + return local_rank in [-1, 0] + + +def total_processes_number(local_rank): + """ + Return the number of processes launched in parallel. Works with `torch.distributed` and TPUs. + """ + if local_rank != -1: + import paddle + + return paddle.distributed.get_world_size() + return 1 + + +def speed_metrics(split, start_time, num_samples=None, num_steps=None): + """ + Measure and return speed performance metrics. + + This function requires a time snapshot `start_time` before the operation to be measured starts and this function + should be run immediately after the operation to be measured has completed. + + Args: + + - split: name to prefix metric (like train, eval, test...) + - start_time: operation start time + - num_samples: number of samples processed + """ + runtime = time.time() - start_time + result = {f"{split}_runtime": round(runtime, 4)} + if num_samples is not None: + samples_per_second = num_samples / runtime + result[f"{split}_samples_per_second"] = round(samples_per_second, 3) + if num_steps is not None: + steps_per_second = num_steps / runtime + result[f"{split}_steps_per_second"] = round(steps_per_second, 3) + return result + + +class SchedulerType(ExplicitEnum): + LINEAR = "linear" + COSINE = "cosine" + COSINE_WITH_RESTARTS = "cosine_with_restarts" + POLYNOMIAL = "polynomial" + CONSTANT = "constant" + CONSTANT_WITH_WARMUP = "constant_with_warmup" diff --git a/paddlenlp/transformers/model_utils.py b/paddlenlp/transformers/model_utils.py index b3cb7f9707bc..689a6f45a4f3 100644 --- a/paddlenlp/transformers/model_utils.py +++ b/paddlenlp/transformers/model_utils.py @@ -37,6 +37,12 @@ ] +def unwrap_model(model, *args, **kwargs): + raw_model = model._layers if isinstance(model, + paddle.DataParallel) else model + return raw_model + + def register_base_model(cls): """ A decorator for `PretrainedModel` class. It first retrieves the parent class