import dataclasses
import datetime
import os
Â
import datasets
import tokenizers
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.useful as F
import torch.optim.lr_scheduler as lr_scheduler
import tqdm
from torch import Tensor
from torch.distributed.checkpoint import load, save
from torch.distributed.checkpoint.default_planner import DefaultLoadPlanner
from torch.distributed.fsdp import FSDPModule, fully_shard
from torch.distributed.tensor import Replicate, Shard
from torch.distributed.tensor.parallel import (
    ColwiseParallel,
    PrepareModuleInput,
    RowwiseParallel,
    SequenceParallel,
    loss_parallel,
    parallelize_module,
)
from torch.utils.information.distributed import DistributedSampler
Â
# Set default to bfloat16
torch.set_default_dtype(torch.bfloat16)
print(“NCCL model:”, torch.cuda.nccl.model())
Â
# Construct the mannequin
@dataclasses.dataclass
class LlamaConfig:
    “”“Outline Llama mannequin hyperparameters.”“”
    vocab_size: int = 50000  # Dimension of the tokenizer vocabulary
    max_position_embeddings: int = 2048  # Most sequence size
    hidden_size: int = 768  # Dimension of hidden layers
    intermediate_size: int = 4*768  # Dimension of MLP’s hidden layer
    num_hidden_layers: int = 12  # Variety of transformer layers
    num_attention_heads: int = 12  # Variety of consideration heads
    num_key_value_heads: int = 3  # Variety of key-value heads for GQA
Â
Â
class RotaryPositionEncoding(nn.Module):
    “”“Rotary place encoding.”“”
Â
    def __init__(self, dim: int, max_position_embeddings: int) -> None:
        “”“Initialize the RotaryPositionEncoding module.
Â
        Args:
            dim: The hidden dimension of the enter tensor to which RoPE is utilized
            max_position_embeddings: The utmost sequence size of the enter tensor
        ““”
        tremendous().__init__()
        self.dim = dim
        self.max_position_embeddings = max_position_embeddings
        # compute a matrix of ntheta_i
        N = 10_000.0
        inv_freq = 1.0 / (N ** (torch.arange(0, dim, 2) / dim))
        inv_freq = torch.cat((inv_freq, inv_freq), dim=–1)
        place = torch.arange(max_position_embeddings)
        sinusoid_inp = torch.outer(place, inv_freq)
        # save cosine and sine matrices as buffers, not parameters
        self.register_buffer(“cos”, sinusoid_inp.cos())
        self.register_buffer(“sin”, sinusoid_inp.sin())
Â
    def ahead(self, x: Tensor) -> Tensor:
        “”“Apply RoPE to tensor x.
Â
        Args:
            x: Enter tensor of form (batch_size, seq_length, num_heads, head_dim)
Â
        Returns:
            Output tensor of form (batch_size, seq_length, num_heads, head_dim)
        ““”
        batch_size, seq_len, num_heads, head_dim = x.form
        gadget = x.gadget
        dtype = x.dtype
        # remodel the cosine and sine matrices to 4D tensor and the identical dtype as x
        cos = self.cos.to(gadget, dtype)[:seq_len].view(1, seq_len, 1, –1)
        sin = self.sin.to(gadget, dtype)[:seq_len].view(1, seq_len, 1, –1)
        # apply RoPE to x
        x1, x2 = x.chunk(2, dim=–1)
        rotated = torch.cat((–x2, x1), dim=–1)
        output = (x * cos) + (rotated * sin)
        return output
Â
Â
class LlamaAttention(nn.Module):
    “”“Grouped-query consideration with rotary embeddings.”“”
Â
    def __init__(self, config: LlamaConfig) -> None:
        tremendous().__init__()
        self.hidden_size = config.hidden_size
        self.num_heads = config.num_attention_heads
        self.head_dim = self.hidden_size // self.num_heads
        self.num_kv_heads = config.num_key_value_heads  # GQA: H_kv < H_q
Â
        # hidden_size have to be divisible by num_heads
        assert (self.head_dim * self.num_heads) == self.hidden_measurement
Â
        # Linear layers for Q, Okay, V projections
        self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=False)
        self.k_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)
        self.v_proj = nn.Linear(self.hidden_size, self.num_kv_heads * self.head_dim, bias=False)
        self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=False)
Â
    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:
        bs, seq_len, dim = hidden_states.measurement()
Â
        # Undertaking inputs to Q, Okay, V
        query_states = self.q_proj(hidden_states).view(bs, seq_len, self.num_heads, self.head_dim)
        key_states = self.k_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)
        value_states = self.v_proj(hidden_states).view(bs, seq_len, self.num_kv_heads, self.head_dim)
Â
        # Apply rotary place embeddings
        query_states = rope(query_states)
        key_states = rope(key_states)
Â
        # Transpose tensors from BSHD to BHSD dimension for scaled_dot_product_attention
        query_states = query_states.transpose(1, 2)
        key_states = key_states.transpose(1, 2)
        value_states = value_states.transpose(1, 2)
Â
        # Use PyTorch’s optimized consideration implementation
        # setting is_causal=True is incompatible with setting express consideration masks
        attn_output = F.scaled_dot_product_attention(
            query_states,
            key_states,
            value_states,
            attn_mask=attn_mask,
            dropout_p=0.0,
            enable_gqa=True,
        )
Â
        # Transpose output tensor from BHSD to BSHD dimension, reshape to 3D, after which venture output
        attn_output = attn_output.transpose(1, 2).reshape(bs, seq_len, self.hidden_size)
        attn_output = self.o_proj(attn_output)
        return attn_output
Â
Â
class LlamaMLP(nn.Module):
    “”“Feed-forward community with SwiGLU activation.”“”
Â
    def __init__(self, config: LlamaConfig) -> None:
        tremendous().__init__()
        # Two parallel projections for SwiGLU
        self.gate_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
        self.up_proj = nn.Linear(config.hidden_size, config.intermediate_size, bias=False)
        self.act_fn = F.silu  # SwiGLU activation perform
        # Undertaking again to hidden measurement
        self.down_proj = nn.Linear(config.intermediate_size, config.hidden_size, bias=False)
Â
    def ahead(self, x: Tensor) -> Tensor:
        # SwiGLU activation: multiply gate and up-projected inputs
        gate = self.act_fn(self.gate_proj(x))
        up = self.up_proj(x)
        return self.down_proj(gate * up)
Â
Â
class LlamaDecoderLayer(nn.Module):
    “”“Single transformer layer for a Llama mannequin.”“”
Â
    def __init__(self, config: LlamaConfig) -> None:
        tremendous().__init__()
        self.input_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)
        self.self_attn = LlamaAttention(config)
        self.post_attention_layernorm = nn.RMSNorm(config.hidden_size, eps=1e–5)
        self.mlp = LlamaMLP(config)
Â
    def ahead(self, hidden_states: Tensor, rope: RotaryPositionEncoding, attn_mask: Tensor) -> Tensor:
        # First residual block: Self-attention
        residual = hidden_states
        hidden_states = self.input_layernorm(hidden_states)
        attn_outputs = self.self_attn(hidden_states, rope=rope, attn_mask=attn_mask)
        hidden_states = attn_outputs + residual
Â
        # Second residual block: MLP
        residual = hidden_states
        hidden_states = self.post_attention_layernorm(hidden_states)
        hidden_states = self.mlp(hidden_states) + residual
        return hidden_states
Â
Â
class LlamaModel(nn.Module):
    “”“The complete Llama mannequin with none pretraining heads.”“”
Â
    def __init__(self, config: LlamaConfig) -> None:
        tremendous().__init__()
        self.rotary_emb = RotaryPositionEncoding(
            config.hidden_size // config.num_attention_heads,
            config.max_position_embeddings,
        )
Â
        self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size)
        self.layers = nn.ModuleList([
            LlamaDecoderLayer(config) for _ in range(config.num_hidden_layers)
        ])
        self.norm = nn.RMSNorm(config.hidden_size, eps=1e–5)
Â
    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:
        # Convert enter token IDs to embeddings
        hidden_states = self.embed_tokens(input_ids)
        # Course of by means of all transformer layers, then the ultimate norm layer
        for layer in self.layers:
            hidden_states = layer(hidden_states, rope=self.rotary_emb, attn_mask=attn_mask)
        hidden_states = self.norm(hidden_states)
        # Return the ultimate hidden states
        return hidden_states
Â
Â
class LlamaForPretraining(nn.Module):
    def __init__(self, config: LlamaConfig) -> None:
        tremendous().__init__()
        self.base_model = LlamaModel(config)
        self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False)
Â
    def ahead(self, input_ids: Tensor, attn_mask: Tensor) -> Tensor:
        hidden_states = self.base_model(input_ids, attn_mask)
        return self.lm_head(hidden_states)
Â
Â
def create_causal_mask(batch: Tensor, dtype: torch.dtype = torch.float32) -> Tensor:
    “”“Create a causal masks for self-attention.
Â
    Args:
        batch: Batch of sequences, form (batch_size, seq_len)
        dtype: Information sort of the masks
Â
    Returns:
        Causal masks of form (seq_len, seq_len)
    ““”
    batch_size, seq_len = batch.form
    masks = torch.full((seq_len, seq_len), float(“-inf”), gadget=batch.gadget, dtype=dtype)
                .triu(diagonal=1)
    return masks
Â
Â
def create_padding_mask(batch: Tensor, padding_token_id: int, dtype: torch.dtype = torch.float32) -> Tensor:
    “”“Create a padding masks for a batch of sequences for self-attention.
Â
    Args:
        batch: Batch of sequences, form (batch_size, seq_len)
        padding_token_id: ID of the padding token
        dtype: Information sort of the masks
Â
    Returns:
        Padding masks of form (batch_size, 1, seq_len, seq_len)
    ““”
    padded = torch.zeros_like(batch, gadget=batch.gadget, dtype=dtype)
                  .masked_fill(batch == padding_token_id, float(“-inf”))
    masks = padded[:,:,None] + padded[:,None,:]
    return masks[:, None, :, :]
Â
Â
# Generator perform to create padded sequences of mounted size
class PretrainingDataset(torch.utils.information.Dataset):
    def __init__(self, dataset: datasets.Dataset, tokenizer: tokenizers.Tokenizer,
                seq_length: int):
        self.dataset = dataset
        self.tokenizer = tokenizer
        self.seq_length = seq_length
        self.bot = tokenizer.token_to_id(“[BOT]”)
        self.eot = tokenizer.token_to_id(“[EOT]”)
        self.pad = tokenizer.token_to_id(“[PAD]”)
Â
    def __len__(self):
        return len(self.dataset)
Â
    def __getitem__(self, index: int) -> tuple[Tensor, Tensor]:
        “”“Get a sequence of token ids from the dataset. [BOT] and [EOT] tokens
        are added. Clipped and padded to the sequence size.
        ““”
        seq = self.dataset[index][“text”]
        tokens: record[int] = [self.bot] + self.tokenizer.encode(seq).ids + [self.eot]
        # pad to focus on sequence size
        toklen = len(tokens)
        if toklen < self.seq_length+1:
            pad_length = self.seq_length+1 – toklen
            tokens += [self.pad] * pad_size
        # return the sequence
        x = torch.tensor(tokens[:self.seq_length], dtype=torch.int64)
        y = torch.tensor(tokens[1:self.seq_length+1], dtype=torch.int64)
        return x, y
Â
Â
def load_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:
    dist.barrier()
    load(
        {“mannequin”: mannequin, “optimizer”: optimizer},
        checkpoint_id=“checkpoint-dist”,
        planner=DefaultLoadPlanner(allow_partial_load=True),  # ignore keys for RoPE buffer
    )
    scheduler.load_state_dict(
        torch.load(“checkpoint-dist/lrscheduler.pt”, map_location=gadget),
    )
    dist.barrier()
Â
Â
def save_checkpoint(mannequin: nn.Module, optimizer: torch.optim.Optimizer, scheduler: lr_scheduler.SequentialLR) -> None:
    dist.barrier()
    save(
        {“mannequin”: mannequin, “optimizer”: optimizer},
        checkpoint_id=“checkpoint-dist”,
    )
    if dist.get_rank() == 0:
        torch.save(scheduler.state_dict(), “checkpoint-dist/lrscheduler.pt”)
    dist.barrier()
Â
Â
# Load the tokenizer and dataset
tokenizer = tokenizers.Tokenizer.from_file(“bpe_50K.json”)
dataset = datasets.load_dataset(“HuggingFaceFW/fineweb”, “sample-10BT”, break up=“practice”)
Â
# Initialize the distributed surroundings
dist.init_process_group(backend=“nccl”, timeout=datetime.timedelta(seconds=60))
local_rank = int(os.environ[“LOCAL_RANK”])
gadget = torch.gadget(f“cuda:{local_rank}”)
rank = dist.get_rank()
world_size = dist.get_world_size()
print(f“World measurement {world_size}, rank {rank}, native rank {local_rank}. Utilizing {gadget}”)
Â
# Initialize the mesh for tensor parallelism
n_tensor_parallel = 2
assert world_size % n_tensor_parallel == 0, “Count on world measurement to be divisible by variety of tensor parallel GPUs”
mesh = dist.device_mesh.init_device_mesh(
    “cuda”,
    (world_size // n_tensor_parallel, n_tensor_parallel),
    mesh_dim_names=(“dp”, “tp”),
)
print(f“({rank}) Mesh: {mesh}, DP measurement: {mesh[‘dp’].measurement()}, TP measurement: {mesh[‘tp’].measurement()}, DP native rank: {mesh[‘dp’].get_local_rank()}, TP native rank: {mesh[‘tp’].get_local_rank()}”)
Â
# Create pretraining mannequin on meta gadget, on all ranks
with torch.gadget(“meta”):
    model_config = LlamaConfig()
    mannequin = LlamaForPretraining(model_config)
Â
# Arrange tensor parallelism on every transformer block within the base mannequin
tp_plan = {
    “input_layernorm”: SequenceParallel(),
    “self_attn”: PrepareModuleInput(
        input_layouts=Shard(dim=1),  # just one place arg shall be used
        desired_input_layouts=Replicate(),
    ),
    # Q/Okay projections output shall be used with RoPE, must be replicated
    # Q/Okay/V output shall be used with GQA, additionally must be replicated
    “self_attn.q_proj”: ColwiseParallel(output_layouts=Replicate()),
    “self_attn.k_proj”: ColwiseParallel(output_layouts=Replicate()),
    “self_attn.v_proj”: ColwiseParallel(output_layouts=Replicate()),
    “self_attn.o_proj”: RowwiseParallel(input_layouts=Replicate(), output_layouts=Shard(1)),
    “post_attention_layernorm”: SequenceParallel(),
    “mlp”: PrepareModuleInput(
        input_layouts=Shard(dim=1),
        desired_input_layouts=Replicate(),
    ),
    “mlp.gate_proj”: ColwiseParallel(),
    “mlp.up_proj”: ColwiseParallel(),
    “mlp.down_proj”: RowwiseParallel(output_layouts=Shard(1)),
}
for layer in mannequin.base_model.layers:
    parallelize_module(layer, mesh[“tp”], tp_plan)
Â
# Arrange tensor parallelism on the embedding and output norm layers within the base mannequin
# and the prediction head within the top-level mannequin
tp_plan = {
    “base_model.embed_tokens”: RowwiseParallel(
        input_layouts=Replicate(),
        output_layouts=Shard(1),
    ),
    “base_model.norm”: SequenceParallel(),
    “lm_head”: ColwiseParallel(
        input_layouts=Shard(1),
        # output_layouts=Replicate(), # provided that not utilizing loss parallel
        use_local_output=False,  # Maintain DTensor output for loss parallel
    ),
}
parallelize_module(mannequin, mesh[“tp”], tp_plan)
Â
# Convert tensor-parallelized mannequin to FSDP2, should shard each part
# shard throughout the “dp” dimension of the mesh
for layer in mannequin.base_model.layers:
    fully_shard(layer, mesh=mesh[“dp”])
fully_shard(mannequin.base_model, mesh=mesh[“dp”])
fully_shard(mannequin, mesh=mesh[“dp”])
Â
def reset_all_weights(mannequin: nn.Module) -> None:
    “”“Initialize all weights of the mannequin after transferring it away from meta gadget.”“”
    @torch.no_grad()
    def weight_reset(m: nn.Module):
        reset_parameters = getattr(m, “reset_parameters”, None)
        if callable(reset_parameters):
            m.reset_parameters()
Â
    # Applies fn recursively to mannequin itself and all of mannequin.kids()
    mannequin.apply(fn=weight_reset)
Â
torch.manual_seed(42)
mannequin.to_empty(gadget=gadget)
reset_all_weights(mannequin)
assert isinstance(mannequin, FSDPModule), f“Anticipated FSDPModule, acquired {sort(mannequin)}”
Â
# Coaching parameters
epochs = 3
learning_rate = 1e–3
batch_size = 64 // mesh[“dp”].measurement()
seq_length = 512
num_warmup_steps = 1000
PAD_TOKEN_ID = tokenizer.token_to_id(“[PAD]”)
mannequin.practice()
Â
# DataLoader, optimizer, scheduler, and loss perform
# Sampler is required to shard the dataset throughout world measurement
dataset = PretrainingDataset(dataset, tokenizer, seq_length)
sampler = DistributedSampler(
    dataset, shuffle=False, drop_last=True,
    num_replicas=mesh[“dp”].measurement(),
    rank=mesh[“dp”].get_local_rank(),
)
dataloader = torch.utils.information.DataLoader(
    dataset,
    sampler=sampler,
    batch_size=batch_size,
    pin_memory=True,  # non-obligatory
    shuffle=False,
    num_workers=2,
    prefetch_factor=2,
)
num_training_steps = len(dataloader) * epochs
Â
optimizer = torch.optim.AdamW(
    mannequin.parameters(), lr=learning_rate, betas=(0.9, 0.99), eps=1e–8, weight_decay=0.1,
)
warmup_scheduler = lr_scheduler.LinearLR(
    optimizer,
    start_factor=0.1, end_factor=1.0, total_iters=num_warmup_steps,
)
cosine_scheduler = lr_scheduler.CosineAnnealingLR(
    optimizer,
    T_max=num_training_steps – num_warmup_steps,
    eta_min=0,
)
scheduler = lr_scheduler.SequentialLR(
    optimizer,
    schedulers=[warmup_scheduler, cosine_scheduler],
    milestones=[num_warmup_steps],
)
loss_fn = nn.CrossEntropyLoss(ignore_index=PAD_TOKEN_ID)
Â
# if checkpoint-dist dir exists, load the checkpoint to mannequin and optimizer
if os.path.exists(“checkpoint-dist”):
    load_checkpoint(mannequin, optimizer, scheduler)
Â
# begin coaching
print(f“({rank}) Beginning coaching”)
for epoch in vary(epochs):
    pbar = tqdm.tqdm(dataloader, desc=f“({rank}) Epoch {epoch+1}/{epochs}”)
    for batch_id, batch in enumerate(pbar):
        if batch_id % 1000 == 0:
            save_checkpoint(mannequin, optimizer, scheduler)
        # Express prefetching earlier than sending any information to mannequin
        mannequin.unshard()
        # Get batched information, transfer from CPU to GPU
        input_ids, target_ids = batch
        input_ids = input_ids.to(gadget)
        target_ids = target_ids.to(gadget)
        # create consideration masks: causal masks + padding masks
        attn_mask = create_causal_mask(input_ids) +
                    create_padding_mask(input_ids, PAD_TOKEN_ID)
        # Extract output from mannequin
        logits = mannequin(input_ids, attn_mask)
        optimizer.zero_grad()
        with loss_parallel():
            # Compute loss: cross-entropy between logits and goal, ignoring padding tokens
            loss = loss_fn(logits.view(–1, logits.measurement(–1)), target_ids.view(–1))
            # Backward with loss on DTensor
            loss.backward()
        torch.nn.utils.clip_grad_norm_(mannequin.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        pbar.set_postfix(loss=loss.merchandise())
        pbar.replace(1)
    pbar.shut()
Â
# Save the mannequin
save_checkpoint(mannequin, optimizer, scheduler)
Â
# Clear up the distributed surroundings
dist.destroy_process_group()

