Skip to content

inRiver to Blower - Phase 1 (Fast Track)

Project: inRiver JSONL to Heiler CSV Converter
Date: 2026-03-03
Status: Approved (2026-04-21)
Timeline: 2-3 days


Overview

Goal: Get inRiver PIM data working with existing Blower infrastructure as quickly as possible.

Approach:

  1. Load inRiver JSONL deltas → PostgreSQL accumulator database (JSONB)
  2. Maintain full dataset (merge/upsert deltas)
  3. Generate Heiler CSV files from full dataset
  4. Feed CSVs to existing Blower

Multi-Business Unit Architecture:

  • Single PostgreSQL instance with separate schemas per BU (BK, GM, EC, CEFL)
  • Each schema has independent:
    • Data tables (products, channel_nodes, attribute_file)
    • Event log (file_processing_events)
    • Configuration tables (field_mapping, decorator_data, csv_template_config)
    • Processing schedule (independent DAGs)
  • Rationale: Cost-effective, data isolation via schemas, matches Blower pattern

Key Insight: inRiver delivers deltas, Heiler/Blower expects full files. PostgreSQL accumulator bridges the gap.

Terminology:

  • inRiver Product: The source entity from inRiver. Contains child Items. This is what arrives in JSONL files.
  • Item: The unit of work in Blower and all downstream systems. Our current architecture is item-centric — there is no "product" concept downstream.
  • Flattening: Each inRiver Product is flattened to its child Items. Item-level data takes precedence over Product-level data. All CSVs and Blower tables operate on items.

Strategic Context:

  • Fast track to production (2-3 days per BU)
  • PostgreSQL databases become Bronze layer in Phase 2
  • No throwaway work - everything reused in Phase 2

Functional Requirements

FR-1: inRiver JSONL Ingestion (Per Business Unit)

Priority: High
Description: Read inRiver JSONL export files from S3 as they arrive (separate processing per BU)

Details:

  • Source: S3 bucket with inRiver exports (organized by BU)
  • File types: Product, ChannelNode, AttributeFile (JSONL format)
  • Trigger: S3 event notification (near real-time, per BU)
  • Processing: Load each delta file immediately to BU-specific PostgreSQL database
  • Frequency: On-demand (as files arrive throughout the day)
  • Business Units: BK, GM, EC, CEFL, CEMA, CESE, etc. (separate schemas)
  • Isolation: Each BU processes independently

FR-2: PostgreSQL Accumulator Database (Per Business Unit)

Priority: High
Description: Maintain full dataset by accumulating inRiver deltas in PostgreSQL (one schema per BU)

Details:

  • Single PostgreSQL instance with separate schemas per BU:
    • inriver_bk - Baker Distributing
    • inriver_gm - Gemaire
    • inriver_ec - East Coast Metal Distributors
    • inriver_cefl - Carrier Enterprise Florida (pending: confirm sub-unit separation)
  • Tables per schema: One per entity type (products, channel_nodes, attribute_file)
  • Schema: JSONB column + metadata + audit trail
    • id (text) - inRiver entity ID
    • data (JSONB) - Full inRiver JSON
    • updated_at (timestamp) - Last update time
    • source_file (text) - Most recent S3 file that updated this record
    • deleted (boolean) - Soft delete flag
  • Event Log Table: Track all file processing events
    • id (serial) - Event ID
    • event_time (timestamp) - When event occurred
    • event_type (text) - 'file_start', 'file_complete', 'file_error'
    • source_file (text) - S3 file path
    • entity_type (text) - products, channel_nodes, attribute_file
    • records_processed (integer) - Count of records
    • records_inserted (integer) - New records
    • records_updated (integer) - Updated records
    • records_deleted (integer) - Deleted records
    • error_message (text) - Error details if failed
    • replay_date (timestamp, nullable) - When marked for replay (NULL = valid)
    • metadata (JSONB) - Additional context
  • Operations:
    • Insert: New records from delta (track source_file)
    • Update: Merge changes (JSONB merge, update source_file)
    • Delete: Mark as deleted (soft delete, track source_file)
    • Log: Record every file processing event
  • Purpose: Convert deltas → full dataset + complete audit trail

FR-3: Delta Merge Logic with Ordered Processing

Priority: High
Description: Merge inRiver delta updates into full dataset with ordered processing and replay capability

Ordered Processing:

  • Files must be processed in chronological order (sequence matters for deltas)
  • Delta Loader queries event log to determine next file to process:
    sql
    -- Find last successfully processed file (not marked for replay)
    SELECT MAX(id) FROM file_processing_events 
    WHERE event_type = 'file_complete' 
      AND replay_date IS NULL;
    
    -- Process files after that point in order
  • Only process files that come after last valid event
  • Prevents duplicate processing and maintains data consistency

Replay Capability:

  • Can mark events for replay when bad data detected or reprocessing needed
  • Mark event and all following events:
    sql
    UPDATE file_processing_events 
    SET replay_date = NOW() 
    WHERE id >= [problem_event_id];
  • Delta Loader detects replay gap (no valid events after certain point)
  • Reprocesses files from marked point forward
  • Creates new events with replay_date = NULL
  • Original events preserved (never deleted) for audit trail

Merge Operations:

  • Upsert pattern: INSERT ... ON CONFLICT UPDATE
  • JSONB merge for updates (preserve unchanged fields)
  • Delete handling:
    • Delete deltas contain only the entity ID (no data payload)
    • Set deleted = true on the record — do NOT remove data
    • Original data preserved in JSONB column for audit/replay
    • Product-level delete: All child items of that Product excluded from next CSV generation
    • Item-level delete: Only that specific item excluded (other items under same Product unaffected)
    • ⚠️ See Open Question 3: Delete granularity not yet confirmed in inRiver export format
    • Track delete source_file and timestamp
  • Track update timestamps and source file
  • Event logging for every file processed
  • Idempotent (can replay deltas safely)

Event Immutability:

  • Events are never deleted (permanent audit trail)
  • Replay marks events but preserves history
  • Can always trace processing history

FR-4: CSV Generation from PostgreSQL

Priority: High
Description: Query PostgreSQL full dataset to generate Heiler CSV files (nightly or on-demand)

Schedule:

  • Nightly cron OR manual trigger
  • Frequency: Once per day (or on-demand for testing/urgent updates)
  • Decoupled: Independent from delta loading (deltas accumulate throughout day)

Process:

  1. Query all non-deleted records from PostgreSQL (WHERE deleted = false)
    • Uses transaction isolation (REPEATABLE READ) for consistent read during generation
    • No snapshot step — read directly from accumulator tables (fastest path)
    • Snapshot pattern deferred to Phase 2 (dbt snapshot)
  2. Apply configuration from field_mapping, decorator_data, csv_template_config tables
  3. Transform JSONB → Heiler CSV columns
  4. Generate 12 Heiler CSV files

Output Files:

Daily Files (7 CSVs - shared across all BUs):

  • attributes.csv - Attribute definitions
  • attributemap.csv - Structure-to-attribute mappings
  • structures.csv - Category/structure definitions
  • items.csv - Core item data
  • Item_free_attributes.csv - Free-form attribute values
  • item_structure_features.csv - Structure-specific attribute values
  • media.csv - Item images/assets

Business Unit Files (5 CSVs - per BU):

  • itemdataflex.csv - BU-specific item data and flex fields
  • category.csv - BU-specific category hierarchy
  • purpose.csv - Attribute purpose metadata
  • references.csv - Item cross-references
  • bundles.csv - Item bundle definitions

Mapping:

  • inRiver Product → Heiler daily: items.csv, media.csv, Item_free_attributes.csv, item_structure_features.csv
  • inRiver Product → Heiler BU: itemdataflex.csv, references.csv, bundles.csv
  • inRiver ChannelNode → Heiler BU: category.csv
  • inRiver AttributeFile → Heiler daily: attributes.csv, attributemap.csv, structures.csv
  • Decorator data → Heiler BU: purpose.csv

CSV Format:

  • UTF-8 encoding
  • Headers matching Heiler PIM2 format
  • Column mapping per research/blower-source-pim2/PIM_STRUCTURE.md
  • Full dataset (not deltas)

Delivery:

  • Write to Blower PIM2 input directory (S3)
  • Trigger Blower import after files written (this is hadnled by an AWS Lambda Function)

FR-5: Configuration and Mapping Tables

Priority: High
Description: Maintain configuration for field mapping and decorator data

Field Mapping Table:

sql
CREATE TABLE field_mapping (
  id SERIAL PRIMARY KEY,
  inriver_entity TEXT,        -- 'Product', 'ChannelNode', 'AttributeFile'
  inriver_field TEXT,         -- inRiver field name
  heiler_csv TEXT,            -- Target Heiler CSV file
  heiler_column TEXT,         -- Target column in CSV
  transformation_rule TEXT,   -- Optional: transformation logic
  active BOOLEAN DEFAULT TRUE,
  notes TEXT
);

Purpose:

  • Map inRiver attributes to multiple Heiler CSVs
  • Example: inRiver Product attributes → attributes.csv, attributemap.csv, itemdataflex.csv
  • Handle one-to-many mappings (one inRiver field → multiple Heiler columns)

Decorator Data Table:

sql
CREATE TABLE decorator_data (
  id SERIAL PRIMARY KEY,
  entity_type TEXT,           -- 'attribute', 'item', 'category', etc.
  entity_id TEXT,             -- inRiver entity ID
  decorator_type TEXT,        -- 'purpose', 'role', etc.
  decorator_value TEXT,       -- The decorator value
  source TEXT,                -- Where this came from (manual, migration, etc.)
  created_at TIMESTAMP DEFAULT NOW(),
  notes TEXT
);

Purpose:

  • Store data that doesn't exist in inRiver (lost fidelity)
  • Example: Heiler "purpose" field not in inRiver
  • Manually maintained (for now)
  • Future: Web UI for management

CSV Template Config Table:

sql
CREATE TABLE csv_template_config (
  id SERIAL PRIMARY KEY,
  csv_name TEXT,              -- 'items.csv', 'itemdataflex.csv', etc.
  column_name TEXT,           -- Column in CSV
  source_type TEXT,           -- 'inriver_field', 'decorator', 'static', 'computed'
  source_value TEXT,          -- Field name, decorator type, or static value
  default_value TEXT,         -- Default if source is null
  transformation TEXT,        -- Optional: transformation logic
  required BOOLEAN DEFAULT FALSE,
  notes TEXT
);

Purpose:

  • Define CSV generation rules per column
  • Flexible: source from inRiver, decorator, static value, or computed
  • CSV Generator uses this config to build CSVs

Management:

  • Phase 1: Manual SQL inserts (load configuration data)
  • Future: Web UI for managing mappings and decorator data

FR-6: Dual-ID Strategy (Legacy PIM + inRiver)

Priority: High
Description: Handle item identification for both migrated items (with legacy Heiler PIM IDs) and new items (inRiver-only IDs)

Context:

  • Migrated items carry old Heiler PIM number in ItemLegacyPIMItemNo attribute
  • New items created in inRiver will NOT have ItemLegacyPIMItemNo
  • Blower currently uses old PIM numbers as blower_id — behavior with new IDs is unknown

ID Resolution Logic:

IF ItemLegacyPIMItemNo IS NOT NULL AND ItemLegacyPIMItemNo != ''
  THEN blower_id = ItemLegacyPIMItemNo   -- backward compatible
ELSE
  blower_id = EntityId (inRiver)          -- new items

CSV Output Requirements:

  1. Item# column (existing): Populated with resolved blower_id (legacy PIM ID or inRiver EntityId)
  2. inriver_entity_id column (new): Always populated with inRiver EntityId for ALL items
    • Enables Blower/downstream to start tracking inRiver IDs
    • Future-proofs for full migration away from legacy IDs

PostgreSQL Accumulator Schema Update:

  • Data tables store both identifiers:
    • id (text): inRiver EntityId (primary key, always present)
    • data (JSONB): Contains ItemLegacyPIMItemNo when present
  • CSV Generator resolves blower_id at generation time using the logic above

Tracking:

  • Event log metadata should note counts of legacy-ID vs inRiver-ID items per file
  • Enables monitoring of migration progress (ratio of legacy vs new items over time)

⚠️ Open Investigation: How Blower handles items with non-legacy ID formats needs to be tested with a small batch of new-format IDs before full rollout.

FR-7: Error Handling

Priority: High
Description: Handle malformed JSON and conversion errors

Details:

  • Validate JSON structure
  • Log conversion errors
  • Quarantine bad records
  • Continue processing valid records
  • Alert on failures

Non-Functional Requirements

NFR-1: Performance

Priority: Medium
Description: Process deltas in near real-time, generate CSVs nightly

Details:

  • Delta Loading: <2 minutes per file (near real-time)
  • CSV Generation: <15 minutes (nightly batch)
  • Data volume: <100K items per BU
  • Acceptable: Deltas throughout day, CSVs once nightly
  • Decoupled: Delta loading doesn't block CSV generation

NFR-2: Reliability

Priority: High
Description: Ensure reliable conversion with error recovery and replay capability

Details:

  • Retry logic for transient failures
  • Idempotent processing (can rerun safely)
  • Ordered processing (files processed in sequence)
  • Replay capability (can reprocess from any point)
  • Event immutability (never delete events)
  • State tracking via event log (prevents duplicates)

NFR-3: Maintainability

Priority: Medium
Description: Simple, readable Python code

Details:

  • Clear mapping logic
  • Well-documented
  • Easy to debug
  • Minimal dependencies

Technical Requirements

TR-1: Database (Per Business Unit)

Technology: RDS PostgreSQL
Rationale: Accumulate deltas into full dataset, becomes Bronze layer in Phase 2

Details:

  • Architecture: Single RDS PostgreSQL instance with separate schemas per BU
  • Instance: db.t3.medium (shared across all BUs)
  • Schemas: inriver_bk, inriver_gm, inriver_ec, inriver_cefl, etc. (pending: sub-unit separation)
  • Each schema contains:
    • Data tables: products, channel_nodes, attribute_file (JSONB)
    • Event log: file_processing_events (audit trail + replay control)
    • Configuration tables:
      • field_mapping - Maps inRiver fields to Heiler CSV columns
      • decorator_data - Stores data lost in inRiver migration (e.g., purpose)
      • csv_template_config - Configuration for CSV generation rules
  • Schema:
    • Data tables: id, data (JSONB), updated_at, source_file, deleted
    • Event log: id, event_time, event_type, source_file, entity_type, records_processed, records_inserted, records_updated, records_deleted, error_message, replay_date (nullable), metadata (JSONB)
  • Indexes:
    • Primary key on id
    • Index on updated_at
    • Index on source_file
    • Index on event_time (event log)
    • Index on replay_date (event log) - for efficient query of valid events
  • Purpose: Delta accumulation, CSV generation source, complete audit trail, replay control

TR-2: Implementation

Technology: Python 3.11+
Rationale: Simple, fast to develop, team familiarity

Components:

  1. Delta Loader (~300 lines): S3 → PostgreSQL
    • Query event log to find next file to process
    • Enforce ordered processing (chronological)
    • Detect replay scenarios (gaps in valid events)
    • Upsert records with source_file tracking
    • Log events to file_processing_events table
    • Track insert/update/delete counts
    • Support replay from any point
  2. CSV Generator (~400 lines): PostgreSQL → Heiler CSVs
    • Query configuration tables (field_mapping, decorator_data, csv_template_config)
    • Apply mapping rules to transform inRiver JSON → Heiler CSV columns
    • Join decorator data where needed
    • Generate 7 daily CSVs + 5 BU-specific CSVs
  3. Configuration Loader (~100 lines): Load initial mapping data
    • SQL scripts to populate field_mapping table
    • SQL scripts to populate decorator_data table
    • SQL scripts to populate csv_template_config table
  4. Libraries: boto3 (S3), psycopg2 (PostgreSQL), pandas (CSV)

TR-3: Orchestration (Per Business Unit)

Technology: AWS MWAA (Managed Airflow)
Rationale: Existing infrastructure

Details:

  • Separate DAGs per Business Unit (BK, GM, EC, CEFL, etc.)

  • Each BU has two DAGs:

    DAG 1: Delta Loader (runs as files arrive)

    • Name: inriver_{bu}_delta_loader
    • Trigger: S3 event notification (near real-time, BU-specific path)
    • Task: Load delta file → BU-specific PostgreSQL database
    • Frequency: On-demand (per file)
    • Duration: ~1-2 minutes per file

    DAG 2: CSV Generator (runs nightly or on-demand)

    • Name: inriver_{bu}_csv_generator
    • Trigger: Cron schedule (nightly) OR manual trigger
    • Tasks:
      1. Generate Heiler CSVs from BU PostgreSQL database
      2. Upload CSVs to BU-specific Blower input location
      3. Trigger existing BU Blower import DAG
    • Frequency: Nightly (or on-demand)
    • Duration: ~10-15 minutes
  • Total DAGs: 8+ (4+ BUs × 2 DAGs each, pending sub-unit clarification)

  • Decoupled Design: Delta loading independent from CSV generation

  • BU Isolation: Each BU processes independently, no cross-BU dependencies

  • Benefit: Accumulate deltas throughout day, generate full CSVs once nightly per BU

  • Monitoring: Airflow UI + CloudWatch (per BU)

TR-4: Storage

Technology: AWS S3
Rationale: inRiver delivery mechanism

Details:

  • Input: inRiver JSONL files
  • Output: Heiler CSV files
  • Blower reads from S3 or local mount

Mapping Requirements

inRiver → Heiler Field Mapping

Product Entity:

  • ProductId → items.csv: Item# (via dual-ID resolution — see FR-6)
  • ProductId → itemdataflex.csv: Item# (via dual-ID resolution)
  • ProductId → media.csv: Item_Product
  • ProductId → item-attributes.csv: Item_Product#
  • Items[].EntityId → ALL CSVs: inriver_entity_id (new column, always present)
  • Items[].Attributes.ItemLegacyPIMItemNo → items.csv: Item# (when present, takes priority)
  • FieldValues → item-attributes.csv: Attribute_Name, Default_Value
  • Items[] → Flatten to items.csv (one row per Item, merge Product-level data where Item data not present - Item data takes precedence)
  • Items.Resources.ResourceFilename → media.csv: File_Name
  • Items.Resources.ResourceResourceType → media.csv: Image_Type

Item ID Resolution (applied to all CSVs referencing Item#):

Item# = ItemLegacyPIMItemNo ?? EntityId
inriver_entity_id = EntityId (always)

ChannelNode Entity:

  • ChannelNodeID → category.csv: Structure_ID
  • ChannelNodeName → category.csv: Structure_Name
  • InboundLinks.SourceEntityId → category.csv: Parent_Structure_ID (*ref)

AttributeFile Entity:

  • FieldTypes[] → attributes.csv: AttributeID, AttributeName, AttributeDataType
  • StructureEntities[] → structures.csv: StructureID, StructureName

Open Mapping Questions:

  1. Attribute ID Source: inRiver doesn't export attribute IDs - need resolution (see Risk 5)
  2. Attribute Splitting: How to determine which attributes go to Item_free_attributes.csv vs item_structure_features.csv?
    • Hypothesis: Structure-specific attributes → item_structure_features.csv
    • Hypothesis: Free-form attributes → Item_free_attributes.csv
    • Need to validate with sample data analysis
  3. Delete Granularity: Can inRiver deltas delete at both Product level and Item level, or only Product level?
    • If Product-level only: all child items removed in one delta
    • If Item-level also: individual items can be removed while sibling items remain
    • Need to confirm with inRiver export documentation or sample delete files

Data Type Mapping

inRiver TypeHeiler Type
StringCHARACTER_STRING
IntegerINTEGER
DoubleDECIMAL
BooleanBOOLEAN
CVLCVL (lookup)
LocaleStringCHARACTER_STRING (per locale)

Success Criteria

Phase 1 Success

  1. ✅ Converter processes inRiver JSONL files
  2. ✅ PostgreSQL accumulates deltas into full dataset
  3. ✅ Event log tracks all file processing
  4. ✅ Generates valid Heiler CSV files
  5. ✅ Blower imports CSVs successfully
  6. ✅ Downstream systems receive data (no changes needed)
  7. ✅ Single BU operational (BAK or GEM)
  8. ✅ Audit trail complete (source_file + event log)

Acceptance Criteria

  • Item count matches between inRiver and Blower
  • Attribute values match
  • Category hierarchy correct
  • Media files linked properly
  • Delta updates work (add/update/delete)
  • Every record tracks source_file
  • Event log shows complete processing history

Out of Scope (Phase 1)

  • Database changes to Blower (use existing Blower)
  • dbt transformations (use existing Blower)
  • Full pipeline architecture (Phase 2)
  • Comprehensive monitoring (basic logging only)
  • Automated testing (manual validation)
  • Web UI for configuration management (manual SQL for Phase 1, future enhancement)
  • Complex transformation logic (keep simple, iterate)
  • Multi-BU simultaneous rollout (start with single BU, then expand)

Risks and Mitigations

Risk 1: Incomplete Field Mapping

Mitigation: Reference Heiler PIM2 structure docs, validate with sample data

Risk 2: Delta Handling Complexity

Mitigation: Start with full files, add delta logic incrementally

Risk 3: Blower Compatibility

Mitigation: Match Heiler CSV format exactly, test with existing Blower

Risk 4: Performance Issues

Mitigation: Process in batches, optimize for large files

Risk 5: Missing Attribute IDs from inRiver

Issue: Current PIM attribute IDs are not exported by inRiver, but Heiler CSVs require AttributeID
Impact: Cannot populate attributes.csv, attributemap.csv, Item_free_attributes.csv, item_structure_features.csv without attribute IDs
Mitigation:

  • Option A: Request inRiver to include attribute IDs in AttributeFile export
  • Option B: Generate synthetic attribute IDs in Silver layer (hash-based or sequential)
  • Option C: Use attribute names as IDs (if acceptable to downstream systems)
    Status: ⚠️ Critical blocker - needs resolution before implementation

Risk 6: Legacy ID Preservation and Migration

Issue: Heiler PIM IDs are used in Blower joins, downstream systems, and permalinks
Context:

  • Migrated items: Have old Heiler PIM ID in ItemLegacyPIMItemNo attribute (e.g., "1369225580515")
  • New items: Will NOT have ItemLegacyPIMItemNo — only inRiver identifiers (EntityId, ProductId)
  • Blower currently uses old PIM numbers as blower_id for joins and relationships
  • Unknown: How Blower will handle items with inRiver-format IDs (no legacy PIM number)

Identifier Landscape (from sample data):

LevelFieldExampleMigrated ItemsNew Items
ProductEntityId19456
ProductProductId"P100004823"
ItemEntityId3424
ItemItemLegacyPIMItemNo"1369225580515"
ItemItemWatscoItemNonull or valueMaybeMaybe

Phase 1 ID Strategy (Dual-ID Approach):

  1. Primary ID (blower_id):
    • If ItemLegacyPIMItemNo is present → use it (backward compatibility with Blower)
    • If ItemLegacyPIMItemNo is absent → use inRiver EntityId (new items)
  2. inRiver ID (always sent):
    • ALL items get their inRiver EntityId passed to Blower as an additional field
    • Enables Blower to start tracking inRiver IDs alongside legacy IDs
    • Future-proofs for when all items are inRiver-native
  3. ID source tracking:
    • Track whether blower_id came from legacy PIM or inRiver in metadata
    • Enables auditing and debugging of ID-related issues

Impact:

  • Migrated items: No change — Blower sees same IDs it always has
  • New items: ⚠️ Blower receives unfamiliar ID format — behavior unknown
  • Downstream systems: May need to handle both ID formats

Open Questions (Reduced):

  1. What inRiver attribute stores the legacy Heiler ID?Resolved: ItemLegacyPIMItemNo
  2. What is the ID format/pattern for legacy vs new?Resolved: Legacy = numeric string (e.g., "1369225580515"), inRiver = integer EntityId (e.g., 3424)
  3. How does Blower handle items with non-legacy ID formats? (needs investigation)
  4. Which downstream systems depend on specific ID formats?
  5. Should inRiver EntityId be sent as a new CSV column or mapped to an existing field?

Status: ⚠️ High risk — dual-ID strategy defined, but Blower behavior with new IDs needs investigation


Dependencies

  1. inRiver S3 Access: Credentials and bucket access (organized by BU)
  2. RDS PostgreSQL: Provision single instance (db.t3.medium)
    • Create schemas: inriver_bk, inriver_gm, inriver_ec, inriver_cefl (pending: sub-unit clarification)
    • Each schema contains all tables (data, event log, configuration)
  3. Blower PIM2 Docs: CSV format specification (available in research/)
  4. Sample Files: inRiver JSONL samples (available in samples/aperture-export-ecom), Heiler CSV samples (available in samples/heiler-pim)
  5. Blower Input Locations: S3 paths per BU (BK, GM, EC, CEFL, etc.)

Next Steps

  1. Review and approve Phase 1 requirements
  2. Resolve Critical Blockers:
    • Attribute ID issue (see Risk 5)
    • Sub-business unit file separation (see questions.md)
  3. Provision Infrastructure (per BU):
    • Single PostgreSQL instance (db.t3.medium)
    • Create schemas: inriver_bk, inriver_gm, inriver_ec, inriver_cefl (pending sub-unit clarification)
    • S3 paths for inRiver input
    • S3 paths for Heiler CSV output
  4. Build Components:
    • Delta Loader Python script
    • CSV Generator Python script
    • Airflow DAGs (8+ total: 4+ BUs × 2 DAGs, pending sub-unit clarification)
  5. Test with Single BU (BK or GM):
    • Test with sample inRiver files
    • Validate Heiler CSV output against samples
    • Test with Blower import
  6. Deploy to Production (single BU first)
  7. Monitor and Iterate
  8. Roll Out to Remaining BUs (EC, CEFL, remaining)

Document Version: 1.0
Last Updated: 2026-03-03
Author: AI-DLC Requirements Analysis