diff --git a/benchmarks/benchmark_forward.py b/benchmarks/benchmark_forward.py index bf547ec..5ad678c 100755 --- a/benchmarks/benchmark_forward.py +++ b/benchmarks/benchmark_forward.py @@ -18,7 +18,7 @@ def main(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--model", type=str, required=True, help="Model") parser.add_argument("--initial_peers", type=str, nargs="+", default=PUBLIC_INITIAL_PEERS, help="Initial peers") - parser.add_argument("--torch_dtype", type=str, default="bfloat16", help="Torch dtype") + parser.add_argument("--torch_dtype", type=str, default="float32", help="Torch dtype") parser.add_argument("--n_processes", type=str, default=1, help="Number of concurrent processes") parser.add_argument("--seq_len", type=int, default=128, help="Sequence length") parser.add_argument("--n_steps", type=int, default=100, help="Number of benchmark steps") @@ -31,15 +31,19 @@ def main(): else: args.n_processes = int(args.n_processes) - processes = [mp.Process(target=benchmark_forward, args=(i, args)) for i in range(args.n_processes)] + pipe_recv, pipe_send = mp.Pipe(duplex=False) + processes = [mp.Process(target=benchmark_forward, args=(i, args, pipe_send)) for i in range(args.n_processes)] for proc in processes: proc.start() for proc in processes: proc.join() + speed = np.mean([pipe_recv.recv() for _ in range(args.n_processes)]) + logger.info(f"Final result: {speed=:.2f}") + @torch.inference_mode() -def benchmark_forward(process_idx, args): +def benchmark_forward(process_idx, args, result_pipe): model = AutoDistributedModel.from_pretrained( args.model, initial_peers=args.initial_peers, @@ -64,7 +68,7 @@ def benchmark_forward(process_idx, args): speed = input_ids.numel() / np.mean(step_times) logger.info(f"{process_idx=} {step=} {speed=:.2f}") - logger.info(f"Final result: {process_idx=} {speed=:.2f}") + result_pipe.send(speed) if __name__ == "__main__": diff --git a/benchmarks/benchmark_inference.py b/benchmarks/benchmark_inference.py index e894bb1..202dc6d 100755 --- a/benchmarks/benchmark_inference.py +++ b/benchmarks/benchmark_inference.py @@ -19,7 +19,7 @@ def main(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument("--model", type=str, required=True, help="Model") parser.add_argument("--initial_peers", type=str, nargs="+", default=PUBLIC_INITIAL_PEERS, help="Initial peers") - parser.add_argument("--torch_dtype", type=str, default="bfloat16", help="Torch dtype") + parser.add_argument("--torch_dtype", type=str, default="float32", help="Torch dtype") parser.add_argument("--n_processes", type=str, default=1, help="Number of concurrent processes") parser.add_argument("--seq_len", type=int, default=2048, help="Sequence length") parser.add_argument("--warmup_steps", type=int, default=1, help="Number of warmup steps") @@ -30,15 +30,19 @@ def main(): else: args.n_processes = int(args.n_processes) - processes = [mp.Process(target=benchmark_inference, args=(i, args)) for i in range(args.n_processes)] + pipe_recv, pipe_send = mp.Pipe(duplex=False) + processes = [mp.Process(target=benchmark_inference, args=(i, args, pipe_send)) for i in range(args.n_processes)] for proc in processes: proc.start() for proc in processes: proc.join() + speed = np.mean([pipe_recv.recv() for _ in range(args.n_processes)]) + logger.info(f"Final result: {speed=:.2f}") + @torch.inference_mode() -def benchmark_inference(process_idx, args): +def benchmark_inference(process_idx, args, result_pipe): tokenizer = AutoTokenizer.from_pretrained(args.model, use_fast=False) # Using use_fast=False since LlamaTokenizerFast takes a long time to start, and we decode 1 token at a time anyway @@ -61,7 +65,7 @@ def benchmark_inference(process_idx, args): speed = 1 / np.mean(step_times) logger.info(f"{process_idx=} {step=} {speed=:.2f}") - logger.info(f"Final result: {process_idx=} {speed=:.2f}") + result_pipe.send(speed) if __name__ == "__main__": diff --git a/benchmarks/benchmark_training.py b/benchmarks/benchmark_training.py index 85061a3..f542907 100755 --- a/benchmarks/benchmark_training.py +++ b/benchmarks/benchmark_training.py @@ -20,7 +20,7 @@ def main(): parser.add_argument("--device", type=str, default="cpu", help="Torch device hosting the client") parser.add_argument("--task", type=str, default="cls", help="Training task type") parser.add_argument("--initial_peers", type=str, nargs="+", default=PUBLIC_INITIAL_PEERS, help="Initial peers") - parser.add_argument("--torch_dtype", type=str, default="bfloat16", help="Torch dtype") + parser.add_argument("--torch_dtype", type=str, default="float32", help="Torch dtype") parser.add_argument("--n_processes", type=str, default=1, help="Number of concurrent processes") parser.add_argument("--seq_len", type=int, default=128, help="Sequence length") parser.add_argument("--pre_seq_len", type=int, default=16, help="Number of trainable tokens") @@ -36,14 +36,18 @@ def main(): else: args.n_processes = int(args.n_processes) - processes = [mp.Process(target=benchmark_training, args=(i, args)) for i in range(args.n_processes)] + pipe_recv, pipe_send = mp.Pipe(duplex=False) + processes = [mp.Process(target=benchmark_training, args=(i, args, pipe_send)) for i in range(args.n_processes)] for proc in processes: proc.start() for proc in processes: proc.join() + fwd_speed, bwd_speed = np.mean([pipe_recv.recv() for _ in range(args.n_processes)], axis=0) + logger.info(f"Final result: {fwd_speed=:.2f} {bwd_speed=:.2f}") -def benchmark_training(process_idx, args): + +def benchmark_training(process_idx, args, result_pipe): if args.task == "cls": model = AutoDistributedModelForSequenceClassification.from_pretrained( args.model, @@ -96,7 +100,7 @@ def benchmark_training(process_idx, args): bwd_speed = input_ids.numel() / np.mean(bwd_times) logger.info(f"{process_idx=} Fwd speed: {fwd_speed:.2f} | Bwd speed: {bwd_speed:.2f}") - logger.info(f"Final result: {process_idx=} {fwd_speed=:.2f} | {bwd_speed=:.2f}") + result_pipe.send((fwd_speed, bwd_speed)) if __name__ == "__main__":