Feeding the Beast: Building Data Pipelines for an 8B Parameter Model

September 12, 2024 · 18 min read · machine-learning, data-engineering, pipelines, llm, infrastructure

Feeding the Beast: Building Data Pipelines for an 8B Parameter Model

The Tiger Team

  1. The company decides to build its first large-scale internal language model. Not a toy. Not a proof of concept. An 8 billion parameter model that would run in production, trained on proprietary data to handle domain-specific tasks that general-purpose models couldn't.

They assembled a tiger team: six ML scientists who'd worked on transformer architectures, four systems engineers who'd built distributed training infrastructure, and three data engineers—including me—whose job was to figure out how to actually get data into this thing.

The ML scientists had the glamorous work: model architecture, attention mechanisms, training dynamics. We had the unsexy but critical problem: where do you get hundreds of billions of tokens of high-quality training data, and how do you process it fast enough to keep GPU clusters busy?

An internal model made sense for a few reasons. External models couldn't see proprietary data. Latency requirements ruled out API calls for certain use cases. And there's strategic value in owning your own models rather than renting capability from competitors.

The scale was sobering. Training an 8B model requires roughly 160 billion tokens minimum—that's the Chinchilla-optimal ratio. Our target was 200 billion tokens to give the model some cushion. Each token is roughly 4 bytes on average, so we needed to process, clean, deduplicate, and tokenize about 800GB of raw text. The actual storage requirements were higher because we kept metadata, quality scores, and multiple processing stages.

Data engineering turned out to be 80% of the problem.

The Data Challenge

Volume: Enormous Data

Training an 8B parameter model requires data at a scale that breaks typical data engineering intuitions.

The math:

Training target: 200 billion tokens
Average token size: ~4 bytes
Raw text needed: ~800GB

But that's just the final training data. With metadata, lineage tracking,
quality scores, deduplication indices, and multiple processing stages:

Total storage footprint: ~15TB
Peak processing throughput: 50GB/hour

We needed both hot storage for active processing and cold storage for archives and rollback. The hot tier lived on NVMe SSDs for speed; the cold tier on object storage for cost. Moving data between tiers became its own optimization problem.

Velocity: The Data Never Stops

This wasn't a one-time ETL job. The model would be continuously trained as new data became available—internal documentation gets updated, new code gets written, new customer interactions happen. We needed streaming ingestion that could handle bursts while maintaining quality.

Peak ingestion rate: 100,000 documents per hour during bulk loads, settling to a steady state of about 5,000 per hour during normal operation. Each document could be anything from a 50-word internal wiki page to a 10,000-line code file.

Variety: Not All Data Is Created Equal

We pulled from:

  • Internal documentation systems (Confluence-style wikis, technical specs, runbooks)
  • Code repositories (millions of files across hundreds of repos)
  • Customer interaction logs (properly anonymized—more on this nightmare later)
  • Technical specifications and API documentation
  • Support tickets and incident reports
  • Internal communications (again, heavily processed for privacy)

Each source had its own:

  • Format: JSON, XML, plaintext, Markdown, structured logs, HTML that looked like it was written in 1998
  • Access patterns: Batch APIs that rate-limited you aggressively, streaming connections that died randomly, database exports that took 6 hours
  • Quality characteristics: Some sources were gold (well-written technical docs); others were garbage (auto-generated reports full of boilerplate)
  • Privacy requirements: Ranging from "public-facing docs" to "contains PII and must be scrubbed with extreme prejudice"

Veracity: Garbage In, Garbage Model

The ML scientists kept saying: "The model is only as good as the data."

The data was messy. Obviously. But the specific ways it was messy were instructive:

  • Duplicate content: The same document copied into 47 different wikis, sometimes with slight modifications
  • Template pollution: Auto-generated reports where 95% of the text was boilerplate template and 5% was actual information
  • Mixed quality: A brilliant technical deep-dive followed by someone's stream-of-consciousness meeting notes
  • Encoding chaos: UTF-8, UTF-16, Latin-1, and things that claimed to be UTF-8 but were lying
  • Invisible contamination: Test data mixed with production data, synthetic examples that looked real

Pipeline Architecture

The Three-Stage Pipeline

We settled on a three-stage architecture after several iterations. The first version tried to do too much in a single pass and became impossible to debug.

Stage 1: Ingestion & Standardization

Every data source got its own ingestion connector. The connector's job was simple: pull data from the source system and convert it to our internal schema. No filtering, no quality judgments—just normalization.

@dataclass
class Document:
    source_id: str           # Where did this come from?
    content: str             # Raw text content
    content_type: str        # "code", "documentation", "conversation", etc.
    language: str            # "en", "python", "java", etc.
    timestamp: datetime      # When was this created/modified?
    metadata: dict           # Source-specific metadata
    lineage: LineageInfo     # Full provenance chain

Connectors had to handle:

  • Pagination (some APIs returned 10,000 results per page; others returned 50)
  • Rate limiting (exponential backoff with jitter)
  • Checkpointing (resume after failures without reprocessing everything)
  • Schema evolution (sources changed their APIs without warning)

Stage 2: Quality Filtering & Enrichment

This is where documents got scored, filtered, and enriched with additional metadata.

Quality signals:

  • Length filtering: Too short (< 50 tokens) was usually garbage; too long (> 50,000 tokens) was usually log dumps
  • Language detection: We trained on English primarily; other languages got flagged
  • Perplexity scoring: Documents that were incomprehensible to a smaller model were probably garbage
  • Toxicity filtering: Using an off-the-shelf classifier to catch obviously problematic content
  • Deduplication: Both exact (hash-based) and near-duplicate (MinHash)
  • Signal density: Ratio of information to boilerplate

Enrichment added:

  • Token count estimates
  • Quality scores (0-1 scale across multiple dimensions)
  • Topic classification
  • Detected entities (for PII scrubbing)
  • Structural metadata (headers, code blocks, etc.)

Stage 3: Tokenization & Packaging

Finally, clean documents got tokenized and packaged into the format the training system expected.

Data Sources → Ingestion → Quality Filter → Enrichment → Tokenization → Training Store
                  ↓            ↓             ↓              ↓
              Monitoring   Sampling      Validation    Verification

Each stage was independently scalable. When ingestion was the bottleneck, we added ingestion workers. When tokenization fell behind, we scaled that. The stages communicated through message queues so we could buffer bursts.

Building the Pipeline

Ingestion Layer

The ingestion layer was conceptually simple but operationally annoying. Every source system had its own quirks.

class ResourceIngestionPipeline:
    def __init__(self, source_config: SourceConfig):
        self.source = source_config
        self.rate_limiter = AdaptiveRateLimiter(
            initial_rate=source_config.suggested_rate,
            backoff_factor=2.0,
            max_rate=source_config.max_rate
        )
        self.checkpoint_manager = CheckpointManager(
            storage=S3CheckpointStorage(source_config.checkpoint_bucket),
            checkpoint_interval=timedelta(minutes=5)
        )

    async def ingest(self) -> AsyncIterator[Document]:
        """Stream data from source system"""
        last_checkpoint = await self.checkpoint_manager.load()
        cursor = last_checkpoint.cursor if last_checkpoint else None

        while True:
            await self.rate_limiter.acquire()
            try:
                batch = await self.source.fetch_batch(cursor=cursor, limit=100)
                if not batch.documents:
                    break

                for doc in batch.documents:
                    yield self.transform(doc)

                cursor = batch.next_cursor
                await self.checkpoint_manager.save(Checkpoint(cursor=cursor))

            except RateLimitError:
                self.rate_limiter.backoff()
                continue
            except SourceUnavailableError:
                await asyncio.sleep(60)
                continue

    def transform(self, raw_data: SourceDocument) -> Document:
        """Normalize into standard format"""
        return Document(
            source_id=f"{self.source.name}:{raw_data.id}",
            content=self._extract_text(raw_data),
            content_type=self._classify_content(raw_data),
            language=detect_language(raw_data.content),
            timestamp=raw_data.modified_at,
            metadata=self._extract_metadata(raw_data),
            lineage=LineageInfo(
                source=self.source.name,
                ingested_at=datetime.utcnow(),
                version=self.source.schema_version
            )
        )

The adaptive rate limiter was key. Different source systems had different limits, and those limits weren't always documented. We learned them empirically by watching for 429 responses and backing off.

Quality Filtering

The ML scientists provided heuristics, but we needed to apply them at 50GB/hour.

Quality Dimensions:

class QualityScorer:
    def score(self, doc: Document) -> QualityScore:
        scores = {
            'length': self._score_length(doc),
            'language': self._score_language(doc),
            'perplexity': self._score_perplexity(doc),
            'duplication': self._score_duplication(doc),
            'signal_density': self._score_signal_density(doc),
            'toxicity': self._score_toxicity(doc),
        }

        # Weighted combination - weights tuned on labeled examples
        weights = {
            'length': 0.1,
            'language': 0.15,
            'perplexity': 0.25,
            'duplication': 0.2,
            'signal_density': 0.2,
            'toxicity': 0.1,
        }

        final_score = sum(scores[k] * weights[k] for k in scores)

        return QualityScore(
            overall=final_score,
            components=scores,
            threshold_met=final_score > 0.6
        )

Length filtering was simple but effective: documents under 50 tokens were almost always garbage (navigation elements, error messages, empty templates). Documents over 50,000 tokens were usually log dumps or data exports that had accidentally ended up in documentation systems.

Perplexity scoring used a small (125M parameter) language model. High perplexity meant the text was either nonsense, in a language the model didn't understand, or highly technical jargon. We used it as a signal, not a hard filter—some valuable technical content has high perplexity because it uses specialized terminology.

Deduplication at Scale

Training on duplicate data wastes compute and can bias the model. We needed both exact and near-duplicate detection.

Exact Deduplication:

Hash-based, using SHA-256 on normalized content. "Normalized" meant: lowercase, remove whitespace, remove punctuation. This caught copy-paste duplicates even if someone added a space or changed capitalization.

def exact_dedup_hash(content: str) -> str:
    normalized = content.lower()
    normalized = re.sub(r'\s+', '', normalized)
    normalized = re.sub(r'[^\w]', '', normalized)
    return hashlib.sha256(normalized.encode()).hexdigest()

Near-Deduplication:

MinHash with Locality-Sensitive Hashing (LSH). This caught documents that were 80%+ similar—like templates with different values filled in, or the same doc with minor edits.

class MinHashDeduplicator:
    def __init__(self, num_perm: int = 128, threshold: float = 0.8):
        self.num_perm = num_perm
        self.threshold = threshold
        self.lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)
        self.seen_hashes = {}

    def is_duplicate(self, doc_id: str, content: str) -> Tuple[bool, Optional[str]]:
        minhash = self._compute_minhash(content)

        # Check for near-duplicates
        similar = self.lsh.query(minhash)
        if similar:
            return True, similar[0]  # Return the ID of the similar doc

        # Not a duplicate, add to index
        self.lsh.insert(doc_id, minhash)
        return False, None

    def _compute_minhash(self, content: str) -> MinHash:
        minhash = MinHash(num_perm=self.num_perm)
        # Shingle the content (3-grams of words)
        words = content.split()
        for i in range(len(words) - 2):
            shingle = ' '.join(words[i:i+3])
            minhash.update(shingle.encode('utf-8'))
        return minhash

The computational cost was significant. MinHash over 200 billion tokens required careful optimization: we sharded the LSH index by content type and processed in parallel. Even so, deduplication was one of the slower stages.

Tokenization Pipeline

Converting text to tokens for training.

class TokenizationPipeline:
    def __init__(self, tokenizer_path: str):
        self.tokenizer = Tokenizer.from_file(tokenizer_path)
        self.max_seq_length = 4096

    def tokenize_document(self, doc: Document) -> TokenizedDocument:
        tokens = self.tokenizer.encode(doc.content)

        return TokenizedDocument(
            source_id=doc.source_id,
            tokens=tokens.ids,
            attention_mask=[1] * len(tokens.ids),
            metadata=doc.metadata,
            token_count=len(tokens.ids)
        )

    def pack_sequences(self, docs: List[TokenizedDocument]) -> List[TrainingExample]:
        """Pack multiple documents into fixed-length sequences for efficiency"""
        examples = []
        current_tokens = []
        current_mask = []

        for doc in docs:
            # Add document separator
            doc_tokens = [self.tokenizer.bos_token_id] + doc.tokens + [self.tokenizer.eos_token_id]

            if len(current_tokens) + len(doc_tokens) > self.max_seq_length:
                # Pad and emit current sequence
                if current_tokens:
                    examples.append(self._pad_and_create_example(current_tokens, current_mask))
                current_tokens = doc_tokens
                current_mask = [1] * len(doc_tokens)
            else:
                current_tokens.extend(doc_tokens)
                current_mask.extend([1] * len(doc_tokens))

        # Don't forget the last batch
        if current_tokens:
            examples.append(self._pad_and_create_example(current_tokens, current_mask))

        return examples

Sequence packing was crucial for training efficiency. Rather than padding every document to max length (wasting 80% of compute on many short documents), we packed multiple documents into single sequences separated by special tokens. This increased effective throughput by 3x.

Performance Optimization

Throughput Bottlenecks

Initial pipeline throughput: 8GB/hour. Target: 50GB/hour. We had work to do.

Bottleneck 1: Network I/O to Source Systems

Some source systems were slow. Really slow. The internal wiki API, for instance, had 500ms latency per request and only returned 20 documents per request. At that rate, pulling our target corpus would take months.

Fix: Parallel connections (up to the rate limit), request batching where supported, and local caching of recently-seen documents to avoid redundant fetches.

Bottleneck 2: CPU-bound Deduplication

MinHash computation is CPU-intensive. On a single core, we could process about 1,000 documents per minute.

Fix: Parallelization across cores, sharding the LSH index by content type, and using a faster MinHash implementation (datasketch with the C extension).

Bottleneck 3: Disk I/O for Checkpointing

We checkpointed every 5 minutes to enable recovery from failures. But writing checkpoint state to disk was blocking ingestion.

Fix: Async checkpointing to object storage with local buffering. Checkpoints happened in the background without blocking the main pipeline.

Parallelization Strategy

We used a staged pipeline with queues between stages:

[Ingestion Workers] → [Queue 1] → [Quality Workers] → [Queue 2] → [Tokenization Workers]
       (8 workers)       (Redis)      (16 workers)       (Redis)       (8 workers)

Each stage scaled independently. When the queue between stages backed up, we knew which stage was the bottleneck. Auto-scaling based on queue depth kept things balanced.

Caching and Incremental Processing

Re-processing the entire corpus on every run was wasteful. We implemented incremental processing:

  1. Content-addressable storage: Documents stored by hash of content
  2. Dependency tracking: Track which processing stages each document had completed
  3. Delta processing: On each run, only process documents that are new or changed

This reduced daily processing load by 90% after the initial corpus load.

Monitoring and Observability

Key Metrics

Pipeline Health:

  • Ingestion rate (documents/sec per source)
  • Processing latency (p50, p95, p99 end-to-end)
  • Error rate by stage and error type
  • Queue depths between stages
  • Backlog age (oldest unprocessed document)

Data Quality:

  • Documents filtered by quality dimension (daily)
  • Deduplication rate (what % are duplicates?)
  • Token distribution statistics (length, vocabulary coverage)
  • Quality score distributions over time (are we regressing?)

Operational:

  • Cost per million tokens processed
  • Storage utilization
  • Worker utilization (are we over/under-provisioned?)

Alerting

  • Error rate > 1% for 5 minutes → page on-call
  • Queue depth > 100,000 for 10 minutes → alert
  • Processing latency p99 > 1 hour → alert
  • Quality score mean drops by > 5% day-over-day → alert

The quality regression alert caught several issues: a source system changed its export format, causing our parser to produce garbage; a new data source had unexpected character encoding; and once, someone accidentally included test fixtures in a production export.

The Integration Dance

The Feedback Loop

ML scientists would train on a data snapshot, evaluate model performance, and provide feedback:

"The model is weak on code completion—can we increase the code ratio?" "We're seeing hallucinations about product X—are we training on outdated docs?" "The quality filter is too aggressive on technical jargon—domain-specific terms are getting flagged as low-quality."

This feedback loop was continuous. We maintained versioned dataset snapshots so we could track which data produced which model behavior. When the model performed poorly on a task, we could trace back to the training data and understand why.

The iteration cycle:

  1. Train model on data snapshot v1
  2. Evaluate model, identify weaknesses
  3. Adjust pipeline parameters (quality thresholds, content mix ratios, filtering rules)
  4. Generate data snapshot v2
  5. Train again, compare

We went through about 15 major iterations before hitting acceptable quality.

Experiment Tracking

Every dataset had:

  • A version ID
  • Pipeline configuration that produced it
  • Quality metrics at time of generation
  • List of source systems and their versions
  • Full lineage for reproducibility

This let us answer: "What was different about the data that produced the good model vs. the bad model?" Usually the answer was something mundane like "we accidentally included log files that time."

Challenges and Gotchas

1. The Moving Target

Requirements evolved constantly as the team learned more about model behavior.

Week 3: "We need more code data." Week 5: "Actually, too much code is making the model worse at natural language. Reduce code ratio." Week 8: "Can we filter out auto-generated code? The model is learning to produce boilerplate."

The pipeline needed to be flexible enough to handle these pivots without a full rewrite. Our content-addressable storage and incremental processing made this manageable—changing the content mix ratio didn't require re-ingesting everything.

2. Privacy and Compliance

Training data included customer interaction logs. Even anonymized, this required careful handling.

We implemented:

  • PII detection and redaction (names, emails, phone numbers, addresses)
  • Entity replacement (swap detected entities with placeholders)
  • Differential privacy on aggregate statistics
  • Access controls on raw data (only the pipeline had access; humans saw only processed data)
  • Audit logging (who accessed what, when)

The privacy review added 6 weeks to the timeline. Worth it—one slip and we'd have been in serious trouble.

3. Data Lineage

When a model produces unexpected output, you need to trace it back to the training data. "Why does the model think our product does X?" usually meant someone wrote inaccurate documentation that ended up in training data.

We stored full lineage: every training example could be traced back through tokenization → quality scoring → source document → original system. This took significant storage but saved debugging time.

4. Scale Testing

We couldn't afford to test at full scale (200B tokens) before production. Instead:

  • Unit tests on pipeline logic
  • Integration tests with 1% sample data
  • Load tests with synthetic data to verify throughput
  • Progressive rollout: process 10%, 25%, 50%, 100% with monitoring

Several bugs only appeared at scale: integer overflow in our token counter (we switched to int64), memory leaks in the deduplication index (we added periodic compaction), and hot partition issues in our metadata store.

5. Cost Management

Processing 15TB of data isn't free.

Major costs:

  • Compute for processing: ~$15,000
  • Storage for intermediate stages: ~$5,000/month
  • Egress from source systems: ~$2,000
  • GPU time for perplexity scoring: ~$8,000

We optimized by:

  • Spot instances for batch processing (60% savings)
  • Tiered storage (hot/warm/cold)
  • Early filtering (don't process what you'll discard)
  • Caching expensive operations (perplexity scores)

What Worked

1. Modular Architecture

Each pipeline stage could be developed, tested, and optimized independently. When perplexity scoring was slow, we optimized it without touching ingestion. When we needed a new source connector, we added it without changing downstream stages.

2. Incremental Processing

Processing only what changed reduced daily compute by 90%. It also made debugging easier—when something broke, we only needed to examine recent data.

3. Quality Over Quantity

The ML scientists initially wanted "all the data." We pushed back: more data isn't better if it's low quality. A curated 150B token dataset outperformed a noisy 300B token dataset in our evaluations.

4. Strong Monitoring

Every time something went wrong, we found out from monitoring before users did. Alerting on quality metric regressions caught issues that would have taken weeks to manifest in model evaluations.

What Didn't Work

1. Over-Engineering Early

We spent three weeks building a sophisticated content classification system before we understood what classifications mattered. Most of that work was thrown out. Should have started simpler and added complexity based on actual needs.

2. Underestimating Coordination Overhead

A 15-person team requires significant coordination. We spent more time in meetings discussing data requirements than actually building pipeline features. In retrospect, a smaller team with clearer ownership would have moved faster.

3. Assuming Source Data Quality

We assumed internal documentation would be high quality. It wasn't. Auto-generated reports, outdated wikis, duplicate content across systems—the cleanup work was twice what we estimated.

The Results

After 4 months:

  • Pipeline throughput: 50GB/hour sustained
  • Total data processed: ~800GB raw → ~200B tokens clean
  • Pipeline uptime: 99.2% (excluding planned maintenance)
  • Model training: Successfully fed training for 6 weeks continuous
  • Final model: Outperformed general-purpose models on domain-specific tasks by 15-20%

The model shipped to internal users. Feedback was positive—it understood our products, our terminology, our processes in ways that external models couldn't.

Lessons Learned

1. Data Engineering Is the Bottleneck

Model architecture gets the attention, but data pipelines determine success. Our ML scientists could have designed a better architecture, but it wouldn't have mattered without quality data to train on.

2. Privacy Can't Be Bolted On

We designed privacy controls from day one, and it was still hard. Retrofitting privacy onto an existing pipeline would have been a nightmare.

3. Invest in Observability

We spent 20% of development time on monitoring and alerting. It paid back immediately—every production issue was caught by monitoring first.

4. Test at Scale (Or As Close As You Can)

Problems that don't appear with 1GB of data will definitely appear with 1TB. Our integration tests caught some issues, but scale-dependent bugs still slipped through.

5. Collaboration Is Key

The best technical solution doesn't matter if it doesn't meet the ML team's needs. We embedded a data engineer with the ML scientists for the duration of the project. That person became the translator between "we need more diverse data" and "we need to increase the sampling rate for underrepresented content types."

Key Takeaways

  • Data pipelines for foundation models are a unique challenge: scale, quality, and velocity all matter simultaneously
  • Privacy and compliance requirements must be designed in from the start
  • Quality filtering is as important as volume—garbage in, garbage model
  • Deduplication at scale requires specialized algorithms and infrastructure
  • Monitoring and observability are critical for maintaining pipeline health
  • Close collaboration between data engineers and ML scientists is essential
  • The pipeline is never "done"—continuous improvement based on model performance feedback

Details have been abstracted to protect proprietary information. All examples are simplified for illustration.