inRiver ETL Pipeline - Phase 2 (Full Modernization)
Project: inRiver PIM to Aurora PostgreSQL ETL Pipeline
Date: 2026-02-26
Last Updated: 2026-03-03 (Split into Phase 2)
Status: Future - After Phase 1 Complete
Timeline: 4-6 weeks
Overview
Goal: Replace Blower PHP/MySQL infrastructure with modern dbt + PostgreSQL pipeline.
Strategic Context: Phase 1 (Python converter) gets inRiver working quickly. Phase 2 modernizes the entire stack and sets precedent for replacing other Blower sources.
Prerequisites: Phase 1 operational and stable
Intent Analysis
User Request Summary
Migrate from current Blower ETL system (nightly full load, PHP-generated Airflow DAGs) to new inRiver PIM system (delta load, near real-time updates). Load inRiver JSONL exports into Aurora MySQL database and use dbt transformations to maintain functional parity with current Blower database outputs.
Request Classification
- Type: Migration + New ETL Pipeline Development
- Scope: System-wide (new data pipeline with multiple components)
- Complexity: Complex (multi-technology integration, data model transformation, multiple downstream systems)
Architectural Shift
- From: Nightly full load batch processing
- To: Hourly delta load incremental processing
- Impact: Requires merge/upsert logic, state management, change data capture patterns
Medallion Architecture (Bronze-Silver-Gold + Export)
Bronze Layer (Raw Landing Zone):
- Raw inRiver JSON files with audit metadata (load_date, file_name, business_unit, s3_key)
- Storage: PostgreSQL JSONB column for efficient querying
- Immutable append-only storage
- Equivalent to Blower's PIM2 schema
- Purpose: Historical record of all deltas received
Silver Layer (Normalization - REQUIRED):
- Normalized entities with integer ID assignment
- SHA-1 hashed value store for attribute values (deduplication)
- Equivalent to Blower's Central schema
- Tables: items, attributes, categories, media, attribute_sets, value_store
- Each entity:
id(integer) +blower_id(source reference) - Purpose: Normalize Bronze data, assign IDs, enable efficient joins
Gold Layer (Business-Ready):
- 13 Blower revision tables with normalized integer IDs
- Full product dataset maintained (not just deltas)
- Uses Silver IDs via joins (Bronze + Silver → Gold)
- All tables have
seen_status_idfor delta tracking (add/delete/modified/unchanged) - Equivalent to Blower's Revision schema
- Purpose: Optimized for queries, maintains full state with change tracking
Export Layer (Downstream Consumption):
- dbt models that transform Gold → CSV format
- Converts internal IDs back to source IDs (blower_id)
- Resolves foreign keys to names (e.g., type_id → type name)
- Removes internal columns (seen_status_id)
- Equivalent to Blower's Export classes
- Purpose: Downstream systems consume CSV exports (not direct DB access)
Target Schema
Blower Revision Schema (*_revision_XXX):
- 13 normalized tables documented in
docs/blower-target/ - Core tables: items, attributes, categories, relationships, media
- Multi-BU: Separate schema per business unit (bk_, gm_, ec_*, etc.)
- EAV pattern for attributes with relationship tables
- All tables include
seen_status_idcolumn for delta tracking
Blower Central Schema (Silver layer normalization):
- Assigns integer IDs to all entities
value_storetable with SHA-1 hashes for attribute values- Lookup tables: items, attributes, categories, media, attribute_sets
- Each entity has
id(integer) +blower_id(source reference)
Transformation Pattern (from Blower PIM source):
-- Example: Bronze + Silver → Gold
INSERT INTO {revision}.items
SELECT item.id, item.name, it.id, ag.id, @SEEN_ID
FROM bronze.items b
INNER JOIN silver.items item ON item.blower_id = b.blower_id
INNER JOIN silver.item_types it ON it.name = b.item_type
LEFT JOIN silver.attribute_sets ag ON ag.blower_id = b.attribute_set_blower_idFunctional Requirements
FR-1: Bronze Layer - Raw Data Ingestion
Priority: High
Description: Ingest inRiver JSONL export files from S3 to Bronze layer with audit metadata
Details:
- Source: S3 bucket with inRiver exports (controlled by Watsco)
- File types: Product, Item, ChannelNode, AttributeFile (JSON format)
- Storage: PostgreSQL table with JSONB column + metadata columns
data(JSONB) - Raw inRiver JSON for SQL queryingload_date(timestamp) - When file was loadedfile_name(text) - Original S3 file namebusiness_unit(text) - BU identifier (BAK, CEFL, ECM, GEM)s3_key(text) - S3 object key for referenceentity_type(text) - Product, ChannelNode, AttributeFile, etc.
- Pattern: Immutable append-only (historical record of all deltas)
- Trigger: S3 event notifications on file upload
- Processing: Hourly batch processing acceptable
- Rationale: JSONB enables efficient SQL querying for Silver transformations
FR-2: Silver Layer - Normalization and ID Assignment
Priority: High
Description: Normalize Bronze data and assign integer IDs to all entities
Details:
- Input: Bronze layer (raw JSON with audit metadata)
- Process: Parse JSON, extract entities, assign integer IDs
- Tables:
items- id, blower_id (inRiver Product/Item ID), name, item_typeattributes- id, blower_id (inRiver Field ID), code, name, data_typecategories- id, blower_id (inRiver ChannelNode ID), name, parent_idmedia- id, blower_id (inRiver Resource ID), filename, pathattribute_sets- id, blower_id (inRiver Structure ID), namevalue_store- id, sha1_hash, value (deduplicated attribute values)
- Pattern: Each entity gets integer
id+ sourceblower_idreference - Purpose: Enable efficient joins for Gold layer population
FR-3: Gold Layer - Business-Ready Data
Priority: High
Description: Transform Bronze + Silver to Gold layer matching Blower revision schema
Details:
- Target: 13 normalized Blower tables (items, attributes, categories, relationships, media)
- Schema:
*_revision_XXXper business unit (e.g.,bk_revision_2191) - Tables:
items- Core product/SKU data with Silver IDsattributes,attribute_sets,attribute_set_attributes,attribute_roles- Attribute metadataitem_attributes- Product attribute values (EAV pattern with value_store IDs)categories,category_attributes,item_categories- Category hierarchyitem_associations,item_bundles,item_groups- Product relationshipsitem_media- Product images/assetsreport- Data quality reports
- Transformation: Join Bronze entities with Silver IDs to populate Gold
- Full dataset: Gold maintains complete product catalog (not just deltas)
- Update strategy: Bronze deltas trigger merge/upsert into Gold
- Seen status: All tables have
seen_status_id(seen/added/deleted/modified/unchanged) - Purpose: Downstream systems consume Gold directly without transformation
FR-4: dbt Transformations (Bronze → Silver → Gold → Export)
Priority: High
Description: Transform raw inRiver JSON through four-stage pipeline using dbt
Stage 1: Bronze → Silver (Normalization):
- Parse inRiver JSONB entities (Product, Item, ChannelNode, Resource, AttributeFile)
- Use PostgreSQL JSONB operators for efficient extraction
- Assign integer IDs to all entities
- Create SHA-1 hashes for attribute values in value_store
- Build lookup tables: items, attributes, categories, media, attribute_sets
- Pattern:
blower_id(source) →id(normalized integer)
Stage 2: Silver → Gold (Business-Ready):
- Join Bronze entities with Silver IDs
- Populate 13 Blower revision tables
- Map inRiver entities to Blower tables:
- Product/Item → items table
- Field definitions → attributes, attribute_sets, attribute_roles
- Field values → item_attributes (EAV with value_store IDs)
- ChannelNode → categories, category_attributes, item_categories
- Product links → item_associations, item_bundles, item_groups
- Resources → item_media
- Set seen_status_id = 'seen' for initial load
Stage 3: Delta Processing (Change Tracking):
- Compare current Gold vs previous Gold
- Update seen_status_id: added/deleted/modified/unchanged
- Merge/upsert changed records
- Hard delete removed records
- Maintain full dataset in Gold
Stage 4: Gold → Export (CSV Generation):
- Transform Gold tables to CSV format for downstream systems
- Convert internal IDs back to source IDs (join Silver for blower_id)
- Resolve foreign keys to names (e.g., type_id → type name)
- Remove internal columns (seen_status_id)
- Generate CSV files matching Blower export format
- Upload to S3 or local directory
Success criteria: Functional parity with current Blower outputs
FR-5: Downstream System Outputs
Priority: High
Description: Generate CSV exports for multiple downstream systems
Details:
- Export format: CSV files with headers (UTF-8 BOM)
- Export types:
- Full export: All data from Gold layer
- Delta export: Only changed records (based on seen_status)
- CSV structure: Denormalized, downstream-friendly format
- Use source IDs (blower_id), not internal IDs
- Resolve foreign keys to names (e.g., type_id → type name)
- Exclude internal columns (seen_status_id)
- Downstream systems:
- E-commerce platforms: Magento, Shopware (consume CSV imports)
- Search: Algolia indexes (consume CSV imports)
- Other systems: Additional consumers with own ETL
- Delivery: S3 upload or local file system
- Equivalent to Blower's Export classes (Items, Attributes, Categories, etc.)
CSV Exporters (one per table):
- items.csv, attributes.csv, attribute_sets.csv, attribute_set_attributes.csv, attribute_roles.csv
- categories.csv, category_attributes.csv, item_categories.csv
- item_attributes.csv, item_media.csv, item_associations.csv, item_bundles.csv, item_groups.csv
FR-6: ChannelNode Link Handling
Priority: Medium
Description: Process ChannelNode relationships excluding outbound product links
Details:
- Exclusion: Request Aperture Labs to exclude OutboundLinks to Products from ChannelNode exports
- Rationale: Scalability concern with large link arrays (categories with 1000+ products)
- Alternative: Process InboundLinks from Products only (via Product assortment)
- Mapping: Product assortment → item_categories table
FR-7: AttributeFile Processing
Priority: High
Description: Load daily AttributeFile to Silver layer for field definitions
Details:
- Frequency: Daily system-wide delivery
- Purpose: Field definitions and metadata for all entity types
- Storage: Load to Silver attributes, attribute_sets tables
- Usage: Referenced by Bronze → Silver → Gold transformations
FR-8: Delta Update Processing
Priority: High
Description: Process incremental updates with insert, update, delete operations
Details:
- Insert: Add new entities to Silver, propagate to Gold with seen_status = 'added'
- Update: Merge changes to existing entities, mark seen_status = 'modified'
- Delete: Hard delete removed entities, mark seen_status = 'deleted' before removal
- Unchanged: Mark seen_status = 'unchanged' for records with no changes
- State management: Track processed files in Bronze to avoid duplicates
- Comparison: Compare current Gold vs previous Gold to determine changes
- Pattern: Equivalent to Blower's History class logic
Non-Functional Requirements
NFR-1: Performance
Priority: Medium
Description: Process updates within hourly window
Details:
- Data volume: <100K products per business unit
- Processing frequency: Hourly batch
- Latency: Hourly updates acceptable for downstream systems
- Scalability: Design for current volume, plan for growth
NFR-2: Data Quality
Priority: High
Description: Ensure data parity with current Blower outputs
Details:
- Validation: Schema validation, business rule checks
- Testing: Data parity validation - output tables must be identical to Blower
- Example: Item 1234 with 5 attributes and 2 media should match exactly
- Error handling: Standard validation, quarantine bad records, alerting
NFR-3: Monitoring and Observability
Priority: Medium (MVP), High (Phase 2)
Description: Monitor pipeline health and data quality
MVP Requirements:
- Airflow task success/failure monitoring
- Data quality checks: record counts, schema validation
- Basic alerting on failures
Phase 2 Requirements:
- Comprehensive observability: metrics, logs, traces
- SLA monitoring
- Data quality dashboards
NFR-4: Reliability
Priority: High
Description: Ensure reliable data processing with error recovery
Details:
- Error handling: Log errors, quarantine bad records, continue processing
- Retry logic: Automated retry for transient failures
- Alerting: Notify on persistent failures
- Recovery: Manual intervention for quarantined records
NFR-5: Maintainability
Priority: High
Description: Code and infrastructure should be maintainable by team
Details:
- Team: Mixed experience - familiar with Airflow/dbt but not this pattern
- Documentation: Comprehensive setup and operational docs
- Code quality: Automated testing, CI/CD pipeline
- Infrastructure: Terraform for IaC (future consideration)
Technical Requirements
TR-1: Database
Technology: Aurora PostgreSQL
Rationale: Modern data stack alignment, superior JSON handling, downstream systems DB-agnostic
Details:
- Managed service: AWS RDS Aurora PostgreSQL
- Schema: Normalized tables with foreign keys
- JSON support: JSONB for Bronze layer (native operators, indexing)
- Array support: Native arrays for complex data structures
- Indexing: Optimize for query patterns, JSONB indexes
- Backup: Automated backups, point-in-time recovery
- Strategic shift: Complete modernization (dbt + Postgres) since downstream systems consume CSV exports only
TR-2: Orchestration
Technology: AWS MWAA (Managed Airflow)
Rationale: Existing infrastructure, team familiarity
Details:
- Environment: New MWAA instance (already provisioned)
- DAGs: Python-based (not PHP-generated like Blower)
- Tasks:
- S3 ingestion: Python operator (~50 lines, load JSON to Bronze)
- dbt transformations: dbt operator (Bronze → Silver → Gold)
- Data quality checks: dbt tests
- Scheduling: Hourly batch processing triggered by S3 events
- Monitoring: Airflow UI + CloudWatch
- Strategic shift: Python + dbt replaces PHP + Blower pattern
TR-3: Transformation
Technology: dbt (data build tool)
Rationale: Strategic move toward dbt-native pipeline to replace Blower pattern
Details:
- Adapter: dbt-postgres for Aurora PostgreSQL
- Models: SQL-based transformations for all three stages
- Bronze → Silver: Parse JSONB, assign IDs, create value_store
- Silver → Gold: Join and populate 13 revision tables
- Delta tracking: Incremental models with seen_status logic
- JSON handling: Native JSONB operators for parsing inRiver JSON
- Testing: dbt tests for data quality
- Documentation: Auto-generated from dbt models
- Macros: Replace Blower event hooks with dbt macros
- Strategic direction: Sets precedent for replacing other Blower sources with dbt
TR-4: Storage
Technology: AWS S3
Rationale: inRiver export delivery mechanism
Details:
- Bucket: Watsco-controlled S3 bucket
- Event notifications: S3 events trigger pipeline
- Access: IAM roles for MWAA access
- Retention: Define retention policy for processed files
TR-5: Deployment
Technology: CI/CD pipeline with automated testing
Rationale: Ensure quality and repeatability
Details:
- Version control: Git repository
- CI/CD: Automated testing on commit
- Deployment: Automated deployment to MWAA
- Testing: Unit tests (dbt, Python), integration tests, data parity validation
Project Constraints
Timeline
- MVP: 2-4 weeks (basic pipeline, minimal monitoring, manual validation)
- Phase 2: Iterate to add comprehensive monitoring and automated testing
- Phased approach: Deliver working MVP quickly, enhance over time
Dependencies
- Blower schema documentation: Available now (critical for design)
- Aperture Labs coordination: Request ChannelNode OutboundLinks exclusion
- Downstream system analysis: Understand transformation requirements
Team
- Resources: Mixed team with Airflow/dbt experience but new to this pattern
- Learning curve: Account for team learning in timeline
- Support: May need external guidance on best practices
Success Criteria
Primary Success Criteria
- Functional Parity: Output tables identical to current Blower database
- Data Accuracy: Item-level validation (attributes, media, relationships match)
- Processing Reliability: Hourly updates complete successfully
- Downstream Compatibility: All consuming systems work without changes
MVP Success Criteria
- Basic pipeline operational (S3 → Aurora MySQL → dbt)
- Airflow monitoring + data quality checks
- Manual validation confirms data parity
- Single business unit operational
Phase 2 Success Criteria
- All business units operational
- Comprehensive monitoring and alerting
- Automated data parity testing
- CI/CD pipeline operational
Out of Scope (for MVP)
- Full refresh mode (inRiver limitation)
- Comprehensive monitoring (Phase 2)
- Automated end-to-end testing (Phase 2)
- Infrastructure as Code with Terraform (future consideration)
- Near real-time processing < 1 hour (hourly acceptable)
- Soft delete timestamps (hard deletes only)
- ERP Data Integration (ec_erp_data, pp_p21_data schemas):
- Current Blower has separate ERP schemas updated with each run
- Tables: item_attributes, packages, supersedes
- Decision: Address in Phase 2 after PIM replacement working
- Options: Replicate to PostgreSQL, federated queries, or dual-database pattern
Open Questions
- Bronze Retention: How long should we retain Bronze layer data? (30 days, 90 days, indefinitely?)
- Initial Load: How to handle initial full load if inRiver only provides deltas? (Historical backfill strategy?)
- Late-Arriving Data: How to handle out-of-order updates in Bronze layer?
- ChannelNode Links: Confirm Aperture Labs can exclude OutboundLinks to Products
- Downstream Systems: Complete inventory of all consuming systems beyond Magento/Shopware/Algolia
- Transformation Logic: Document any complex business rules from current Blower PHP transformations
- Revision Cleanup: How many Gold revision schemas to keep? (Blower keeps N most recent)
- Value Store: Should we implement SHA-1 hashing for value_store or use simpler approach?
Next Steps
- Review and approve updated requirements with Bronze-Silver-Gold-Export four-stage architecture
- Proceed to User Stories phase (assess if needed)
- Design Bronze, Silver, Gold, and Export layer schemas
- Map inRiver JSON entities to Blower PIM2 CSV equivalents
- Design dbt transformation models for each stage:
- Stage 1: Bronze → Silver (normalization, ID assignment)
- Stage 2: Silver → Gold (join and populate revision tables)
- Stage 3: Delta processing (seen_status tracking)
- Stage 4: Gold → Export (CSV generation for downstream)
- Implement MVP pipeline
Document Version: 1.0
Last Updated: 2026-02-26
Author: AI-DLC Requirements Analysis