强化学习框架:OpenRLHF源码解读,模型训练-1

HuangJie 于 2025-04-24 在 changsha 发布 ⏳ 预计阅读 5 分钟 浏览量

前文已经介绍了:

本文主要介绍 强化学习框架:OpenRLHF源码解读,模型训练。因为在 OpenRLHF 中整个模型训练过程代码比较多因此分成多次进行解读,此部分主要介绍一些训练前的初始化配置。因为RL由DPO、GRPO、PPO等几种类别,因此本文主要介绍PPO范式训练。在OpenRLHF训练框架中,主要还会应用到DeepSpeed以及vLLM,因此在介绍PPO训练之前需要回顾一下:1、DeepSpeed的配置2、vLLM配置

在之前Blog已经对DeepSpeed以及vLLM原理进行了解释,因此只需要介绍在OpenRLHF如何去对这两部分进行配置

参数参考脚本:https://github.com/OpenRLHF/OpenRLHF/blob/main/examples/scripts/train_ppo_llama_ray.sh 中的设置

From:https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/trainer/ray/vllm_engine.py

def create_vllm_engines(
    num_engines: int, # 推理引擎数量
    tensor_parallel_size: int, # 张量并行大小
    pretrain: str,
    seed: int,
    full_determinism: bool,
    enable_prefix_caching: bool,
    enforce_eager: bool,
    max_model_len: int,
    num_total_actors: int,
    shared_pg=None,
    gpu_memory_utilization=None,
    vllm_enable_sleep=False,):
    ...
    # 1、资源调度配置。配置参数设置为:num_engines= tensor_parallel_size= 2
    use_hybrid_engine = shared_pg is not None
    num_gpus = int(tensor_parallel_size == 1)
    if use_hybrid_engine and tensor_parallel_size == 1:
        num_gpus = 0.2

    if not use_hybrid_engine:
        bundles = [{"GPU": 1, "CPU": 1} for _ in range(num_engines * tensor_parallel_size)]
        shared_pg = placement_group(bundles, strategy="PACK")
        ray.get(shared_pg.ready())
    ...
    # 2、构建每一个vLLM(=2)
    for i in range(num_engins):
        ...
        scheduling_strategy = PlacementGroupSchedulingStrategy(...) # 调度策略
        ...
        vllm_engines.append(
            LLMRayActor.options(
            num_cpus=num_gpus,
            num_gpus=num_gpus,
            scheduling_strategy=scheduling_strategy,
        ).remote(...)
        )

1、资源调度配置:第一种Hybrid模式(多个引擎共同占用GPU);第二种标准模式(每个引擎都单独占用一个GPU和CPU);
2、构建vLLM:首先是建立资源调度策略,以及使用vLLM。有必要了解一下就是在OpenRLHF中使用的是 ray 分布式架构进行训练。简单了解一下在这个里面他是怎么做的。通过 ray封装了一个vLLM推理架构( LLMRayActor

补充-1:ray简单使用,代码:🔗
Ray核心概念:1、任务(Task):无状态的并行函数调用。
2、Actor:有状态的计算单元,适合需要持久状态的场景(如模型推理)。3、远程调用(remote):通过 .remote() 异步调度任务或 Actor 方法。比如说上面代码,初始化我需要的节点

@ray.remote
class PrintActor:

在vLLM中可能就需要对资源进行分配:PrintActor.options(...).remote(...)其中 remote就是每个“进程”需要输出的任务参数,而 options 则是资源分配策略,比如GPU(num_gpus)/CPU(num_cpus)数量。在后面获取进程结果可以直接通过:

ray.init()
print_engines = create_print_engines(4)
results = [engine.execture_print.remote(i) for i, engine in enumerate(print_engines)]
print(ray.get(results))

PPO训练范式

TODO: 此部分没有写完
完全了解PPO训练范式之前需要了解一下在OpenRLHF中如何定义 PPO训练器的。

experience_maker.py

https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/trainer/ppo_utils/experience_maker.py

改代码中首先定义一个基础类 BaseExperienceMaker 用于初始化,这个类中主要作用为:1、定义一个 tokenizer;2、一些初始化。关键是 RemoteExperienceMaker定义:

class RemoteExperienceMaker(BaseExperienceMaker):
    def __init__(...):
        ...
    # (1)定义生成器,通过LLM生成新的文本内容
    @torch.no_grad()
    def generate_samples(self, all_prompts: List[str], all_labels, **generate_kwargs):
        #(1.1)使用hugginface生成  _generate_with_hf(...)
        #(1.2)使用vLLM进行生成     _generate_vllm(...)
    @torch.no_grad()
    def _generate_with_hf(...):
        ...
    def _generate_vllm(...):
        ....

    

(1)定义生成器:通过两种方式(huggingface以及vLLM)

@torch.no_grad()
def _generate_with_hf(self, all_prompts: List[str], all_labels, **generate_kwargs) -> List[Samples]:
    ...    
    # train_ppo_ray.py n_samples_per_prompt=1 对每一条 prompt 生成内容数量
    all_prompts = sum([[prompt] * args.n_samples_per_prompt for prompt in all_prompts], [])
    all_labels = sum([[label] * args.n_samples_per_prompt for label in all_labels], [])
    samples_list = []
    for i in range(0, len(all_prompts), args.micro_rollout_batch_size): 
        # train_ppo_ray.py micro_rollout_batch_size=8
        prompts = all_prompts[i : i + args.micro_rollout_batch_size]
        labels = all_labels[i : i + args.micro_rollout_batch_size]
        inputs = self.tokenize_fn(prompts, self.prompt_max_len, device="cuda")
        sequences, attention_mask, action_mask = self.actor.generate(**inputs, **generate_kwargs)
        samples = Samples(...) # 将生成内容所有信息存储,比如说sequences这些关键信息
        samples_list.append(samples)
    return samples_list

使用vLLM方式进行生成需要注意的是,正如最开始上面介绍的,我的所有的vLLM都是通过ray进行处理了,因此这个就会比较复杂

def _generate_vllm(self, all_prompts: List[str], all_labels, **kwargs):
    ...
    #(1)首先获取所有的设备数量,并且确定 vllm 引擎数量
    rank = torch.distributed.get_rank() // self.strategy.ring_attn_size
    world_size = torch.distributed.get_world_size() // self.strategy.ring_attn_size
    if len(self.vllm_engines) <= world_size:
        llms = [self.vllm_engines[rank % len(self.vllm_engines)]]
    else:
        llms = self.vllm_engines[rank::world_size]
    ...
    #(2)直接通过 from vllm import SamplingParams 来设置生成器策略比如说 top_p等
    sampling_params = SamplingParams(...)
    #(3)将prompt、labels进行重复采用,这里和hf生成处理相同,并且将prompt通过tokenizer进行处理
    all_prompts = sum([[prompt] * args.n_samples_per_prompt for prompt in all_prompts], [])
    all_labels = sum(...)
    all_prompt_token_ids = self.tokenize_fn(all_prompts, self.prompt_max_len, padding=False)["input_ids"]
    #(4)直接将所有的prompt输入到 vllm中进行生成
    refs = []
    batch_size = (len(all_prompt_token_ids) + len(llms) - 1) // len(llms)
    for i, llm in enumerate(llms):
        prompt_token_ids = all_prompt_token_ids[i * batch_size : (i + 1) * batch_size]
        refs.append(
            llm.add_requests.remote(rank, sampling_params=sampling_params, prompt_token_ids=prompt_token_ids)
        )
    ray.get(refs)
    #(5)所有设备之间进行同步
    ...
    #(6)获取vLLM生成内容
    all_output_refs = []
    for i, llm in enumerate(llms):
        all_output_refs.append(llm.get_responses.remote(rank))
    all_outputs = sum(ray.get(all_output_refs), [])
    #(7)处理vLLM输出
    samples_list = []
    for i in range(0, len(all_outputs), args.micro_rollout_batch_size):
        outputs = all_outputs[i : i + self.strategy.args.micro_rollout_batch_size]
        prompts = all_prompts[i : i + self.strategy.args.micro_rollout_batch_size]
        labels = all_labels[i : i + self.strategy.args.micro_rollout_batch_size]
        #(8)处理数据
        if not self.packing_samples:
            # 不对数据进行打包输出
            # | [PAD] [PAD] token token token | token token [EOS] [PAD] |
            # | token token token token token | token token [EOS] [PAD] |
            # | [PAD] [PAD] [PAD] token token | token token token [EOS] |
            # |<---------- prompt ----------->|<-------- answer ------->|
            ...
            samples_list.append(...)
        else:
            # 对数据进行打包输出
            # | token token token | token token [EOS] | token token token token token | token token [EOS] | token token | token token token [EOS] |
            # |<---  prompt ----->|<---- answer ----->|<---------- prompt ----------->|<----- answer ---->|<- prompt -->|<-------- answer ------->|
            ...
            samples_list.append(...)
    return samples_list

(4)llm.add_requests.remote(...)llm.get_responses.remote(...) 其中 add_requests 执行操作:直接通过vLLM生成文本:self.llm.generate(prompts=requests, sampling_params=sampling_params) 而在 get_responses则是直接获取vLLM所生成的内容。
(7-8)在处理输出过程中由于数据可能会将短文本进行拼接(packing_samples)但是不同短文本之间对应问题是不同的,因此就需要将输出进行整理。处理过程比较简单:
不对数据进行打包输出:只需要就算 outputs(是被切分了每次处理的一块) 中最大长度然后按照这个最大长度进行填补即可(左填充方式:在最左侧添加 [PAD]标记)
如果将数据进行打包输出:只需要将prompt和输出拼接起来即可

ppo_actor.py

https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/trainer/ray/ppo_actor.py

这里主要是定义了一个trainer类,用来执行数据加载、模型处理、记录loss等操作

class ActorPPOTrainer(BasePPOTrainer):
    def __init__(...):
        ...
        # (1)初始化记录器,比如说wandb、tensorboard,这部分比较简单不做介绍
        ...
        self.experience_maker = RemoteExperienceMaker(...) #TODO: 这部分需要特殊去看一下
        # (2)因为要使用ray就需要设置分布式训练设置
        backend = getattr(self.strategy.args, "vllm_sync_backend", "nccl") # 判断是否使用 nccl 进行通信
        ...
        # (3)ray 分布式节点设置
        if self.vllm_engines is not None and not self.use_cuda_ipc and torch.distributed.get_rank() == 0: # 如果不使用 nccl 进行通信
            # 初始化设置:获取主节点,并且随便绑定一个端口
            master_address = ray._private.services.get_node_ip_address()
            with socket.socket() as sock:
                sock.bind(("", 0))
                master_port = sock.getsockname()[1]

            vllm_num_engines, vllm_tensor_parallel_size = (
                self.strategy.args.vllm_num_engines,
                self.strategy.args.vllm_tensor_parallel_size,
            )
            # 获取规模:计算vllm引擎以及几张卡进行张量并行
            world_size = vllm_num_engines * vllm_tensor_parallel_size + 1

            use_ray = getattr(self.strategy.args, "vllm_sync_with_ray", False)
            group_name = "openrlhf"
            refs = [
                engine.init_process_group.remote(
                    master_address,
                    master_port,
                    i * vllm_tensor_parallel_size + 1,
                    world_size,
                    group_name,
                    backend=backend,
                    use_ray=use_ray,
                )
                for i, engine in enumerate(self.vllm_engines)
            ]
            # (4) 通信之间进行同步
            ...
            ray.get(refs)
        torch.distributed.barrier()
    def fit(
        ...
        prompts_dataloader,
        pretrain_dataloader,
        consumed_samples=0,
        num_update_steps_per_episodes=1,):
        # 主要进行数据加载/模型计算/模型存储等操作
        ...
        for episode in range(start_episode, args.num_episodes):
            ...
            for rand_prompts, labels in self.prompts_dataloader:
                for i, experience in enumerate(
                    self.experience_maker.make_experience_list(rand_prompts, labels, **self.generate_kwargs)
                ):


(3)使用 ray 分布式节点设置,初始化设置主节点/端口号/分组信息等,通过 refs 来存储初始化的 vLLM,代码中初始化操作为:

def init_process_group(self, master_address, master_port, rank_offset, world_size, group_name, backend, use_ray):
    return self.llm.collective_rpc(
        "init_process_group",
        args=(master_address, master_port, rank_offset, world_size, group_name, backend, use_ray),
    )

其中 self.llm=vllm.LLM(*args, **kwargs)

launcher.py

https://github.com/OpenRLHF/OpenRLHF/blob/main/openrlhf/trainer/ray/launcher.py

在launcher中主要也是定义各类分布式系统方式(如何将各类模型分不到设别上),首先来说定义两个基类:
DistributedTorchRayActor:这个类是一个基于 Ray 的分布式 Actor,负责设置分布式训练或推理环境,它初始化分布式环境的配置,包括节点地址、端口、进程排名等。
BasePPORole(DistributedTorchRayActor):扩展上面所定义的分布式环境配置,并提供了批量推理的功能。
下面就是对各类模型:rewardmodel等进行分布式进行处理

@ray.remote(num_gpus=1)
class ReferenceModelRayActor(BasePPORole):
    def init_model_from_pretrained(self, strategy: DeepspeedStrategy, pretrain):
        self._setup_distributed(strategy)
        model = Actor(...)
        # 1)这里也就是对于一些模型进行初始化操作
        ...
    def forward(...):
        ...
        # 2)这里也就是直接将文本输入到模型中进行计算

@ray.remote(num_gpus=1)
class RewardModelRayActor(BasePPORole):
    def init_model_from_pretrained(self, strategy: DeepspeedStrategy, pretrain):
        self._setup_distributed(strategy)
        model = get_llm_for_sequence_regression(...)
        # 3) 初始化模型操作
        ...
    def forward(...):
        ...
        # 4)这里也就是直接将文本输入到模型中进行计算

(1、2、3、4)、模型初始化操作,这里没什么好解释的直接通过继承的类(BasePPORole)中对 Deepspeed 进行初始化。模型计算也是比较常规的 forward 计算范式。对于 Actor 操作:📃

class PPORayActorGroup:
    def __init(
        ...,
        pg: PlacementGroup = None,
        ...):
        ...
        self._initiate_actors(pg, num_gpus_per_actor)
    def _initiate_actors(
        self, 
        pg, # 通过ray创建资源组
        num_gpus_per_actor):
        ...
        self._actor_handlers = [master_actor]

    def async_fit_actor_model(
        self,
        critic_model_group: "PPORayActorGroup",
        initial_model_group: "PPORayActorGroup",
        reward_model_groups: List["PPORayActorGroup"],
        remote_rm_urls: List[str] = None,
        reward_fn: Callable[[List[torch.Tensor]], torch.Tensor] = None,
        vllm_engines: List = None,):
        ...
        critic_actors = critic_model_group._actor_handlers if critic_model_group else None
        initial_actors = initial_model_group._actor_handlers if initial_model_group else None



    def async_init_model_from_pretrained(...):
        # 初始化模型
        return [actor.init_model_from_pretrained.remote(*args, **kwargs) for actor in self._actor_handlers]

    def async_save_model(self):
        # 保存模型
        return [actor.save_model.remote() for actor in self._actor_handlers]

    def async_run_method(self, method_name, *args, **kwargs):
        refs = []
        for actor in self._actor_handlers:
            method = getattr(actor, method_name)
            refs.append(method.remote(*args, **kwargs))
        return refs

(1)、在初始化( async_init_model_from_pretrained )以及保存( async_save_model )中里面所使用的 actor 就是上面提到的 ReferenceModelRayActor

记录

https://github.com/Dao-AILab/flash-attention/issues/432#issuecomment-1698610752

Footer Image
-->