Skip to content

inRiver ETL Pipeline - Phase 2 (Full Modernization)

Project: inRiver PIM to Aurora PostgreSQL ETL Pipeline
Date: 2026-02-26
Last Updated: 2026-03-03 (Split into Phase 2)
Status: Future - After Phase 1 Complete
Timeline: 4-6 weeks


Overview

Goal: Replace Blower PHP/MySQL infrastructure with modern dbt + PostgreSQL pipeline.

Strategic Context: Phase 1 (Python converter) gets inRiver working quickly. Phase 2 modernizes the entire stack and sets precedent for replacing other Blower sources.

Prerequisites: Phase 1 operational and stable


Intent Analysis

User Request Summary

Migrate from current Blower ETL system (nightly full load, PHP-generated Airflow DAGs) to new inRiver PIM system (delta load, near real-time updates). Load inRiver JSONL exports into Aurora MySQL database and use dbt transformations to maintain functional parity with current Blower database outputs.

Request Classification

  • Type: Migration + New ETL Pipeline Development
  • Scope: System-wide (new data pipeline with multiple components)
  • Complexity: Complex (multi-technology integration, data model transformation, multiple downstream systems)

Architectural Shift

  • From: Nightly full load batch processing
  • To: Hourly delta load incremental processing
  • Impact: Requires merge/upsert logic, state management, change data capture patterns

Medallion Architecture (Bronze-Silver-Gold + Export)

Bronze Layer (Raw Landing Zone):

  • Raw inRiver JSON files with audit metadata (load_date, file_name, business_unit, s3_key)
  • Storage: PostgreSQL JSONB column for efficient querying
  • Immutable append-only storage
  • Equivalent to Blower's PIM2 schema
  • Purpose: Historical record of all deltas received

Silver Layer (Normalization - REQUIRED):

  • Normalized entities with integer ID assignment
  • SHA-1 hashed value store for attribute values (deduplication)
  • Equivalent to Blower's Central schema
  • Tables: items, attributes, categories, media, attribute_sets, value_store
  • Each entity: id (integer) + blower_id (source reference)
  • Purpose: Normalize Bronze data, assign IDs, enable efficient joins

Gold Layer (Business-Ready):

  • 13 Blower revision tables with normalized integer IDs
  • Full product dataset maintained (not just deltas)
  • Uses Silver IDs via joins (Bronze + Silver → Gold)
  • All tables have seen_status_id for delta tracking (add/delete/modified/unchanged)
  • Equivalent to Blower's Revision schema
  • Purpose: Optimized for queries, maintains full state with change tracking

Export Layer (Downstream Consumption):

  • dbt models that transform Gold → CSV format
  • Converts internal IDs back to source IDs (blower_id)
  • Resolves foreign keys to names (e.g., type_id → type name)
  • Removes internal columns (seen_status_id)
  • Equivalent to Blower's Export classes
  • Purpose: Downstream systems consume CSV exports (not direct DB access)

Target Schema

Blower Revision Schema (*_revision_XXX):

  • 13 normalized tables documented in docs/blower-target/
  • Core tables: items, attributes, categories, relationships, media
  • Multi-BU: Separate schema per business unit (bk_, gm_, ec_*, etc.)
  • EAV pattern for attributes with relationship tables
  • All tables include seen_status_id column for delta tracking

Blower Central Schema (Silver layer normalization):

  • Assigns integer IDs to all entities
  • value_store table with SHA-1 hashes for attribute values
  • Lookup tables: items, attributes, categories, media, attribute_sets
  • Each entity has id (integer) + blower_id (source reference)

Transformation Pattern (from Blower PIM source):

sql
-- Example: Bronze + Silver → Gold
INSERT INTO {revision}.items
  SELECT item.id, item.name, it.id, ag.id, @SEEN_ID
  FROM bronze.items b
    INNER JOIN silver.items item ON item.blower_id = b.blower_id
    INNER JOIN silver.item_types it ON it.name = b.item_type
    LEFT JOIN silver.attribute_sets ag ON ag.blower_id = b.attribute_set_blower_id

Functional Requirements

FR-1: Bronze Layer - Raw Data Ingestion

Priority: High
Description: Ingest inRiver JSONL export files from S3 to Bronze layer with audit metadata

Details:

  • Source: S3 bucket with inRiver exports (controlled by Watsco)
  • File types: Product, Item, ChannelNode, AttributeFile (JSON format)
  • Storage: PostgreSQL table with JSONB column + metadata columns
    • data (JSONB) - Raw inRiver JSON for SQL querying
    • load_date (timestamp) - When file was loaded
    • file_name (text) - Original S3 file name
    • business_unit (text) - BU identifier (BAK, CEFL, ECM, GEM)
    • s3_key (text) - S3 object key for reference
    • entity_type (text) - Product, ChannelNode, AttributeFile, etc.
  • Pattern: Immutable append-only (historical record of all deltas)
  • Trigger: S3 event notifications on file upload
  • Processing: Hourly batch processing acceptable
  • Rationale: JSONB enables efficient SQL querying for Silver transformations

FR-2: Silver Layer - Normalization and ID Assignment

Priority: High
Description: Normalize Bronze data and assign integer IDs to all entities

Details:

  • Input: Bronze layer (raw JSON with audit metadata)
  • Process: Parse JSON, extract entities, assign integer IDs
  • Tables:
    • items - id, blower_id (inRiver Product/Item ID), name, item_type
    • attributes - id, blower_id (inRiver Field ID), code, name, data_type
    • categories - id, blower_id (inRiver ChannelNode ID), name, parent_id
    • media - id, blower_id (inRiver Resource ID), filename, path
    • attribute_sets - id, blower_id (inRiver Structure ID), name
    • value_store - id, sha1_hash, value (deduplicated attribute values)
  • Pattern: Each entity gets integer id + source blower_id reference
  • Purpose: Enable efficient joins for Gold layer population

FR-3: Gold Layer - Business-Ready Data

Priority: High
Description: Transform Bronze + Silver to Gold layer matching Blower revision schema

Details:

  • Target: 13 normalized Blower tables (items, attributes, categories, relationships, media)
  • Schema: *_revision_XXX per business unit (e.g., bk_revision_2191)
  • Tables:
    • items - Core product/SKU data with Silver IDs
    • attributes, attribute_sets, attribute_set_attributes, attribute_roles - Attribute metadata
    • item_attributes - Product attribute values (EAV pattern with value_store IDs)
    • categories, category_attributes, item_categories - Category hierarchy
    • item_associations, item_bundles, item_groups - Product relationships
    • item_media - Product images/assets
    • report - Data quality reports
  • Transformation: Join Bronze entities with Silver IDs to populate Gold
  • Full dataset: Gold maintains complete product catalog (not just deltas)
  • Update strategy: Bronze deltas trigger merge/upsert into Gold
  • Seen status: All tables have seen_status_id (seen/added/deleted/modified/unchanged)
  • Purpose: Downstream systems consume Gold directly without transformation

FR-4: dbt Transformations (Bronze → Silver → Gold → Export)

Priority: High
Description: Transform raw inRiver JSON through four-stage pipeline using dbt

Stage 1: Bronze → Silver (Normalization):

  • Parse inRiver JSONB entities (Product, Item, ChannelNode, Resource, AttributeFile)
  • Use PostgreSQL JSONB operators for efficient extraction
  • Assign integer IDs to all entities
  • Create SHA-1 hashes for attribute values in value_store
  • Build lookup tables: items, attributes, categories, media, attribute_sets
  • Pattern: blower_id (source) → id (normalized integer)

Stage 2: Silver → Gold (Business-Ready):

  • Join Bronze entities with Silver IDs
  • Populate 13 Blower revision tables
  • Map inRiver entities to Blower tables:
    • Product/Item → items table
    • Field definitions → attributes, attribute_sets, attribute_roles
    • Field values → item_attributes (EAV with value_store IDs)
    • ChannelNode → categories, category_attributes, item_categories
    • Product links → item_associations, item_bundles, item_groups
    • Resources → item_media
  • Set seen_status_id = 'seen' for initial load

Stage 3: Delta Processing (Change Tracking):

  • Compare current Gold vs previous Gold
  • Update seen_status_id: added/deleted/modified/unchanged
  • Merge/upsert changed records
  • Hard delete removed records
  • Maintain full dataset in Gold

Stage 4: Gold → Export (CSV Generation):

  • Transform Gold tables to CSV format for downstream systems
  • Convert internal IDs back to source IDs (join Silver for blower_id)
  • Resolve foreign keys to names (e.g., type_id → type name)
  • Remove internal columns (seen_status_id)
  • Generate CSV files matching Blower export format
  • Upload to S3 or local directory

Success criteria: Functional parity with current Blower outputs

FR-5: Downstream System Outputs

Priority: High
Description: Generate CSV exports for multiple downstream systems

Details:

  • Export format: CSV files with headers (UTF-8 BOM)
  • Export types:
    • Full export: All data from Gold layer
    • Delta export: Only changed records (based on seen_status)
  • CSV structure: Denormalized, downstream-friendly format
    • Use source IDs (blower_id), not internal IDs
    • Resolve foreign keys to names (e.g., type_id → type name)
    • Exclude internal columns (seen_status_id)
  • Downstream systems:
    • E-commerce platforms: Magento, Shopware (consume CSV imports)
    • Search: Algolia indexes (consume CSV imports)
    • Other systems: Additional consumers with own ETL
  • Delivery: S3 upload or local file system
  • Equivalent to Blower's Export classes (Items, Attributes, Categories, etc.)

CSV Exporters (one per table):

  • items.csv, attributes.csv, attribute_sets.csv, attribute_set_attributes.csv, attribute_roles.csv
  • categories.csv, category_attributes.csv, item_categories.csv
  • item_attributes.csv, item_media.csv, item_associations.csv, item_bundles.csv, item_groups.csv

Priority: Medium
Description: Process ChannelNode relationships excluding outbound product links

Details:

  • Exclusion: Request Aperture Labs to exclude OutboundLinks to Products from ChannelNode exports
  • Rationale: Scalability concern with large link arrays (categories with 1000+ products)
  • Alternative: Process InboundLinks from Products only (via Product assortment)
  • Mapping: Product assortment → item_categories table

FR-7: AttributeFile Processing

Priority: High
Description: Load daily AttributeFile to Silver layer for field definitions

Details:

  • Frequency: Daily system-wide delivery
  • Purpose: Field definitions and metadata for all entity types
  • Storage: Load to Silver attributes, attribute_sets tables
  • Usage: Referenced by Bronze → Silver → Gold transformations

FR-8: Delta Update Processing

Priority: High
Description: Process incremental updates with insert, update, delete operations

Details:

  • Insert: Add new entities to Silver, propagate to Gold with seen_status = 'added'
  • Update: Merge changes to existing entities, mark seen_status = 'modified'
  • Delete: Hard delete removed entities, mark seen_status = 'deleted' before removal
  • Unchanged: Mark seen_status = 'unchanged' for records with no changes
  • State management: Track processed files in Bronze to avoid duplicates
  • Comparison: Compare current Gold vs previous Gold to determine changes
  • Pattern: Equivalent to Blower's History class logic

Non-Functional Requirements

NFR-1: Performance

Priority: Medium
Description: Process updates within hourly window

Details:

  • Data volume: <100K products per business unit
  • Processing frequency: Hourly batch
  • Latency: Hourly updates acceptable for downstream systems
  • Scalability: Design for current volume, plan for growth

NFR-2: Data Quality

Priority: High
Description: Ensure data parity with current Blower outputs

Details:

  • Validation: Schema validation, business rule checks
  • Testing: Data parity validation - output tables must be identical to Blower
  • Example: Item 1234 with 5 attributes and 2 media should match exactly
  • Error handling: Standard validation, quarantine bad records, alerting

NFR-3: Monitoring and Observability

Priority: Medium (MVP), High (Phase 2)
Description: Monitor pipeline health and data quality

MVP Requirements:

  • Airflow task success/failure monitoring
  • Data quality checks: record counts, schema validation
  • Basic alerting on failures

Phase 2 Requirements:

  • Comprehensive observability: metrics, logs, traces
  • SLA monitoring
  • Data quality dashboards

NFR-4: Reliability

Priority: High
Description: Ensure reliable data processing with error recovery

Details:

  • Error handling: Log errors, quarantine bad records, continue processing
  • Retry logic: Automated retry for transient failures
  • Alerting: Notify on persistent failures
  • Recovery: Manual intervention for quarantined records

NFR-5: Maintainability

Priority: High
Description: Code and infrastructure should be maintainable by team

Details:

  • Team: Mixed experience - familiar with Airflow/dbt but not this pattern
  • Documentation: Comprehensive setup and operational docs
  • Code quality: Automated testing, CI/CD pipeline
  • Infrastructure: Terraform for IaC (future consideration)

Technical Requirements

TR-1: Database

Technology: Aurora PostgreSQL
Rationale: Modern data stack alignment, superior JSON handling, downstream systems DB-agnostic

Details:

  • Managed service: AWS RDS Aurora PostgreSQL
  • Schema: Normalized tables with foreign keys
  • JSON support: JSONB for Bronze layer (native operators, indexing)
  • Array support: Native arrays for complex data structures
  • Indexing: Optimize for query patterns, JSONB indexes
  • Backup: Automated backups, point-in-time recovery
  • Strategic shift: Complete modernization (dbt + Postgres) since downstream systems consume CSV exports only

TR-2: Orchestration

Technology: AWS MWAA (Managed Airflow)
Rationale: Existing infrastructure, team familiarity

Details:

  • Environment: New MWAA instance (already provisioned)
  • DAGs: Python-based (not PHP-generated like Blower)
  • Tasks:
    • S3 ingestion: Python operator (~50 lines, load JSON to Bronze)
    • dbt transformations: dbt operator (Bronze → Silver → Gold)
    • Data quality checks: dbt tests
  • Scheduling: Hourly batch processing triggered by S3 events
  • Monitoring: Airflow UI + CloudWatch
  • Strategic shift: Python + dbt replaces PHP + Blower pattern

TR-3: Transformation

Technology: dbt (data build tool)
Rationale: Strategic move toward dbt-native pipeline to replace Blower pattern

Details:

  • Adapter: dbt-postgres for Aurora PostgreSQL
  • Models: SQL-based transformations for all three stages
    • Bronze → Silver: Parse JSONB, assign IDs, create value_store
    • Silver → Gold: Join and populate 13 revision tables
    • Delta tracking: Incremental models with seen_status logic
  • JSON handling: Native JSONB operators for parsing inRiver JSON
  • Testing: dbt tests for data quality
  • Documentation: Auto-generated from dbt models
  • Macros: Replace Blower event hooks with dbt macros
  • Strategic direction: Sets precedent for replacing other Blower sources with dbt

TR-4: Storage

Technology: AWS S3
Rationale: inRiver export delivery mechanism

Details:

  • Bucket: Watsco-controlled S3 bucket
  • Event notifications: S3 events trigger pipeline
  • Access: IAM roles for MWAA access
  • Retention: Define retention policy for processed files

TR-5: Deployment

Technology: CI/CD pipeline with automated testing
Rationale: Ensure quality and repeatability

Details:

  • Version control: Git repository
  • CI/CD: Automated testing on commit
  • Deployment: Automated deployment to MWAA
  • Testing: Unit tests (dbt, Python), integration tests, data parity validation

Project Constraints

Timeline

  • MVP: 2-4 weeks (basic pipeline, minimal monitoring, manual validation)
  • Phase 2: Iterate to add comprehensive monitoring and automated testing
  • Phased approach: Deliver working MVP quickly, enhance over time

Dependencies

  • Blower schema documentation: Available now (critical for design)
  • Aperture Labs coordination: Request ChannelNode OutboundLinks exclusion
  • Downstream system analysis: Understand transformation requirements

Team

  • Resources: Mixed team with Airflow/dbt experience but new to this pattern
  • Learning curve: Account for team learning in timeline
  • Support: May need external guidance on best practices

Success Criteria

Primary Success Criteria

  1. Functional Parity: Output tables identical to current Blower database
  2. Data Accuracy: Item-level validation (attributes, media, relationships match)
  3. Processing Reliability: Hourly updates complete successfully
  4. Downstream Compatibility: All consuming systems work without changes

MVP Success Criteria

  1. Basic pipeline operational (S3 → Aurora MySQL → dbt)
  2. Airflow monitoring + data quality checks
  3. Manual validation confirms data parity
  4. Single business unit operational

Phase 2 Success Criteria

  1. All business units operational
  2. Comprehensive monitoring and alerting
  3. Automated data parity testing
  4. CI/CD pipeline operational

Out of Scope (for MVP)

  • Full refresh mode (inRiver limitation)
  • Comprehensive monitoring (Phase 2)
  • Automated end-to-end testing (Phase 2)
  • Infrastructure as Code with Terraform (future consideration)
  • Near real-time processing < 1 hour (hourly acceptable)
  • Soft delete timestamps (hard deletes only)
  • ERP Data Integration (ec_erp_data, pp_p21_data schemas):
    • Current Blower has separate ERP schemas updated with each run
    • Tables: item_attributes, packages, supersedes
    • Decision: Address in Phase 2 after PIM replacement working
    • Options: Replicate to PostgreSQL, federated queries, or dual-database pattern

Open Questions

  1. Bronze Retention: How long should we retain Bronze layer data? (30 days, 90 days, indefinitely?)
  2. Initial Load: How to handle initial full load if inRiver only provides deltas? (Historical backfill strategy?)
  3. Late-Arriving Data: How to handle out-of-order updates in Bronze layer?
  4. ChannelNode Links: Confirm Aperture Labs can exclude OutboundLinks to Products
  5. Downstream Systems: Complete inventory of all consuming systems beyond Magento/Shopware/Algolia
  6. Transformation Logic: Document any complex business rules from current Blower PHP transformations
  7. Revision Cleanup: How many Gold revision schemas to keep? (Blower keeps N most recent)
  8. Value Store: Should we implement SHA-1 hashing for value_store or use simpler approach?

Next Steps

  1. Review and approve updated requirements with Bronze-Silver-Gold-Export four-stage architecture
  2. Proceed to User Stories phase (assess if needed)
  3. Design Bronze, Silver, Gold, and Export layer schemas
  4. Map inRiver JSON entities to Blower PIM2 CSV equivalents
  5. Design dbt transformation models for each stage:
    • Stage 1: Bronze → Silver (normalization, ID assignment)
    • Stage 2: Silver → Gold (join and populate revision tables)
    • Stage 3: Delta processing (seen_status tracking)
    • Stage 4: Gold → Export (CSV generation for downstream)
  6. Implement MVP pipeline

Document Version: 1.0
Last Updated: 2026-02-26
Author: AI-DLC Requirements Analysis