AI-DLC Audit Trail
User Request (2026-02-26T15:27:02-07:00)
Raw Input:
this repo contains documentation about our new inriver pim files as well as some sample files; we use a system called blower that runs on apache airflow with php generated dags to do ETL from our current pim systems raw inputs to the outputs expected by downstream systems. We are migrating to this new pim system that outputs pim data in the formats described in this repo and represented in the ecom samples `samples/aperature-export-ecom`. Our plan is to keep the downstream outputs the same but to get the inriver jsonl product files into a postgresql database where they can be manipulated leveraging dbt on airflow into the same format as our current blower database tables. understand this repo and then help us by running aidlc to start producing a project plan.Workspace Detection (2026-02-26T15:27:02-07:00)
Findings:
- Repository Type: Documentation repository (VitePress)
- Existing Code: Only utility script (parse_email.py) - not application code
- Project Type: Greenfield - New ETL pipeline project
- Context:
- Current system: Blower (Apache Airflow + PHP-generated DAGs)
- New PIM: inRiver with JSONL exports
- Goal: Load inRiver JSONL → PostgreSQL → dbt transformations → Same output format as current Blower tables
- Documentation exists: Technical specs, sample files, ERD, schema docs
Repository Structure Analysis:
docs/- VitePress documentation site with technical specssamples/aperture-export-ecom/- Sample inRiver JSON exports (Product, ChannelNode, AttributeFile)correspondence/- Email archives with stakeholder communicationsmeetings/- Meeting notesterraform/- Infrastructure code (likely for documentation hosting)
Key Documentation Reviewed:
- Technical spec: JSONL export format, CVL fields, delivery schedule
- ERD: Entity relationships (Channel, ChannelNode, Product, Item, Resource)
- Sample data: Product, ChannelNode, AttributeFile JSON structures
- Assumptions: Business unit handling, cardinality, scalability concerns
- Open questions: ChannelNode OutboundLinks scalability, delta update strategy
Decision:
- Brownfield: No
- Next Phase: Requirements Analysis
- Rationale: This is a new ETL pipeline project. Documentation exists but no application code.
User Clarification (2026-02-26T15:27:02-07:00)
Additional Context Provided:
in this you should also note that we are moving from a system that is "nightly full load" to a system that is always delta loadKey Insight:
- Current System (Blower): Nightly full load pattern
- New System (inRiver): Always delta load (near real-time updates)
- Architectural Shift: From batch full refresh to incremental delta processing
- Impact: Requires change data capture, merge/upsert logic, and state management
Requirements Analysis - Initial Questions (2026-02-26T15:46:35-07:00)
User Response: "requirements doc has been updated"
Answers Received:
- Blower Schema: Can provide documentation (A)
- Downstream Systems: E-commerce (Magento, Shopware) + Algolia + other systems (A + detail)
- Business Unit Handling: Separate pipelines per BU (A)
- Data Volume: <100K products, hourly updates acceptable (A)
- Delta Strategy: Hard deletes (B)
- Error Handling: Standard validation, quarantine, alerting (B)
- Airflow: Existing MWAA available (A)
- S3 Access: Control bucket, can add event notifications (A)
- Database: MariaDB on RDS, might consider Aurora (C + detail) [CONTRADICTION: Original request said PostgreSQL]
- Transformation Complexity: Not sure, needs analysis (D)
- Nested Data: Flatten to normalized tables (A) + note: "current process deals with items, new files have products with items as children"
- Monitoring: Comprehensive observability (C)
- Testing: Data parity validation - tables must be identical (E)
- Deployment: CI/CD with automated testing (B)
- Incremental/Full: Incremental only - inRiver limitation (A)
- AttributeFile: Load to table, joined by incremental data (E)
- ChannelNode: Will ask to exclude outbound links to products (E)
- Timeline: Urgent 2-4 weeks (A) [CONTRADICTION: Complex project with unknowns]
- Team: Mixed experience, familiar with Airflow/dbt but not this pattern (D)
- Success: Functional parity with Blower (A)
Contradictions Detected:
- Database Technology: PostgreSQL (original) vs MariaDB (Q9)
- Timeline vs Complexity: 2-4 weeks (Q18) vs comprehensive monitoring (Q12) + unknown transformations (Q10)
- Performance: "Near real-time delta" (original) vs "hourly acceptable" (Q4)
Ambiguities Detected:
- Transformation complexity unknown (Q10)
- Monitoring scope vs timeline mismatch
- Product → Item hierarchy change impact
- Blower schema availability timing
- Multiple downstream systems scope
Action:
Created requirements-clarification-questions.md with 8 questions to resolve contradictions and ambiguities.
Requirements Clarification - Resolved (2026-02-26T22:42:53-07:00)
User Response: "I've updated the contradictions doc"
Clarification Answers:
- Database Technology: Aurora MySQL (B)
- Timeline: Phased approach - MVP in 2-4 weeks, iterate for monitoring/testing (D)
- Performance: Hourly batch acceptable (C)
- Transformation Analysis: Provide Blower schema now, analyze before design (A)
- Monitoring Scope: MVP with Airflow monitoring + data quality checks (B)
- Product/Item Model: Flatten Products to Items - maintain item-centric model (A)
- Blower Schema: Available now (A)
- Downstream Systems: Multiple output schemas per system type (B)
Contradictions Resolved:
✅ Database: Aurora MySQL confirmed ✅ Timeline: Phased approach with MVP focus ✅ Performance: Hourly batch processing confirmed ✅ All ambiguities clarified
Next Action:
Proceed to generate requirements document incorporating all answers.
Requirements Analysis Complete (2026-02-26T22:42:53-07:00)
Status: Requirements document generated and ready for review
Location: aidlc-docs/inception/requirements/requirements.md
Summary:
- 7 Functional Requirements defined
- 5 Non-Functional Requirements defined
- 5 Technical Requirements defined
- Project constraints documented
- Success criteria established
- MVP vs Phase 2 scope clarified
Key Requirements:
- Data ingestion from S3 (hourly batch)
- Aurora MySQL database (upgrade from MariaDB)
- dbt transformations for multiple downstream schemas
- Flatten Product → Item hierarchy for backward compatibility
- Hard delete strategy for delta updates
- Phased delivery: MVP in 2-4 weeks, iterate for monitoring
Awaiting: User approval to proceed to next phase
User Input - Bronze-Silver-Gold Architecture (2026-02-27T08:28:26-07:00)
User Request:
I think we should use a bronze-silver-gold model for the ingestion of the data; the bronze model should be the raw json files with load dates, the gold model should be data formatted in the current ingestion table format. To minimize the downstream work those "gold" tables will include the entire product data set and the deltas from bronze will just update the records that are in each delta load; lets explore this ideaKey Concepts Introduced:
- Bronze Layer: Raw JSON files with load dates (immutable landing zone)
- Gold Layer: Current Blower table format (downstream-ready)
- Delta Processing: Bronze deltas update Gold tables (merge/upsert pattern)
- Full Dataset in Gold: Gold maintains complete product dataset, not just deltas
- Minimize Downstream Work: Downstream systems consume Gold directly without transformation
Architectural Implications:
- Medallion architecture (Bronze → Silver → Gold)
- Bronze = raw inRiver JSON with audit metadata
- Silver = normalized/cleaned data (potentially intermediate transformations)
- Gold = business-ready data matching current Blower schema
- Gold tables are stateful (full dataset), Bronze is append-only (deltas)
Questions to Explore:
- What role does Silver layer play? (Bronze → Silver → Gold or Bronze → Gold?)
- How to handle initial load vs incremental updates in Gold?
- What audit metadata in Bronze? (load_date, file_name, business_unit, etc.)
- How to handle late-arriving data or out-of-order updates?
- Retention policy for Bronze layer?
- How does this affect dbt model structure?
Status: Exploring architectural approach before finalizing requirements
User Input - Blower Target Schema Documentation Added (2026-03-02T13:17:17-07:00)
User Update:
I've added docs/blower-target lets continue with aidlcDocumentation Added:
docs/blower-target/overview.md- Target schema mapping and ERD- Table-specific documentation for all 13 Blower tables:
items.md- Core product/SKU tableattributes.md- Attribute definitionsattribute_sets.md,attribute_set_attributes.md,attribute_roles.md- Attribute metadataitem_attributes.md- Product attribute valuescategories.md,category_attributes.md,item_categories.md- Category hierarchyitem_associations.md,item_bundles.md,item_groups.md- Product relationshipsitem_media.md- Product images/assets
- ERD diagram (D2 source + SVG)
Key Insights from Blower Schema:
- Target:
*_revision_XXXschemas (e.g.,bk_revision_2191) - 13 Core Tables: Items, attributes, categories, and relationship tables
- Multi-BU: Each business unit has separate schema (bk_, gm_, ec_*, etc.)
- Normalized Structure: EAV pattern for attributes, separate relationship tables
- 18 Relationships: Complex entity relationships documented
Impact on Requirements:
- Gold layer target structure now fully documented
- Transformation complexity can be assessed (inRiver JSON → Blower tables)
- Data model mapping requirements clarified
Next Action: Continue AI-DLC workflow - proceed to User Stories phase
Requirements Analysis - Approval Prompt (2026-03-02T13:17:17-07:00)
Status: Requirements document updated with:
- Bronze-Silver-Gold medallion architecture
- Blower target schema details (13 tables documented)
- Updated transformation requirements
- Refined open questions
Awaiting: User approval to proceed to next phase
User Question - Blower Revision Table Generation (2026-03-02T13:36:20-07:00)
User Question:
do we understand how these revision tables are made in blower? we may have to look at the blower documentation and source code in the blower directoryAnalysis Performed: Examined Blower framework source code in blower/ directory
Key Findings:
Blower Architecture (from README.md):
- Framework for importing product data from sources (Heiler PIM)
- Normalizes, validates, creates reports, exports to Magento/Bloomreach
- Each BU creates own "Blower" application depending on this package
Revision System:
- Revisions: Snapshot of product data at a point in time
- Revision Schema:
*_revision_XXX(e.g.,bk_revision_2191) - History Schema: Past revisions stored separately
- Central Schema: Shared data, revision tracking, normalization
Database Schema Structure (from
catalog-common.sql):- 13 tables matching documented Blower target schema
- All tables have
seen_status_idcolumn (tracks add/delete/modified) - Foreign keys reference
*_centralschema for normalization - Tables: items, attributes, attribute_sets, attribute_set_attributes, attribute_roles, categories, category_attributes, item_categories, item_attributes, item_media, item_associations, item_groups, item_bundles
Data Flow:
- Sources: Packages that fetch data from PIM and transform it
- ImporterInterface: Sources implement this to populate revision tables
- RevisionManager: Creates new revision schema, tracks in central.revisions table
- History: Compares current vs previous revision, stores deltas with seen_status (add/delete/modified)
- Cleanup: Old revision schemas deleted, keeping N most recent
Critical Insight:
- Blower sources directly populate revision tables via ImporterInterface
- Each source has own schema, then data flows to revision schema
- Revision tables are the "Gold" layer - downstream systems consume these
- History schema tracks changes between revisions (delta tracking)
Implications for inRiver ETL:
- We need to replicate Blower's revision table population logic
- Bronze layer = inRiver raw JSON (equivalent to Blower source schemas)
- Gold layer = revision tables (same structure as Blower)
- dbt transformations must implement the ImporterInterface logic
- Need to understand how Blower sources transform PIM data → revision tables
Open Question: Do we have access to a Blower source implementation (e.g., Heiler PIM source) to understand the transformation logic from source data to revision tables?
Status: Awaiting user response on Blower source availability
Blower PIM Source Analysis (2026-03-02T14:43:55-07:00)
User Update:
the blower source repos have been moved to a research directory; i've added the blower-pim repo; reviewAnalysis Performed: Examined research/blower-source-pim2/ - Heiler PIM source implementation
Complete Data Flow Discovered:
1. Source Layer (PIM2 Schema)
- Input: 17 CSV files from Heiler PIM (items.csv, attributes.csv, category.csv, etc.)
- Process: Import CSVs to
pim2schema tables (raw, non-normalized) - Commands:
pim2:import:*(Items, Attributes, Categories, ItemAttributes, etc.) - Key Files:
src/Import/*.php- Importer classes implementing ImporterInterfacesrc/Input/*.php- CSV reader classesPIM_STRUCTURE.md- Complete CSV → DB mapping
2. Central Layer (Normalization)
- Input:
pim2schema tables - Process: Normalize data, assign integer IDs, create SHA-1 hashes for values
- Commands:
pim2:pending:central-*(Items, Attributes, Categories, ValueStore, Media, etc.) - Key Tables:
central.items- Normalized items with integer IDscentral.attributes- Normalized attributescentral.value_store- SHA-1 hashed attribute valuescentral.categories- Normalized categoriescentral.media- Normalized media files
3. Revision Layer (Gold)
- Input:
pim2+centralschemas - Process: Join pim2 → central IDs, insert to revision tables
- Commands:
pim2:pending:pending-*(Items, Attributes, Categories, ItemAttributes, etc.) - Schema:
*_revision_XXX(e.g.,bk_revision_2191) - Example SQL (from Items.php):
INSERT IGNORE INTO {revision}.items
SELECT item.id, item.name, it.id, ag.id, @SEEN_ID
FROM pim2.items pis
INNER JOIN central.items item ON item.blower_id = pis.blower_id
INNER JOIN central.item_types it ON it.name = pis.item_type
LEFT JOIN central.attribute_sets ag ON ag.blower_id = pis.attribute_set_blower_id4. History Layer (Delta Tracking)
- Input: Current revision vs previous revision
- Process: Compare schemas, mark seen_status (add/delete/modified)
- Commands: Blower framework
Historyclass - Output:
*_historyschema with delta records
Critical Insights:
Three-Stage Transformation:
- Stage 1: CSV → pim2 schema (raw import, validation)
- Stage 2: pim2 → central (normalization, ID assignment)
- Stage 3: pim2 + central → revision (join and populate gold tables)
Blower ID Pattern:
- Source tables have
blower_idcolumn (original PIM identifier) - Central tables have
id(integer) +blower_id(source reference) - Revision tables use central
id(normalized integer IDs)
- Source tables have
EAV Implementation:
item_attributesusesvalue_id→central.value_store.idvalue_storecontains SHA-1 hash + actual value- Enables deduplication and efficient storage
Seen Status Tracking:
- All revision tables have
seen_status_idcolumn - Values: seen (initial), added, deleted, modified, unchanged
- History class compares revisions to set status
- All revision tables have
Mapping to inRiver ETL:
| Blower Layer | inRiver Equivalent | Purpose |
|---|---|---|
| PIM2 schema | Bronze layer | Raw data landing zone |
| Central schema | Silver layer | Normalization + ID assignment |
| Revision schema | Gold layer | Business-ready data |
| History schema | Delta tracking | Change data capture |
Transformation Requirements for inRiver:
Bronze → Silver (equivalent to pim2 → central):
- Parse inRiver JSON to extract entities
- Assign integer IDs to all entities
- Create value_store with SHA-1 hashes
- Normalize references (blower_id → id)
Silver → Gold (equivalent to pim2 + central → revision):
- Join Bronze entities with Silver IDs
- Populate 13 revision tables
- Set seen_status_id = 'seen' initially
- Apply business rules and validation
Gold Delta Processing (equivalent to History):
- Compare current Gold vs previous Gold
- Mark records as add/delete/modified
- Update seen_status_id accordingly
- Maintain full dataset in Gold (merge/upsert)
Next Steps:
- Map inRiver JSON entities to Blower CSV equivalents
- Design Silver layer schema (central equivalent)
- Design dbt models for Bronze → Silver → Gold transformations
- Implement seen_status tracking for delta updates
Status: Ready to update requirements with complete transformation understanding
Requirements Update - Blower Source Analysis Complete (2026-03-02T14:49:51-07:00)
User Approval: "yy"
Updates Applied:
Three-Layer Architecture Confirmed:
- Bronze: Raw inRiver JSON (equivalent to Blower PIM2 schema)
- Silver: Normalization + ID assignment (equivalent to Blower Central schema) - REQUIRED
- Gold: Business-ready revision tables (equivalent to Blower Revision schema)
Silver Layer Details Added:
- Integer ID assignment for all entities
- SHA-1 hashed value_store for attribute values
- Lookup tables: items, attributes, categories, media, attribute_sets
- Pattern:
blower_id(source) →id(normalized)
Transformation Pattern Documented:
- Stage 1: Bronze → Silver (parse JSON, assign IDs, create value_store)
- Stage 2: Silver → Gold (join Bronze + Silver IDs, populate 13 tables)
- Stage 3: Delta processing (compare Gold revisions, update seen_status)
Seen Status Tracking:
- All Gold tables have
seen_status_idcolumn - Values: seen, added, deleted, modified, unchanged
- Enables delta tracking and change data capture
- All Gold tables have
Functional Requirements Updated:
- FR-1: Bronze layer ingestion
- FR-2: Silver layer normalization (NEW)
- FR-3: Gold layer business-ready data
- FR-4: Three-stage dbt transformations
- FR-6: ChannelNode link handling
- FR-7: AttributeFile processing (NEW)
- FR-8: Delta update processing with seen_status
Open Questions Updated:
- Silver layer confirmed as required (removed from questions)
- Added: Revision cleanup strategy, value_store hashing approach
Status: Requirements document updated, ready for final approval
Architectural Decision - dbt-Native Approach (2026-03-02T15:39:53-07:00)
User Question:
should we be creating an importer following the paradigm in pim2 instead of using dbt?User Direction:
my leaning is that eventually blower would be replaced by dbt; we are just trying to take shortest path that keeps us on this trajectoryDecision: All-dbt pipeline with minimal Python glue
Approach:
- Bronze ingestion: Airflow Python operator (~50 lines) - S3 → JSON to database
- Silver transformations: dbt models - Parse JSON, assign IDs, create value_store
- Gold transformations: dbt models - Join and populate 13 revision tables
- Delta tracking: dbt incremental models with seen_status logic
Rationale:
- Strategic move toward dbt-native pipeline
- Sets precedent for replacing other Blower sources
- Team already familiar with dbt
- Shortest path to modern data stack
- No PHP dependency
- Easier to maintain and hire for
What We Skip from Blower:
- PHP ImporterInterface classes
- Event-driven hooks (use dbt macros instead)
- PIM2 schema pattern (Bronze is simpler JSON table)
- Complex batch processing (dbt handles this)
Requirements Updated:
- TR-2: Orchestration - Added Python + dbt task details
- TR-3: Transformation - Emphasized dbt-native approach with strategic rationale
User Approval: "approved"
Status: Requirements ready for final approval to proceed to User Stories phase
Database Technology Decision - PostgreSQL (2026-03-02T15:53:15-07:00)
User Question:
since we are doing all of this is aurora mysql a good choice or should we move to postgresqlContext Discovery:
downstream systems are dependent on csv exportsDecision: Switch to Aurora PostgreSQL
Rationale:
Downstream Systems DB-Agnostic:
- Consume CSV exports, not direct DB connections
- No breaking changes to downstream systems
Superior JSON Handling:
- Bronze layer stores raw inRiver JSON
- PostgreSQL JSONB: native operators, indexing, efficient querying
- MySQL JSON: limited operators, less efficient
Complete Modernization:
- dbt + Postgres = industry standard modern data stack
- If replacing Blower pattern, modernize DB too
- Future-proof architecture
Better for Data Pipelines:
- Advanced SQL features (CTEs, window functions, arrays)
- dbt has richer feature set for Postgres
- Native array support for complex structures
No Migration Risk:
- Downstream systems unaffected (CSV exports remain same)
- Change before any code written
- Minimal effort (just DB engine + dbt adapter)
Requirements Updated:
- TR-1: Aurora MySQL → Aurora PostgreSQL with JSONB support
- TR-3: dbt-mysql → dbt-postgres with JSONB operators
- Architecture: Bronze layer uses JSONB column
- FR-4: Stage 1 uses PostgreSQL JSONB operators for parsing
User Approval: "approved"
Status: Complete modernization approved (dbt + Postgres), requirements ready for final approval
Bronze Layer Storage Clarification (2026-03-02T22:21:26-07:00)
User Question:
do we have to change more docs on the bronze model? originally it was just going to contain meta data and the json was going to be left in s3User Decision:
should be option 1 now that we are using postgresqlClarification: Bronze layer stores JSON in database (not just metadata)
Approach:
- PostgreSQL table with JSONB column for raw inRiver JSON
- Metadata columns: load_date, file_name, business_unit, s3_key, entity_type
- Immutable append-only pattern
Rationale:
- PostgreSQL JSONB enables efficient SQL querying
- dbt can transform directly from Bronze table (no S3 reads)
- Simpler dbt models (pure SQL, no external data sources)
- JSONB indexing for performance
Requirements Updated:
- FR-1: Explicitly defines Bronze table structure with JSONB column
- Clarified: JSON stored in database, not just S3 reference
Status: Bronze layer storage approach confirmed and documented
ERP Data Integration - Deferred to Phase 2 (2026-03-02T23:16:54-07:00)
User Question:
how fresh is the data in ec_erp_dataAnalysis Performed: Connected to production Blower database and analyzed ERP schemas
Findings:
ERP Schemas Exist (only 3 BUs):
ec_erp_data- Enterprise Comfort (item_attributes, packages, supersedes)ec_m1_data- Enterprise Comfort M1 legacypp_p21_data- Peirce-Phelps P21 ERP
Data Freshness:
- ERP data updated: 2026-03-03 02:38:25
- Revision created: 2026-03-03 02:40:20
- Synchronized: ERP updated with each Blower run (within 2 minutes)
Schema Structure:
- ERP data in separate schemas (not merged into revision)
- Likely joined at export time by downstream systems
- ~292K ERP records for EC
Implications:
- Moving to PostgreSQL separates PIM from ERP data
- Downstream systems may need ERP data access
- Most BUs don't have ERP schemas (only EC and PP)
User Decision:
lets commit to postgresql we'll figure out the erp data laterResolution:
- Commit to Aurora PostgreSQL for inRiver PIM data
- Defer ERP integration to Phase 2
- Added to Out of Scope: ERP data integration with options documented
Phase 2 Options for ERP Data:
- Replicate ERP schemas to PostgreSQL
- Federated queries (postgres_fdw to MySQL)
- Dual-database pattern (PIM in Postgres, ERP in MySQL)
- Consolidate ERP sources into dbt pipeline
Status: PostgreSQL confirmed, ERP data deferred, requirements ready for approval
Export Layer Analysis - CSV vs Revision Tables (2026-03-03T07:43:25-07:00)
User Question:
do the CSV outputs seem to match the revision tables in structure?Analysis Performed: Examined Blower export classes and compared with revision table structure
Finding: NO - CSV exports are DIFFERENT from revision tables
Revision Table items:
id int(10) unsigned -- Internal ID
name varchar(128)
type_id tinyint(3) -- Foreign key to item_types
attribute_set_id int(10) -- Foreign key to attribute_sets
seen_status_id tinyint(3) -- Delta trackingCSV Export items.csv:
item_id -- blower_id (original source ID)
name
type -- type NAME (not ID)
attribute_set_id -- blower_id (not internal ID)Key Differences:
- IDs are converted: Internal IDs → blower_ids (source IDs)
- Foreign keys resolved: type_id → type name (lookup via central)
- No seen_status_id: Internal tracking not exported
- Simplified structure: Downstream-friendly format
Export SQL Pattern (from Items.php):
SELECT
central_i.blower_id item_id, -- Source ID
i.name,
it.name type, -- Resolved name
cas.blower_id attribute_set_id -- Source ID
FROM revision.items i
INNER JOIN central.items central_i ON central_i.id = i.id
INNER JOIN central.item_types it ON it.id = i.type_id
LEFT JOIN central.attribute_sets cas ON cas.id = i.attribute_set_idImplications for PostgreSQL Pipeline:
Gold layer ≠ Export format
- Gold = normalized with integer IDs (performance)
- Export = denormalized with source IDs (compatibility)
Need Export Layer
- Separate dbt models for CSV generation
- Transform Gold → Export format
- Join Silver to get blower_ids
- Resolve foreign keys to names
Four-Stage Pipeline:
- Bronze → Silver (normalization)
- Silver → Gold (business-ready)
- Gold → Gold (delta tracking)
- Gold → Export (CSV generation)
Requirements Updated:
- Architecture: Added Export Layer description
- FR-4: Added Stage 4 (Gold → Export)
- FR-5: Updated with CSV export details
- Next Steps: Updated to four-stage architecture
User Approval: "yes"
Status: Export layer added to requirements, four-stage pipeline documented
Phased Approach Decision (2026-03-03T08:07:43-07:00)
User Question:
would the fastest path be to have a separate system that massages the jsonl files into the import tables; I wonder if we have a fasttrack project and a rebuild project, what we've defined here seems like it may be alotUser Decision:
we need this moving quickly the challenge is that delta files - Option 3 would be the proper option; I want to make blower more maintainable longtermDecision: Two-Phase Approach
Phase 1: Fast Track (2-3 days)
Approach: Python converter - inRiver JSONL → Heiler CSV format
Rationale:
- Business urgency - need inRiver working quickly
- Low risk - use existing Blower infrastructure
- Minimal code - ~500 lines Python
- Proven pattern - Blower already works
Deliverable: Python script that converts inRiver files to Heiler format, drops into Blower input
Phase 2: Full Modernization (4-6 weeks)
Approach: dbt + PostgreSQL pipeline (Bronze → Silver → Gold → Export)
Rationale:
- Strategic goal - make Blower more maintainable long-term
- Sets precedent for replacing other Blower sources
- Modern data stack - easier to hire/train for
- Better architecture - scalable and testable
Deliverable: Complete replacement of Blower with dbt-native pipeline
Documents Created:
- requirements-overview.md - Two-phase strategy summary
- phase1-requirements.md - Fast track Python converter requirements
- phase2-requirements.md - Full modernization requirements (renamed from requirements.md)
Status: Phase 1 requirements ready for approval, Phase 2 deferred
PostgreSQL Accumulator Addition to Phase 1 (2026-03-03T08:25:00-07:00)
User Insight:
since we are dealing with deltas and, I believe, heiler delivers fulls, it seems like we should standup a postgresql server in RDS as part of phase1 that loads the jsonl; this will eventually be bronze in phase 2 but can be used to "mimic" the full database to create the csvs?Critical Realization:
- Heiler: Delivers full files (nightly full load)
- inRiver: Delivers delta files only (incremental updates)
- Blower: Expects full CSV files (all products, not just changes)
Problem: Can't generate full CSVs from delta files alone - need to accumulate deltas into full dataset
Solution: PostgreSQL Accumulator Database
Updated Phase 1 Architecture:
inRiver S3 (deltas)
→ Python Delta Loader
→ PostgreSQL (full dataset, JSONB)
→ Python CSV Generator
→ Heiler CSVs (full files)
→ BlowerPostgreSQL Schema:
CREATE TABLE products (
id TEXT PRIMARY KEY,
data JSONB,
updated_at TIMESTAMP,
deleted BOOLEAN DEFAULT FALSE
);Operations:
- Insert: New records from delta
- Update: Merge JSONB (ON CONFLICT UPDATE)
- Delete: Soft delete (deleted = true)
- Query: SELECT all non-deleted for CSV generation
Benefits:
- ✅ Converts deltas → full dataset
- ✅ PostgreSQL ready for Phase 2 (becomes Bronze layer)
- ✅ JSONB for flexible schema
- ✅ No throwaway work - reused in Phase 2
Phase 1 Requirements Updated:
- FR-2: PostgreSQL Accumulator Database
- FR-3: Delta Merge Logic
- FR-4: CSV Generation from PostgreSQL
- TR-1: Database (RDS PostgreSQL)
- TR-2: Implementation (Delta Loader + CSV Generator)
- TR-3: Orchestration (Two-step DAG)
Status: Phase 1 requirements updated with PostgreSQL accumulator
Audit Trail Enhancement (2026-03-03T08:38:54-07:00)
User Request:
lets make sure in phase1 we track the most recent source file in addition to the timestamp for each json date object in the table. We should also maintain an event log table that tracks the loading of these files through timeEnhancements Added:
1. Source File Tracking
Data Tables Schema Updated:
CREATE TABLE products (
id TEXT PRIMARY KEY,
data JSONB,
updated_at TIMESTAMP,
source_file TEXT, -- NEW: Track which file last updated this record
deleted BOOLEAN DEFAULT FALSE
);Purpose:
- Know which S3 file last modified each record
- Debug data issues by tracing back to source
- Audit trail for compliance
2. Event Log Table
New Table:
CREATE TABLE file_processing_events (
id SERIAL PRIMARY KEY,
event_time TIMESTAMP DEFAULT NOW(),
event_type TEXT, -- 'file_start', 'file_complete', 'file_error'
source_file TEXT,
entity_type TEXT, -- 'products', 'channel_nodes', 'attribute_file'
records_processed INTEGER,
records_inserted INTEGER,
records_updated INTEGER,
records_deleted INTEGER,
error_message TEXT,
metadata JSONB
);Purpose:
- Complete history of all file processing
- Track success/failure rates
- Monitor data flow over time
- Debug processing issues
- Compliance and audit requirements
3. Event Logging Flow
For each file processed:
- Log 'file_start' event
- Process records (track counts)
- Log 'file_complete' event with statistics
- On error: Log 'file_error' event with details
Example Event Log:
2026-03-03 08:00:00 | file_start | s3://bucket/products_delta_001.jsonl | products | ...
2026-03-03 08:00:15 | file_complete | s3://bucket/products_delta_001.jsonl | products | 1000 processed, 50 inserted, 900 updated, 50 deleted
2026-03-03 08:05:00 | file_start | s3://bucket/products_delta_002.jsonl | products | ...Phase 1 Requirements Updated:
- FR-2: Added source_file column and event log table
- FR-3: Added event logging to delta merge logic
- TR-1: Updated database schema with event log
- TR-2: Updated Delta Loader to log events
- Success Criteria: Added audit trail requirements
Status: Phase 1 requirements updated with complete audit trail
Decoupled Processing Schedule (2026-03-03T08:56:30-07:00)
User Clarification:
in phase 1 we will run the delta imports as they come but will only need to generate the CSVs nightly (cron or on demand)Architecture Update: Decoupled DAGs
DAG 1: Delta Loader (Near Real-Time)
- Trigger: S3 event notification (as files arrive)
- Frequency: On-demand (per file, throughout the day)
- Duration: ~1-2 minutes per file
- Purpose: Accumulate deltas into PostgreSQL immediately
DAG 2: CSV Generator (Nightly)
- Trigger: Cron schedule (nightly) OR manual trigger
- Frequency: Once per day (or on-demand)
- Duration: ~10-15 minutes
- Purpose: Generate full Heiler CSVs from accumulated deltas
Benefits:
- ✅ Near real-time delta accumulation (data always current)
- ✅ Nightly CSV generation (matches Blower pattern)
- ✅ Decoupled processing (delta loading doesn't block CSV generation)
- ✅ On-demand CSV generation (for testing or urgent updates)
- ✅ Efficient (accumulate all day, generate once)
Data Flow:
Throughout Day:
inRiver delta files → S3 event → Delta Loader → PostgreSQL
Nightly (or on-demand):
PostgreSQL (full dataset) → CSV Generator → Heiler CSVs → BlowerPhase 1 Requirements Updated:
- FR-1: Delta loading as files arrive (near real-time)
- FR-4: CSV generation nightly or on-demand
- NFR-1: Performance requirements for decoupled processing
- TR-3: Two separate DAGs with different triggers
Status: Phase 1 requirements updated with decoupled processing schedule
Ordered Processing & Replay Capability (2026-03-03T09:27:42-07:00)
User Requirements:
we need to add notes that the jsonl need to be load in order and loading new files should leverage the events table to decide what files to load. we should be able to mark an event and following events as reload and the process should restart from that point forward. Events themselves should not be deleted.Requirements Added:
1. Ordered Processing
Requirement: JSONL files must be processed in chronological order (deltas depend on sequence)
Implementation:
-- Find last successfully processed file (not marked for replay)
SELECT MAX(id) FROM file_processing_events
WHERE event_type = 'file_complete'
AND replay_date IS NULL;
-- Process files after that point in orderBenefits:
- Maintains data consistency
- Prevents duplicate processing
- Event log drives processing logic
2. Replay Capability
Requirement: Can mark events for replay and reprocess from that point forward
Implementation:
-- Mark event and all following events for replay
UPDATE file_processing_events
SET replay_date = NOW()
WHERE id >= [problem_event_id];Replay Scenario Example:
Initial State:
1. file_001.jsonl - processed ✓ (replay_date = NULL)
2. file_002.jsonl - processed ✓ (replay_date = NULL) ← bad data detected
3. file_003.jsonl - processed ✓ (replay_date = NULL)
Action: Mark events #2 and #3 for replay
UPDATE file_processing_events SET replay_date = NOW() WHERE id >= 2;
After Marking:
1. file_001.jsonl - processed ✓ (replay_date = NULL)
2. file_002.jsonl - processed ✓ (replay_date = '2026-03-03 09:00:00')
3. file_003.jsonl - processed ✓ (replay_date = '2026-03-03 09:00:00')
Delta Loader Query:
- Finds last valid event: #1
- Detects gap (no valid events after #1)
- Reprocesses: file_002.jsonl, file_003.jsonl
- Creates new events with replay_date = NULL
Final State:
1. file_001.jsonl - processed ✓ (replay_date = NULL)
2. file_002.jsonl - processed ✓ (replay_date = '2026-03-03 09:00:00') [original]
3. file_003.jsonl - processed ✓ (replay_date = '2026-03-03 09:00:00') [original]
4. file_002.jsonl - processed ✓ (replay_date = NULL) [replay]
5. file_003.jsonl - processed ✓ (replay_date = NULL) [replay]3. Event Immutability
Requirement: Events are never deleted (permanent audit trail)
Benefits:
- Complete processing history
- Can trace any data issue
- Replay depends on event history
- Compliance and audit requirements
4. Event Log Schema Update
Added Column:
replay_date(timestamp, nullable) - When event was marked for replay- NULL = valid event
- NOT NULL = marked for replay (excluded from processing)
Query Pattern:
-- Find next file to process
WHERE replay_date IS NULL
ORDER BY event_timePhase 1 Requirements Updated:
- FR-2: Added replay_date column to event log schema
- FR-3: Added ordered processing and replay logic
- TR-1: Added replay_date index for efficient queries
- TR-2: Updated Delta Loader with ordered processing logic
- NFR-2: Added ordered processing and replay requirements
Status: Phase 1 requirements updated with ordered processing and replay capability
Multi-Business Unit Architecture (2026-03-03T09:31:42-07:00)
User Clarification:
do we capture the multi business unit nature of this project. There will be a separate aggregator database for each business unitArchitecture Update: Separate Database Per BU
Database Architecture
One PostgreSQL database per business unit:
inriver_bak- Baker Distributinginriver_cefl- Carrier Enterprise Floridainriver_ecm- Enterprise Comfortinriver_gem- Gemaire
Deployment Options:
- Option A: 4 separate RDS instances (complete isolation, higher cost)
- Option B: 1 shared RDS instance with 4 databases (cost-effective, recommended)
DAG Architecture
8 Total DAGs (4 BUs × 2 DAGs each):
Per Business Unit:
inriver_{bu}_delta_loader- Load deltas as they arriveinriver_{bu}_csv_generator- Generate CSVs nightly
Examples:
inriver_bak_delta_loaderinriver_bak_csv_generatorinriver_cefl_delta_loaderinriver_cefl_csv_generator- etc.
Benefits of Separate Databases
- ✅ Isolation: BU data completely separate
- ✅ Independent Processing: Each BU processes at own pace
- ✅ Failure Isolation: Issue in one BU doesn't affect others
- ✅ Matches Blower Pattern: Blower has separate schemas per BU
- ✅ Easier Rollout: Deploy one BU at a time
- ✅ Phase 2 Ready: Each database becomes Bronze layer for that BU
Data Flow Per BU
inRiver S3 (BU-specific path)
↓
Delta Loader DAG (BU-specific)
↓
PostgreSQL Database (BU-specific)
↓
CSV Generator DAG (BU-specific)
↓
Heiler CSVs (BU-specific path)
↓
Blower (BU-specific)Phase 1 Requirements Updated:
- Overview: Added multi-BU architecture explanation
- FR-1: Per-BU processing
- FR-2: Separate database per BU
- TR-1: Database architecture options (separate instances vs shared)
- TR-3: 8 DAGs total (4 BUs × 2 DAGs)
- Dependencies: Per-BU infrastructure provisioning
- Next Steps: Per-BU rollout plan
Status: Phase 1 requirements updated with multi-BU architecture
Configuration and Mapping Tables (2026-03-03T09:39:29-07:00)
User Requirement:
we have an additional problem. there will need to be some mapping data (e.g. heiler creates a itemdataflex, item-attributes, products and maybe more from data that will all be in attributes in inriver) we will need a table that describes what goes where potentially. Also we lose some fidelity - for example we may not have a purpose anymore and will need a table where we can keep this decorator data. For now we can load those tables manually - may eventually need a quick web frontend for this systemProblem Identified:
Complex Mapping: inRiver attributes → multiple Heiler CSVs
- One inRiver field may map to multiple Heiler columns
- Example: Product attributes → itemdataflex.csv, item-attributes.csv, products.csv
Lost Fidelity: Data exists in Heiler but not in inRiver
- Example: "purpose" field in Heiler
- Need to store this decorator data somewhere
Solution: Configuration Tables
1. Field Mapping Table
Purpose: Map inRiver fields to Heiler CSV columns
Schema:
CREATE TABLE field_mapping (
id SERIAL PRIMARY KEY,
inriver_entity TEXT, -- 'Product', 'ChannelNode', etc.
inriver_field TEXT, -- inRiver field name
heiler_csv TEXT, -- Target CSV file
heiler_column TEXT, -- Target column
transformation_rule TEXT, -- Optional transformation
active BOOLEAN DEFAULT TRUE,
notes TEXT
);Example Data:
INSERT INTO field_mapping VALUES
(1, 'Product', 'ProductId', 'items.csv', 'Item#', NULL, true, 'Direct mapping'),
(2, 'Product', 'ProductId', 'itemdataflex.csv', 'Item#', NULL, true, 'Same ID in multiple CSVs'),
(3, 'Product', 'FieldValues.ShortDescription', 'itemdataflex.csv', 'BU_ISD', NULL, true, 'Extract from field values');2. Decorator Data Table
Purpose: Store data that doesn't exist in inRiver
Schema:
CREATE TABLE decorator_data (
id SERIAL PRIMARY KEY,
entity_type TEXT, -- 'attribute', 'item', 'category'
entity_id TEXT, -- inRiver entity ID
decorator_type TEXT, -- 'purpose', 'role', etc.
decorator_value TEXT, -- The value
source TEXT, -- 'manual', 'migration', etc.
created_at TIMESTAMP DEFAULT NOW(),
notes TEXT
);Example Data:
INSERT INTO decorator_data VALUES
(1, 'attribute', 'ATTR_001', 'purpose', 'searchable', 'manual', NOW(), 'Migrated from Heiler'),
(2, 'attribute', 'ATTR_002', 'purpose', 'filterable', 'manual', NOW(), 'Migrated from Heiler');3. CSV Template Config Table
Purpose: Define CSV generation rules per column
Schema:
CREATE TABLE csv_template_config (
id SERIAL PRIMARY KEY,
csv_name TEXT, -- 'items.csv', etc.
column_name TEXT, -- Column in CSV
source_type TEXT, -- 'inriver_field', 'decorator', 'static', 'computed'
source_value TEXT, -- Field name or value
default_value TEXT, -- Default if null
transformation TEXT, -- Optional transformation
required BOOLEAN DEFAULT FALSE,
notes TEXT
);Example Data:
INSERT INTO csv_template_config VALUES
(1, 'items.csv', 'Item#', 'inriver_field', 'ProductId', NULL, NULL, true, 'Primary key'),
(2, 'purpose.csv', 'Feature_Purpose', 'decorator', 'purpose', NULL, NULL, false, 'From decorator_data table');CSV Generator Logic
Uses configuration tables:
- Query
csv_template_configfor target CSV - For each column:
- If
source_type = 'inriver_field': Extract from JSON - If
source_type = 'decorator': Join with decorator_data - If
source_type = 'static': Use static value - If
source_type = 'computed': Apply transformation
- If
- Apply
field_mappingfor complex mappings - Generate CSV row
Management Approach
Phase 1: Manual SQL inserts
- Load initial configuration via SQL scripts
- Maintain decorator data via SQL updates
- Simple, no UI needed initially
Future Enhancement: Web UI
- CRUD interface for field_mapping
- CRUD interface for decorator_data
- CSV template editor
- Out of scope for Phase 1
Phase 1 Requirements Updated:
- TR-1: Added configuration tables to database schema
- FR-6: New requirement for configuration and mapping tables
- TR-2: Updated CSV Generator to use configuration tables
- Out of Scope: Added web UI as future enhancement
Status: Phase 1 requirements updated with configuration management
Database Architecture Clarification (2026-03-03T09:49:14-07:00)
User Clarification:
we want separate databases(schemas) in one postgresql instanceArchitecture Update: Single Instance with Separate Schemas
PostgreSQL Architecture
Single RDS PostgreSQL instance:
- Instance size: db.t3.medium (shared across all BUs)
- 4 Schemas (one per BU):
inriver_bak- Baker Distributinginriver_cefl- Carrier Enterprise Floridainriver_ecm- Enterprise Comfortinriver_gem- Gemaire
Schema Structure
Each schema contains identical table structure:
inriver_bak/
├── products (data table)
├── channel_nodes (data table)
├── attribute_file (data table)
├── file_processing_events (event log)
├── field_mapping (configuration)
├── decorator_data (configuration)
└── csv_template_config (configuration)
inriver_cefl/
├── (same tables)
...Benefits
- ✅ Cost-effective: Single instance vs 4 separate instances
- ✅ Data isolation: Schemas provide logical separation
- ✅ Simplified management: One instance to maintain
- ✅ Shared resources: Better resource utilization
- ✅ Easy backup: Single instance backup covers all BUs
- ✅ Matches Blower pattern: Blower uses schemas per BU (bk_, gm_, etc.)
Connection Pattern
DAGs connect with schema-specific search_path:
# BAK Delta Loader
conn = psycopg2.connect(
host='rds-endpoint',
database='inriver',
user='inriver_user',
options='-c search_path=inriver_bak'
)
# CEFL Delta Loader
conn = psycopg2.connect(
host='rds-endpoint',
database='inriver',
user='inriver_user',
options='-c search_path=inriver_cefl'
)Phase 1 Requirements Updated:
- Overview: Single instance with separate schemas
- FR-2: Single instance with schemas per BU
- TR-1: Single db.t3.medium instance with 4 schemas
- Dependencies: Single instance provisioning
- Next Steps: Create schemas in single instance
Status: Phase 1 requirements updated with single-instance architecture