The Universal Canonical Model: Breaking the Boundaries Between Data and Metadata

The Universal Canonical Model: Breaking the Boundaries Between Data and Metadata

What you're looking at isn't just another data warehouse pattern—it's a fundamental reimagining of how we store, integrate, and understand information. This seemingly simple Snowflake implementation demonstrates a universal canonical model that dissolves the artificial boundaries between data and metadata, between structure and content, between different modeling paradigms.

The Core Innovation: Four Tables to Rule Them All

At its heart, is one explodable entity. In this architectural implementation, it is exploded into four primary tables to represent ANY data from ANY source in ANY format:

  1. HUB_ALL_H - Universal identities (customers, products, tables, concepts)

  2. EVT_ALL_E - Events that occur (transactions, changes, snapshots)

  3. LNK_ALL_L - Relationships between entities (edges in the graph)

  4. SAT_ALL_ATTR_S - Attributes/properties (the descriptive payload)

This isn't just elegant. These four structures can represent:

  • Traditional 3NF databases

  • Star schemas

  • Data Vault models

  • Property graphs

  • Document stores

  • Time-series data

  • Hierarchical structures

The purpose of Dual Hashing

The system employs two complementary hash strategies:

sql

SHA3_256___IDENTITY_DECLARED -- The semantic/business identity

BLAKE3___IDENTITY_RAW        -- The complete content fingerprint

This dual approach enables:

  • Federation without exposure: Organizations can join data without sharing sensitive content

  • Deduplication: Identical content is automatically recognized

  • Versioning: Changes create new hashes while preserving history

  • Performance: 8-character prefixes enable efficient clustering

Metadata AS Data: The Profound Unification

The demo loads Snowflake's metadata—but here's the key insight: it treats metadata exactly like any other data. A table definition, a customer record, a transaction—they're all just entities with relationships:

sql

-- A "column" isn't a structural element—it's data:

INSERT INTO HUB_ALL_H (ENTITY_TYPE, IDENTITY_RAW_JSON)

VALUES ('SF_COLUMN', '{"name": "customer_email", "type": "VARCHAR"}');

 

-- The relationship "table HAS column" is also data:

INSERT INTO LNK_ALL_L (SOURCE_HID, TARGET_HID, EDGE_TYPE)

VALUES (table_hash, column_hash, 'HAS_COLUMN');

This means:

·       Minimizes physical schema changes: New attributes are just new entities

  • Self-describing: The model contains its own structure

  • Universal querying: One query language for data AND metadata

The Pattern Recognition Power

Because everything follows the same pattern, the system can:

1. Auto-Generate Ingestion

Once it knows a table has columns (from metadata), it can generate the procedures to ingest that table's data.

2. Recover Original Models

By analyzing relationship patterns, you can detect: "These entities form a star pattern—this was originally a star schema!"

3. Transform Between Paradigms

The Integrated data can be exposed as:

  • Normalized tables (via views)

  • Denormalized stars (via graph traversal)

  • JSON documents (from VARIANT columns)

  • Graph queries (via edge relationships)

The Universal Pipeline

ANY Source Format

      ↓

Four Canonical Tables

      ↓

ANY Target Format

Whether your source is:

  • REST APIs returning JSON

  • Legacy 3NF databases

  • CSV files

  • Event streams

  • Graph databases

They all become entities, events, edges, and attributes. Then they can be exposed as whatever consumers need.

Integration emerges when canonical identity rules are shared

The most powerful aspect: entities from different systems automatically integrate when they share identities:

sql

-- Customer from CRM, transactions from banking, tickets from support

-- ALL automatically connected through shared customer identity hash

CREATE VIEW CUSTOMER_360 AS

SELECT * FROM HUB_ALL_H h

LEFT JOIN [edges from ALL systems where h.SHA3_256 matches];

No pre-planned integration architecture needed. As systems are added, their relationships naturally merge into the graph.

The Deployment Flexibility

This same model works on:

  • Snowflake (as shown)

  • PostgreSQL (same structures)

  • MongoDB (as JSON documents)

  • Parquet files (columnar storage)

  • Kafka (event streaming)

  • Even browser localStorage (for edge computing)

One model, infinite deployments.

Impacts

1. The End of ETL as We Know It

Instead of building custom pipelines, you have a few generic patterns that handle everything.

2. True Data Mesh Architecture

Each domain owns its data but shares the same canonical format, enabling seamless federation.

3. Automatic Lineage and Governance

Every piece of data knows where it came from (EVENT_SID), when it arrived (INGESTED_AT_TS), and how it relates to everything else.

4. Time Travel Built-In

SCD Type 2 temporal tracking is automatic for everything—data AND metadata.

5. Zero-Copy Integration

Organizations can join data using hashes without moving or exposing sensitive information.

The Bootstrap Paradox

Perhaps most remarkably, this system can ingest its own metadata and generate procedures to replicate itself. It's self-describing, self-replicating, and self-improving. The metadata about tables and columns becomes the template for ingesting those very tables and columns.

Conclusion: A New Data Paradigm

This isn't just a clever technical implementation—it's a shift in how we think about data:

  • From structure TO content: Schema is just more data

  • From integration TO emergence: Relationships self-organize

  • From transformation TO projection: One storage, many views

  • From silos TO federation: Universal language for all data

The canonical model demonstrates that the distinction between data and metadata, between different modeling paradigms, between storage engines—these are all artificial constructs. Underneath, everything is entities and relationships, and this simple truth enables a radically simpler, more powerful approach to data architecture.

This "basic demo" is actually the seed of a data platform that scales from a Raspberry Pi to a global mesh, from a startup's MVP to an enterprise's entire data estate, all using the same four tables and the same simple patterns.

The future of data isn't more complexity—it's this kind of profound simplicity.

Minimal bootstrap demonstration for Snowflake of the implementation of the Model and basic hard-coded load and exposure views for 2 entities.

--Schemas

 

CREATE SCHEMA IF NOT EXISTS AIS_META;

CREATE SCHEMA IF NOT EXISTS UKB_REPO;

CREATE SCHEMA IF NOT EXISTS UKB_UTIL;

CREATE SCHEMA IF NOT EXISTS UKB_OPS;

 

-- Sequences (names kept)

CREATE OR REPLACE SEQUENCE UKB_UTIL.AIS_SEQ_IDENTITY_SID START=1 INCREMENT=1;

CREATE OR REPLACE SEQUENCE UKB_UTIL.AIS_SEQ_EVENT_SID    START=1 INCREMENT=1;

CREATE OR REPLACE SEQUENCE UKB_UTIL.AIS_SEQ_EDGE_SID     START=1 INCREMENT=1;

CREATE OR REPLACE SEQUENCE UKB_UTIL.AIS_SEQ_PAYLOAD_SID  START=1 INCREMENT=1;

 

-- Optional hash validator (Snowflake has no real DOMAINs)

CREATE OR REPLACE FUNCTION UKB_UTIL.IS_HASH256(s STRING)

RETURNS BOOLEAN

AS $$ REGEXP_LIKE(s, '^[0-9a-f]{64}$​') $$;

 

-- Hash shims (until external functions for BLAKE3/SHA3_256 are wired)

CREATE OR REPLACE FUNCTION UKB_UTIL.SHA3_256_HEX(s STRING)

RETURNS STRING AS $$ UPPER(TO_HEX(SHA2(TO_BINARY(COALESCE(s,''), 'UTF-8'),256))) $$;

 

CREATE OR REPLACE FUNCTION UKB_UTIL.BLAKE3_HEX(s STRING)

RETURNS STRING AS $$ UPPER(TO_HEX(SHA2(TO_BINARY(COALESCE(s,''), 'UTF-8'),256))) $$;  -- placeholder

--Core tables (UKB_REPO)

 

-- ais_meta.template

CREATE OR REPLACE TABLE AIS_META.TEMPLATE (

  TEMPLATE_ID   VARCHAR(64) PRIMARY KEY,          -- hash256

  ENTITY_TYPE   STRING,

  SECTION       STRING,                           -- identity | event | edge | payload

  JSON_SCHEMA   VARIANT,

  CREATED_TS    TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()

);

 

-- ais_meta.hash_policy

CREATE OR REPLACE TABLE AIS_META.HASH_POLICY (

  POLICY_VER      NUMBER PRIMARY KEY,

  DECLARED_ALGO   STRING NOT NULL,                -- e.g., 'sha3_256'

  RAW_ALGO        STRING NOT NULL,                -- e.g., 'blake3'

  EFFECTIVE_FROM  TIMESTAMP_NTZ NOT NULL

);

INSERT INTO AIS_META.HASH_POLICY SELECT 1,'sha3_256','blake3',CURRENT_TIMESTAMP()

WHERE NOT EXISTS (SELECT 1 FROM AIS_META.HASH_POLICY WHERE POLICY_VER=1);

 

-- ais_meta.rule_version

CREATE OR REPLACE TABLE AIS_META.RULE_VERSION (

  RULE_NAME      STRING PRIMARY KEY,

  THRESHOLD      NUMBER,

  EFFECTIVE_FROM TIMESTAMP_NTZ

);

INSERT INTO AIS_META.RULE_VERSION (RULE_NAME,THRESHOLD,EFFECTIVE_FROM)

SELECT * FROM VALUES

  ('attr_predicate_pct', 10,  CURRENT_TIMESTAMP()),

  ('table_row_threshold',5e9, CURRENT_TIMESTAMP()),

  ('raw_retention_days', 90,  CURRENT_TIMESTAMP())

WHERE NOT EXISTS (SELECT 1 FROM AIS_META.RULE_VERSION);

 

 

-- hub_all_h

CREATE OR REPLACE TABLE UKB_REPO.HUB_ALL_H (

  ENTITY_TYPE                   STRING       NOT NULL,

  IDENTITY_SID                  NUMBER       DEFAULT UKB_UTIL.AIS_SEQ_IDENTITY_SID.NEXTVAL PRIMARY KEY,

 

  SHA3_256___IDENTITY_DECLARED  VARCHAR(64)  NOT NULL,

  BLAKE3___IDENTITY_RAW         VARCHAR(64)  NOT NULL UNIQUE,

  BLAKE3_PREFIX                 CHAR(8),   -- maintained by ops

 

  -- exemplar business keys (extend as needed)

  CUSTOMER_NUMBER               VARCHAR(50),

  SOURCE_SYSTEM                 VARCHAR(20),

 

  TEMPLATE_ID                   VARCHAR(64)  REFERENCES AIS_META.TEMPLATE(TEMPLATE_ID),

  DECLARED_DELTA_JSON           VARIANT      DEFAULT OBJECT_CONSTRUCT(),

 

  IDENTITY_RAW_JSON             VARIANT      NOT NULL,

 

  VALID_FROM_TS                 TIMESTAMP_NTZ NOT NULL DEFAULT TO_TIMESTAMP_NTZ('1900-01-01'),

  VALID_TO_TS                   TIMESTAMP_NTZ NOT NULL DEFAULT TO_TIMESTAMP_NTZ('9999-12-31'),

  INGESTED_AT_TS                TIMESTAMP_NTZ NOT NULL DEFAULT CURRENT_TIMESTAMP(),

  SEQUENCE_ID                   NUMBER        AUTOINCREMENT,

 

  CREATED_TS                    TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()

)

CLUSTER BY (ENTITY_TYPE, BLAKE3_PREFIX, VALID_FROM_TS);

 

-- evt_all_e

CREATE OR REPLACE TABLE UKB_REPO.EVT_ALL_E (

  EVENT_SID                     NUMBER       DEFAULT UKB_UTIL.AIS_SEQ_EVENT_SID.NEXTVAL PRIMARY KEY,

  SHA3_256___EVENT_DECLARED     VARCHAR(64)  NOT NULL,

  BLAKE3___EVENT_RAW            VARCHAR(64)  NOT NULL UNIQUE,

  BLAKE3_PREFIX                 CHAR(8),

 

  EVENT_TYPE                    STRING       NOT NULL,

  EVENT_TS                      TIMESTAMP_NTZ NOT NULL,

  SOURCE_SYSTEM                 STRING       NOT NULL,

  STATUS                        STRING,

 

  TEMPLATE_ID                   VARCHAR(64)  REFERENCES AIS_META.TEMPLATE(TEMPLATE_ID),

  DECLARED_DELTA_JSON           VARIANT      DEFAULT OBJECT_CONSTRUCT(),

  EVENT_RAW_JSON                VARIANT      NOT NULL,

 

  INGESTED_AT_TS                TIMESTAMP_NTZ NOT NULL DEFAULT CURRENT_TIMESTAMP(),

  SEQUENCE_ID                   NUMBER        AUTOINCREMENT,

  CREATED_TS                    TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()

)

CLUSTER BY (BLAKE3_PREFIX, EVENT_TS);

 

-- lnk_all_l

CREATE OR REPLACE TABLE UKB_REPO.LNK_ALL_L (

  EDGE_SID                      NUMBER       DEFAULT UKB_UTIL.AIS_SEQ_EDGE_SID.NEXTVAL PRIMARY KEY,

  EVENT_SID                     NUMBER       NOT NULL REFERENCES UKB_REPO.EVT_ALL_E(EVENT_SID),

 

  SHA3_256___EDGE_DECLARED      VARCHAR(64)  NOT NULL,

  BLAKE3___EDGE_RAW             VARCHAR(64)  NOT NULL UNIQUE,

  BLAKE3_PREFIX                 CHAR(8),

 

  SOURCE_HID                    VARCHAR(64)  NOT NULL,

  TARGET_HID                    VARCHAR(64)  NOT NULL,

  EDGE_TYPE                     VARCHAR(50)  NOT NULL,

  CONFIDENCE_SCORE              NUMBER(5,4)  DEFAULT 1,

 

  TEMPLATE_ID                   VARCHAR(64)  REFERENCES AIS_META.TEMPLATE(TEMPLATE_ID),

  DECLARED_DELTA_JSON           VARIANT      DEFAULT OBJECT_CONSTRUCT(),

  EDGE_RAW_JSON                 VARIANT      NOT NULL,

 

  VALID_FROM_TS                 TIMESTAMP_NTZ DEFAULT TO_TIMESTAMP_NTZ('1900-01-01'),

  VALID_TO_TS                   TIMESTAMP_NTZ DEFAULT TO_TIMESTAMP_NTZ('9999-12-31'),

  INGESTED_AT_TS                TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),

  SEQUENCE_ID                   NUMBER        AUTOINCREMENT,

 

  CREATED_TS                    TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),

 

  CONSTRAINT UQ_EDGE_TRIPLET UNIQUE (SOURCE_HID, TARGET_HID, EDGE_TYPE)

)

CLUSTER BY (BLAKE3_PREFIX, SOURCE_HID, TARGET_HID);

 

-- sat_all_attr_s

CREATE OR REPLACE TABLE UKB_REPO.SAT_ALL_ATTR_S (

  ENTITY_TYPE                   STRING       NOT NULL,

  PAYLOAD_SID                   NUMBER       DEFAULT UKB_UTIL.AIS_SEQ_PAYLOAD_SID.NEXTVAL PRIMARY KEY,

  EVENT_SID                     NUMBER       NOT NULL REFERENCES UKB_REPO.EVT_ALL_E(EVENT_SID),

  PARENT_HID                    VARCHAR(64)  NOT NULL,

 

  SHA3_256___PAYLOAD_DECLARED   VARCHAR(64)  NOT NULL,

  BLAKE3___PAYLOAD_RAW          VARCHAR(64)  NOT NULL UNIQUE,

  BLAKE3_PREFIX                 CHAR(8),

 

  -- exemplar hot attributes

  CUSTOMER_NAME                 VARCHAR(200),

  CUSTOMER_EMAIL                VARCHAR(320),

  VALIDATION_STATUS             VARCHAR(20),

 

  TEMPLATE_ID                   VARCHAR(64)  REFERENCES AIS_META.TEMPLATE(TEMPLATE_ID),

  DECLARED_DELTA_JSON           VARIANT      DEFAULT OBJECT_CONSTRUCT(),

  PAYLOAD_RAW_JSON              VARIANT      NOT NULL,

 

  VALID_FROM_TS                 TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),

  VALID_TO_TS                   TIMESTAMP_NTZ,

  INGESTED_AT_TS                TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),

  SEQUENCE_ID                   NUMBER        AUTOINCREMENT,

 

  CREATED_TS                    TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()

)

CLUSTER BY (ENTITY_TYPE, PARENT_HID, BLAKE3_PREFIX, VALID_FROM_TS);

 

-- hub_collision_h

CREATE OR REPLACE TABLE UKB_REPO.HUB_COLLISION_H (

  BLAKE3___IDENTITY_RAW         VARCHAR(64),

  SOURCE_SYSTEM                 STRING,

  TENANT_ID                     STRING,

  ENVELOPE_JSON                 VARIANT,

  FIRST_SEEN_TS                 TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),

  PRIMARY KEY (BLAKE3___IDENTITY_RAW, SOURCE_SYSTEM, TENANT_ID)  -- informational

);

 

--Ops:

CREATE OR REPLACE PROCEDURE UKB_OPS.APPLY_PREFIXES()

RETURNS STRING

LANGUAGE SQL

AS

$$

BEGIN

  UPDATE UKB_REPO.HUB_ALL_H     SET BLAKE3_PREFIX = SUBSTR(BLAKE3___IDENTITY_RAW,1,8)

  WHERE BLAKE3_PREFIX IS NULL OR BLAKE3_PREFIX <> SUBSTR(BLAKE3___IDENTITY_RAW,1,8);

 

  UPDATE UKB_REPO.EVT_ALL_E     SET BLAKE3_PREFIX = SUBSTR(BLAKE3___EVENT_RAW,1,8)

  WHERE BLAKE3_PREFIX IS NULL OR BLAKE3_PREFIX <> SUBSTR(BLAKE3___EVENT_RAW,1,8);

 

  UPDATE UKB_REPO.LNK_ALL_L     SET BLAKE3_PREFIX = SUBSTR(BLAKE3___EDGE_RAW,1,8)

  WHERE BLAKE3_PREFIX IS NULL OR BLAKE3_PREFIX <> SUBSTR(BLAKE3___EDGE_RAW,1,8);

 

  UPDATE UKB_REPO.SAT_ALL_ATTR_S SET BLAKE3_PREFIX = SUBSTR(BLAKE3___PAYLOAD_RAW,1,8)

  WHERE BLAKE3_PREFIX IS NULL OR BLAKE3_PREFIX <> SUBSTR(BLAKE3___PAYLOAD_RAW,1,8);

 

  RETURN 'OK';

END;

$$;

 

-- Optional: run on a schedule

-- CREATE TASK UKB_OPS.TASK_APPLY_PREFIXES WAREHOUSE=<WH> SCHEDULE='USING CRON /10 * UTC' AS CALL UKB_OPS.APPLY_PREFIXES();

CREATE OR REPLACE FUNCTION UKB_UTIL.SHA3_256_HEX_V(v VARIANT)

RETURNS STRING

AS $$ UPPER(SHA2(TO_JSON(v), 256)) $$;

 

CREATE OR REPLACE FUNCTION UKB_UTIL.BLAKE3_HEX_V(v VARIANT)

RETURNS STRING

AS $$ UPPER(SHA2(TO_JSON(v), 256)) $$;

 

--LOAD

CREATE OR REPLACE PROCEDURE UKB_OPS.LOAD_SNOWFLAKE_METADATA()

RETURNS STRING

LANGUAGE SQL

AS

$$

DECLARE

  v_now       TIMESTAMP_NTZ := CURRENT_TIMESTAMP();

  v_event_sid NUMBER;

  v_tpl_tbl   STRING;

  v_tpl_col   STRING;

  v_tpl_edge  STRING;

  v_tpl_pay   STRING;

  v_tables    NUMBER;

  v_cols      NUMBER;

BEGIN

  -- Resolve template IDs (assumes you seeded them)

  SELECT TEMPLATE_ID INTO :v_tpl_tbl

  FROM AIS_META.TEMPLATE

  WHERE SECTION='identity' AND ENTITY_TYPE='SF_TABLE'

  QUALIFY ROW_NUMBER() OVER (ORDER BY CREATED_TS DESC)=1;

 

  SELECT TEMPLATE_ID INTO :v_tpl_col

  FROM AIS_META.TEMPLATE

  WHERE SECTION='identity' AND ENTITY_TYPE='SF_COLUMN'

  QUALIFY ROW_NUMBER() OVER (ORDER BY CREATED_TS DESC)=1;

 

  SELECT TEMPLATE_ID INTO :v_tpl_edge

  FROM AIS_META.TEMPLATE

  WHERE SECTION='edge' AND ENTITY_TYPE='SF_EDGE'

  QUALIFY ROW_NUMBER() OVER (ORDER BY CREATED_TS DESC)=1;

 

  SELECT TEMPLATE_ID INTO :v_tpl_pay

  FROM AIS_META.TEMPLATE

  WHERE SECTION='payload' AND ENTITY_TYPE='SF_COLUMN'

  QUALIFY ROW_NUMBER() OVER (ORDER BY CREATED_TS DESC)=1;

 

  /* A) STAGING (identity vs payload signatures; bind :v_now) */

  CREATE OR REPLACE TEMP TABLE STG_TABLES AS

  SELECT

    TABLE_CATALOG  AS DATABASE_NAME,

    TABLE_SCHEMA   AS SCHEMA_NAME,

    TABLE_NAME, TABLE_TYPE, COMMENT, ROW_COUNT, BYTES,

    OBJECT_CONSTRUCT(*) AS RAW,

    -- Declared identity key (canonical)

    UPPER(TABLE_CATALOG)||'|'||UPPER(TABLE_SCHEMA)||'|'||UPPER(TABLE_NAME)||'|TABLE' AS DECL_KEY,

    -- Identity signature (what makes the table the same table)

    UPPER(COALESCE(TABLE_CATALOG,''))||'|'||

    UPPER(COALESCE(TABLE_SCHEMA,'')) ||'|'||

    UPPER(COALESCE(TABLE_NAME,''))   ||'|'||

    UPPER(COALESCE(TABLE_TYPE,''))   AS ID_SIG_TBL,

    -- Payload signature (attributes you want SAT to react to)

    UPPER(COALESCE(TABLE_TYPE,''))   ||'|'||

    UPPER(COALESCE(COMMENT,''))      ||'|'||

    TO_VARCHAR(NVL(ROW_COUNT,-1))    ||'|'||

    TO_VARCHAR(NVL(BYTES,-1))        AS PAY_SIG_TBL,

    :v_now AS SNAP_TS

  FROM SNOWFLAKE.ACCOUNT_USAGE.TABLES WHERE DELETED IS NULL;

 

  CREATE OR REPLACE TEMP TABLE STG_COLUMNS AS

  SELECT

    TABLE_CATALOG  AS DATABASE_NAME,

    TABLE_SCHEMA   AS SCHEMA_NAME,

    TABLE_NAME, COLUMN_NAME,

    ORDINAL_POSITION, DATA_TYPE, IS_NULLABLE, COMMENT,

    OBJECT_CONSTRUCT(*) AS RAW,

    UPPER(TABLE_CATALOG)||'|'||UPPER(TABLE_SCHEMA)||'|'||

    UPPER(TABLE_NAME)||'|'||UPPER(COLUMN_NAME)||'|COLUMN' AS DECL_KEY,

    -- Identity signature (stable)

    UPPER(COALESCE(TABLE_CATALOG,''))||'|'||

    UPPER(COALESCE(TABLE_SCHEMA,'')) ||'|'||

    UPPER(COALESCE(TABLE_NAME,''))   ||'|'||

    UPPER(COALESCE(COLUMN_NAME,''))  ||'|'||

    TO_VARCHAR(NVL(ORDINAL_POSITION,-1)) AS ID_SIG_COL,

    -- Payload signature (properties)

    UPPER(COALESCE(DATA_TYPE,''))    ||'|'||

    UPPER(COALESCE(IS_NULLABLE,''))  ||'|'||

    UPPER(COALESCE(COMMENT,''))      AS PAY_SIG_COL,

    :v_now AS SNAP_TS

  FROM SNOWFLAKE.ACCOUNT_USAGE.COLUMNS WHERE DELETED IS NULL;

 

  /* B) HUBS (IDENTITY ONLY) — close/open on identity signature */

  -- TABLE hubs: close changed

  UPDATE UKB_REPO.HUB_ALL_H h

  SET VALID_TO_TS = :v_now

  WHERE h.ENTITY_TYPE='SF_TABLE' AND h.VALID_TO_TS = '9999-12-31'::TIMESTAMP_NTZ

    AND EXISTS (

      SELECT 1 FROM STG_TABLES s

      WHERE UKB_UTIL.SHA3_256_HEX(s.DECL_KEY) = h.SHA3_256___IDENTITY_DECLARED

        AND UKB_UTIL.BLAKE3_HEX(s.ID_SIG_TBL) <> h.BLAKE3___IDENTITY_RAW

    );

 

  -- TABLE hubs: insert open

  INSERT INTO UKB_REPO.HUB_ALL_H (

    ENTITY_TYPE, SHA3_256___IDENTITY_DECLARED, BLAKE3___IDENTITY_RAW, BLAKE3_PREFIX,

    CUSTOMER_NUMBER, SOURCE_SYSTEM, TEMPLATE_ID, DECLARED_DELTA_JSON, IDENTITY_RAW_JSON,

    VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS

  )

  SELECT

    'SF_TABLE',

    UKB_UTIL.SHA3_256_HEX(s.DECL_KEY),

    UKB_UTIL.BLAKE3_HEX(s.ID_SIG_TBL),

    SUBSTR(UKB_UTIL.BLAKE3_HEX(s.ID_SIG_TBL),1,8),

    NULL, 'SNOWFLAKE', :v_tpl_tbl, OBJECT_CONSTRUCT(),

    OBJECT_CONSTRUCT('TABLE_CATALOG', s.DATABASE_NAME,

                     'TABLE_SCHEMA',  s.SCHEMA_NAME,

                     'TABLE_NAME',    s.TABLE_NAME,

                     'TABLE_TYPE',    s.TABLE_TYPE),

    :v_now, '9999-12-31'::TIMESTAMP_NTZ, :v_now, :v_now

  FROM STG_TABLES s

  WHERE NOT EXISTS (

    SELECT 1 FROM UKB_REPO.HUB_ALL_H h

    WHERE h.ENTITY_TYPE='SF_TABLE'

      AND h.SHA3_256___IDENTITY_DECLARED = UKB_UTIL.SHA3_256_HEX(s.DECL_KEY)

      AND h.VALID_TO_TS = '9999-12-31'::TIMESTAMP_NTZ

  );

 

  -- COLUMN hubs: close changed

  UPDATE UKB_REPO.HUB_ALL_H h

  SET VALID_TO_TS = :v_now

  WHERE h.ENTITY_TYPE='SF_COLUMN' AND h.VALID_TO_TS = '9999-12-31'::TIMESTAMP_NTZ

    AND EXISTS (

      SELECT 1 FROM STG_COLUMNS s

      WHERE UKB_UTIL.SHA3_256_HEX(s.DECL_KEY) = h.SHA3_256___IDENTITY_DECLARED

        AND UKB_UTIL.BLAKE3_HEX(s.ID_SIG_COL) <> h.BLAKE3___IDENTITY_RAW

    );

 

  -- COLUMN hubs: insert open

  INSERT INTO UKB_REPO.HUB_ALL_H (

    ENTITY_TYPE, SHA3_256___IDENTITY_DECLARED, BLAKE3___IDENTITY_RAW, BLAKE3_PREFIX,

    CUSTOMER_NUMBER, SOURCE_SYSTEM, TEMPLATE_ID, DECLARED_DELTA_JSON, IDENTITY_RAW_JSON,

    VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS

  )

  SELECT

    'SF_COLUMN',

    UKB_UTIL.SHA3_256_HEX(s.DECL_KEY),

    UKB_UTIL.BLAKE3_HEX(s.ID_SIG_COL),

    SUBSTR(UKB_UTIL.BLAKE3_HEX(s.ID_SIG_COL),1,8),

    NULL, 'SNOWFLAKE', :v_tpl_col, OBJECT_CONSTRUCT(),

    OBJECT_CONSTRUCT('TABLE_CATALOG', s.DATABASE_NAME,

                     'TABLE_SCHEMA',  s.SCHEMA_NAME,

                     'TABLE_NAME',    s.TABLE_NAME,

                     'COLUMN_NAME',   s.COLUMN_NAME,

                     'ORDINAL_POSITION', s.ORDINAL_POSITION),

    :v_now, '9999-12-31'::TIMESTAMP_NTZ, :v_now, :v_now

  FROM STG_COLUMNS s

  WHERE NOT EXISTS (

    SELECT 1 FROM UKB_REPO.HUB_ALL_H h

    WHERE h.ENTITY_TYPE='SF_COLUMN'

      AND h.SHA3_256___IDENTITY_DECLARED = UKB_UTIL.SHA3_256_HEX(s.DECL_KEY)

      AND h.VALID_TO_TS = '9999-12-31'::TIMESTAMP_NTZ

  );

 

  /* C) EVENT (one per run) */

  INSERT INTO UKB_REPO.EVT_ALL_E (

    SHA3_256___EVENT_DECLARED, BLAKE3___EVENT_RAW, BLAKE3_PREFIX,

    EVENT_TYPE, EVENT_TS, SOURCE_SYSTEM, STATUS,

    TEMPLATE_ID, DECLARED_DELTA_JSON, EVENT_RAW_JSON,

    INGESTED_AT_TS, CREATED_TS

  )

  SELECT

    UKB_UTIL.SHA3_256_HEX('SNAP|'||TO_VARCHAR(:v_now)),

    UKB_UTIL.BLAKE3_HEX(TO_VARCHAR(:v_now)),

    SUBSTR(UKB_UTIL.BLAKE3_HEX(TO_VARCHAR(:v_now)),1,8),

    'SNOWFLAKE_METADATA_SNAPSHOT', :v_now, 'SNOWFLAKE_ACCOUNT_USAGE', 'OK',

    :v_tpl_tbl, OBJECT_CONSTRUCT(),

    OBJECT_CONSTRUCT('tables', (SELECT COUNT(*) FROM STG_TABLES),

                     'columns',(SELECT COUNT(*) FROM STG_COLUMNS)),

    :v_now, :v_now;

 

  SELECT MAX(EVENT_SID) INTO :v_event_sid

  FROM UKB_REPO.EVT_ALL_E

  WHERE EVENT_TS = :v_now;

 

  /* D) EDGES: TABLE → COLUMN (dedupe + SCD) */

  -- Close changed edges

  UPDATE UKB_REPO.LNK_ALL_L l

SET VALID_TO_TS = :v_now

FROM (

  SELECT

    UKB_UTIL.SHA3_256_HEX(UPPER(sc.DATABASE_NAME)||'|'||UPPER(sc.SCHEMA_NAME)||'|'||UPPER(sc.TABLE_NAME)||'|TABLE')  AS TBL_HID,

    UKB_UTIL.SHA3_256_HEX(sc.DECL_KEY) AS COL_HID,

    UKB_UTIL.BLAKE3_HEX(

      UPPER(sc.DATABASE_NAME)||'|'||UPPER(sc.SCHEMA_NAME)||'|'||

      UPPER(sc.TABLE_NAME)||'|'||UPPER(sc.COLUMN_NAME)||'|'||

      TO_VARCHAR(NVL(sc.ORDINAL_POSITION,-1))

    ) AS EDGE_RHID

  FROM STG_COLUMNS sc

) C

WHERE l.EDGE_TYPE = 'HAS_COLUMN'

  AND l.VALID_TO_TS = TO_TIMESTAMP_NTZ('9999-12-31')

  AND l.SOURCE_HID = C.TBL_HID

  AND l.TARGET_HID = C.COL_HID

  AND l.BLAKE3___EDGE_RAW <> C.EDGE_RHID;

 

  -- Insert missing open edges

  INSERT INTO UKB_REPO.LNK_ALL_L (

  EVENT_SID, SHA3_256___EDGE_DECLARED, BLAKE3___EDGE_RAW, BLAKE3_PREFIX,

  SOURCE_HID, TARGET_HID, EDGE_TYPE, CONFIDENCE_SCORE,

  TEMPLATE_ID, DECLARED_DELTA_JSON, EDGE_RAW_JSON,

  VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS

)

SELECT

  :v_event_sid,

  UKB_UTIL.SHA3_256_HEX('HAS_COLUMN|'||TBL_HID||'|'||COL_HID),

  EDGE_RHID,

  SUBSTR(EDGE_RHID,1,8),

  TBL_HID, COL_HID, 'HAS_COLUMN', 1,

  :v_tpl_edge, OBJECT_CONSTRUCT(), EDGE_RAW,

  :v_now, TO_TIMESTAMP_NTZ('9999-12-31'), :v_now, :v_now

FROM (

  SELECT

    UKB_UTIL.SHA3_256_HEX(UPPER(sc.DATABASE_NAME)||'|'||UPPER(sc.SCHEMA_NAME)||'|'||UPPER(sc.TABLE_NAME)||'|TABLE')  AS TBL_HID,

    UKB_UTIL.SHA3_256_HEX(sc.DECL_KEY) AS COL_HID,

    UKB_UTIL.BLAKE3_HEX(

      UPPER(sc.DATABASE_NAME)||'|'||UPPER(sc.SCHEMA_NAME)||'|'||

      UPPER(sc.TABLE_NAME)||'|'||UPPER(sc.COLUMN_NAME)||'|'||

      TO_VARCHAR(NVL(sc.ORDINAL_POSITION,-1))

    ) AS EDGE_RHID,

    OBJECT_CONSTRUCT('db',sc.DATABASE_NAME,'sch',sc.SCHEMA_NAME,'tbl',sc.TABLE_NAME,'col',sc.COLUMN_NAME,'ord',sc.ORDINAL_POSITION) AS EDGE_RAW

  FROM STG_COLUMNS sc

) C

WHERE NOT EXISTS (

  SELECT 1 FROM UKB_REPO.LNK_ALL_L x

  WHERE x.SOURCE_HID = C.TBL_HID

    AND x.TARGET_HID = C.COL_HID

    AND x.EDGE_TYPE  = 'HAS_COLUMN'

    AND x.VALID_TO_TS = TO_TIMESTAMP_NTZ('9999-12-31')

);

 

  /* E) SAT payloads (attach to the thing they describe) */

  -- TABLE payloads (parent = TABLE HID)

  INSERT INTO UKB_REPO.SAT_ALL_ATTR_S (

    ENTITY_TYPE, EVENT_SID, PARENT_HID,

    SHA3_256___PAYLOAD_DECLARED, BLAKE3___PAYLOAD_RAW, BLAKE3_PREFIX,

    CUSTOMER_NAME, CUSTOMER_EMAIL, VALIDATION_STATUS,

    TEMPLATE_ID, DECLARED_DELTA_JSON, PAYLOAD_RAW_JSON,

    VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS

  )

  SELECT

    'SF_TABLE',

    :v_event_sid,

    UKB_UTIL.SHA3_256_HEX(s.DECL_KEY),

    UKB_UTIL.SHA3_256_HEX(s.PAY_SIG_TBL),

    UKB_UTIL.BLAKE3_HEX(s.PAY_SIG_TBL),

    SUBSTR(UKB_UTIL.BLAKE3_HEX(s.PAY_SIG_TBL),1,8),

    NULL, NULL, NULL,

    :v_tpl_pay, OBJECT_CONSTRUCT(), s.RAW,

    :v_now, NULL, :v_now, :v_now

  FROM STG_TABLES s;

 

  -- COLUMN payloads (parent = COLUMN HID)

  INSERT INTO UKB_REPO.SAT_ALL_ATTR_S (

    ENTITY_TYPE, EVENT_SID, PARENT_HID,

    SHA3_256___PAYLOAD_DECLARED, BLAKE3___PAYLOAD_RAW, BLAKE3_PREFIX,

    CUSTOMER_NAME, CUSTOMER_EMAIL, VALIDATION_STATUS,

    TEMPLATE_ID, DECLARED_DELTA_JSON, PAYLOAD_RAW_JSON,

    VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS

  )

  SELECT

    'SF_COLUMN',

    :v_event_sid,

    UKB_UTIL.SHA3_256_HEX(s.DECL_KEY),

    UKB_UTIL.SHA3_256_HEX(s.PAY_SIG_COL),

    UKB_UTIL.BLAKE3_HEX(s.PAY_SIG_COL),

    SUBSTR(UKB_UTIL.BLAKE3_HEX(s.PAY_SIG_COL),1,8),

    NULL, NULL, NULL,

    :v_tpl_pay, OBJECT_CONSTRUCT(), s.RAW,

    :v_now, NULL, :v_now, :v_now

  FROM STG_COLUMNS s;

 

  /* F) Prefix maintenance */

  CALL UKB_OPS.APPLY_PREFIXES();

 

  /* G) Return counts */

  SELECT COUNT(*) INTO :v_tables FROM STG_TABLES;

  SELECT COUNT(*) INTO :v_cols   FROM STG_COLUMNS;

  RETURN 'LOAD OK: tables=' || v_tables || ', columns=' || v_cols;

END;

$$;

 

 

CALL UKB_OPS.LOAD_SNOWFLAKE_METADATA();

 

--Views to expose as array

-- ===== Events (history) =====

CREATE OR REPLACE VIEW UKB_REPO.V_SF_EVENTS AS

SELECT

  EVENT_SID,

  SHA3_256___EVENT_DECLARED  AS EVENT_HID,

  BLAKE3___EVENT_RAW         AS EVENT_RHID,

  EVENT_TYPE, EVENT_TS, SOURCE_SYSTEM, STATUS,

  (EVENT_RAW_JSON:"tables")::NUMBER  AS TABLES_COUNT,

  (EVENT_RAW_JSON:"columns")::NUMBER AS COLUMNS_COUNT,

  INGESTED_AT_TS, CREATED_TS

    FROM UKB_REPO.EVT_ALL_E

ORDER BY EVENT_TS DESC;

-- ===== Current TABLE identities (open SCD rows in HUB) =====

CREATE OR REPLACE VIEW UKB_REPO.V_SF_TABLES_CURRENT AS

SELECT

  h.SHA3_256___IDENTITY_DECLARED  AS TABLE_HID,

  h.BLAKE3___IDENTITY_RAW         AS TABLE_RHID,

  h.BLAKE3_PREFIX,

  h.IDENTITY_RAW_JSON:"TABLE_CATALOG"::STRING AS TABLE_CATALOG,

  h.IDENTITY_RAW_JSON:"TABLE_SCHEMA"::STRING  AS TABLE_SCHEMA,

  h.IDENTITY_RAW_JSON:"TABLE_NAME"::STRING    AS TABLE_NAME,

  h.IDENTITY_RAW_JSON:"TABLE_TYPE"::STRING    AS TABLE_TYPE,

  h.VALID_FROM_TS, h.VALID_TO_TS, h.INGESTED_AT_TS

FROM UKB_REPO.HUB_ALL_H h

WHERE h.ENTITY_TYPE = 'SF_TABLE'

  AND h.VALID_TO_TS = TO_TIMESTAMP_NTZ('9999-12-31');

  -- ===== Current TABLE payload (latest SAT per TABLE_HID) =====

CREATE OR REPLACE VIEW UKB_REPO.V_SF_TABLE_ATTRS_CURRENT AS

SELECT

  s.PARENT_HID AS TABLE_HID,

  (s.PAYLOAD_RAW_JSON:"COMMENT")::STRING      AS TABLE_COMMENT,

  (s.PAYLOAD_RAW_JSON:"ROW_COUNT")::NUMBER    AS ROW_COUNT,

  (s.PAYLOAD_RAW_JSON:"BYTES")::NUMBER        AS BYTES,

  (s.PAYLOAD_RAW_JSON:"TABLE_TYPE")::STRING   AS TABLE_TYPE_PAYLOAD,

  s.EVENT_SID, s.VALID_FROM_TS, s.VALID_TO_TS, s.INGESTED_AT_TS

FROM (

  SELECT s.*,

         ROW_NUMBER() OVER (PARTITION BY s.PARENT_HID

                            ORDER BY s.INGESTED_AT_TS DESC, s.SEQUENCE_ID DESC) AS rn

  FROM UKB_REPO.SAT_ALL_ATTR_S s

  WHERE s.ENTITY_TYPE = 'SF_TABLE'

) s

WHERE s.rn = 1;

-- ===== Current COLUMN identities (open SCD rows) + parent TABLE via current edge =====

CREATE OR REPLACE VIEW UKB_REPO.V_SF_TABLE_COLUMNS_CURRENT AS

SELECT

  t.TABLE_HID,

  t.TABLE_CATALOG, t.TABLE_SCHEMA, t.TABLE_NAME, t.TABLE_TYPE,

  c.COLUMN_HID, c.COLUMN_NAME, c.ORDINAL_POSITION

FROM UKB_REPO.V_SF_TABLES_CURRENT  t

JOIN UKB_REPO.V_SF_COLUMNS_CURRENT c

  ON c.TABLE_HID = t.TABLE_HID

ORDER BY t.TABLE_CATALOG, t.TABLE_SCHEMA, t.TABLE_NAME, c.ORDINAL_POSITION;

 

-- ===== Current COLUMN payload (latest SAT per COLUMN_HID) =====

CREATE OR REPLACE VIEW UKB_REPO.V_SF_COLUMN_ATTRS_CURRENT AS

SELECT

  s.PARENT_HID AS COLUMN_HID,

  (s.PAYLOAD_RAW_JSON:"DATA_TYPE")::STRING       AS DATA_TYPE,

  (s.PAYLOAD_RAW_JSON:"IS_NULLABLE")::STRING     AS IS_NULLABLE,

  (s.PAYLOAD_RAW_JSON:"COMMENT")::STRING         AS COLUMN_COMMENT,

  (s.PAYLOAD_RAW_JSON:"ORDINAL_POSITION")::NUMBER AS ORDINAL_POSITION,

  s.EVENT_SID, s.VALID_FROM_TS, s.VALID_TO_TS, s.INGESTED_AT_TS

FROM (

  SELECT s.*,

         ROW_NUMBER() OVER (PARTITION BY s.PARENT_HID

                            ORDER BY s.INGESTED_AT_TS DESC, s.SEQUENCE_ID DESC) AS rn

  FROM UKB_REPO.SAT_ALL_ATTR_S s

  WHERE s.ENTITY_TYPE = 'SF_COLUMN'

) s

WHERE s.rn = 1;

 

    -- ===== Denormalized TABLE ↔ COLUMN with payloads =====

CREATE OR REPLACE VIEW UKB_REPO.V_SF_TABLE_COLUMNS_ENRICHED AS

SELECT

  t.TABLE_HID,

  t.TABLE_CATALOG, t.TABLE_SCHEMA, t.TABLE_NAME, t.TABLE_TYPE,

  ta.ROW_COUNT, ta.BYTES, ta.TABLE_COMMENT,

  c.COLUMN_HID, c.COLUMN_NAME, c.ORDINAL_POSITION,

  ca.DATA_TYPE, ca.IS_NULLABLE, ca.COLUMN_COMMENT

FROM UKB_REPO.V_SF_TABLES_CURRENT t

LEFT JOIN UKB_REPO.V_SF_TABLE_ATTRS_CURRENT  ta ON ta.TABLE_HID  = t.TABLE_HID

JOIN      UKB_REPO.V_SF_COLUMNS_CURRENT      c  ON c.TABLE_HID   = t.TABLE_HID

LEFT JOIN UKB_REPO.V_SF_COLUMN_ATTRS_CURRENT ca ON ca.COLUMN_HID = c.COLUMN_HID

ORDER BY t.TABLE_CATALOG, t.TABLE_SCHEMA, t.TABLE_NAME, c.ORDINAL_POSITION;

 

select * from V_SF_EVENTS;

select * from V_SF_TABLES_CURRENT;

select * from V_SF_TABLE_ATTRS_CURRENT ;

select * from V_SF_TABLE_COLUMNS_CURRENT ;

select * from V_SF_TABLE_COLUMNS_ENRICHED  ;

select * from LNK_ALL_L order by source_hid ;

 

 

 

-- Payloads (child of hubs/events)

DELETE FROM UKB_REPO.SAT_ALL_ATTR_S;

 

-- Edges (child of events)

DELETE FROM UKB_REPO.LNK_ALL_L;

 

-- Collision bucket

DELETE FROM UKB_REPO.HUB_COLLISION_H;

 

-- Hubs (identities)

DELETE FROM UKB_REPO.HUB_ALL_H;

 

-- Events (parents of SAT/LNK)

DELETE FROM UKB_REPO.EVT_ALL_E;

 

 

 

 

To view or add a comment, sign in

Explore topics