diff --git a/Dockerfile b/Dockerfile index 70739ef..1e35ee0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM nvcr.io/nvidia/pytorch:23.01-py3 +FROM nvcr.io/nvidia/pytorch:23.03-py3 ARG USER=1000 ARG USERNAME=user diff --git a/requirements.txt b/requirements.txt index 68006a1..5144b94 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,8 @@ bitsandbytes safetensors deepspeed==0.7.7 -e ./transformers +flash-attn +einops # TODO: Analysis only py-markdown-table diff --git a/scripts/run_all_benchmark_breakdown.sh b/scripts/run_all_benchmark_breakdown.sh new file mode 100755 index 0000000..818ddc4 --- /dev/null +++ b/scripts/run_all_benchmark_breakdown.sh @@ -0,0 +1,20 @@ + +# Santacoder +./scripts/run_benchmark_breakdown.sh santacoder bigcode/gpt_bigcode-santacoder 1 2040 5 0 v2_ +./scripts/run_benchmark_breakdown.sh santacoder bigcode/gpt_bigcode-santacoder 32 2040 5 0 v2_ +./scripts/run_benchmark_breakdown.sh santacoder bigcode/gpt_bigcode-santacoder 256 2040 5 0 v2_ + +./scripts/run_benchmark_breakdown.sh santacoder bigcode/gpt_bigcode-santacoder 1 2040 11 1 v2_ +./scripts/run_benchmark_breakdown.sh santacoder bigcode/gpt_bigcode-santacoder 32 2040 11 1 v2_ +./scripts/run_benchmark_breakdown.sh santacoder bigcode/gpt_bigcode-santacoder 256 2040 11 1 v2_ + +# Large model +./scripts/run_benchmark_breakdown.sh large_model ./data/large-model 1 8190 11 0 v2_ +./scripts/run_benchmark_breakdown.sh large_model ./data/large-model 8 8190 11 0 v2_ +./scripts/run_benchmark_breakdown.sh large_model ./data/large-model 32 8190 11 0 v2_ +./scripts/run_benchmark_breakdown.sh large_model ./data/large-model 256 8190 11 0 v2_# OOM? + +./scripts/run_benchmark_breakdown.sh large_model ./data/large-model 1 8190 29 1 v2_ 1 +./scripts/run_benchmark_breakdown.sh large_model ./data/large-model 8 8190 29 1 v2_ 1 +./scripts/run_benchmark_breakdown.sh large_model ./data/large-model 32 8190 29 1 v2_ 1 +./scripts/run_benchmark_breakdown.sh large_model ./data/large-model 256 8190 29 1 v2_ 1 # OOM? diff --git a/scripts/run_benchmark_breakdown.sh b/scripts/run_benchmark_breakdown.sh new file mode 100755 index 0000000..5781a13 --- /dev/null +++ b/scripts/run_benchmark_breakdown.sh @@ -0,0 +1,76 @@ + +# Santacoder prefill. +# ./scripts/run_benchmark_breakdown.sh santacoder bigcode/gpt_bigcode-santacoder 32 2040 5 0 +# Santacoder decode (fewer data points because slower) +# ./scripts/run_benchmark_breakdown.sh santacoder bigcode/gpt_bigcode-santacoder 32 2040 11 1 +MODEL_NAME=${1:-"santacoder"} +MODEL_PATH=${2:-"bigcode/gpt_bigcode-santacoder"} +BATCH_SIZE=${3:-32} +MAX_NEW_TOKENS=${4:-2040} +# Prime number to see key length padding effect. +TOKEN_STEP=${5:-5} +STEP_ID=${6:-""} +FILE_PREFIX=${7:-""} +CYCLES=${8:-10} + +SAVE_DIR=data/benchmarks/v2 +#BATCH_SIZES="1 2 4 8 16 24 32 48 64 96 128 160 224 256" +RUN="python3 src/main.py --max_log_outputs=0 --dtype=float16 --device=cuda --custom_generate --breakdown_latency --ignore_oom" + + +RUNTIME=("" "pre_allocate_kv_cache=True" "pre_allocate_kv_cache=True inference_runner=3") +RUNTIME_NAMES=("base" "pre_allocate" "graph") + +ATTN=( \ + "attention_implementation=0" \ + "attention_implementation=1" \ + "attention_implementation=1 --pad_generated_tokens=0.5" \ + "attention_implementation=2" \ + "attention_implementation=0 fused_softmax=False" \ + "attention_implementation=0 fused_softmax=True" \ + "attention_implementation=3" \ + "attention_implementation=4" \ + "attention_implementation=5" \ + ) +ATTN_NAME=( \ + "default" \ + "flash" \ + "flash_unpad_50" \ + "torch" \ + "no_jit" \ + "jit" \ + "torchflash" \ + "torchmem" \ + "torchcpp" \ + ) + + +STEP=("--no_prefill" "--no_cache") +STEP_NAME=("decode" "prefill") + +COMMON="--pretrained_model=$MODEL_PATH --tokenizer=$MODEL_PATH --cycles=$CYCLES --max_input_length=1 --max_new_tokens=$MAX_NEW_TOKENS --key_length_step=$TOKEN_STEP --batch_size=$BATCH_SIZE predict_last_token=True" + +run () { # run(step, runtime, attn) + FILE_NAME="$SAVE_DIR"/"$MODEL_NAME"_bs_"$BATCH_SIZE"_tok_"$MAX_NEW_TOKENS"_step_"$TOKEN_STEP"_"${STEP_NAME[$1]}"/"$FILE_PREFIX""${RUNTIME_NAMES[$2]}"_"${ATTN_NAME[$3]}".json + if [ -f "$FILE_NAME" ]; + then + echo "Skipping existing $FILE_NAME" + else + $RUN $COMMON ${RUNTIME[$2]} ${ATTN[$3]} ${STEP[$1]} --save="$FILE_NAME" + fi +} + +if [ "${STEP_ID}" -eq "0" ] +then + # Decode (default attn only) + for runtime in {0..2} + do + run 0 $runtime 0 + done +else + # Prefill (all runtimes are the same) + for attn in {0..2} + do + run 1 0 $attn + done +fi diff --git a/src/main.py b/src/main.py index 4b3287b..e42b929 100644 --- a/src/main.py +++ b/src/main.py @@ -11,7 +11,7 @@ from src.metrics import Metrics from src.pipeline import Pipeline, get_pipeline_class from src.profile import get_profiler, logger -from src.utils import configure_logging, get_dummy_batch, log_dict, log_rank_n, parse_config_args +from src.utils import configure_logging, get_input_batch, log_dict, log_rank_n, parse_config_args def get_arg_parser() -> ArgumentParser: @@ -26,16 +26,25 @@ def get_arg_parser() -> ArgumentParser: parser.add_argument("config_args", nargs="*") # Runtime + parser.add_argument("-c", "--custom_generate", action="store_true") parser.add_argument("--pipeline_class", default="HF_Pipeline") parser.add_argument("--device", default="cuda", type=torch.device) parser.add_argument("--dtype", default="float16", type=lambda x: getattr(torch, x)) parser.add_argument("--local_rank", type=int) - parser.add_argument("--no_fast_init", dest="fast_init", action="store_false") + parser.add_argument("--no_fast_init", "--nf", dest="fast_init", action="store_false") + parser.add_argument("--no_cache", "--nc", dest="use_cache", action="store_false") + parser.add_argument("--no_prefill", "--np", dest="do_prefill", action="store_false") + parser.add_argument("--key_length_step", "--ks", default=1, type=int) + parser.add_argument("--ignore_oom", "--oom", action="store_true") # Input and output - parser.add_argument("--batch_size", default=1, type=int) - parser.add_argument("--max_input_length", default=-1, type=int) - parser.add_argument("--max_new_tokens", default=100, type=int) + parser.add_argument("--batch_size", "-b", default=1, type=int) + parser.add_argument("--max_input_length", "-i", default=-1, type=int) + parser.add_argument("--sample_dir", "-d") + parser.add_argument("--input_pad_ratio", "--pad", default=0, type=float) + parser.add_argument("--pad_generated_tokens", "--pad_g", default=0, type=float) + parser.add_argument("--input_seed", "--seed", default=0, type=int) + parser.add_argument("--max_new_tokens", "-g", default=100, type=int) # Cleanup parser.add_argument("--clear_every_run", action="store_true") @@ -47,10 +56,11 @@ def get_arg_parser() -> ArgumentParser: # Profiling and logging parser.add_argument("--max_log_outputs", type=int) - parser.add_argument("--profile", action="store_true") - parser.add_argument("--profile_cycles", type=int) - parser.add_argument("--full_trace", action="store_true") - parser.add_argument("--show_op_names", action="store_true") + parser.add_argument("--breakdown_latency", "--bl", action="store_true") + parser.add_argument("--profile", "-p", action="store_true") + parser.add_argument("--profile_cycles", "--pc", type=int) + parser.add_argument("--full_trace", "--pt", action="store_true") + parser.add_argument("--show_op_names", "--pn", action="store_true") parser.add_argument("--save", type=Path) return parser @@ -61,8 +71,6 @@ def main(argv: Optional[List[str]] = None) -> None: parser = get_arg_parser() args = parser.parse_args(argv) config_args = parse_config_args(args.config_args) - generate_kwargs = {"max_new_tokens": args.max_new_tokens, "do_sample": False} - inputs = get_dummy_batch(args.batch_size, args.max_input_length) separate_profile = args.profile and args.profile_cycles is not None warmup = args.profile if args.warmup is None else args.warmup if separate_profile: @@ -89,6 +97,14 @@ def main(argv: Optional[List[str]] = None) -> None: fast_init=args.fast_init, trust_remote_code=args.trust_remote_code, ) + inputs = get_input_batch( + args.batch_size, + args.max_input_length, + pipeline.tokenizer, + args.input_pad_ratio, + args.input_seed, + args.sample_dir, + ) all_metrics = [] @@ -104,7 +120,7 @@ def main(argv: Optional[List[str]] = None) -> None: profiler = contextlib.nullcontext() benchmark_metrics = { - **generate_kwargs, + "max_new_tokens": args.max_new_tokens, "Model parameters": pipeline.get_num_parameters(), "Cycles (warmup)": args.skip + warmup, "Cycles (benchmark)": args.cycles, @@ -121,10 +137,27 @@ def main(argv: Optional[List[str]] = None) -> None: t1 = time.perf_counter() with profiler as p: for step in range(args.skip + warmup + args.cycles): + log_rank_n( + ( + f"*** Running generation step {step} " + f"({'skip' if step str: return f"{1000 * t:.2f} ms" +def format_ms_dict(t_dict: Dict[str, float]) -> Dict[str, str]: + return {key: format_ms(value) for key, value in t_dict.items()} + + def format_mib(m: float) -> str: return f"{m/2**20:.0f} MiB" @@ -24,7 +28,9 @@ def format_mib(m: float) -> str: class Metrics: LATENCY_E2E = "Latency (end to end)" LATENCY_TOKEN = "Latency (tokenization)" - LATENCY_MODEL = "Latency (model)" + LATENCY_MODEL = "Latency (generate)" + LATENCY_GENERATE_START = "Latency (prepare for generation)" + LATENCY_GENERATE_BREAKDOWN = "Latency (generate breakdown)" LATENCY_DECODE = "Latency (decode)" LATENCY_MAX = "Latency (max)" LATENCY_MIN = "Latency (min)" @@ -59,6 +65,8 @@ class Metrics: LATENCY_E2E: format_ms, LATENCY_TOKEN: format_ms, LATENCY_MODEL: format_ms, + LATENCY_GENERATE_START: format_ms, + LATENCY_GENERATE_BREAKDOWN: format_ms_dict, LATENCY_DECODE: format_ms, LATENCY_MAX: format_ms, LATENCY_MIN: format_ms, diff --git a/src/parse_breakdown_results.py b/src/parse_breakdown_results.py new file mode 100644 index 0000000..4c281cf --- /dev/null +++ b/src/parse_breakdown_results.py @@ -0,0 +1,81 @@ +import json +from argparse import ArgumentParser +from pathlib import Path +from typing import List, Optional + + +def get_arg_parser() -> ArgumentParser: + parser = ArgumentParser() + parser.add_argument("input_dir", type=Path) + parser.add_argument("--title") + parser.add_argument("--size", nargs=2, type=float) + parser.add_argument("--save_dir", "--save", type=Path) + return parser + + +def read_data(input_file: Path): + try: + with input_file.open("r") as f: + data = json.load(f) + data = {**data["config"], **data["results"]} + except (ValueError, OSError) as e: + raise ValueError(f"Cannot parse file {input_file} ({e})") + data["Setting"] = input_file.stem + return data + + +def plot(data, title=None, size=None): + import matplotlib.pyplot as plt + + fig = plt.figure(figsize=size) + ax = fig.add_subplot() + + cmap = plt.get_cmap("tab20").colors + cmap = cmap[::2] + cmap[1::2] + + for i, dat in enumerate(data): + latency_data = dat["Latency (generate breakdown)"] + ax.plot( + [int(k) for k in latency_data.keys()], + [v * 1000 for v in latency_data.values()], + label=dat["Setting"], + linewidth=1, + color=cmap[i], + ) # , linestyle=":")#, markersize=1, marker="o") + + ax.set_title(title) + ax.set_xlabel("Sequence length") + ax.set_ylabel("Latency (ms)") + ax.legend() + return fig + + +def main(argv: Optional[List[str]] = None) -> None: + parser = get_arg_parser() + args = parser.parse_args(argv) + data = [read_data(input_file) for input_file in args.input_dir.iterdir()] + + if len(data) == 0: + raise RuntimeError(f"No data to show.") + + title = args.title + dirname = args.input_dir.stem + if title is None: + try: + name, _, bs, _, _, _, _, step = dirname.rsplit("_", 7) + title = f"{name} {step}, bs = {bs}" + except ValueError: + title = dirname + + fig = plot(data, title, args.size) + fig.show() + if args.save_dir: + save_path = (args.save_dir / dirname).with_suffix(".jpg") + fig.savefig(save_path) + print(f"Figure saved to {save_path}") + + input("Press enter to continue") + + +if __name__ == "__main__": + main() diff --git a/src/pipeline.py b/src/pipeline.py index 7ae3977..03f8c0d 100644 --- a/src/pipeline.py +++ b/src/pipeline.py @@ -18,6 +18,7 @@ AutoTokenizer, PretrainedConfig, PreTrainedModel, + GPTBigCodeConfig, ) @@ -40,22 +41,26 @@ def __init__( ): self.global_metrics = {} log_rank_n("*** Setting up tokenizer", logger.info) - t0 = time.perf_counter() - self.tokenizer = AutoTokenizer.from_pretrained(tokenizer) + t0 = self._get_time() + self.tokenizer = AutoTokenizer.from_pretrained(tokenizer, padding_side="left") + if self.tokenizer.pad_token is None: + self.tokenizer.pad_token = self.tokenizer.eos_token - self.tokenizer.add_special_tokens({"pad_token": "[PAD]"}) - t1 = time.perf_counter() + t1 = self._get_time() self.device = device + if self.device == torch.device("cuda"): + self.device = torch.device("cuda:0") + self.dtype = dtype self.is_int8 = self.dtype == torch.int8 self.fast_init = fast_init self.trust_remote_code = trust_remote_code - if self.is_int8 and self.device != torch.device("cuda"): + if self.is_int8 and self.device != torch.device("cuda:0"): raise ValueError(f"Model quantization not supported on device {self.device}") self.config = self._get_config(model_type, pretrained_config or pretrained_model, config_args) - t2 = time.perf_counter() + t2 = self._get_time() logger.info(f"Model configuration: {self.config}") @@ -67,27 +72,27 @@ def __init__( self.model = self._load_pretrained(pretrained_model) self.model.eval() - t3 = time.perf_counter() + t3 = self._get_time() self.global_metrics[Metrics.INIT_TOKEN] = t1 - t0 self.global_metrics[Metrics.INIT_CONFIG] = t2 - t1 self.global_metrics[Metrics.INIT_TOTAL] = t3 - t0 def _create_model(self) -> PreTrainedModel: - t0 = time.perf_counter() + t0 = self._get_time() log_rank_n("*** Creating model", logger.info) with fast_init(self.device) if self.fast_init else contextlib.nullcontext(): torch_dtype = torch.float16 if self.is_int8 else self.dtype model = AutoModelForCausalLM.from_config( config=self.config, torch_dtype=torch_dtype, trust_remote_code=self.trust_remote_code ) - t1 = time.perf_counter() + t1 = self._get_time() log_rank_n("*** Moving to device", logger.info) model.to(self.device) - t2 = time.perf_counter() + t2 = self._get_time() log_rank_n("*** Initializing weights", logger.info) # Initialization is ~1000x faster on GPU. model.init_weights() - t3 = time.perf_counter() + t3 = self._get_time() self.global_metrics[Metrics.INIT_CREATE] = t1 - t0 self.global_metrics[Metrics.INIT_DEVICE] = t2 - t1 self.global_metrics[Metrics.INIT_WEIGHTS] = t3 - t2 @@ -101,14 +106,14 @@ def _reload_model(self): self.model = self._load_pretrained("tmp") def _save_pretrained(self, pretrained_model: str): - t0 = time.perf_counter() + t0 = self._get_time() log_rank_n(f"*** Saving model to {pretrained_model}", logger.info) - t1 = time.perf_counter() + t1 = self._get_time() self.global_metrics[Metrics.INIT_SAVE] = t1 - t0 self.model.save_pretrained(pretrained_model) def _load_pretrained(self, pretrained_model: str) -> PreTrainedModel: - t0 = time.perf_counter() + t0 = self._get_time() log_rank_n(f"*** Loading model from {pretrained_model}", logger.info) kwargs = {"load_in_8bit": True, "device_map": "auto"} if self.is_int8 else {"torch_dtype": self.dtype} with fast_init(self.device) if self.fast_init else contextlib.nullcontext(): @@ -120,12 +125,12 @@ def _load_pretrained(self, pretrained_model: str) -> PreTrainedModel: trust_remote_code=self.trust_remote_code, **kwargs, ) - t1 = time.perf_counter() + t1 = self._get_time() self.global_metrics["load pretrained model"] = t1 - t0 if not self.is_int8: log_rank_n("*** Moving to device", logger.info) model = model.to(self.device) - t2 = time.perf_counter() + t2 = self._get_time() self.global_metrics[Metrics.INIT_DEVICE] = t2 - t1 return model @@ -171,26 +176,175 @@ def _get_config( return config - def __call__(self, text: List[str], **generate_kwargs) -> Tuple[List[str], Dict[str, Any]]: - t0 = time.perf_counter() - inputs = self.tokenizer(text, return_tensors="pt", padding=True) - + def _get_time(self, synchronize=False): + if synchronize: + torch.cuda.synchronize() + return time.perf_counter() + + def _allocate_mock_cache(self, past_key_length: int, batch_size: int): + if isinstance(self.config, GPTBigCodeConfig): + if self.config.pre_allocate_kv_cache: + past_key_values = [past_key_length] * self.config.n_layer + for block in self.model.transformer.h: + block.attn.get_kv_cache( + batch_size, past_key_length, dtype=self.dtype, device=self.device + ).normal_() + else: + kv_dim = self.config.n_embd // self.config.n_head if self.config.multi_query else self.config.n_embd + past_key_values = [ + torch.randn([batch_size, past_key_length, 2 * kv_dim], dtype=self.dtype, device=self.device) + for _ in range(self.config.n_layer) + ] + else: + past_key_values = [ + [ + torch.randn( + [batch_size, past_key_length, self.config.n_embd], dtype=self.dtype, device=self.device + ) + for _ in range(2) + ] + for _ in range(self.config.n_layer) + ] + return past_key_values + + def _generate_custom( + self, + inputs: Dict, + max_new_tokens: int, + use_cache: bool = True, + do_prefill: bool = True, + breakdown_latency: bool = False, + key_length_step: int = 1, + ignore_oom: bool = False, + pad_generated_tokens: float = 0, + ): + t0 = self._get_time(breakdown_latency) + batch_size, input_length = inputs["input_ids"].shape + output_length = input_length + max_new_tokens + input_ids = torch.empty([batch_size, output_length], dtype=torch.int64, device=self.device) + input_ids[:, :input_length].copy_(inputs["input_ids"]) + if key_length_step > 1: + input_ids[:, input_length:].fill_(self.tokenizer.pad_token_id) + + attention_mask = torch.empty([batch_size, output_length], dtype=torch.bool, device=self.device) + attention_mask[:, :input_length].copy_(inputs["attention_mask"]) + if pad_generated_tokens > 0: + attention_mask[:, input_length:].copy_( + torch.empty_like(attention_mask[:, input_length:], dtype=torch.float32).uniform_() + > pad_generated_tokens + ) + else: + attention_mask[:, input_length:].fill_(True) + + position_ids = attention_mask.long().cumsum(-1, dtype=torch.int64) - 1 + # TODO: Useless? + position_ids[:, :input_length].masked_fill_(attention_mask[:, :input_length] == 0, 1) + + t1 = self._get_time(breakdown_latency) + last_time = t1 + past_key_length = 0 + past_key_values = None + generate_times = {} + for key_length in range(input_length, output_length, key_length_step): + try: + if ( + use_cache + and (past_key_values is None and not do_prefill) + or (past_key_values is not None and key_length_step > 1) + ): + past_key_length = key_length - 1 + past_key_values = self._allocate_mock_cache(past_key_length, batch_size) + # Exclude cache creation from timing + last_time = self._get_time(breakdown_latency) + outputs = self.model( + input_ids=input_ids[:, past_key_length:key_length], + past_key_values=past_key_values, + attention_mask=attention_mask[:, :key_length], + position_ids=position_ids[:, past_key_length:key_length], + return_dict=True, + use_cache=use_cache, + ) + if use_cache: + past_key_values = outputs.past_key_values + past_key_length = key_length + next_tokens = torch.argmax(outputs.logits[:, -1, :], dim=-1) + input_ids[:, key_length] = next_tokens + t2 = self._get_time(breakdown_latency) + generate_times[key_length] = t2 - last_time + last_time = t2 + except torch.cuda.OutOfMemoryError: + if ignore_oom: + logger.warning(f"Out of memory at key length {key_length}") + break + else: + raise + + metrics = {} + if breakdown_latency: + metrics[Metrics.LATENCY_GENERATE_START] = t1 - t0 + metrics[Metrics.LATENCY_GENERATE_BREAKDOWN] = generate_times + + return input_ids, metrics + + def _generate_hf(self, inputs: Dict, max_new_tokens: int, use_cache: bool): inputs = {key: value.to(self.device) if torch.is_tensor(value) else value for key, value in inputs.items()} + output = self.model.generate( + **inputs, + return_dict_in_generate=True, + max_new_tokens=max_new_tokens, + do_sample=False, + pad_token_id=self.tokenizer.pad_token_id, + use_cache=use_cache, + ) + return output.sequences - t1 = time.perf_counter() - with torch.inference_mode(): - output = self.model.generate(**inputs, return_dict_in_generate=True, **generate_kwargs) - t2 = time.perf_counter() + def __call__( + self, + text: List[str], + max_new_tokens: int, + custom_generate: bool = False, + use_cache: bool = True, + do_prefill: bool = True, + breakdown_latency=False, + key_length_step: int = 1, + ignore_oom: bool = False, + pad_generated_tokens: float = 0, + ) -> Tuple[List[str], Dict[str, Any]]: + t0 = self._get_time() + inputs = self.tokenizer(text, return_tensors="pt", padding=True) - output_tokens = output.sequences + t1 = self._get_time() + with torch.inference_mode(): + if custom_generate: + assert do_prefill or use_cache + output_tokens, generate_metrics = self._generate_custom( + inputs, + max_new_tokens, + use_cache, + do_prefill, + breakdown_latency, + key_length_step, + ignore_oom, + pad_generated_tokens, + ) + else: + assert do_prefill + assert not breakdown_latency + assert not ignore_oom + assert key_length_step == 1 + assert pad_generated_tokens == 0 + output_tokens = self._generate_hf(inputs, max_new_tokens, use_cache) + generate_metrics = {} + t2 = self._get_time(True) batch_size, input_length = inputs["input_ids"].shape output_length = output_tokens.size(1) output_text = self.tokenizer.batch_decode(output_tokens.cpu(), skip_special_tokens=True) - t3 = time.perf_counter() + t3 = self._get_time() metrics = { + **generate_metrics, Metrics.BATCH_SIZE: batch_size, Metrics.INPUT_LENGTH: input_length, Metrics.OUTPUT_LENGTH: output_length, @@ -218,14 +372,25 @@ def aggregate_metrics(self, metrics: List[Dict[str, Any]]): Metrics.TOKENS_BATCH, Metrics.LATENCY_TOKEN, Metrics.LATENCY_MODEL, + Metrics.LATENCY_GENERATE_START, + Metrics.LATENCY_GENERATE_BREAKDOWN, Metrics.LATENCY_DECODE, Metrics.LATENCY_E2E, ) } + + breakdown = all_metrics.pop(Metrics.LATENCY_GENERATE_BREAKDOWN, []) + mean_metrics = {key: np.mean(value).item() for key, value in all_metrics.items() if len(value) > 0} throughput = mean_metrics[Metrics.TOKENS_BATCH] / mean_metrics[Metrics.LATENCY_E2E] model_throughput = mean_metrics[Metrics.TOKENS_BATCH] / mean_metrics[Metrics.LATENCY_MODEL] + if len(breakdown) > 0: + mean_metrics[Metrics.LATENCY_GENERATE_BREAKDOWN] = { + str(key): np.mean([values[key] for values in breakdown if key in values]).item() + for key in breakdown[0] + } + return { **self.global_metrics, **mean_metrics, diff --git a/src/utils.py b/src/utils.py index 33cfe32..9abc913 100644 --- a/src/utils.py +++ b/src/utils.py @@ -3,8 +3,10 @@ import logging.config import math import typing -from typing import Any, Callable, List, Optional, Tuple +from pathlib import Path +from typing import Any, Callable, List, Optional, Tuple, Union +import numpy as np from torch import distributed as dist @@ -82,12 +84,16 @@ def log_rank_n(msg: str, logger: Callable = logging.info, rank: int = 0): logger(line) -def log_dict(data: dict, logger: Callable = logging.info, rank: int = 0): +def log_dict(data: dict, logger: Callable = logging.info, rank: int = 0, _prefix=""): for key, value in data.items(): - log_rank_n(f"{key}: {value}", logger, rank) + if isinstance(value, dict): + log_rank_n(f"{_prefix}{key}:", logger, rank) + log_dict(value, logger, rank, _prefix + " ") + else: + log_rank_n(f"{_prefix}{key}: {value}", logger, rank) -dummy_input_sentences = [ +dummy_inputs = [ "DeepSpeed is a machine learning framework", "He is working on", "He has a", @@ -99,14 +105,100 @@ def log_dict(data: dict, logger: Callable = logging.info, rank: int = 0): ] -def get_dummy_batch(batch_size: int, max_input_length: int = -1) -> List[str]: +def get_input_lengths(batch_size, max_input_length, padding_ratio, random_state): + """ + Generate a random set of input lengths with the desired padding ratio and at least one of the specified max length. + """ + if padding_ratio == 0: + return batch_size * [max_input_length] + assert batch_size >= 2 + total_tokens = batch_size * max_input_length + pad_tokens = round(padding_ratio * total_tokens) + input_tokens = total_tokens - pad_tokens + # First length is deterministic + required_tokens = input_tokens - max_input_length + average_length = required_tokens / (batch_size - 1) + smin = 1 + smax = round(2 * average_length - smin) + if smax > max_input_length: + smax = max_input_length + smin = round(2 * average_length - smax) + assert smax >= smin >= 1, "Cannot obtain desired padding ratio" + print("AA", batch_size, max_input_length, padding_ratio, smin, smax) + assert abs(smax + smin - 2 * average_length) < 1 + for i in range(100): + lengths = random_state.randint(smin, smax, batch_size - 2) + remaining = required_tokens - lengths.sum() + if 1 <= remaining <= max_input_length: + lengths = [max_input_length, *lengths.tolist(), remaining] + random_state.shuffle(lengths) + assert sum(lengths) == input_tokens + return lengths + raise RuntimeError("Failed to get desired padding ratio") + + +def get_inputs_from_tokens(tokens, length, tokenizer): + for _ in range(10): + assert len(tokens) == length + inputs = tokenizer.decode(tokens) + # We often get more tokens than we started with, less in som rare cases. + tokens = tokenizer(inputs)["input_ids"] + if len(tokens) == length: + return inputs + tokens = tokens[:length] + max(length - len(tokens), 0) * [tokens[-1]] + raise RuntimeError("Failed to generate stable input sequences") + + +def get_random_inputs(length, tokenizer, random_state): + return get_inputs_from_tokens(random_state.randint(0, tokenizer.vocab_size, length).tolist(), length, tokenizer) + + +def get_inputs_from_files(files: List[Path], lengths, tokenizer, random_state): + file_tokens = [tokenizer(f.open().read())["input_ids"] for f in files] + max_len = max(len(t) for t in file_tokens) + batch_size = len(lengths) + inputs = [] + while len(inputs) < batch_size: + length = lengths[len(inputs)] + if length > max_len: + # No file works, pick at random instead. + inputs.append(get_random_inputs(length, tokenizer, random_state)) + else: + tokens = file_tokens[random_state.randint(len(file_tokens))] + if length > len(tokens): + # Try another file. + continue + start_index = random_state.randint(len(tokens) - length) + inputs.append(get_inputs_from_tokens(tokens[start_index : start_index + length], length, tokenizer)) + return inputs + + +def get_input_batch( + batch_size: int, + max_input_length: int = -1, + tokenizer=None, + padding_ratio: float = 0, + seed: int = 0, + sample_dir: Optional[Union[Path, List[Path]]] = None, +) -> List[str]: if max_input_length == -1: - input_sentences = copy.deepcopy(dummy_input_sentences) + inputs = copy.deepcopy(dummy_inputs) + if batch_size > len(inputs): + inputs *= math.ceil(batch_size / len(inputs)) + return inputs[:batch_size] else: - input_sentences = batch_size * [" Hello" * max_input_length] - - if batch_size > len(input_sentences): - input_sentences *= math.ceil(batch_size / len(input_sentences)) - input_sentences = input_sentences[:batch_size] - - return input_sentences + random_state = np.random.RandomState(seed) + lengths = get_input_lengths(batch_size, max_input_length, padding_ratio, random_state) + if isinstance(sample_dir, Path): + if sample_dir.is_dir(): + sample_dir = [f for f in sample_dir.iterdir() if f.is_file() and f.suffix == ".py"] + elif sample_dir.is_file(): + sample_dir = [sample_dir] + else: + raise FileNotFoundError(sample_dir) + if sample_dir is None: + return get_random_inputs(lengths, tokenizer, random_state) + else: + assert isinstance(sample_dir, List) + assert len(sample_dir) > 0 + return get_inputs_from_files(sample_dir, lengths, tokenizer, random_state) diff --git a/transformers b/transformers index 9c3c548..a2efad2 160000 --- a/transformers +++ b/transformers @@ -1 +1 @@ -Subproject commit 9c3c5484d831484f96e2bcd2961cfac100e52d0b +Subproject commit a2efad2c96e6da982f102eea53918c7b8431da80