inRiver to Blower - Phase 1 (Fast Track)
Project: inRiver JSONL to Heiler CSV Converter
Date: 2026-03-03
Status: Draft - Awaiting Approval
Timeline: 2-3 days
Overview
Goal: Get inRiver PIM data working with existing Blower infrastructure as quickly as possible.
Approach:
- Load inRiver JSONL deltas → PostgreSQL accumulator database (JSONB)
- Maintain full dataset (merge/upsert deltas)
- Generate Heiler CSV files from full dataset
- Feed CSVs to existing Blower
Multi-Business Unit Architecture:
- Single PostgreSQL instance with separate schemas per BU (BK, GM, EC, CEFL)
- Each schema has independent:
- Data tables (products, channel_nodes, attribute_file)
- Event log (file_processing_events)
- Configuration tables (field_mapping, decorator_data, csv_template_config)
- Processing schedule (independent DAGs)
- Rationale: Cost-effective, data isolation via schemas, matches Blower pattern
Key Insight: inRiver delivers deltas, Heiler/Blower expects full files. PostgreSQL accumulator bridges the gap.
Strategic Context:
- Fast track to production (2-3 days per BU)
- PostgreSQL databases become Bronze layer in Phase 2
- No throwaway work - everything reused in Phase 2
Functional Requirements
FR-1: inRiver JSONL Ingestion (Per Business Unit)
Priority: High
Description: Read inRiver JSONL export files from S3 as they arrive (separate processing per BU)
Details:
- Source: S3 bucket with inRiver exports (organized by BU)
- File types: Product, ChannelNode, AttributeFile (JSONL format)
- Trigger: S3 event notification (near real-time, per BU)
- Processing: Load each delta file immediately to BU-specific PostgreSQL database
- Frequency: On-demand (as files arrive throughout the day)
- Business Units: BK, GM, EC, CEFL, CEMA, CESE, etc. (separate schemas)
- Isolation: Each BU processes independently
FR-2: PostgreSQL Accumulator Database (Per Business Unit)
Priority: High
Description: Maintain full dataset by accumulating inRiver deltas in PostgreSQL (one schema per BU)
Details:
- Single PostgreSQL instance with separate schemas per BU:
inriver_bk- Baker Distributinginriver_gm- Gemaireinriver_ec- East Coast Metal Distributorsinriver_cefl- Carrier Enterprise Florida (pending: confirm sub-unit separation)
- Tables per schema: One per entity type (products, channel_nodes, attribute_file)
- Schema: JSONB column + metadata + audit trail
id(text) - inRiver entity IDdata(JSONB) - Full inRiver JSONupdated_at(timestamp) - Last update timesource_file(text) - Most recent S3 file that updated this recorddeleted(boolean) - Soft delete flag
- Event Log Table: Track all file processing events
id(serial) - Event IDevent_time(timestamp) - When event occurredevent_type(text) - 'file_start', 'file_complete', 'file_error'source_file(text) - S3 file pathentity_type(text) - products, channel_nodes, attribute_filerecords_processed(integer) - Count of recordsrecords_inserted(integer) - New recordsrecords_updated(integer) - Updated recordsrecords_deleted(integer) - Deleted recordserror_message(text) - Error details if failedreplay_date(timestamp, nullable) - When marked for replay (NULL = valid)metadata(JSONB) - Additional context
- Operations:
- Insert: New records from delta (track source_file)
- Update: Merge changes (JSONB merge, update source_file)
- Delete: Mark as deleted (soft delete, track source_file)
- Log: Record every file processing event
- Purpose: Convert deltas → full dataset + complete audit trail
FR-3: Delta Merge Logic with Ordered Processing
Priority: High
Description: Merge inRiver delta updates into full dataset with ordered processing and replay capability
Ordered Processing:
- Files must be processed in chronological order (sequence matters for deltas)
- Delta Loader queries event log to determine next file to process:sql
-- Find last successfully processed file (not marked for replay) SELECT MAX(id) FROM file_processing_events WHERE event_type = 'file_complete' AND replay_date IS NULL; -- Process files after that point in order - Only process files that come after last valid event
- Prevents duplicate processing and maintains data consistency
Replay Capability:
- Can mark events for replay when bad data detected or reprocessing needed
- Mark event and all following events:sql
UPDATE file_processing_events SET replay_date = NOW() WHERE id >= [problem_event_id]; - Delta Loader detects replay gap (no valid events after certain point)
- Reprocesses files from marked point forward
- Creates new events with
replay_date = NULL - Original events preserved (never deleted) for audit trail
Merge Operations:
- Upsert pattern: INSERT ... ON CONFLICT UPDATE
- JSONB merge for updates (preserve unchanged fields)
- Soft delete for removed records (deleted = true)
- Track update timestamps and source file
- Event logging for every file processed
- Idempotent (can replay deltas safely)
Event Immutability:
- Events are never deleted (permanent audit trail)
- Replay marks events but preserves history
- Can always trace processing history
FR-4: CSV Generation from PostgreSQL
Priority: High
Description: Query PostgreSQL full dataset to generate Heiler CSV files (nightly or on-demand)
Schedule:
- Nightly cron OR manual trigger
- Frequency: Once per day (or on-demand for testing/urgent updates)
- Decoupled: Independent from delta loading (deltas accumulate throughout day)
Process:
- Query all non-deleted records from PostgreSQL
- Apply configuration from field_mapping, decorator_data, csv_template_config tables
- Transform JSONB → Heiler CSV columns
- Generate 12 Heiler CSV files
Output Files:
Daily Files (7 CSVs - shared across all BUs):
- attributes.csv - Attribute definitions
- attributemap.csv - Structure-to-attribute mappings
- structures.csv - Category/structure definitions
- items.csv - Core product/item data
- Item_free_attributes.csv - Free-form attribute values
- item_structure_features.csv - Structure-specific attribute values
- media.csv - Product images/assets
Business Unit Files (5 CSVs - per BU):
- itemdataflex.csv - BU-specific item data and flex fields
- category.csv - BU-specific category hierarchy
- purpose.csv - Attribute purpose metadata
- references.csv - Product cross-references
- bundles.csv - Product bundle definitions
Mapping:
- inRiver Product → Heiler daily: items.csv, media.csv, Item_free_attributes.csv, item_structure_features.csv
- inRiver Product → Heiler BU: itemdataflex.csv, references.csv, bundles.csv
- inRiver ChannelNode → Heiler BU: category.csv
- inRiver AttributeFile → Heiler daily: attributes.csv, attributemap.csv, structures.csv
- Decorator data → Heiler BU: purpose.csv
CSV Format:
- UTF-8 encoding
- Headers matching Heiler PIM2 format
- Column mapping per
research/blower-source-pim2/PIM_STRUCTURE.md - Full dataset (not deltas)
Delivery:
- Write to Blower PIM2 input directory (S3)
- Trigger Blower import after files written (this is hadnled by an AWS Lambda Function)
FR-5: Configuration and Mapping Tables
Priority: High
Description: Maintain configuration for field mapping and decorator data
Field Mapping Table:
CREATE TABLE field_mapping (
id SERIAL PRIMARY KEY,
inriver_entity TEXT, -- 'Product', 'ChannelNode', 'AttributeFile'
inriver_field TEXT, -- inRiver field name
heiler_csv TEXT, -- Target Heiler CSV file
heiler_column TEXT, -- Target column in CSV
transformation_rule TEXT, -- Optional: transformation logic
active BOOLEAN DEFAULT TRUE,
notes TEXT
);Purpose:
- Map inRiver attributes to multiple Heiler CSVs
- Example: inRiver Product attributes → attributes.csv, attributemap.csv, itemdataflex.csv
- Handle one-to-many mappings (one inRiver field → multiple Heiler columns)
Decorator Data Table:
CREATE TABLE decorator_data (
id SERIAL PRIMARY KEY,
entity_type TEXT, -- 'attribute', 'item', 'category', etc.
entity_id TEXT, -- inRiver entity ID
decorator_type TEXT, -- 'purpose', 'role', etc.
decorator_value TEXT, -- The decorator value
source TEXT, -- Where this came from (manual, migration, etc.)
created_at TIMESTAMP DEFAULT NOW(),
notes TEXT
);Purpose:
- Store data that doesn't exist in inRiver (lost fidelity)
- Example: Heiler "purpose" field not in inRiver
- Manually maintained (for now)
- Future: Web UI for management
CSV Template Config Table:
CREATE TABLE csv_template_config (
id SERIAL PRIMARY KEY,
csv_name TEXT, -- 'items.csv', 'itemdataflex.csv', etc.
column_name TEXT, -- Column in CSV
source_type TEXT, -- 'inriver_field', 'decorator', 'static', 'computed'
source_value TEXT, -- Field name, decorator type, or static value
default_value TEXT, -- Default if source is null
transformation TEXT, -- Optional: transformation logic
required BOOLEAN DEFAULT FALSE,
notes TEXT
);Purpose:
- Define CSV generation rules per column
- Flexible: source from inRiver, decorator, static value, or computed
- CSV Generator uses this config to build CSVs
Management:
- Phase 1: Manual SQL inserts (load configuration data)
- Future: Web UI for managing mappings and decorator data
FR-6: Error Handling
Priority: High
Description: Handle malformed JSON and conversion errors
Details:
- Validate JSON structure
- Log conversion errors
- Quarantine bad records
- Continue processing valid records
- Alert on failures
Non-Functional Requirements
NFR-1: Performance
Priority: Medium
Description: Process deltas in near real-time, generate CSVs nightly
Details:
- Delta Loading: <2 minutes per file (near real-time)
- CSV Generation: <15 minutes (nightly batch)
- Data volume: <100K products per BU
- Acceptable: Deltas throughout day, CSVs once nightly
- Decoupled: Delta loading doesn't block CSV generation
NFR-2: Reliability
Priority: High
Description: Ensure reliable conversion with error recovery and replay capability
Details:
- Retry logic for transient failures
- Idempotent processing (can rerun safely)
- Ordered processing (files processed in sequence)
- Replay capability (can reprocess from any point)
- Event immutability (never delete events)
- State tracking via event log (prevents duplicates)
NFR-3: Maintainability
Priority: Medium
Description: Simple, readable Python code
Details:
- Clear mapping logic
- Well-documented
- Easy to debug
- Minimal dependencies
Technical Requirements
TR-1: Database (Per Business Unit)
Technology: RDS PostgreSQL
Rationale: Accumulate deltas into full dataset, becomes Bronze layer in Phase 2
Details:
- Architecture: Single RDS PostgreSQL instance with separate schemas per BU
- Instance: db.t3.medium (shared across all BUs)
- Schemas:
inriver_bk,inriver_gm,inriver_ec,inriver_cefl, etc. (pending: sub-unit separation) - Each schema contains:
- Data tables: products, channel_nodes, attribute_file (JSONB)
- Event log: file_processing_events (audit trail + replay control)
- Configuration tables:
field_mapping- Maps inRiver fields to Heiler CSV columnsdecorator_data- Stores data lost in inRiver migration (e.g., purpose)csv_template_config- Configuration for CSV generation rules
- Schema:
- Data tables: id, data (JSONB), updated_at, source_file, deleted
- Event log: id, event_time, event_type, source_file, entity_type, records_processed, records_inserted, records_updated, records_deleted, error_message, replay_date (nullable), metadata (JSONB)
- Indexes:
- Primary key on id
- Index on updated_at
- Index on source_file
- Index on event_time (event log)
- Index on replay_date (event log) - for efficient query of valid events
- Purpose: Delta accumulation, CSV generation source, complete audit trail, replay control
TR-2: Implementation
Technology: Python 3.11+
Rationale: Simple, fast to develop, team familiarity
Components:
- Delta Loader (~300 lines): S3 → PostgreSQL
- Query event log to find next file to process
- Enforce ordered processing (chronological)
- Detect replay scenarios (gaps in valid events)
- Upsert records with source_file tracking
- Log events to file_processing_events table
- Track insert/update/delete counts
- Support replay from any point
- CSV Generator (~400 lines): PostgreSQL → Heiler CSVs
- Query configuration tables (field_mapping, decorator_data, csv_template_config)
- Apply mapping rules to transform inRiver JSON → Heiler CSV columns
- Join decorator data where needed
- Generate 7 daily CSVs + 5 BU-specific CSVs
- Configuration Loader (~100 lines): Load initial mapping data
- SQL scripts to populate field_mapping table
- SQL scripts to populate decorator_data table
- SQL scripts to populate csv_template_config table
- Libraries: boto3 (S3), psycopg2 (PostgreSQL), pandas (CSV)
TR-3: Orchestration (Per Business Unit)
Technology: AWS MWAA (Managed Airflow)
Rationale: Existing infrastructure
Details:
Separate DAGs per Business Unit (BK, GM, EC, CEFL, etc.)
Each BU has two DAGs:
DAG 1: Delta Loader (runs as files arrive)
- Name:
inriver_{bu}_delta_loader - Trigger: S3 event notification (near real-time, BU-specific path)
- Task: Load delta file → BU-specific PostgreSQL database
- Frequency: On-demand (per file)
- Duration: ~1-2 minutes per file
DAG 2: CSV Generator (runs nightly or on-demand)
- Name:
inriver_{bu}_csv_generator - Trigger: Cron schedule (nightly) OR manual trigger
- Tasks:
- Generate Heiler CSVs from BU PostgreSQL database
- Upload CSVs to BU-specific Blower input location
- Trigger existing BU Blower import DAG
- Frequency: Nightly (or on-demand)
- Duration: ~10-15 minutes
- Name:
Total DAGs: 8+ (4+ BUs × 2 DAGs each, pending sub-unit clarification)
Decoupled Design: Delta loading independent from CSV generation
BU Isolation: Each BU processes independently, no cross-BU dependencies
Benefit: Accumulate deltas throughout day, generate full CSVs once nightly per BU
Monitoring: Airflow UI + CloudWatch (per BU)
TR-4: Storage
Technology: AWS S3
Rationale: inRiver delivery mechanism
Details:
- Input: inRiver JSONL files
- Output: Heiler CSV files
- Blower reads from S3 or local mount
Mapping Requirements
inRiver → Heiler Field Mapping
Product Entity:
ProductId→ items.csv:Item#ProductId→ itemdataflex.csv:Item#ProductId→ media.csv:Item_ProductProductId→ item-attributes.csv:Item_Product#FieldValues→ item-attributes.csv:Attribute_Name,Default_ValueItems[]→ Flatten to items.csv (one row per Item, merge Product-level data where Item data not present - Item data takes precedence)Items.Resources.ResourceFilename→ media.csv:File_NameItems.Resources.ResourceResourceType→ media.csv:Image_Type
ChannelNode Entity:
ChannelNodeID→ category.csv:Structure_IDChannelNodeName→ category.csv:Structure_NameInboundLinks.SourceEntityId→ category.csv:Parent_Structure_ID (*ref)
AttributeFile Entity:
FieldTypes[]→ attributes.csv:AttributeID,AttributeName,AttributeDataTypeStructureEntities[]→ structures.csv:StructureID,StructureName
Open Mapping Questions:
- Attribute ID Source: inRiver doesn't export attribute IDs - need resolution (see Risk 5)
- Attribute Splitting: How to determine which attributes go to Item_free_attributes.csv vs item_structure_features.csv?
- Hypothesis: Structure-specific attributes → item_structure_features.csv
- Hypothesis: Free-form attributes → Item_free_attributes.csv
- Need to validate with sample data analysis
Data Type Mapping
| inRiver Type | Heiler Type |
|---|---|
| String | CHARACTER_STRING |
| Integer | INTEGER |
| Double | DECIMAL |
| Boolean | BOOLEAN |
| CVL | CVL (lookup) |
| LocaleString | CHARACTER_STRING (per locale) |
Success Criteria
Phase 1 Success
- ✅ Converter processes inRiver JSONL files
- ✅ PostgreSQL accumulates deltas into full dataset
- ✅ Event log tracks all file processing
- ✅ Generates valid Heiler CSV files
- ✅ Blower imports CSVs successfully
- ✅ Downstream systems receive data (no changes needed)
- ✅ Single BU operational (BAK or GEM)
- ✅ Audit trail complete (source_file + event log)
Acceptance Criteria
- Item count matches between inRiver and Blower
- Attribute values match
- Category hierarchy correct
- Media files linked properly
- Delta updates work (add/update/delete)
- Every record tracks source_file
- Event log shows complete processing history
Out of Scope (Phase 1)
- Database changes to Blower (use existing Blower)
- dbt transformations (use existing Blower)
- Full pipeline architecture (Phase 2)
- Comprehensive monitoring (basic logging only)
- Automated testing (manual validation)
- Web UI for configuration management (manual SQL for Phase 1, future enhancement)
- Complex transformation logic (keep simple, iterate)
- Multi-BU simultaneous rollout (start with single BU, then expand)
Risks and Mitigations
Risk 1: Incomplete Field Mapping
Mitigation: Reference Heiler PIM2 structure docs, validate with sample data
Risk 2: Delta Handling Complexity
Mitigation: Start with full files, add delta logic incrementally
Risk 3: Blower Compatibility
Mitigation: Match Heiler CSV format exactly, test with existing Blower
Risk 4: Performance Issues
Mitigation: Process in batches, optimize for large files
Risk 5: Missing Attribute IDs from inRiver
Issue: Current PIM attribute IDs are not exported by inRiver, but Heiler CSVs require AttributeID
Impact: Cannot populate attributes.csv, attributemap.csv, Item_free_attributes.csv, item_structure_features.csv without attribute IDs
Mitigation:
- Option A: Request inRiver to include attribute IDs in AttributeFile export
- Option B: Generate synthetic attribute IDs in Silver layer (hash-based or sequential)
- Option C: Use attribute names as IDs (if acceptable to downstream systems)
Status: ⚠️ Critical blocker - needs resolution before implementation
Risk 6: Legacy ID Preservation and Migration
Issue: Heiler PIM IDs are used in permalinks, Blower joins, and downstream systems
Context:
- Migrated products: Have reference to old Heiler IDs (stored in inRiver item attribute)
- New products: Will not have Heiler IDs (never existed in old system)
- Category lineage: Blower likely uses old Heiler category IDs for linking
Impact:
- Breaking permalinks for existing products
- Breaking Blower joins and relationships
- Breaking category hierarchy references
- Inconsistent ID strategy (old vs new products)
Mitigation:
- Phase 1 Approach:
- Use old Heiler ID when present in inRiver attribute
- Generate new ID when not present (sequential or hash-based)
- Track ID source in metadata (migrated vs new)
- Blower Changes Required:
- Update join logic to handle both old and new ID patterns
- Update category linking to use new inRiver category IDs
- Maintain backward compatibility during transition
- Downstream Impact:
- Permalinks: May need redirect strategy for new products
- E-commerce: May need ID mapping tables
Status: ⚠️ High risk - cannot fully resolve until implementation starts, needs discovery phase
Open Questions:
- What inRiver attribute stores the legacy Heiler ID?
- What is the ID format/pattern for legacy vs new?
- Which downstream systems depend on Heiler IDs?
- What is the permalink structure and redirect strategy?
- How are category IDs used in Blower joins?
Dependencies
- inRiver S3 Access: Credentials and bucket access (organized by BU)
- RDS PostgreSQL: Provision single instance (db.t3.medium)
- Create schemas: inriver_bk, inriver_gm, inriver_ec, inriver_cefl (pending: sub-unit clarification)
- Each schema contains all tables (data, event log, configuration)
- Blower PIM2 Docs: CSV format specification (available in research/)
- Sample Files: inRiver JSONL samples (available in samples/aperture-export-ecom), Heiler CSV samples (available in samples/heiler-pim)
- Blower Input Locations: S3 paths per BU (BK, GM, EC, CEFL, etc.)
Next Steps
- Review and approve Phase 1 requirements
- Resolve Critical Blockers:
- Attribute ID issue (see Risk 5)
- Sub-business unit file separation (see questions.md)
- Provision Infrastructure (per BU):
- Single PostgreSQL instance (db.t3.medium)
- Create schemas: inriver_bk, inriver_gm, inriver_ec, inriver_cefl (pending sub-unit clarification)
- S3 paths for inRiver input
- S3 paths for Heiler CSV output
- Build Components:
- Delta Loader Python script
- CSV Generator Python script
- Airflow DAGs (8+ total: 4+ BUs × 2 DAGs, pending sub-unit clarification)
- Test with Single BU (BK or GM):
- Test with sample inRiver files
- Validate Heiler CSV output against samples
- Test with Blower import
- Deploy to Production (single BU first)
- Monitor and Iterate
- Roll Out to Remaining BUs (EC, CEFL, remaining)
Document Version: 1.0
Last Updated: 2026-03-03
Author: AI-DLC Requirements Analysis