Skip to content

AI-DLC Audit Trail

User Request (2026-02-26T15:27:02-07:00)

Raw Input:

this repo contains documentation about our new inriver pim files as well as some sample files; we use a system called blower that runs on apache airflow with php generated dags to do ETL from our current pim systems raw inputs to the outputs expected by downstream systems.  We are migrating to this new pim system that outputs pim data in the formats described in this repo and represented in the ecom samples `samples/aperature-export-ecom`.  Our plan is to keep the downstream outputs the same but to get the inriver jsonl product files into a postgresql database where they can be manipulated leveraging dbt on airflow into the same format as our current blower database tables. understand this repo and then help us by running aidlc to start producing a project plan.

Workspace Detection (2026-02-26T15:27:02-07:00)

Findings:

  • Repository Type: Documentation repository (VitePress)
  • Existing Code: Only utility script (parse_email.py) - not application code
  • Project Type: Greenfield - New ETL pipeline project
  • Context:
    • Current system: Blower (Apache Airflow + PHP-generated DAGs)
    • New PIM: inRiver with JSONL exports
    • Goal: Load inRiver JSONL → PostgreSQL → dbt transformations → Same output format as current Blower tables
    • Documentation exists: Technical specs, sample files, ERD, schema docs

Repository Structure Analysis:

  • docs/ - VitePress documentation site with technical specs
  • samples/aperture-export-ecom/ - Sample inRiver JSON exports (Product, ChannelNode, AttributeFile)
  • correspondence/ - Email archives with stakeholder communications
  • meetings/ - Meeting notes
  • terraform/ - Infrastructure code (likely for documentation hosting)

Key Documentation Reviewed:

  • Technical spec: JSONL export format, CVL fields, delivery schedule
  • ERD: Entity relationships (Channel, ChannelNode, Product, Item, Resource)
  • Sample data: Product, ChannelNode, AttributeFile JSON structures
  • Assumptions: Business unit handling, cardinality, scalability concerns
  • Open questions: ChannelNode OutboundLinks scalability, delta update strategy

Decision:

  • Brownfield: No
  • Next Phase: Requirements Analysis
  • Rationale: This is a new ETL pipeline project. Documentation exists but no application code.

User Clarification (2026-02-26T15:27:02-07:00)

Additional Context Provided:

in this you should also note that we are moving from a system that is "nightly full load" to a system that is always delta load

Key Insight:

  • Current System (Blower): Nightly full load pattern
  • New System (inRiver): Always delta load (near real-time updates)
  • Architectural Shift: From batch full refresh to incremental delta processing
  • Impact: Requires change data capture, merge/upsert logic, and state management

Requirements Analysis - Initial Questions (2026-02-26T15:46:35-07:00)

User Response: "requirements doc has been updated"

Answers Received:

  1. Blower Schema: Can provide documentation (A)
  2. Downstream Systems: E-commerce (Magento, Shopware) + Algolia + other systems (A + detail)
  3. Business Unit Handling: Separate pipelines per BU (A)
  4. Data Volume: <100K products, hourly updates acceptable (A)
  5. Delta Strategy: Hard deletes (B)
  6. Error Handling: Standard validation, quarantine, alerting (B)
  7. Airflow: Existing MWAA available (A)
  8. S3 Access: Control bucket, can add event notifications (A)
  9. Database: MariaDB on RDS, might consider Aurora (C + detail) [CONTRADICTION: Original request said PostgreSQL]
  10. Transformation Complexity: Not sure, needs analysis (D)
  11. Nested Data: Flatten to normalized tables (A) + note: "current process deals with items, new files have products with items as children"
  12. Monitoring: Comprehensive observability (C)
  13. Testing: Data parity validation - tables must be identical (E)
  14. Deployment: CI/CD with automated testing (B)
  15. Incremental/Full: Incremental only - inRiver limitation (A)
  16. AttributeFile: Load to table, joined by incremental data (E)
  17. ChannelNode: Will ask to exclude outbound links to products (E)
  18. Timeline: Urgent 2-4 weeks (A) [CONTRADICTION: Complex project with unknowns]
  19. Team: Mixed experience, familiar with Airflow/dbt but not this pattern (D)
  20. Success: Functional parity with Blower (A)

Contradictions Detected:

  1. Database Technology: PostgreSQL (original) vs MariaDB (Q9)
  2. Timeline vs Complexity: 2-4 weeks (Q18) vs comprehensive monitoring (Q12) + unknown transformations (Q10)
  3. Performance: "Near real-time delta" (original) vs "hourly acceptable" (Q4)

Ambiguities Detected:

  1. Transformation complexity unknown (Q10)
  2. Monitoring scope vs timeline mismatch
  3. Product → Item hierarchy change impact
  4. Blower schema availability timing
  5. Multiple downstream systems scope

Action:

Created requirements-clarification-questions.md with 8 questions to resolve contradictions and ambiguities.


Requirements Clarification - Resolved (2026-02-26T22:42:53-07:00)

User Response: "I've updated the contradictions doc"

Clarification Answers:

  1. Database Technology: Aurora MySQL (B)
  2. Timeline: Phased approach - MVP in 2-4 weeks, iterate for monitoring/testing (D)
  3. Performance: Hourly batch acceptable (C)
  4. Transformation Analysis: Provide Blower schema now, analyze before design (A)
  5. Monitoring Scope: MVP with Airflow monitoring + data quality checks (B)
  6. Product/Item Model: Flatten Products to Items - maintain item-centric model (A)
  7. Blower Schema: Available now (A)
  8. Downstream Systems: Multiple output schemas per system type (B)

Contradictions Resolved:

✅ Database: Aurora MySQL confirmed ✅ Timeline: Phased approach with MVP focus ✅ Performance: Hourly batch processing confirmed ✅ All ambiguities clarified

Next Action:

Proceed to generate requirements document incorporating all answers.


Requirements Analysis Complete (2026-02-26T22:42:53-07:00)

Status: Requirements document generated and ready for review

Location: aidlc-docs/inception/requirements/requirements.md

Summary:

  • 7 Functional Requirements defined
  • 5 Non-Functional Requirements defined
  • 5 Technical Requirements defined
  • Project constraints documented
  • Success criteria established
  • MVP vs Phase 2 scope clarified

Key Requirements:

  • Data ingestion from S3 (hourly batch)
  • Aurora MySQL database (upgrade from MariaDB)
  • dbt transformations for multiple downstream schemas
  • Flatten Product → Item hierarchy for backward compatibility
  • Hard delete strategy for delta updates
  • Phased delivery: MVP in 2-4 weeks, iterate for monitoring

Awaiting: User approval to proceed to next phase


User Input - Bronze-Silver-Gold Architecture (2026-02-27T08:28:26-07:00)

User Request:

I think we should use a bronze-silver-gold model for the ingestion of the data; the bronze model should be the raw json files with load dates, the gold model should be data formatted in the current ingestion table format.  To minimize the downstream work those "gold" tables will include the entire product data set and the deltas from bronze will just update the records that are in each delta load; lets explore this idea

Key Concepts Introduced:

  • Bronze Layer: Raw JSON files with load dates (immutable landing zone)
  • Gold Layer: Current Blower table format (downstream-ready)
  • Delta Processing: Bronze deltas update Gold tables (merge/upsert pattern)
  • Full Dataset in Gold: Gold maintains complete product dataset, not just deltas
  • Minimize Downstream Work: Downstream systems consume Gold directly without transformation

Architectural Implications:

  • Medallion architecture (Bronze → Silver → Gold)
  • Bronze = raw inRiver JSON with audit metadata
  • Silver = normalized/cleaned data (potentially intermediate transformations)
  • Gold = business-ready data matching current Blower schema
  • Gold tables are stateful (full dataset), Bronze is append-only (deltas)

Questions to Explore:

  1. What role does Silver layer play? (Bronze → Silver → Gold or Bronze → Gold?)
  2. How to handle initial load vs incremental updates in Gold?
  3. What audit metadata in Bronze? (load_date, file_name, business_unit, etc.)
  4. How to handle late-arriving data or out-of-order updates?
  5. Retention policy for Bronze layer?
  6. How does this affect dbt model structure?

Status: Exploring architectural approach before finalizing requirements


User Input - Blower Target Schema Documentation Added (2026-03-02T13:17:17-07:00)

User Update:

I've added docs/blower-target lets continue with aidlc

Documentation Added:

  • docs/blower-target/overview.md - Target schema mapping and ERD
  • Table-specific documentation for all 13 Blower tables:
    • items.md - Core product/SKU table
    • attributes.md - Attribute definitions
    • attribute_sets.md, attribute_set_attributes.md, attribute_roles.md - Attribute metadata
    • item_attributes.md - Product attribute values
    • categories.md, category_attributes.md, item_categories.md - Category hierarchy
    • item_associations.md, item_bundles.md, item_groups.md - Product relationships
    • item_media.md - Product images/assets
  • ERD diagram (D2 source + SVG)

Key Insights from Blower Schema:

  • Target: *_revision_XXX schemas (e.g., bk_revision_2191)
  • 13 Core Tables: Items, attributes, categories, and relationship tables
  • Multi-BU: Each business unit has separate schema (bk_, gm_, ec_*, etc.)
  • Normalized Structure: EAV pattern for attributes, separate relationship tables
  • 18 Relationships: Complex entity relationships documented

Impact on Requirements:

  • Gold layer target structure now fully documented
  • Transformation complexity can be assessed (inRiver JSON → Blower tables)
  • Data model mapping requirements clarified

Next Action: Continue AI-DLC workflow - proceed to User Stories phase


Requirements Analysis - Approval Prompt (2026-03-02T13:17:17-07:00)

Status: Requirements document updated with:

  • Bronze-Silver-Gold medallion architecture
  • Blower target schema details (13 tables documented)
  • Updated transformation requirements
  • Refined open questions

Awaiting: User approval to proceed to next phase


User Question - Blower Revision Table Generation (2026-03-02T13:36:20-07:00)

User Question:

do we understand how these revision tables are made in blower? we may have to look at the blower documentation and source code in the blower directory

Analysis Performed: Examined Blower framework source code in blower/ directory

Key Findings:

  1. Blower Architecture (from README.md):

    • Framework for importing product data from sources (Heiler PIM)
    • Normalizes, validates, creates reports, exports to Magento/Bloomreach
    • Each BU creates own "Blower" application depending on this package
  2. Revision System:

    • Revisions: Snapshot of product data at a point in time
    • Revision Schema: *_revision_XXX (e.g., bk_revision_2191)
    • History Schema: Past revisions stored separately
    • Central Schema: Shared data, revision tracking, normalization
  3. Database Schema Structure (from catalog-common.sql):

    • 13 tables matching documented Blower target schema
    • All tables have seen_status_id column (tracks add/delete/modified)
    • Foreign keys reference *_central schema for normalization
    • Tables: items, attributes, attribute_sets, attribute_set_attributes, attribute_roles, categories, category_attributes, item_categories, item_attributes, item_media, item_associations, item_groups, item_bundles
  4. Data Flow:

    • Sources: Packages that fetch data from PIM and transform it
    • ImporterInterface: Sources implement this to populate revision tables
    • RevisionManager: Creates new revision schema, tracks in central.revisions table
    • History: Compares current vs previous revision, stores deltas with seen_status (add/delete/modified)
    • Cleanup: Old revision schemas deleted, keeping N most recent
  5. Critical Insight:

    • Blower sources directly populate revision tables via ImporterInterface
    • Each source has own schema, then data flows to revision schema
    • Revision tables are the "Gold" layer - downstream systems consume these
    • History schema tracks changes between revisions (delta tracking)

Implications for inRiver ETL:

  • We need to replicate Blower's revision table population logic
  • Bronze layer = inRiver raw JSON (equivalent to Blower source schemas)
  • Gold layer = revision tables (same structure as Blower)
  • dbt transformations must implement the ImporterInterface logic
  • Need to understand how Blower sources transform PIM data → revision tables

Open Question: Do we have access to a Blower source implementation (e.g., Heiler PIM source) to understand the transformation logic from source data to revision tables?

Status: Awaiting user response on Blower source availability


Blower PIM Source Analysis (2026-03-02T14:43:55-07:00)

User Update:

the blower source repos have been moved to a research directory; i've added the blower-pim repo; review

Analysis Performed: Examined research/blower-source-pim2/ - Heiler PIM source implementation

Complete Data Flow Discovered:

1. Source Layer (PIM2 Schema)

  • Input: 17 CSV files from Heiler PIM (items.csv, attributes.csv, category.csv, etc.)
  • Process: Import CSVs to pim2 schema tables (raw, non-normalized)
  • Commands: pim2:import:* (Items, Attributes, Categories, ItemAttributes, etc.)
  • Key Files:
    • src/Import/*.php - Importer classes implementing ImporterInterface
    • src/Input/*.php - CSV reader classes
    • PIM_STRUCTURE.md - Complete CSV → DB mapping

2. Central Layer (Normalization)

  • Input: pim2 schema tables
  • Process: Normalize data, assign integer IDs, create SHA-1 hashes for values
  • Commands: pim2:pending:central-* (Items, Attributes, Categories, ValueStore, Media, etc.)
  • Key Tables:
    • central.items - Normalized items with integer IDs
    • central.attributes - Normalized attributes
    • central.value_store - SHA-1 hashed attribute values
    • central.categories - Normalized categories
    • central.media - Normalized media files

3. Revision Layer (Gold)

  • Input: pim2 + central schemas
  • Process: Join pim2 → central IDs, insert to revision tables
  • Commands: pim2:pending:pending-* (Items, Attributes, Categories, ItemAttributes, etc.)
  • Schema: *_revision_XXX (e.g., bk_revision_2191)
  • Example SQL (from Items.php):
sql
INSERT IGNORE INTO {revision}.items
  SELECT item.id, item.name, it.id, ag.id, @SEEN_ID
  FROM pim2.items pis
    INNER JOIN central.items item ON item.blower_id = pis.blower_id
    INNER JOIN central.item_types it ON it.name = pis.item_type
    LEFT JOIN central.attribute_sets ag ON ag.blower_id = pis.attribute_set_blower_id

4. History Layer (Delta Tracking)

  • Input: Current revision vs previous revision
  • Process: Compare schemas, mark seen_status (add/delete/modified)
  • Commands: Blower framework History class
  • Output: *_history schema with delta records

Critical Insights:

  1. Three-Stage Transformation:

    • Stage 1: CSV → pim2 schema (raw import, validation)
    • Stage 2: pim2 → central (normalization, ID assignment)
    • Stage 3: pim2 + central → revision (join and populate gold tables)
  2. Blower ID Pattern:

    • Source tables have blower_id column (original PIM identifier)
    • Central tables have id (integer) + blower_id (source reference)
    • Revision tables use central id (normalized integer IDs)
  3. EAV Implementation:

    • item_attributes uses value_idcentral.value_store.id
    • value_store contains SHA-1 hash + actual value
    • Enables deduplication and efficient storage
  4. Seen Status Tracking:

    • All revision tables have seen_status_id column
    • Values: seen (initial), added, deleted, modified, unchanged
    • History class compares revisions to set status

Mapping to inRiver ETL:

Blower LayerinRiver EquivalentPurpose
PIM2 schemaBronze layerRaw data landing zone
Central schemaSilver layerNormalization + ID assignment
Revision schemaGold layerBusiness-ready data
History schemaDelta trackingChange data capture

Transformation Requirements for inRiver:

  1. Bronze → Silver (equivalent to pim2 → central):

    • Parse inRiver JSON to extract entities
    • Assign integer IDs to all entities
    • Create value_store with SHA-1 hashes
    • Normalize references (blower_id → id)
  2. Silver → Gold (equivalent to pim2 + central → revision):

    • Join Bronze entities with Silver IDs
    • Populate 13 revision tables
    • Set seen_status_id = 'seen' initially
    • Apply business rules and validation
  3. Gold Delta Processing (equivalent to History):

    • Compare current Gold vs previous Gold
    • Mark records as add/delete/modified
    • Update seen_status_id accordingly
    • Maintain full dataset in Gold (merge/upsert)

Next Steps:

  1. Map inRiver JSON entities to Blower CSV equivalents
  2. Design Silver layer schema (central equivalent)
  3. Design dbt models for Bronze → Silver → Gold transformations
  4. Implement seen_status tracking for delta updates

Status: Ready to update requirements with complete transformation understanding


Requirements Update - Blower Source Analysis Complete (2026-03-02T14:49:51-07:00)

User Approval: "yy"

Updates Applied:

  1. Three-Layer Architecture Confirmed:

    • Bronze: Raw inRiver JSON (equivalent to Blower PIM2 schema)
    • Silver: Normalization + ID assignment (equivalent to Blower Central schema) - REQUIRED
    • Gold: Business-ready revision tables (equivalent to Blower Revision schema)
  2. Silver Layer Details Added:

    • Integer ID assignment for all entities
    • SHA-1 hashed value_store for attribute values
    • Lookup tables: items, attributes, categories, media, attribute_sets
    • Pattern: blower_id (source) → id (normalized)
  3. Transformation Pattern Documented:

    • Stage 1: Bronze → Silver (parse JSON, assign IDs, create value_store)
    • Stage 2: Silver → Gold (join Bronze + Silver IDs, populate 13 tables)
    • Stage 3: Delta processing (compare Gold revisions, update seen_status)
  4. Seen Status Tracking:

    • All Gold tables have seen_status_id column
    • Values: seen, added, deleted, modified, unchanged
    • Enables delta tracking and change data capture
  5. Functional Requirements Updated:

    • FR-1: Bronze layer ingestion
    • FR-2: Silver layer normalization (NEW)
    • FR-3: Gold layer business-ready data
    • FR-4: Three-stage dbt transformations
    • FR-6: ChannelNode link handling
    • FR-7: AttributeFile processing (NEW)
    • FR-8: Delta update processing with seen_status
  6. Open Questions Updated:

    • Silver layer confirmed as required (removed from questions)
    • Added: Revision cleanup strategy, value_store hashing approach

Status: Requirements document updated, ready for final approval


Architectural Decision - dbt-Native Approach (2026-03-02T15:39:53-07:00)

User Question:

should we be creating an importer following the paradigm in pim2 instead of using dbt?

User Direction:

my leaning is that eventually blower would be replaced by dbt; we are just trying to take shortest path that keeps us on this trajectory

Decision: All-dbt pipeline with minimal Python glue

Approach:

  • Bronze ingestion: Airflow Python operator (~50 lines) - S3 → JSON to database
  • Silver transformations: dbt models - Parse JSON, assign IDs, create value_store
  • Gold transformations: dbt models - Join and populate 13 revision tables
  • Delta tracking: dbt incremental models with seen_status logic

Rationale:

  • Strategic move toward dbt-native pipeline
  • Sets precedent for replacing other Blower sources
  • Team already familiar with dbt
  • Shortest path to modern data stack
  • No PHP dependency
  • Easier to maintain and hire for

What We Skip from Blower:

  • PHP ImporterInterface classes
  • Event-driven hooks (use dbt macros instead)
  • PIM2 schema pattern (Bronze is simpler JSON table)
  • Complex batch processing (dbt handles this)

Requirements Updated:

  • TR-2: Orchestration - Added Python + dbt task details
  • TR-3: Transformation - Emphasized dbt-native approach with strategic rationale

User Approval: "approved"

Status: Requirements ready for final approval to proceed to User Stories phase


Database Technology Decision - PostgreSQL (2026-03-02T15:53:15-07:00)

User Question:

since we are doing all of this is aurora mysql a good choice or should we move to postgresql

Context Discovery:

downstream systems are dependent on csv exports

Decision: Switch to Aurora PostgreSQL

Rationale:

  1. Downstream Systems DB-Agnostic:

    • Consume CSV exports, not direct DB connections
    • No breaking changes to downstream systems
  2. Superior JSON Handling:

    • Bronze layer stores raw inRiver JSON
    • PostgreSQL JSONB: native operators, indexing, efficient querying
    • MySQL JSON: limited operators, less efficient
  3. Complete Modernization:

    • dbt + Postgres = industry standard modern data stack
    • If replacing Blower pattern, modernize DB too
    • Future-proof architecture
  4. Better for Data Pipelines:

    • Advanced SQL features (CTEs, window functions, arrays)
    • dbt has richer feature set for Postgres
    • Native array support for complex structures
  5. No Migration Risk:

    • Downstream systems unaffected (CSV exports remain same)
    • Change before any code written
    • Minimal effort (just DB engine + dbt adapter)

Requirements Updated:

  • TR-1: Aurora MySQL → Aurora PostgreSQL with JSONB support
  • TR-3: dbt-mysql → dbt-postgres with JSONB operators
  • Architecture: Bronze layer uses JSONB column
  • FR-4: Stage 1 uses PostgreSQL JSONB operators for parsing

User Approval: "approved"

Status: Complete modernization approved (dbt + Postgres), requirements ready for final approval


Bronze Layer Storage Clarification (2026-03-02T22:21:26-07:00)

User Question:

do we have to change more docs on the bronze model? originally it was just going to contain meta data and the json was going to be left in s3

User Decision:

should be option 1 now that we are using postgresql

Clarification: Bronze layer stores JSON in database (not just metadata)

Approach:

  • PostgreSQL table with JSONB column for raw inRiver JSON
  • Metadata columns: load_date, file_name, business_unit, s3_key, entity_type
  • Immutable append-only pattern

Rationale:

  • PostgreSQL JSONB enables efficient SQL querying
  • dbt can transform directly from Bronze table (no S3 reads)
  • Simpler dbt models (pure SQL, no external data sources)
  • JSONB indexing for performance

Requirements Updated:

  • FR-1: Explicitly defines Bronze table structure with JSONB column
  • Clarified: JSON stored in database, not just S3 reference

Status: Bronze layer storage approach confirmed and documented


ERP Data Integration - Deferred to Phase 2 (2026-03-02T23:16:54-07:00)

User Question:

how fresh is the data in ec_erp_data

Analysis Performed: Connected to production Blower database and analyzed ERP schemas

Findings:

  1. ERP Schemas Exist (only 3 BUs):

    • ec_erp_data - Enterprise Comfort (item_attributes, packages, supersedes)
    • ec_m1_data - Enterprise Comfort M1 legacy
    • pp_p21_data - Peirce-Phelps P21 ERP
  2. Data Freshness:

    • ERP data updated: 2026-03-03 02:38:25
    • Revision created: 2026-03-03 02:40:20
    • Synchronized: ERP updated with each Blower run (within 2 minutes)
  3. Schema Structure:

    • ERP data in separate schemas (not merged into revision)
    • Likely joined at export time by downstream systems
    • ~292K ERP records for EC

Implications:

  • Moving to PostgreSQL separates PIM from ERP data
  • Downstream systems may need ERP data access
  • Most BUs don't have ERP schemas (only EC and PP)

User Decision:

lets commit to postgresql we'll figure out the erp data later

Resolution:

  • Commit to Aurora PostgreSQL for inRiver PIM data
  • Defer ERP integration to Phase 2
  • Added to Out of Scope: ERP data integration with options documented

Phase 2 Options for ERP Data:

  1. Replicate ERP schemas to PostgreSQL
  2. Federated queries (postgres_fdw to MySQL)
  3. Dual-database pattern (PIM in Postgres, ERP in MySQL)
  4. Consolidate ERP sources into dbt pipeline

Status: PostgreSQL confirmed, ERP data deferred, requirements ready for approval


Export Layer Analysis - CSV vs Revision Tables (2026-03-03T07:43:25-07:00)

User Question:

do the CSV outputs seem to match the revision tables in structure?

Analysis Performed: Examined Blower export classes and compared with revision table structure

Finding: NO - CSV exports are DIFFERENT from revision tables

Revision Table items:

sql
id int(10) unsigned          -- Internal ID
name varchar(128)
type_id tinyint(3)           -- Foreign key to item_types
attribute_set_id int(10)     -- Foreign key to attribute_sets
seen_status_id tinyint(3)    -- Delta tracking

CSV Export items.csv:

sql
item_id                      -- blower_id (original source ID)
name
type                         -- type NAME (not ID)
attribute_set_id             -- blower_id (not internal ID)

Key Differences:

  1. IDs are converted: Internal IDs → blower_ids (source IDs)
  2. Foreign keys resolved: type_id → type name (lookup via central)
  3. No seen_status_id: Internal tracking not exported
  4. Simplified structure: Downstream-friendly format

Export SQL Pattern (from Items.php):

sql
SELECT
  central_i.blower_id item_id,        -- Source ID
  i.name,
  it.name type,                        -- Resolved name
  cas.blower_id attribute_set_id       -- Source ID
FROM revision.items i
  INNER JOIN central.items central_i ON central_i.id = i.id
  INNER JOIN central.item_types it ON it.id = i.type_id
  LEFT JOIN central.attribute_sets cas ON cas.id = i.attribute_set_id

Implications for PostgreSQL Pipeline:

  1. Gold layer ≠ Export format

    • Gold = normalized with integer IDs (performance)
    • Export = denormalized with source IDs (compatibility)
  2. Need Export Layer

    • Separate dbt models for CSV generation
    • Transform Gold → Export format
    • Join Silver to get blower_ids
    • Resolve foreign keys to names
  3. Four-Stage Pipeline:

    • Bronze → Silver (normalization)
    • Silver → Gold (business-ready)
    • Gold → Gold (delta tracking)
    • Gold → Export (CSV generation)

Requirements Updated:

  • Architecture: Added Export Layer description
  • FR-4: Added Stage 4 (Gold → Export)
  • FR-5: Updated with CSV export details
  • Next Steps: Updated to four-stage architecture

User Approval: "yes"

Status: Export layer added to requirements, four-stage pipeline documented


Phased Approach Decision (2026-03-03T08:07:43-07:00)

User Question:

would the fastest path be to have a separate system that massages the jsonl files into the import tables; I wonder if we have a fasttrack project and a rebuild project, what we've defined here seems like it may be alot

User Decision:

we need this moving quickly the challenge is that delta files - Option 3 would be the proper option; I want to make blower more maintainable longterm

Decision: Two-Phase Approach

Phase 1: Fast Track (2-3 days)

Approach: Python converter - inRiver JSONL → Heiler CSV format

Rationale:

  • Business urgency - need inRiver working quickly
  • Low risk - use existing Blower infrastructure
  • Minimal code - ~500 lines Python
  • Proven pattern - Blower already works

Deliverable: Python script that converts inRiver files to Heiler format, drops into Blower input

Phase 2: Full Modernization (4-6 weeks)

Approach: dbt + PostgreSQL pipeline (Bronze → Silver → Gold → Export)

Rationale:

  • Strategic goal - make Blower more maintainable long-term
  • Sets precedent for replacing other Blower sources
  • Modern data stack - easier to hire/train for
  • Better architecture - scalable and testable

Deliverable: Complete replacement of Blower with dbt-native pipeline

Documents Created:

  1. requirements-overview.md - Two-phase strategy summary
  2. phase1-requirements.md - Fast track Python converter requirements
  3. phase2-requirements.md - Full modernization requirements (renamed from requirements.md)

Status: Phase 1 requirements ready for approval, Phase 2 deferred


PostgreSQL Accumulator Addition to Phase 1 (2026-03-03T08:25:00-07:00)

User Insight:

since we are dealing with deltas and, I believe, heiler delivers fulls, it seems like we should standup a postgresql server in RDS as part of phase1 that loads the jsonl; this will eventually be bronze in phase 2 but can be used to "mimic" the full database to create the csvs?

Critical Realization:

  • Heiler: Delivers full files (nightly full load)
  • inRiver: Delivers delta files only (incremental updates)
  • Blower: Expects full CSV files (all products, not just changes)

Problem: Can't generate full CSVs from delta files alone - need to accumulate deltas into full dataset

Solution: PostgreSQL Accumulator Database

Updated Phase 1 Architecture:

inRiver S3 (deltas) 
  → Python Delta Loader 
  → PostgreSQL (full dataset, JSONB) 
  → Python CSV Generator 
  → Heiler CSVs (full files)
  → Blower

PostgreSQL Schema:

sql
CREATE TABLE products (
  id TEXT PRIMARY KEY,
  data JSONB,
  updated_at TIMESTAMP,
  deleted BOOLEAN DEFAULT FALSE
);

Operations:

  • Insert: New records from delta
  • Update: Merge JSONB (ON CONFLICT UPDATE)
  • Delete: Soft delete (deleted = true)
  • Query: SELECT all non-deleted for CSV generation

Benefits:

  1. ✅ Converts deltas → full dataset
  2. ✅ PostgreSQL ready for Phase 2 (becomes Bronze layer)
  3. ✅ JSONB for flexible schema
  4. ✅ No throwaway work - reused in Phase 2

Phase 1 Requirements Updated:

  • FR-2: PostgreSQL Accumulator Database
  • FR-3: Delta Merge Logic
  • FR-4: CSV Generation from PostgreSQL
  • TR-1: Database (RDS PostgreSQL)
  • TR-2: Implementation (Delta Loader + CSV Generator)
  • TR-3: Orchestration (Two-step DAG)

Status: Phase 1 requirements updated with PostgreSQL accumulator


Audit Trail Enhancement (2026-03-03T08:38:54-07:00)

User Request:

lets make sure in phase1 we track the most recent source file in addition to the timestamp for each json date object in the table. We should also maintain an event log table that tracks the loading of these files through time

Enhancements Added:

1. Source File Tracking

Data Tables Schema Updated:

sql
CREATE TABLE products (
  id TEXT PRIMARY KEY,
  data JSONB,
  updated_at TIMESTAMP,
  source_file TEXT,  -- NEW: Track which file last updated this record
  deleted BOOLEAN DEFAULT FALSE
);

Purpose:

  • Know which S3 file last modified each record
  • Debug data issues by tracing back to source
  • Audit trail for compliance

2. Event Log Table

New Table:

sql
CREATE TABLE file_processing_events (
  id SERIAL PRIMARY KEY,
  event_time TIMESTAMP DEFAULT NOW(),
  event_type TEXT,  -- 'file_start', 'file_complete', 'file_error'
  source_file TEXT,
  entity_type TEXT,  -- 'products', 'channel_nodes', 'attribute_file'
  records_processed INTEGER,
  records_inserted INTEGER,
  records_updated INTEGER,
  records_deleted INTEGER,
  error_message TEXT,
  metadata JSONB
);

Purpose:

  • Complete history of all file processing
  • Track success/failure rates
  • Monitor data flow over time
  • Debug processing issues
  • Compliance and audit requirements

3. Event Logging Flow

For each file processed:

  1. Log 'file_start' event
  2. Process records (track counts)
  3. Log 'file_complete' event with statistics
  4. On error: Log 'file_error' event with details

Example Event Log:

2026-03-03 08:00:00 | file_start    | s3://bucket/products_delta_001.jsonl | products | ...
2026-03-03 08:00:15 | file_complete | s3://bucket/products_delta_001.jsonl | products | 1000 processed, 50 inserted, 900 updated, 50 deleted
2026-03-03 08:05:00 | file_start    | s3://bucket/products_delta_002.jsonl | products | ...

Phase 1 Requirements Updated:

  • FR-2: Added source_file column and event log table
  • FR-3: Added event logging to delta merge logic
  • TR-1: Updated database schema with event log
  • TR-2: Updated Delta Loader to log events
  • Success Criteria: Added audit trail requirements

Status: Phase 1 requirements updated with complete audit trail


Decoupled Processing Schedule (2026-03-03T08:56:30-07:00)

User Clarification:

in phase 1 we will run the delta imports as they come but will only need to generate the CSVs nightly (cron or on demand)

Architecture Update: Decoupled DAGs

DAG 1: Delta Loader (Near Real-Time)

  • Trigger: S3 event notification (as files arrive)
  • Frequency: On-demand (per file, throughout the day)
  • Duration: ~1-2 minutes per file
  • Purpose: Accumulate deltas into PostgreSQL immediately

DAG 2: CSV Generator (Nightly)

  • Trigger: Cron schedule (nightly) OR manual trigger
  • Frequency: Once per day (or on-demand)
  • Duration: ~10-15 minutes
  • Purpose: Generate full Heiler CSVs from accumulated deltas

Benefits:

  1. ✅ Near real-time delta accumulation (data always current)
  2. ✅ Nightly CSV generation (matches Blower pattern)
  3. ✅ Decoupled processing (delta loading doesn't block CSV generation)
  4. ✅ On-demand CSV generation (for testing or urgent updates)
  5. ✅ Efficient (accumulate all day, generate once)

Data Flow:

Throughout Day:
  inRiver delta files → S3 event → Delta Loader → PostgreSQL

Nightly (or on-demand):
  PostgreSQL (full dataset) → CSV Generator → Heiler CSVs → Blower

Phase 1 Requirements Updated:

  • FR-1: Delta loading as files arrive (near real-time)
  • FR-4: CSV generation nightly or on-demand
  • NFR-1: Performance requirements for decoupled processing
  • TR-3: Two separate DAGs with different triggers

Status: Phase 1 requirements updated with decoupled processing schedule


Ordered Processing & Replay Capability (2026-03-03T09:27:42-07:00)

User Requirements:

we need to add notes that the jsonl need to be load in order and loading new files should leverage the events table to decide what files to load. we should be able to mark an event and following events as reload and the process should restart from that point forward. Events themselves should not be deleted.

Requirements Added:

1. Ordered Processing

Requirement: JSONL files must be processed in chronological order (deltas depend on sequence)

Implementation:

sql
-- Find last successfully processed file (not marked for replay)
SELECT MAX(id) FROM file_processing_events 
WHERE event_type = 'file_complete' 
  AND replay_date IS NULL;

-- Process files after that point in order

Benefits:

  • Maintains data consistency
  • Prevents duplicate processing
  • Event log drives processing logic

2. Replay Capability

Requirement: Can mark events for replay and reprocess from that point forward

Implementation:

sql
-- Mark event and all following events for replay
UPDATE file_processing_events 
SET replay_date = NOW() 
WHERE id >= [problem_event_id];

Replay Scenario Example:

Initial State:
1. file_001.jsonl - processed ✓ (replay_date = NULL)
2. file_002.jsonl - processed ✓ (replay_date = NULL) ← bad data detected
3. file_003.jsonl - processed ✓ (replay_date = NULL)

Action: Mark events #2 and #3 for replay
UPDATE file_processing_events SET replay_date = NOW() WHERE id >= 2;

After Marking:
1. file_001.jsonl - processed ✓ (replay_date = NULL)
2. file_002.jsonl - processed ✓ (replay_date = '2026-03-03 09:00:00')
3. file_003.jsonl - processed ✓ (replay_date = '2026-03-03 09:00:00')

Delta Loader Query:
- Finds last valid event: #1
- Detects gap (no valid events after #1)
- Reprocesses: file_002.jsonl, file_003.jsonl
- Creates new events with replay_date = NULL

Final State:
1. file_001.jsonl - processed ✓ (replay_date = NULL)
2. file_002.jsonl - processed ✓ (replay_date = '2026-03-03 09:00:00') [original]
3. file_003.jsonl - processed ✓ (replay_date = '2026-03-03 09:00:00') [original]
4. file_002.jsonl - processed ✓ (replay_date = NULL) [replay]
5. file_003.jsonl - processed ✓ (replay_date = NULL) [replay]

3. Event Immutability

Requirement: Events are never deleted (permanent audit trail)

Benefits:

  • Complete processing history
  • Can trace any data issue
  • Replay depends on event history
  • Compliance and audit requirements

4. Event Log Schema Update

Added Column:

  • replay_date (timestamp, nullable) - When event was marked for replay
  • NULL = valid event
  • NOT NULL = marked for replay (excluded from processing)

Query Pattern:

sql
-- Find next file to process
WHERE replay_date IS NULL
ORDER BY event_time

Phase 1 Requirements Updated:

  • FR-2: Added replay_date column to event log schema
  • FR-3: Added ordered processing and replay logic
  • TR-1: Added replay_date index for efficient queries
  • TR-2: Updated Delta Loader with ordered processing logic
  • NFR-2: Added ordered processing and replay requirements

Status: Phase 1 requirements updated with ordered processing and replay capability


Multi-Business Unit Architecture (2026-03-03T09:31:42-07:00)

User Clarification:

do we capture the multi business unit nature of this project. There will be a separate aggregator database for each business unit

Architecture Update: Separate Database Per BU

Database Architecture

One PostgreSQL database per business unit:

  • inriver_bak - Baker Distributing
  • inriver_cefl - Carrier Enterprise Florida
  • inriver_ecm - Enterprise Comfort
  • inriver_gem - Gemaire

Deployment Options:

  • Option A: 4 separate RDS instances (complete isolation, higher cost)
  • Option B: 1 shared RDS instance with 4 databases (cost-effective, recommended)

DAG Architecture

8 Total DAGs (4 BUs × 2 DAGs each):

Per Business Unit:

  1. inriver_{bu}_delta_loader - Load deltas as they arrive
  2. inriver_{bu}_csv_generator - Generate CSVs nightly

Examples:

  • inriver_bak_delta_loader
  • inriver_bak_csv_generator
  • inriver_cefl_delta_loader
  • inriver_cefl_csv_generator
  • etc.

Benefits of Separate Databases

  1. Isolation: BU data completely separate
  2. Independent Processing: Each BU processes at own pace
  3. Failure Isolation: Issue in one BU doesn't affect others
  4. Matches Blower Pattern: Blower has separate schemas per BU
  5. Easier Rollout: Deploy one BU at a time
  6. Phase 2 Ready: Each database becomes Bronze layer for that BU

Data Flow Per BU

inRiver S3 (BU-specific path)

Delta Loader DAG (BU-specific)

PostgreSQL Database (BU-specific)

CSV Generator DAG (BU-specific)

Heiler CSVs (BU-specific path)

Blower (BU-specific)

Phase 1 Requirements Updated:

  • Overview: Added multi-BU architecture explanation
  • FR-1: Per-BU processing
  • FR-2: Separate database per BU
  • TR-1: Database architecture options (separate instances vs shared)
  • TR-3: 8 DAGs total (4 BUs × 2 DAGs)
  • Dependencies: Per-BU infrastructure provisioning
  • Next Steps: Per-BU rollout plan

Status: Phase 1 requirements updated with multi-BU architecture


Configuration and Mapping Tables (2026-03-03T09:39:29-07:00)

User Requirement:

we have an additional problem. there will need to be some mapping data (e.g. heiler creates a itemdataflex, item-attributes, products and maybe more from data that will all be in attributes in inriver) we will need a table that describes what goes where potentially. Also we lose some fidelity - for example we may not have a purpose anymore and will need a table where we can keep this decorator data. For now we can load those tables manually - may eventually need a quick web frontend for this system

Problem Identified:

  1. Complex Mapping: inRiver attributes → multiple Heiler CSVs

    • One inRiver field may map to multiple Heiler columns
    • Example: Product attributes → itemdataflex.csv, item-attributes.csv, products.csv
  2. Lost Fidelity: Data exists in Heiler but not in inRiver

    • Example: "purpose" field in Heiler
    • Need to store this decorator data somewhere

Solution: Configuration Tables

1. Field Mapping Table

Purpose: Map inRiver fields to Heiler CSV columns

Schema:

sql
CREATE TABLE field_mapping (
  id SERIAL PRIMARY KEY,
  inriver_entity TEXT,        -- 'Product', 'ChannelNode', etc.
  inriver_field TEXT,         -- inRiver field name
  heiler_csv TEXT,            -- Target CSV file
  heiler_column TEXT,         -- Target column
  transformation_rule TEXT,   -- Optional transformation
  active BOOLEAN DEFAULT TRUE,
  notes TEXT
);

Example Data:

sql
INSERT INTO field_mapping VALUES
  (1, 'Product', 'ProductId', 'items.csv', 'Item#', NULL, true, 'Direct mapping'),
  (2, 'Product', 'ProductId', 'itemdataflex.csv', 'Item#', NULL, true, 'Same ID in multiple CSVs'),
  (3, 'Product', 'FieldValues.ShortDescription', 'itemdataflex.csv', 'BU_ISD', NULL, true, 'Extract from field values');

2. Decorator Data Table

Purpose: Store data that doesn't exist in inRiver

Schema:

sql
CREATE TABLE decorator_data (
  id SERIAL PRIMARY KEY,
  entity_type TEXT,           -- 'attribute', 'item', 'category'
  entity_id TEXT,             -- inRiver entity ID
  decorator_type TEXT,        -- 'purpose', 'role', etc.
  decorator_value TEXT,       -- The value
  source TEXT,                -- 'manual', 'migration', etc.
  created_at TIMESTAMP DEFAULT NOW(),
  notes TEXT
);

Example Data:

sql
INSERT INTO decorator_data VALUES
  (1, 'attribute', 'ATTR_001', 'purpose', 'searchable', 'manual', NOW(), 'Migrated from Heiler'),
  (2, 'attribute', 'ATTR_002', 'purpose', 'filterable', 'manual', NOW(), 'Migrated from Heiler');

3. CSV Template Config Table

Purpose: Define CSV generation rules per column

Schema:

sql
CREATE TABLE csv_template_config (
  id SERIAL PRIMARY KEY,
  csv_name TEXT,              -- 'items.csv', etc.
  column_name TEXT,           -- Column in CSV
  source_type TEXT,           -- 'inriver_field', 'decorator', 'static', 'computed'
  source_value TEXT,          -- Field name or value
  default_value TEXT,         -- Default if null
  transformation TEXT,        -- Optional transformation
  required BOOLEAN DEFAULT FALSE,
  notes TEXT
);

Example Data:

sql
INSERT INTO csv_template_config VALUES
  (1, 'items.csv', 'Item#', 'inriver_field', 'ProductId', NULL, NULL, true, 'Primary key'),
  (2, 'purpose.csv', 'Feature_Purpose', 'decorator', 'purpose', NULL, NULL, false, 'From decorator_data table');

CSV Generator Logic

Uses configuration tables:

  1. Query csv_template_config for target CSV
  2. For each column:
    • If source_type = 'inriver_field': Extract from JSON
    • If source_type = 'decorator': Join with decorator_data
    • If source_type = 'static': Use static value
    • If source_type = 'computed': Apply transformation
  3. Apply field_mapping for complex mappings
  4. Generate CSV row

Management Approach

Phase 1: Manual SQL inserts

  • Load initial configuration via SQL scripts
  • Maintain decorator data via SQL updates
  • Simple, no UI needed initially

Future Enhancement: Web UI

  • CRUD interface for field_mapping
  • CRUD interface for decorator_data
  • CSV template editor
  • Out of scope for Phase 1

Phase 1 Requirements Updated:

  • TR-1: Added configuration tables to database schema
  • FR-6: New requirement for configuration and mapping tables
  • TR-2: Updated CSV Generator to use configuration tables
  • Out of Scope: Added web UI as future enhancement

Status: Phase 1 requirements updated with configuration management


Database Architecture Clarification (2026-03-03T09:49:14-07:00)

User Clarification:

we want separate databases(schemas) in one postgresql instance

Architecture Update: Single Instance with Separate Schemas

PostgreSQL Architecture

Single RDS PostgreSQL instance:

  • Instance size: db.t3.medium (shared across all BUs)
  • 4 Schemas (one per BU):
    • inriver_bak - Baker Distributing
    • inriver_cefl - Carrier Enterprise Florida
    • inriver_ecm - Enterprise Comfort
    • inriver_gem - Gemaire

Schema Structure

Each schema contains identical table structure:

inriver_bak/
  ├── products (data table)
  ├── channel_nodes (data table)
  ├── attribute_file (data table)
  ├── file_processing_events (event log)
  ├── field_mapping (configuration)
  ├── decorator_data (configuration)
  └── csv_template_config (configuration)

inriver_cefl/
  ├── (same tables)
  ...

Benefits

  1. Cost-effective: Single instance vs 4 separate instances
  2. Data isolation: Schemas provide logical separation
  3. Simplified management: One instance to maintain
  4. Shared resources: Better resource utilization
  5. Easy backup: Single instance backup covers all BUs
  6. Matches Blower pattern: Blower uses schemas per BU (bk_, gm_, etc.)

Connection Pattern

DAGs connect with schema-specific search_path:

python
# BAK Delta Loader
conn = psycopg2.connect(
    host='rds-endpoint',
    database='inriver',
    user='inriver_user',
    options='-c search_path=inriver_bak'
)

# CEFL Delta Loader
conn = psycopg2.connect(
    host='rds-endpoint',
    database='inriver',
    user='inriver_user',
    options='-c search_path=inriver_cefl'
)

Phase 1 Requirements Updated:

  • Overview: Single instance with separate schemas
  • FR-2: Single instance with schemas per BU
  • TR-1: Single db.t3.medium instance with 4 schemas
  • Dependencies: Single instance provisioning
  • Next Steps: Create schemas in single instance

Status: Phase 1 requirements updated with single-instance architecture