Skip to content

inRiver to Blower - Phase 1 (Fast Track)

Project: inRiver JSONL to Heiler CSV Converter
Date: 2026-03-03
Status: Draft - Awaiting Approval
Timeline: 2-3 days


Overview

Goal: Get inRiver PIM data working with existing Blower infrastructure as quickly as possible.

Approach:

  1. Load inRiver JSONL deltas → PostgreSQL accumulator database (JSONB)
  2. Maintain full dataset (merge/upsert deltas)
  3. Generate Heiler CSV files from full dataset
  4. Feed CSVs to existing Blower

Multi-Business Unit Architecture:

  • Single PostgreSQL instance with separate schemas per BU (BK, GM, EC, CEFL)
  • Each schema has independent:
    • Data tables (products, channel_nodes, attribute_file)
    • Event log (file_processing_events)
    • Configuration tables (field_mapping, decorator_data, csv_template_config)
    • Processing schedule (independent DAGs)
  • Rationale: Cost-effective, data isolation via schemas, matches Blower pattern

Key Insight: inRiver delivers deltas, Heiler/Blower expects full files. PostgreSQL accumulator bridges the gap.

Strategic Context:

  • Fast track to production (2-3 days per BU)
  • PostgreSQL databases become Bronze layer in Phase 2
  • No throwaway work - everything reused in Phase 2

Functional Requirements

FR-1: inRiver JSONL Ingestion (Per Business Unit)

Priority: High
Description: Read inRiver JSONL export files from S3 as they arrive (separate processing per BU)

Details:

  • Source: S3 bucket with inRiver exports (organized by BU)
  • File types: Product, ChannelNode, AttributeFile (JSONL format)
  • Trigger: S3 event notification (near real-time, per BU)
  • Processing: Load each delta file immediately to BU-specific PostgreSQL database
  • Frequency: On-demand (as files arrive throughout the day)
  • Business Units: BK, GM, EC, CEFL, CEMA, CESE, etc. (separate schemas)
  • Isolation: Each BU processes independently

FR-2: PostgreSQL Accumulator Database (Per Business Unit)

Priority: High
Description: Maintain full dataset by accumulating inRiver deltas in PostgreSQL (one schema per BU)

Details:

  • Single PostgreSQL instance with separate schemas per BU:
    • inriver_bk - Baker Distributing
    • inriver_gm - Gemaire
    • inriver_ec - East Coast Metal Distributors
    • inriver_cefl - Carrier Enterprise Florida (pending: confirm sub-unit separation)
  • Tables per schema: One per entity type (products, channel_nodes, attribute_file)
  • Schema: JSONB column + metadata + audit trail
    • id (text) - inRiver entity ID
    • data (JSONB) - Full inRiver JSON
    • updated_at (timestamp) - Last update time
    • source_file (text) - Most recent S3 file that updated this record
    • deleted (boolean) - Soft delete flag
  • Event Log Table: Track all file processing events
    • id (serial) - Event ID
    • event_time (timestamp) - When event occurred
    • event_type (text) - 'file_start', 'file_complete', 'file_error'
    • source_file (text) - S3 file path
    • entity_type (text) - products, channel_nodes, attribute_file
    • records_processed (integer) - Count of records
    • records_inserted (integer) - New records
    • records_updated (integer) - Updated records
    • records_deleted (integer) - Deleted records
    • error_message (text) - Error details if failed
    • replay_date (timestamp, nullable) - When marked for replay (NULL = valid)
    • metadata (JSONB) - Additional context
  • Operations:
    • Insert: New records from delta (track source_file)
    • Update: Merge changes (JSONB merge, update source_file)
    • Delete: Mark as deleted (soft delete, track source_file)
    • Log: Record every file processing event
  • Purpose: Convert deltas → full dataset + complete audit trail

FR-3: Delta Merge Logic with Ordered Processing

Priority: High
Description: Merge inRiver delta updates into full dataset with ordered processing and replay capability

Ordered Processing:

  • Files must be processed in chronological order (sequence matters for deltas)
  • Delta Loader queries event log to determine next file to process:
    sql
    -- Find last successfully processed file (not marked for replay)
    SELECT MAX(id) FROM file_processing_events 
    WHERE event_type = 'file_complete' 
      AND replay_date IS NULL;
    
    -- Process files after that point in order
  • Only process files that come after last valid event
  • Prevents duplicate processing and maintains data consistency

Replay Capability:

  • Can mark events for replay when bad data detected or reprocessing needed
  • Mark event and all following events:
    sql
    UPDATE file_processing_events 
    SET replay_date = NOW() 
    WHERE id >= [problem_event_id];
  • Delta Loader detects replay gap (no valid events after certain point)
  • Reprocesses files from marked point forward
  • Creates new events with replay_date = NULL
  • Original events preserved (never deleted) for audit trail

Merge Operations:

  • Upsert pattern: INSERT ... ON CONFLICT UPDATE
  • JSONB merge for updates (preserve unchanged fields)
  • Soft delete for removed records (deleted = true)
  • Track update timestamps and source file
  • Event logging for every file processed
  • Idempotent (can replay deltas safely)

Event Immutability:

  • Events are never deleted (permanent audit trail)
  • Replay marks events but preserves history
  • Can always trace processing history

FR-4: CSV Generation from PostgreSQL

Priority: High
Description: Query PostgreSQL full dataset to generate Heiler CSV files (nightly or on-demand)

Schedule:

  • Nightly cron OR manual trigger
  • Frequency: Once per day (or on-demand for testing/urgent updates)
  • Decoupled: Independent from delta loading (deltas accumulate throughout day)

Process:

  1. Query all non-deleted records from PostgreSQL
  2. Apply configuration from field_mapping, decorator_data, csv_template_config tables
  3. Transform JSONB → Heiler CSV columns
  4. Generate 12 Heiler CSV files

Output Files:

Daily Files (7 CSVs - shared across all BUs):

  • attributes.csv - Attribute definitions
  • attributemap.csv - Structure-to-attribute mappings
  • structures.csv - Category/structure definitions
  • items.csv - Core product/item data
  • Item_free_attributes.csv - Free-form attribute values
  • item_structure_features.csv - Structure-specific attribute values
  • media.csv - Product images/assets

Business Unit Files (5 CSVs - per BU):

  • itemdataflex.csv - BU-specific item data and flex fields
  • category.csv - BU-specific category hierarchy
  • purpose.csv - Attribute purpose metadata
  • references.csv - Product cross-references
  • bundles.csv - Product bundle definitions

Mapping:

  • inRiver Product → Heiler daily: items.csv, media.csv, Item_free_attributes.csv, item_structure_features.csv
  • inRiver Product → Heiler BU: itemdataflex.csv, references.csv, bundles.csv
  • inRiver ChannelNode → Heiler BU: category.csv
  • inRiver AttributeFile → Heiler daily: attributes.csv, attributemap.csv, structures.csv
  • Decorator data → Heiler BU: purpose.csv

CSV Format:

  • UTF-8 encoding
  • Headers matching Heiler PIM2 format
  • Column mapping per research/blower-source-pim2/PIM_STRUCTURE.md
  • Full dataset (not deltas)

Delivery:

  • Write to Blower PIM2 input directory (S3)
  • Trigger Blower import after files written (this is hadnled by an AWS Lambda Function)

FR-5: Configuration and Mapping Tables

Priority: High
Description: Maintain configuration for field mapping and decorator data

Field Mapping Table:

sql
CREATE TABLE field_mapping (
  id SERIAL PRIMARY KEY,
  inriver_entity TEXT,        -- 'Product', 'ChannelNode', 'AttributeFile'
  inriver_field TEXT,         -- inRiver field name
  heiler_csv TEXT,            -- Target Heiler CSV file
  heiler_column TEXT,         -- Target column in CSV
  transformation_rule TEXT,   -- Optional: transformation logic
  active BOOLEAN DEFAULT TRUE,
  notes TEXT
);

Purpose:

  • Map inRiver attributes to multiple Heiler CSVs
  • Example: inRiver Product attributes → attributes.csv, attributemap.csv, itemdataflex.csv
  • Handle one-to-many mappings (one inRiver field → multiple Heiler columns)

Decorator Data Table:

sql
CREATE TABLE decorator_data (
  id SERIAL PRIMARY KEY,
  entity_type TEXT,           -- 'attribute', 'item', 'category', etc.
  entity_id TEXT,             -- inRiver entity ID
  decorator_type TEXT,        -- 'purpose', 'role', etc.
  decorator_value TEXT,       -- The decorator value
  source TEXT,                -- Where this came from (manual, migration, etc.)
  created_at TIMESTAMP DEFAULT NOW(),
  notes TEXT
);

Purpose:

  • Store data that doesn't exist in inRiver (lost fidelity)
  • Example: Heiler "purpose" field not in inRiver
  • Manually maintained (for now)
  • Future: Web UI for management

CSV Template Config Table:

sql
CREATE TABLE csv_template_config (
  id SERIAL PRIMARY KEY,
  csv_name TEXT,              -- 'items.csv', 'itemdataflex.csv', etc.
  column_name TEXT,           -- Column in CSV
  source_type TEXT,           -- 'inriver_field', 'decorator', 'static', 'computed'
  source_value TEXT,          -- Field name, decorator type, or static value
  default_value TEXT,         -- Default if source is null
  transformation TEXT,        -- Optional: transformation logic
  required BOOLEAN DEFAULT FALSE,
  notes TEXT
);

Purpose:

  • Define CSV generation rules per column
  • Flexible: source from inRiver, decorator, static value, or computed
  • CSV Generator uses this config to build CSVs

Management:

  • Phase 1: Manual SQL inserts (load configuration data)
  • Future: Web UI for managing mappings and decorator data

FR-6: Error Handling

Priority: High
Description: Handle malformed JSON and conversion errors

Details:

  • Validate JSON structure
  • Log conversion errors
  • Quarantine bad records
  • Continue processing valid records
  • Alert on failures

Non-Functional Requirements

NFR-1: Performance

Priority: Medium
Description: Process deltas in near real-time, generate CSVs nightly

Details:

  • Delta Loading: <2 minutes per file (near real-time)
  • CSV Generation: <15 minutes (nightly batch)
  • Data volume: <100K products per BU
  • Acceptable: Deltas throughout day, CSVs once nightly
  • Decoupled: Delta loading doesn't block CSV generation

NFR-2: Reliability

Priority: High
Description: Ensure reliable conversion with error recovery and replay capability

Details:

  • Retry logic for transient failures
  • Idempotent processing (can rerun safely)
  • Ordered processing (files processed in sequence)
  • Replay capability (can reprocess from any point)
  • Event immutability (never delete events)
  • State tracking via event log (prevents duplicates)

NFR-3: Maintainability

Priority: Medium
Description: Simple, readable Python code

Details:

  • Clear mapping logic
  • Well-documented
  • Easy to debug
  • Minimal dependencies

Technical Requirements

TR-1: Database (Per Business Unit)

Technology: RDS PostgreSQL
Rationale: Accumulate deltas into full dataset, becomes Bronze layer in Phase 2

Details:

  • Architecture: Single RDS PostgreSQL instance with separate schemas per BU
  • Instance: db.t3.medium (shared across all BUs)
  • Schemas: inriver_bk, inriver_gm, inriver_ec, inriver_cefl, etc. (pending: sub-unit separation)
  • Each schema contains:
    • Data tables: products, channel_nodes, attribute_file (JSONB)
    • Event log: file_processing_events (audit trail + replay control)
    • Configuration tables:
      • field_mapping - Maps inRiver fields to Heiler CSV columns
      • decorator_data - Stores data lost in inRiver migration (e.g., purpose)
      • csv_template_config - Configuration for CSV generation rules
  • Schema:
    • Data tables: id, data (JSONB), updated_at, source_file, deleted
    • Event log: id, event_time, event_type, source_file, entity_type, records_processed, records_inserted, records_updated, records_deleted, error_message, replay_date (nullable), metadata (JSONB)
  • Indexes:
    • Primary key on id
    • Index on updated_at
    • Index on source_file
    • Index on event_time (event log)
    • Index on replay_date (event log) - for efficient query of valid events
  • Purpose: Delta accumulation, CSV generation source, complete audit trail, replay control

TR-2: Implementation

Technology: Python 3.11+
Rationale: Simple, fast to develop, team familiarity

Components:

  1. Delta Loader (~300 lines): S3 → PostgreSQL
    • Query event log to find next file to process
    • Enforce ordered processing (chronological)
    • Detect replay scenarios (gaps in valid events)
    • Upsert records with source_file tracking
    • Log events to file_processing_events table
    • Track insert/update/delete counts
    • Support replay from any point
  2. CSV Generator (~400 lines): PostgreSQL → Heiler CSVs
    • Query configuration tables (field_mapping, decorator_data, csv_template_config)
    • Apply mapping rules to transform inRiver JSON → Heiler CSV columns
    • Join decorator data where needed
    • Generate 7 daily CSVs + 5 BU-specific CSVs
  3. Configuration Loader (~100 lines): Load initial mapping data
    • SQL scripts to populate field_mapping table
    • SQL scripts to populate decorator_data table
    • SQL scripts to populate csv_template_config table
  4. Libraries: boto3 (S3), psycopg2 (PostgreSQL), pandas (CSV)

TR-3: Orchestration (Per Business Unit)

Technology: AWS MWAA (Managed Airflow)
Rationale: Existing infrastructure

Details:

  • Separate DAGs per Business Unit (BK, GM, EC, CEFL, etc.)

  • Each BU has two DAGs:

    DAG 1: Delta Loader (runs as files arrive)

    • Name: inriver_{bu}_delta_loader
    • Trigger: S3 event notification (near real-time, BU-specific path)
    • Task: Load delta file → BU-specific PostgreSQL database
    • Frequency: On-demand (per file)
    • Duration: ~1-2 minutes per file

    DAG 2: CSV Generator (runs nightly or on-demand)

    • Name: inriver_{bu}_csv_generator
    • Trigger: Cron schedule (nightly) OR manual trigger
    • Tasks:
      1. Generate Heiler CSVs from BU PostgreSQL database
      2. Upload CSVs to BU-specific Blower input location
      3. Trigger existing BU Blower import DAG
    • Frequency: Nightly (or on-demand)
    • Duration: ~10-15 minutes
  • Total DAGs: 8+ (4+ BUs × 2 DAGs each, pending sub-unit clarification)

  • Decoupled Design: Delta loading independent from CSV generation

  • BU Isolation: Each BU processes independently, no cross-BU dependencies

  • Benefit: Accumulate deltas throughout day, generate full CSVs once nightly per BU

  • Monitoring: Airflow UI + CloudWatch (per BU)

TR-4: Storage

Technology: AWS S3
Rationale: inRiver delivery mechanism

Details:

  • Input: inRiver JSONL files
  • Output: Heiler CSV files
  • Blower reads from S3 or local mount

Mapping Requirements

inRiver → Heiler Field Mapping

Product Entity:

  • ProductId → items.csv: Item#
  • ProductId → itemdataflex.csv: Item#
  • ProductId → media.csv: Item_Product
  • ProductId → item-attributes.csv: Item_Product#
  • FieldValues → item-attributes.csv: Attribute_Name, Default_Value
  • Items[] → Flatten to items.csv (one row per Item, merge Product-level data where Item data not present - Item data takes precedence)
  • Items.Resources.ResourceFilename → media.csv: File_Name
  • Items.Resources.ResourceResourceType → media.csv: Image_Type

ChannelNode Entity:

  • ChannelNodeID → category.csv: Structure_ID
  • ChannelNodeName → category.csv: Structure_Name
  • InboundLinks.SourceEntityId → category.csv: Parent_Structure_ID (*ref)

AttributeFile Entity:

  • FieldTypes[] → attributes.csv: AttributeID, AttributeName, AttributeDataType
  • StructureEntities[] → structures.csv: StructureID, StructureName

Open Mapping Questions:

  1. Attribute ID Source: inRiver doesn't export attribute IDs - need resolution (see Risk 5)
  2. Attribute Splitting: How to determine which attributes go to Item_free_attributes.csv vs item_structure_features.csv?
    • Hypothesis: Structure-specific attributes → item_structure_features.csv
    • Hypothesis: Free-form attributes → Item_free_attributes.csv
    • Need to validate with sample data analysis

Data Type Mapping

inRiver TypeHeiler Type
StringCHARACTER_STRING
IntegerINTEGER
DoubleDECIMAL
BooleanBOOLEAN
CVLCVL (lookup)
LocaleStringCHARACTER_STRING (per locale)

Success Criteria

Phase 1 Success

  1. ✅ Converter processes inRiver JSONL files
  2. ✅ PostgreSQL accumulates deltas into full dataset
  3. ✅ Event log tracks all file processing
  4. ✅ Generates valid Heiler CSV files
  5. ✅ Blower imports CSVs successfully
  6. ✅ Downstream systems receive data (no changes needed)
  7. ✅ Single BU operational (BAK or GEM)
  8. ✅ Audit trail complete (source_file + event log)

Acceptance Criteria

  • Item count matches between inRiver and Blower
  • Attribute values match
  • Category hierarchy correct
  • Media files linked properly
  • Delta updates work (add/update/delete)
  • Every record tracks source_file
  • Event log shows complete processing history

Out of Scope (Phase 1)

  • Database changes to Blower (use existing Blower)
  • dbt transformations (use existing Blower)
  • Full pipeline architecture (Phase 2)
  • Comprehensive monitoring (basic logging only)
  • Automated testing (manual validation)
  • Web UI for configuration management (manual SQL for Phase 1, future enhancement)
  • Complex transformation logic (keep simple, iterate)
  • Multi-BU simultaneous rollout (start with single BU, then expand)

Risks and Mitigations

Risk 1: Incomplete Field Mapping

Mitigation: Reference Heiler PIM2 structure docs, validate with sample data

Risk 2: Delta Handling Complexity

Mitigation: Start with full files, add delta logic incrementally

Risk 3: Blower Compatibility

Mitigation: Match Heiler CSV format exactly, test with existing Blower

Risk 4: Performance Issues

Mitigation: Process in batches, optimize for large files

Risk 5: Missing Attribute IDs from inRiver

Issue: Current PIM attribute IDs are not exported by inRiver, but Heiler CSVs require AttributeID
Impact: Cannot populate attributes.csv, attributemap.csv, Item_free_attributes.csv, item_structure_features.csv without attribute IDs
Mitigation:

  • Option A: Request inRiver to include attribute IDs in AttributeFile export
  • Option B: Generate synthetic attribute IDs in Silver layer (hash-based or sequential)
  • Option C: Use attribute names as IDs (if acceptable to downstream systems)
    Status: ⚠️ Critical blocker - needs resolution before implementation

Risk 6: Legacy ID Preservation and Migration

Issue: Heiler PIM IDs are used in permalinks, Blower joins, and downstream systems
Context:

  • Migrated products: Have reference to old Heiler IDs (stored in inRiver item attribute)
  • New products: Will not have Heiler IDs (never existed in old system)
  • Category lineage: Blower likely uses old Heiler category IDs for linking

Impact:

  • Breaking permalinks for existing products
  • Breaking Blower joins and relationships
  • Breaking category hierarchy references
  • Inconsistent ID strategy (old vs new products)

Mitigation:

  • Phase 1 Approach:
    • Use old Heiler ID when present in inRiver attribute
    • Generate new ID when not present (sequential or hash-based)
    • Track ID source in metadata (migrated vs new)
  • Blower Changes Required:
    • Update join logic to handle both old and new ID patterns
    • Update category linking to use new inRiver category IDs
    • Maintain backward compatibility during transition
  • Downstream Impact:
    • Permalinks: May need redirect strategy for new products
    • E-commerce: May need ID mapping tables

Status: ⚠️ High risk - cannot fully resolve until implementation starts, needs discovery phase

Open Questions:

  1. What inRiver attribute stores the legacy Heiler ID?
  2. What is the ID format/pattern for legacy vs new?
  3. Which downstream systems depend on Heiler IDs?
  4. What is the permalink structure and redirect strategy?
  5. How are category IDs used in Blower joins?

Dependencies

  1. inRiver S3 Access: Credentials and bucket access (organized by BU)
  2. RDS PostgreSQL: Provision single instance (db.t3.medium)
    • Create schemas: inriver_bk, inriver_gm, inriver_ec, inriver_cefl (pending: sub-unit clarification)
    • Each schema contains all tables (data, event log, configuration)
  3. Blower PIM2 Docs: CSV format specification (available in research/)
  4. Sample Files: inRiver JSONL samples (available in samples/aperture-export-ecom), Heiler CSV samples (available in samples/heiler-pim)
  5. Blower Input Locations: S3 paths per BU (BK, GM, EC, CEFL, etc.)

Next Steps

  1. Review and approve Phase 1 requirements
  2. Resolve Critical Blockers:
    • Attribute ID issue (see Risk 5)
    • Sub-business unit file separation (see questions.md)
  3. Provision Infrastructure (per BU):
    • Single PostgreSQL instance (db.t3.medium)
    • Create schemas: inriver_bk, inriver_gm, inriver_ec, inriver_cefl (pending sub-unit clarification)
    • S3 paths for inRiver input
    • S3 paths for Heiler CSV output
  4. Build Components:
    • Delta Loader Python script
    • CSV Generator Python script
    • Airflow DAGs (8+ total: 4+ BUs × 2 DAGs, pending sub-unit clarification)
  5. Test with Single BU (BK or GM):
    • Test with sample inRiver files
    • Validate Heiler CSV output against samples
    • Test with Blower import
  6. Deploy to Production (single BU first)
  7. Monitor and Iterate
  8. Roll Out to Remaining BUs (EC, CEFL, remaining)

Document Version: 1.0
Last Updated: 2026-03-03
Author: AI-DLC Requirements Analysis