前文已经介绍了:
本文主要介绍 强化学习框架:OpenRLHF源码解读,模型训练。因为在 OpenRLHF 中整个模型训练过程代码比较多因此分成多次进行解读,此部分主要介绍一些训练前的初始化配置。因为RL由DPO、GRPO、PPO等几种类别,因此本文主要介绍PPO范式训练。在OpenRLHF训练框架中,主要还会应用到DeepSpeed以及vLLM,因此在介绍PPO训练之前需要回顾一下:1、DeepSpeed的配置;2、vLLM配置。
在之前Blog已经对DeepSpeed以及vLLM原理进行了解释,因此只需要介绍在OpenRLHF如何去对这两部分进行配置
参数参考脚本:https://github.com/OpenRLHF/OpenRLHF/blob/main/examples/scripts/train_ppo_llama_ray.sh 中的设置
From:https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/trainer/ray/vllm_engine.py
def create_vllm_engines(
num_engines: int, # 推理引擎数量
tensor_parallel_size: int, # 张量并行大小
pretrain: str,
seed: int,
full_determinism: bool,
enable_prefix_caching: bool,
enforce_eager: bool,
max_model_len: int,
num_total_actors: int,
shared_pg=None,
gpu_memory_utilization=None,
vllm_enable_sleep=False,):
...
# 1、资源调度配置。配置参数设置为:num_engines= tensor_parallel_size= 2
use_hybrid_engine = shared_pg is not None
num_gpus = int(tensor_parallel_size == 1)
if use_hybrid_engine and tensor_parallel_size == 1:
num_gpus = 0.2
if not use_hybrid_engine:
bundles = [{"GPU": 1, "CPU": 1} for _ in range(num_engines * tensor_parallel_size)]
shared_pg = placement_group(bundles, strategy="PACK")
ray.get(shared_pg.ready())
...
# 2、构建每一个vLLM(=2)
for i in range(num_engins):
...
scheduling_strategy = PlacementGroupSchedulingStrategy(...) # 调度策略
...
vllm_engines.append(
LLMRayActor.options(
num_cpus=num_gpus,
num_gpus=num_gpus,
scheduling_strategy=scheduling_strategy,
).remote(...)
)
1、资源调度配置:第一种Hybrid模式(多个引擎共同占用GPU);第二种标准模式(每个引擎都单独占用一个GPU和CPU);
2、构建vLLM:首先是建立资源调度策略,以及使用vLLM。有必要了解一下就是在OpenRLHF中使用的是 ray 分布式架构进行训练。简单了解一下在这个里面他是怎么做的。通过 ray封装了一个vLLM推理架构( LLMRayActor
)
补充-1:ray简单使用,代码:🔗
Ray核心概念:1、任务(Task):无状态的并行函数调用。
2、Actor:有状态的计算单元,适合需要持久状态的场景(如模型推理)。3、远程调用(remote):通过 .remote() 异步调度任务或 Actor 方法。比如说上面代码,初始化我需要的节点@ray.remote class PrintActor:
在vLLM中可能就需要对资源进行分配:
PrintActor.options(...).remote(...)
其中remote
就是每个“进程”需要输出的任务参数,而options
则是资源分配策略,比如GPU(num_gpus
)/CPU(num_cpus
)数量。在后面获取进程结果可以直接通过:ray.init() print_engines = create_print_engines(4) results = [engine.execture_print.remote(i) for i, engine in enumerate(print_engines)] print(ray.get(results))
TODO: 此部分没有写完
完全了解PPO训练范式之前需要了解一下在OpenRLHF中如何定义 PPO训练器的。
experience_maker.py
https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/trainer/ppo_utils/experience_maker.py
改代码中首先定义一个基础类 BaseExperienceMaker
用于初始化,这个类中主要作用为:1、定义一个 tokenizer
;2、一些初始化。关键是 RemoteExperienceMaker
定义:
class RemoteExperienceMaker(BaseExperienceMaker):
def __init__(...):
...
# (1)定义生成器,通过LLM生成新的文本内容
@torch.no_grad()
def generate_samples(self, all_prompts: List[str], all_labels, **generate_kwargs):
#(1.1)使用hugginface生成 _generate_with_hf(...)
#(1.2)使用vLLM进行生成 _generate_vllm(...)
@torch.no_grad()
def _generate_with_hf(...):
...
def _generate_vllm(...):
....
(1)定义生成器:通过两种方式(huggingface以及vLLM)
@torch.no_grad()
def _generate_with_hf(self, all_prompts: List[str], all_labels, **generate_kwargs) -> List[Samples]:
...
# train_ppo_ray.py n_samples_per_prompt=1 对每一条 prompt 生成内容数量
all_prompts = sum([[prompt] * args.n_samples_per_prompt for prompt in all_prompts], [])
all_labels = sum([[label] * args.n_samples_per_prompt for label in all_labels], [])
samples_list = []
for i in range(0, len(all_prompts), args.micro_rollout_batch_size):
# train_ppo_ray.py micro_rollout_batch_size=8
prompts = all_prompts[i : i + args.micro_rollout_batch_size]
labels = all_labels[i : i + args.micro_rollout_batch_size]
inputs = self.tokenize_fn(prompts, self.prompt_max_len, device="cuda")
sequences, attention_mask, action_mask = self.actor.generate(**inputs, **generate_kwargs)
samples = Samples(...) # 将生成内容所有信息存储,比如说sequences这些关键信息
samples_list.append(samples)
return samples_list
使用vLLM方式进行生成需要注意的是,正如最开始上面介绍的,我的所有的vLLM都是通过ray进行处理了,因此这个就会比较复杂
def _generate_vllm(self, all_prompts: List[str], all_labels, **kwargs):
...
#(1)首先获取所有的设备数量,并且确定 vllm 引擎数量
rank = torch.distributed.get_rank() // self.strategy.ring_attn_size
world_size = torch.distributed.get_world_size() // self.strategy.ring_attn_size
if len(self.vllm_engines) <= world_size:
llms = [self.vllm_engines[rank % len(self.vllm_engines)]]
else:
llms = self.vllm_engines[rank::world_size]
...
#(2)直接通过 from vllm import SamplingParams 来设置生成器策略比如说 top_p等
sampling_params = SamplingParams(...)
#(3)将prompt、labels进行重复采用,这里和hf生成处理相同,并且将prompt通过tokenizer进行处理
all_prompts = sum([[prompt] * args.n_samples_per_prompt for prompt in all_prompts], [])
all_labels = sum(...)
all_prompt_token_ids = self.tokenize_fn(all_prompts, self.prompt_max_len, padding=False)["input_ids"]
#(4)直接将所有的prompt输入到 vllm中进行生成
refs = []
batch_size = (len(all_prompt_token_ids) + len(llms) - 1) // len(llms)
for i, llm in enumerate(llms):
prompt_token_ids = all_prompt_token_ids[i * batch_size : (i + 1) * batch_size]
refs.append(
llm.add_requests.remote(rank, sampling_params=sampling_params, prompt_token_ids=prompt_token_ids)
)
ray.get(refs)
#(5)所有设备之间进行同步
...
#(6)获取vLLM生成内容
all_output_refs = []
for i, llm in enumerate(llms):
all_output_refs.append(llm.get_responses.remote(rank))
all_outputs = sum(ray.get(all_output_refs), [])
#(7)处理vLLM输出
samples_list = []
for i in range(0, len(all_outputs), args.micro_rollout_batch_size):
outputs = all_outputs[i : i + self.strategy.args.micro_rollout_batch_size]
prompts = all_prompts[i : i + self.strategy.args.micro_rollout_batch_size]
labels = all_labels[i : i + self.strategy.args.micro_rollout_batch_size]
#(8)处理数据
if not self.packing_samples:
# 不对数据进行打包输出
# | [PAD] [PAD] token token token | token token [EOS] [PAD] |
# | token token token token token | token token [EOS] [PAD] |
# | [PAD] [PAD] [PAD] token token | token token token [EOS] |
# |<---------- prompt ----------->|<-------- answer ------->|
...
samples_list.append(...)
else:
# 对数据进行打包输出
# | token token token | token token [EOS] | token token token token token | token token [EOS] | token token | token token token [EOS] |
# |<--- prompt ----->|<---- answer ----->|<---------- prompt ----------->|<----- answer ---->|<- prompt -->|<-------- answer ------->|
...
samples_list.append(...)
return samples_list
(4)llm.add_requests.remote(...)
、llm.get_responses.remote(...)
其中 add_requests
执行操作:直接通过vLLM生成文本:self.llm.generate(prompts=requests, sampling_params=sampling_params)
而在 get_responses
则是直接获取vLLM所生成的内容。
(7-8)在处理输出过程中由于数据可能会将短文本进行拼接(packing_samples
)但是不同短文本之间对应问题是不同的,因此就需要将输出进行整理。处理过程比较简单:
不对数据进行打包输出:只需要就算 outputs
(是被切分了每次处理的一块) 中最大长度然后按照这个最大长度进行填补即可(左填充方式:在最左侧添加 [PAD]标记)
如果将数据进行打包输出:只需要将prompt和输出拼接起来即可
ppo_actor.py
https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/trainer/ray/ppo_actor.py
这里主要是定义了一个trainer类,用来执行数据加载、模型处理、记录loss等操作
class ActorPPOTrainer(BasePPOTrainer):
def __init__(...):
...
# (1)初始化记录器,比如说wandb、tensorboard,这部分比较简单不做介绍
...
self.experience_maker = RemoteExperienceMaker(...) #TODO: 这部分需要特殊去看一下
# (2)因为要使用ray就需要设置分布式训练设置
backend = getattr(self.strategy.args, "vllm_sync_backend", "nccl") # 判断是否使用 nccl 进行通信
...
# (3)ray 分布式节点设置
if self.vllm_engines is not None and not self.use_cuda_ipc and torch.distributed.get_rank() == 0: # 如果不使用 nccl 进行通信
# 初始化设置:获取主节点,并且随便绑定一个端口
master_address = ray._private.services.get_node_ip_address()
with socket.socket() as sock:
sock.bind(("", 0))
master_port = sock.getsockname()[1]
vllm_num_engines, vllm_tensor_parallel_size = (
self.strategy.args.vllm_num_engines,
self.strategy.args.vllm_tensor_parallel_size,
)
# 获取规模:计算vllm引擎以及几张卡进行张量并行
world_size = vllm_num_engines * vllm_tensor_parallel_size + 1
use_ray = getattr(self.strategy.args, "vllm_sync_with_ray", False)
group_name = "openrlhf"
refs = [
engine.init_process_group.remote(
master_address,
master_port,
i * vllm_tensor_parallel_size + 1,
world_size,
group_name,
backend=backend,
use_ray=use_ray,
)
for i, engine in enumerate(self.vllm_engines)
]
# (4) 通信之间进行同步
...
ray.get(refs)
torch.distributed.barrier()
def fit(
...
prompts_dataloader,
pretrain_dataloader,
consumed_samples=0,
num_update_steps_per_episodes=1,):
# 主要进行数据加载/模型计算/模型存储等操作
...
for episode in range(start_episode, args.num_episodes):
...
for rand_prompts, labels in self.prompts_dataloader:
for i, experience in enumerate(
self.experience_maker.make_experience_list(rand_prompts, labels, **self.generate_kwargs)
):
(3)使用 ray 分布式节点设置,初始化设置主节点/端口号/分组信息等,通过 refs
来存储初始化的 vLLM,代码中初始化操作为:
def init_process_group(self, master_address, master_port, rank_offset, world_size, group_name, backend, use_ray):
return self.llm.collective_rpc(
"init_process_group",
args=(master_address, master_port, rank_offset, world_size, group_name, backend, use_ray),
)
其中 self.llm=vllm.LLM(*args, **kwargs)
https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/trainer/ray/launcher.py
在launcher中主要也是定义各类分布式系统方式(如何将各类模型分不到设别上),首先来说定义两个基类:
DistributedTorchRayActor
:这个类是一个基于 Ray 的分布式 Actor,负责设置分布式训练或推理环境,它初始化分布式环境的配置,包括节点地址、端口、进程排名等。
BasePPORole(DistributedTorchRayActor)
:扩展上面所定义的分布式环境配置,并提供了批量推理的功能。
下面就是对各类模型:rewardmodel等进行分布式进行处理
ReferenceModelRayActor
和 RewardModelRayActor
@ray.remote(num_gpus=1)
class ReferenceModelRayActor(BasePPORole):
def init_model_from_pretrained(self, strategy: DeepspeedStrategy, pretrain):
self._setup_distributed(strategy)
model = Actor(...)
# 1)这里也就是对于一些模型进行初始化操作
...
def forward(...):
...
# 2)这里也就是直接将文本输入到模型中进行计算
@ray.remote(num_gpus=1)
class RewardModelRayActor(BasePPORole):
def init_model_from_pretrained(self, strategy: DeepspeedStrategy, pretrain):
self._setup_distributed(strategy)
model = get_llm_for_sequence_regression(...)
# 3) 初始化模型操作
...
def forward(...):
...
# 4)这里也就是直接将文本输入到模型中进行计算
(1、2、3、4)、模型初始化操作,这里没什么好解释的直接通过继承的类(BasePPORole
)中对 Deepspeed
进行初始化。模型计算也是比较常规的 forward
计算范式。对于 Actor
操作:📃。
PPORayActorGroup
:创建、初始化和管理多个分布式 Actor,协调它们的模型加载、训练、推理和保存操作class PPORayActorGroup:
def __init(
...,
pg: PlacementGroup = None,
...):
...
self._initiate_actors(pg, num_gpus_per_actor)
def _initiate_actors(
self,
pg, # 通过ray创建资源组
num_gpus_per_actor):
...
self._actor_handlers = [master_actor]
def async_fit_actor_model(
self,
critic_model_group: "PPORayActorGroup",
initial_model_group: "PPORayActorGroup",
reward_model_groups: List["PPORayActorGroup"],
remote_rm_urls: List[str] = None,
reward_fn: Callable[[List[torch.Tensor]], torch.Tensor] = None,
vllm_engines: List = None,):
...
critic_actors = critic_model_group._actor_handlers if critic_model_group else None
initial_actors = initial_model_group._actor_handlers if initial_model_group else None
def async_init_model_from_pretrained(...):
# 初始化模型
return [actor.init_model_from_pretrained.remote(*args, **kwargs) for actor in self._actor_handlers]
def async_save_model(self):
# 保存模型
return [actor.save_model.remote() for actor in self._actor_handlers]
def async_run_method(self, method_name, *args, **kwargs):
refs = []
for actor in self._actor_handlers:
method = getattr(actor, method_name)
refs.append(method.remote(*args, **kwargs))
return refs
(1)、在初始化( async_init_model_from_pretrained
)以及保存( async_save_model
)中里面所使用的 actor
就是上面提到的 ReferenceModelRayActor
https://github.com/Dao-AILab/flash-attention/issues/432#issuecomment-1698610752