Skip to content

Training Pipeline

Detailed technical guide to NeuroShard's decentralized training system.

Overview

NeuroShard uses a hybrid training approach combining:

  1. Pipeline Parallelism: Layers distributed across nodes
  2. DiLoCo Protocol: Infrequent synchronization
  3. Robust Aggregation: Byzantine-tolerant gradient averaging

Genesis Dataset

Training data is drawn from a cryptographically verified manifest of high-quality datasets.

Available Sources

DatasetDescriptionSize
FineWebHigh-quality web text15TB
RedPajamaOpen reproduction of LLaMA data5TB
The PileDiverse text corpus825GB

Deterministic Sharding

Each Driver is assigned shards based on their node ID:

python
shard_id = hash(node_id) % total_shards

This enables:

  • Reproducibility: Same node always gets same data
  • Verification: Peers can check Driver's work
  • Load Distribution: Data spread across network

Data Flow

python
# Driver node
class GenesisDataLoader:
    def __init__(self, node_id, tokenizer):
        self.shard_id = hash(node_id) % TOTAL_SHARDS
        self.tokenizer = tokenizer
    
    def get_batch(self, batch_size=4):
        # Load from assigned shard
        texts = self.load_shard_data()
        
        # Tokenize
        tokens = self.tokenizer.encode(texts)
        
        # Create input/label pairs
        input_ids = tokens[:, :-1]
        labels = tokens[:, 1:]
        
        return input_ids, labels

Forward Pass

Local Forward (Single Node)

When a node holds all layers:

python
def forward_local(input_ids):
    # Embed tokens
    hidden = model.embed(input_ids)
    
    # Forward through all layers
    for layer in model.layers:
        hidden, _ = layer(hidden)
    
    # Compute logits
    logits = model.compute_logits(hidden)
    
    return logits

Distributed Forward (Multiple Nodes)

When layers are spread across nodes:

python
def forward_distributed(input_ids, session_id):
    # Driver: Embed and forward Layer 0
    hidden = model.embed(input_ids)
    hidden = model.layers[0](hidden)
    
    # Send to next node in pipeline
    next_node = get_next_hop(layer_id=1)
    send_activations(next_node, hidden, labels, session_id)

Each node receives activations, processes its layers, and forwards to the next:

python
def handle_pipeline_forward(activations, labels, session_id, sender_url):
    # Forward through my layers
    hidden = model.forward_my_layers(activations)
    
    if model.has_lm_head:
        # I'm the Validator - compute loss
        logits = model.compute_logits(hidden)
        loss = cross_entropy(logits, labels)
        
        # Initiate backward pass
        backward(loss, session_id, sender_url)
    else:
        # Forward to next node
        next_node = get_next_hop(my_last_layer + 1)
        send_activations(next_node, hidden, labels, session_id)

Backward Pass

Gradient Computation

The backward pass flows in reverse:

python
def backward_pass(loss, session_id):
    # Validator computes gradients
    optimizer.zero_grad()
    loss.backward()
    
    # Clip gradients
    clip_grad_norm_(model.parameters(), max_norm=1.0)
    
    # Step optimizer
    optimizer.step()
    
    # Send gradients to previous node
    if session_context[session_id].sender_url:
        send_gradients(sender_url, hidden.grad, session_id)

Gradient Routing

Each node:

  1. Receives gradient from next node
  2. Backward through local layers
  3. Steps local optimizer
  4. Sends gradient to previous node

DiLoCo Protocol

DiLoCo enables training with 500x less communication.

Inner Loop (Local Training)

Each node trains independently for N steps:

python
class DiLoCoTrainer:
    def __init__(self, model, inner_steps=500):
        self.inner_steps = inner_steps
        self.initial_weights = {}
    
    def start_inner_loop(self):
        # Save initial weights
        for name, param in model.named_parameters():
            self.initial_weights[name] = param.data.clone()
        self.step_count = 0
    
    def inner_step(self, loss):
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        self.step_count += 1

Pseudo-Gradient Computation

After N inner steps, compute the pseudo-gradient:

python
def compute_pseudo_gradient(self):
    pseudo_grads = {}
    for name, param in model.named_parameters():
        # Delta = what we learned
        delta = self.initial_weights[name] - param.data
        pseudo_grads[name] = delta
    return pseudo_grads

Outer Loop (Synchronization)

Sync with peers and apply outer optimizer:

python
async def outer_step(self):
    # Compute local pseudo-gradient
    pseudo_grads = self.compute_pseudo_gradient()
    
    # Gossip to peers
    aggregated = await gossip_gradients(pseudo_grads)
    
    # Apply outer optimizer (Nesterov momentum)
    self.outer_optimizer.step(model, aggregated)
    
    # Start new inner loop
    self.start_inner_loop()

Outer Optimizer

Uses Nesterov momentum for better convergence:

python
class OuterOptimizer:
    def __init__(self, lr=0.7, momentum=0.9):
        self.lr = lr
        self.momentum = momentum
        self.velocity = {}
    
    def step(self, model, pseudo_gradients):
        for name, param in model.named_parameters():
            delta = pseudo_gradients[name]
            
            # Initialize velocity
            if name not in self.velocity:
                self.velocity[name] = torch.zeros_like(delta)
            
            v = self.velocity[name]
            
            # Nesterov update
            v.mul_(self.momentum).add_(delta)
            update = self.lr * (self.momentum * v + delta)
            
            param.data.add_(update)

Robust Aggregation

Protects against malicious gradient contributions.

Trimmed Mean

Remove top/bottom 10% before averaging:

python
def trimmed_mean(contributions, trim_fraction=0.1):
    stacked = torch.stack(contributions)  # [n, ...]
    n = len(contributions)
    trim_count = int(n * trim_fraction)
    
    # Sort and trim
    sorted_tensors = stacked.sort(dim=0)[0]
    trimmed = sorted_tensors[trim_count:-trim_count]
    
    return trimmed.mean(dim=0)

Krum

Select gradient closest to majority:

python
def krum(contributions, num_byzantine=1):
    n = len(contributions)
    
    # Compute pairwise distances
    distances = compute_pairwise_distances(contributions)
    
    # For each gradient, sum distances to n-f-2 closest
    scores = []
    for i in range(n):
        sorted_dists = sorted(distances[i])
        score = sum(sorted_dists[1:n-num_byzantine-1])
        scores.append(score)
    
    # Select gradient with minimum score
    best_idx = min(range(n), key=lambda i: scores[i])
    return contributions[best_idx]

Gradient Validation

Before accepting a gradient:

python
def validate_gradient(submitted, reference):
    # Check 1: Cosine similarity (direction)
    cosine_sim = F.cosine_similarity(submitted, reference)
    if cosine_sim < 0.3:
        return False, "Direction mismatch"
    
    # Check 2: Magnitude ratio
    ratio = submitted.norm() / reference.norm()
    if ratio > 10 or ratio < 0.1:
        return False, "Magnitude implausible"
    
    # Check 3: Variance ratio
    var_ratio = submitted.var() / reference.var()
    if var_ratio > 100:
        return False, "Variance too high"
    
    return True, "Valid"

Training Configuration

Default Settings

ParameterDefaultDescription
inner_steps500Steps before sync
inner_lr1e-4Inner optimizer learning rate
outer_lr0.7Outer optimizer learning rate
outer_momentum0.9Nesterov momentum
max_grad_norm1.0Gradient clipping
trim_fraction0.1Trimmed mean fraction

Memory-Adaptive Batch Size

python
def calculate_batch_size(available_memory_mb, num_layers):
    # Model memory (weights + grads + optimizer states)
    model_memory = model_params * 16  # bytes
    
    # Activation memory per sample
    activation_memory = seq_len * hidden_dim * num_layers * 8
    
    # Calculate max batch size
    usable_memory = available_memory_mb * 0.3  # 30% for activations
    max_batch = usable_memory / activation_memory
    
    return max(1, min(max_batch, 8))

Checkpointing

Checkpoints are saved automatically to ~/.neuroshard/checkpoints/:

python
def save_checkpoint(self):
    checkpoint = {
        "layer_ids": self.my_layer_ids,
        "architecture": self.architecture.to_dict(),
        "layers": {id: layer.state_dict() for id, layer in self.layers.items()},
        "optimizer": self.optimizer.state_dict(),
        "diloco": self.diloco_trainer.state_dict(),
        "training_rounds": self.total_training_rounds,
        "current_loss": self.current_loss,
        "timestamp": time.time()
    }
    # Uses wallet_id (derived from token) for stable naming across restarts
    torch.save(checkpoint, f"~/.neuroshard/checkpoints/dynamic_node_{self.wallet_id}.pt")

Checkpoint Naming

Checkpoints use wallet_id (derived from your token) instead of node_id. This ensures:

  • ✅ Same checkpoint found across restarts (even if machine ID changes)
  • ✅ Multiple nodes with same token share checkpoint identity
  • ✅ Earnings tied to wallet, training tied to same checkpoint

Checkpoint Frequency

  • Every 10 training steps (frequent saves)
  • After each DiLoCo outer sync (every 500 steps)
  • On graceful shutdown (Ctrl+C or API shutdown)
  • DiLoCo state included for resuming inner loop position

Architecture Stability

To prevent checkpoint incompatibility due to memory fluctuations, the architecture calculation rounds memory to 500MB tiers:

6568MB → 6500MB tier → Same architecture every time
6622MB → 6500MB tier → Same architecture every time

This ensures restarts don't invalidate checkpoints due to small memory variations.

Verifying Training is Working

Use the global training tracker to verify the distributed training is actually improving the model:

bash
# Quick verification
curl http://localhost:8000/api/training/verify

# Detailed global status
curl http://localhost:8000/api/training/global

Global LLM Status Dashboard

Visit neuroshard.com and click Training in the navigation to see the live Global LLM Status:

MetricWhat it shows
Training NodesNumber of nodes contributing compute
Global LossExponential moving average loss across all nodes
Total StepsCombined training iterations across network
Data ShardsUnique dataset shards being trained on
Model Convergence% of nodes with identical model weights

The dashboard also shows:

  • DiLoCo Protocol Status: Inner steps progress and outer sync count
  • Active Training Nodes: List of nodes currently training
  • Loss Trend: Whether loss is improving, stable, or needs attention

Key Metrics to Monitor

MetricWhat it meansHealthy Value
training_verifiedModel is actually improvingtrue
hash_agreement_rate% of nodes with same weights100%
loss_trendDirection of loss"improving"
sync_success_rateGradient syncs working> 50%

Hash Agreement

If hash_agreement_rate drops below 100%, nodes have diverged. This means gradient synchronization isn't working properly.

Single Node Training

With only 1 training node, all metrics reflect that node's local training. DiLoCo outer syncs will show 0 until the 500th step, at which point the node will attempt to sync with peers. If no peers are available, it applies its own pseudo-gradients locally.

Next Steps

Released under the MIT License.