Skip to content

inRiver ETL Pipeline - Phase 2 (Full Modernization)

Project: inRiver PIM to Aurora PostgreSQL ETL Pipeline
Date: 2026-02-26
Last Updated: 2026-03-03 (Split into Phase 2)
Status: Future - After Phase 1 Complete
Timeline: 4-6 weeks


Overview

Goal: Replace Blower PHP/MySQL infrastructure with modern dbt + PostgreSQL pipeline.

Strategic Context: Phase 1 (Python converter) gets inRiver working quickly. Phase 2 modernizes the entire stack and sets precedent for replacing other Blower sources.

Prerequisites: Phase 1 operational and stable


Intent Analysis

User Request Summary

Migrate from current Blower ETL system (nightly full load, PHP-generated Airflow DAGs) to new inRiver PIM system (delta load, near real-time updates). Load inRiver JSONL exports into Aurora MySQL database and use dbt transformations to maintain functional parity with current Blower database outputs.

Request Classification

  • Type: Migration + New ETL Pipeline Development
  • Scope: System-wide (new data pipeline with multiple components)
  • Complexity: Complex (multi-technology integration, data model transformation, multiple downstream systems)

Architectural Shift

  • From: Nightly full load batch processing
  • To: Hourly delta load incremental processing
  • Impact: Requires merge/upsert logic, state management, change data capture patterns

Medallion Architecture (Bronze-Silver-Gold + Export)

Bronze Layer (Raw Landing Zone):

  • Raw inRiver JSON files with audit metadata (load_date, file_name, business_unit, s3_key)
  • Storage: PostgreSQL JSONB column for efficient querying
  • Mutable: Continuously updated by delta loader (inserts, updates, soft deletes)
  • Carried forward from Phase 1 accumulator tables
  • Purpose: Current state of all received data, audit trail via event log

dbt Snapshot Layer (Point-in-Time Capture):

  • dbt snapshot of Bronze tables before each transformation run
  • SCD Type 2: Tracks historical changes with dbt_valid_from, dbt_valid_to columns
  • Immutable — snapshots are never modified after capture
  • Decouples ingestion from transformation: Bronze can keep receiving deltas while dbt works off a consistent snapshot
  • Enables rerunning failed transformations against the same data
  • Purpose: Consistent, repeatable input for Silver transformations + full change history

Silver Layer (Normalization - REQUIRED):

  • Reads from snapshots, not directly from Bronze
  • Normalized entities with integer ID assignment
  • SHA-1 hashed value store for attribute values (deduplication)
  • Equivalent to Blower's Central schema
  • Tables: items, attributes, categories, media, attribute_sets, value_store
  • Each entity: id (integer) + blower_id (source reference)
  • Purpose: Normalize snapshot data, assign IDs, enable efficient joins

Gold Layer (Business-Ready):

  • 13 Blower revision tables with normalized integer IDs
  • Full item dataset maintained (not just deltas)
  • Uses Silver IDs via joins (Snapshot + Silver → Gold)
  • All tables have seen_status_id for delta tracking (add/delete/modified/unchanged)
  • Equivalent to Blower's Revision schema
  • Purpose: Optimized for queries, maintains full state with change tracking

Export Layer (Downstream Consumption):

  • dbt models that transform Gold → CSV format
  • Converts internal IDs back to source IDs (blower_id)
  • Resolves foreign keys to names (e.g., type_id → type name)
  • Removes internal columns (seen_status_id)
  • Equivalent to Blower's Export classes
  • Purpose: Downstream systems consume CSV exports (not direct DB access)

Data Flow:

Bronze (mutable, live deltas)
  → dbt snapshot (immutable point-in-time capture, SCD Type 2)
    → Silver (normalization, ID assignment)
      → Gold (business-ready revision tables)
        → Export (CSV generation for downstream)

Target Schema

Blower Revision Schema (*_revision_XXX):

  • 13 normalized tables documented in docs/blower-target/
  • Core tables: items, attributes, categories, relationships, media
  • Multi-BU: Separate schema per business unit (bk_, gm_, ec_*, etc.)
  • EAV pattern for attributes with relationship tables
  • All tables include seen_status_id column for delta tracking

Blower Central Schema (Silver layer normalization):

  • Assigns integer IDs to all entities
  • value_store table with SHA-1 hashes for attribute values
  • Lookup tables: items, attributes, categories, media, attribute_sets
  • Each entity has id (integer) + blower_id (source reference)

Transformation Pattern (from Blower PIM source):

sql
-- Example: Snapshot + Silver → Gold
INSERT INTO {revision}.items
  SELECT item.id, item.name, it.id, ag.id, @SEEN_ID
  FROM bronze_snapshot.items b
    INNER JOIN silver.items item ON item.blower_id = b.blower_id
    INNER JOIN silver.item_types it ON it.name = b.item_type
    LEFT JOIN silver.attribute_sets ag ON ag.blower_id = b.attribute_set_blower_id
  WHERE b.dbt_valid_to IS NULL  -- current snapshot records only

Functional Requirements

FR-1: Bronze Layer - Raw Data Ingestion

Priority: High
Description: Ingest inRiver JSONL export files from S3 to Bronze layer with audit metadata

Details:

  • Source: S3 bucket with inRiver exports (controlled by Watsco)
  • File types: Product, Item, ChannelNode, AttributeFile (JSON format)
  • Storage: PostgreSQL table with JSONB column + metadata columns
    • data (JSONB) - Raw inRiver JSON for SQL querying
    • load_date (timestamp) - When file was loaded
    • file_name (text) - Original S3 file name
    • business_unit (text) - BU identifier (BAK, CEFL, ECM, GEM)
    • s3_key (text) - S3 object key for reference
    • entity_type (text) - Product, ChannelNode, AttributeFile, etc.
  • Pattern: Immutable append-only (historical record of all deltas)
  • Trigger: S3 event notifications on file upload
  • Processing: Hourly batch processing acceptable
  • Rationale: JSONB enables efficient SQL querying for Silver transformations

FR-2: Silver Layer - Normalization and ID Assignment

Priority: High
Description: Normalize Bronze data and assign integer IDs to all entities

Details:

  • Input: Bronze layer (raw JSON with audit metadata)
  • Process: Parse JSON, extract entities, assign integer IDs
  • Tables:
    • items - id, blower_id (inRiver Product/Item ID), name, item_type
    • attributes - id, blower_id (inRiver Field ID), code, name, data_type
    • categories - id, blower_id (inRiver ChannelNode ID), name, parent_id
    • media - id, blower_id (inRiver Resource ID), filename, path
    • attribute_sets - id, blower_id (inRiver Structure ID), name
    • value_store - id, sha1_hash, value (deduplicated attribute values)
  • Pattern: Each entity gets integer id + source blower_id reference
  • Purpose: Enable efficient joins for Gold layer population

FR-3: Gold Layer - Business-Ready Data

Priority: High
Description: Transform Bronze + Silver to Gold layer matching Blower revision schema

Details:

  • Target: 13 normalized Blower tables (items, attributes, categories, relationships, media)
  • Schema: *_revision_XXX per business unit (e.g., bk_revision_2191)
  • Tables:
    • items - Core product/SKU data with Silver IDs
    • attributes, attribute_sets, attribute_set_attributes, attribute_roles - Attribute metadata
    • item_attributes - Product attribute values (EAV pattern with value_store IDs)
    • categories, category_attributes, item_categories - Category hierarchy
    • item_associations, item_bundles, item_groups - Product relationships
    • item_media - Product images/assets
    • report - Data quality reports
  • Transformation: Join Bronze entities with Silver IDs to populate Gold
  • Full dataset: Gold maintains complete product catalog (not just deltas)
  • Update strategy: Bronze deltas trigger merge/upsert into Gold
  • Seen status: All tables have seen_status_id (seen/added/deleted/modified/unchanged)
  • Purpose: Downstream systems consume Gold directly without transformation

FR-4: dbt Transformations (Bronze → Snapshot → Silver → Gold → Export)

Priority: High
Description: Transform raw inRiver JSON through five-stage pipeline using dbt

Stage 0: Bronze → dbt Snapshot (Point-in-Time Capture):

  • dbt snapshot captures current state of Bronze tables before each run
  • SCD Type 2 tracking: dbt_valid_from, dbt_valid_to, dbt_updated_at
  • Strategy: check strategy on all columns (detect any change) or timestamp on updated_at
  • All downstream stages read from snapshots, never directly from Bronze
  • Enables consistent, repeatable transformation runs
  • Failed runs can be safely rerun against the same snapshot

Stage 1: Snapshot → Silver (Normalization):

  • Parse inRiver JSONB entities from snapshot (not raw Bronze)
  • Use PostgreSQL JSONB operators for efficient extraction
  • Assign integer IDs to all entities
  • Create SHA-1 hashes for attribute values in value_store
  • Build lookup tables: items, attributes, categories, media, attribute_sets
  • Pattern: blower_id (source) → id (normalized integer)

Stage 2: Silver → Gold (Business-Ready):

  • Join snapshot entities with Silver IDs
  • Populate 13 Blower revision tables
  • Map inRiver entities to Blower tables:
    • Item → items table
    • Field definitions → attributes, attribute_sets, attribute_roles
    • Field values → item_attributes (EAV with value_store IDs)
    • ChannelNode → categories, category_attributes, item_categories
    • Item links → item_associations, item_bundles, item_groups
    • Resources → item_media
  • Set seen_status_id = 'seen' for initial load

Stage 3: Delta Processing (Change Tracking):

  • Compare current Gold vs previous Gold
  • Update seen_status_id: added/deleted/modified/unchanged
  • Merge/upsert changed records
  • Hard delete removed records
  • Maintain full dataset in Gold
  • Snapshot history enables auditing what changed between runs

Stage 4: Gold → Export (CSV Generation):

  • Transform Gold tables to CSV format for downstream systems
  • Convert internal IDs back to source IDs (join Silver for blower_id)
  • Resolve foreign keys to names (e.g., type_id → type name)
  • Remove internal columns (seen_status_id)
  • Generate CSV files matching Blower export format
  • Upload to S3 or local directory

Success criteria: Functional parity with current Blower outputs

FR-5: Downstream System Outputs

Priority: High
Description: Generate CSV exports for multiple downstream systems

Details:

  • Export format: CSV files with headers (UTF-8 BOM)
  • Export types:
    • Full export: All data from Gold layer
    • Delta export: Only changed records (based on seen_status)
  • CSV structure: Denormalized, downstream-friendly format
    • Use source IDs (blower_id), not internal IDs
    • Resolve foreign keys to names (e.g., type_id → type name)
    • Exclude internal columns (seen_status_id)
  • Downstream systems:
    • E-commerce platforms: Magento, Shopware (consume CSV imports)
    • Search: Algolia indexes (consume CSV imports)
    • Other systems: Additional consumers with own ETL
  • Delivery: S3 upload or local file system
  • Equivalent to Blower's Export classes (Items, Attributes, Categories, etc.)

CSV Exporters (one per table):

  • items.csv, attributes.csv, attribute_sets.csv, attribute_set_attributes.csv, attribute_roles.csv
  • categories.csv, category_attributes.csv, item_categories.csv
  • item_attributes.csv, item_media.csv, item_associations.csv, item_bundles.csv, item_groups.csv

Priority: Medium
Description: Process ChannelNode relationships excluding outbound product links

Details:

  • Exclusion: Request Aperture Labs to exclude OutboundLinks to Products from ChannelNode exports
  • Rationale: Scalability concern with large link arrays (categories with 1000+ products)
  • Alternative: Process InboundLinks from Products only (via Product assortment)
  • Mapping: Product assortment → item_categories table

FR-7: AttributeFile Processing

Priority: High
Description: Load daily AttributeFile to Silver layer for field definitions

Details:

  • Frequency: Daily system-wide delivery
  • Purpose: Field definitions and metadata for all entity types
  • Storage: Load to Silver attributes, attribute_sets tables
  • Usage: Referenced by Bronze → Silver → Gold transformations

FR-8: Delta Update Processing

Priority: High
Description: Process incremental updates with insert, update, delete operations

Details:

  • Insert: Add new entities to Silver, propagate to Gold with seen_status = 'added'
  • Update: Merge changes to existing entities, mark seen_status = 'modified'
  • Delete: Hard delete removed entities, mark seen_status = 'deleted' before removal
  • Unchanged: Mark seen_status = 'unchanged' for records with no changes
  • State management: Track processed files in Bronze to avoid duplicates
  • Comparison: Compare current Gold vs previous Gold to determine changes
  • Pattern: Equivalent to Blower's History class logic

Non-Functional Requirements

NFR-1: Performance

Priority: Medium
Description: Process updates within hourly window

Details:

  • Data volume: <100K products per business unit
  • Processing frequency: Hourly batch
  • Latency: Hourly updates acceptable for downstream systems
  • Scalability: Design for current volume, plan for growth

NFR-2: Data Quality

Priority: High
Description: Ensure data parity with current Blower outputs

Details:

  • Validation: Schema validation, business rule checks
  • Testing: Data parity validation - output tables must be identical to Blower
  • Example: Item 1234 with 5 attributes and 2 media should match exactly
  • Error handling: Standard validation, quarantine bad records, alerting

NFR-3: Monitoring and Observability

Priority: Medium (MVP), High (Phase 2)
Description: Monitor pipeline health and data quality

MVP Requirements:

  • Airflow task success/failure monitoring
  • Data quality checks: record counts, schema validation
  • Basic alerting on failures

Phase 2 Requirements:

  • Comprehensive observability: metrics, logs, traces
  • SLA monitoring
  • Data quality dashboards

NFR-4: Reliability

Priority: High
Description: Ensure reliable data processing with error recovery

Details:

  • Error handling: Log errors, quarantine bad records, continue processing
  • Retry logic: Automated retry for transient failures
  • Alerting: Notify on persistent failures
  • Recovery: Manual intervention for quarantined records

NFR-5: Maintainability

Priority: High
Description: Code and infrastructure should be maintainable by team

Details:

  • Team: Mixed experience - familiar with Airflow/dbt but not this pattern
  • Documentation: Comprehensive setup and operational docs
  • Code quality: Automated testing, CI/CD pipeline
  • Infrastructure: Terraform for IaC (future consideration)

Technical Requirements

TR-1: Database

Technology: Aurora PostgreSQL
Rationale: Modern data stack alignment, superior JSON handling, downstream systems DB-agnostic

Details:

  • Managed service: AWS RDS Aurora PostgreSQL
  • Schema: Normalized tables with foreign keys
  • JSON support: JSONB for Bronze layer (native operators, indexing)
  • Array support: Native arrays for complex data structures
  • Indexing: Optimize for query patterns, JSONB indexes
  • Backup: Automated backups, point-in-time recovery
  • Strategic shift: Complete modernization (dbt + Postgres) since downstream systems consume CSV exports only

TR-2: Orchestration

Technology: AWS MWAA (Managed Airflow)
Rationale: Existing infrastructure, team familiarity

Details:

  • Environment: New MWAA instance (already provisioned)
  • DAGs: Python-based (not PHP-generated like Blower)
  • Tasks:
    • S3 ingestion: Python operator (~50 lines, load JSON to Bronze)
    • dbt transformations: dbt operator (Bronze → Silver → Gold)
    • Data quality checks: dbt tests
  • Scheduling: Hourly batch processing triggered by S3 events
  • Monitoring: Airflow UI + CloudWatch
  • Strategic shift: Python + dbt replaces PHP + Blower pattern

TR-3: Transformation

Technology: dbt (data build tool)
Rationale: Strategic move toward dbt-native pipeline to replace Blower pattern

Details:

  • Adapter: dbt-postgres for Aurora PostgreSQL
  • Snapshots: dbt snapshot on Bronze tables before each run (SCD Type 2)
    • Captures point-in-time state of mutable Bronze tables
    • All downstream models read from snapshots
    • Enables consistent runs and historical auditing
  • Models: SQL-based transformations for all stages
    • Snapshot → Silver: Parse JSONB, assign IDs, create value_store
    • Silver → Gold: Join and populate 13 revision tables
    • Delta tracking: Incremental models with seen_status logic
    • Gold → Export: CSV generation with ID resolution
  • JSON handling: Native JSONB operators for parsing inRiver JSON
  • Testing: dbt tests for data quality
  • Documentation: Auto-generated from dbt models
  • Macros: Replace Blower event hooks with dbt macros
  • Strategic direction: Sets precedent for replacing other Blower sources with dbt

TR-4: Storage

Technology: AWS S3
Rationale: inRiver export delivery mechanism

Details:

  • Bucket: Watsco-controlled S3 bucket
  • Event notifications: S3 events trigger pipeline
  • Access: IAM roles for MWAA access
  • Retention: Define retention policy for processed files

TR-5: Deployment

Technology: CI/CD pipeline with automated testing
Rationale: Ensure quality and repeatability

Details:

  • Version control: Git repository
  • CI/CD: Automated testing on commit
  • Deployment: Automated deployment to MWAA
  • Testing: Unit tests (dbt, Python), integration tests, data parity validation

Project Constraints

Timeline

  • MVP: 2-4 weeks (basic pipeline, minimal monitoring, manual validation)
  • Phase 2: Iterate to add comprehensive monitoring and automated testing
  • Phased approach: Deliver working MVP quickly, enhance over time

Dependencies

  • Blower schema documentation: Available now (critical for design)
  • Aperture Labs coordination: Request ChannelNode OutboundLinks exclusion
  • Downstream system analysis: Understand transformation requirements

Team

  • Resources: Mixed team with Airflow/dbt experience but new to this pattern
  • Learning curve: Account for team learning in timeline
  • Support: May need external guidance on best practices

Success Criteria

Primary Success Criteria

  1. Functional Parity: Output tables identical to current Blower database
  2. Data Accuracy: Item-level validation (attributes, media, relationships match)
  3. Processing Reliability: Hourly updates complete successfully
  4. Downstream Compatibility: All consuming systems work without changes

MVP Success Criteria

  1. Basic pipeline operational (S3 → Aurora PostgreSQL → dbt)
  2. Airflow monitoring + data quality checks
  3. Manual validation confirms data parity
  4. Single business unit operational

Phase 2 Success Criteria

  1. All business units operational
  2. Comprehensive monitoring and alerting
  3. Automated data parity testing
  4. CI/CD pipeline operational

Out of Scope (for MVP)

  • Full refresh mode (inRiver limitation)
  • Comprehensive monitoring (Phase 2)
  • Automated end-to-end testing (Phase 2)
  • Infrastructure as Code with Terraform (future consideration)
  • Near real-time processing < 1 hour (hourly acceptable)
  • Soft delete timestamps (hard deletes only)
  • ERP Data Integration (ec_erp_data, pp_p21_data schemas):
    • Current Blower has separate ERP schemas updated with each run
    • Tables: item_attributes, packages, supersedes
    • Decision: Address in Phase 2 after PIM replacement working
    • Options: Replicate to PostgreSQL, federated queries, or dual-database pattern

Open Questions

  1. Bronze Retention: How long should we retain Bronze layer data? (30 days, 90 days, indefinitely?)
  2. Initial Load: How to handle initial full load if inRiver only provides deltas? (Historical backfill strategy?)
  3. Late-Arriving Data: How to handle out-of-order updates in Bronze layer?
  4. ChannelNode Links: Confirm Aperture Labs can exclude OutboundLinks to Products
  5. Downstream Systems: Complete inventory of all consuming systems beyond Magento/Shopware/Algolia
  6. Transformation Logic: Document any complex business rules from current Blower PHP transformations
  7. Revision Cleanup: How many Gold revision schemas to keep? (Blower keeps N most recent)
  8. Value Store: Should we implement SHA-1 hashing for value_store or use simpler approach?

Next Steps

  1. Review and approve updated requirements with Bronze-Silver-Gold-Export four-stage architecture
  2. Proceed to User Stories phase (assess if needed)
  3. Design Bronze, Silver, Gold, and Export layer schemas
  4. Map inRiver JSON entities to Blower PIM2 CSV equivalents
  5. Design dbt transformation models for each stage:
    • Stage 0: dbt snapshot (Bronze point-in-time capture, SCD Type 2)
    • Stage 1: Snapshot → Silver (normalization, ID assignment)
    • Stage 2: Silver → Gold (join and populate revision tables)
    • Stage 3: Delta processing (seen_status tracking)
    • Stage 4: Gold → Export (CSV generation for downstream)
  6. Implement MVP pipeline

Document Version: 1.0
Last Updated: 2026-02-26
Author: AI-DLC Requirements Analysis