The Universal Canonical Model: Breaking the Boundaries Between Data and Metadata
What you're looking at isn't just another data warehouse pattern—it's a fundamental reimagining of how we store, integrate, and understand information. This seemingly simple Snowflake implementation demonstrates a universal canonical model that dissolves the artificial boundaries between data and metadata, between structure and content, between different modeling paradigms.
The Core Innovation: Four Tables to Rule Them All
At its heart, is one explodable entity. In this architectural implementation, it is exploded into four primary tables to represent ANY data from ANY source in ANY format:
HUB_ALL_H - Universal identities (customers, products, tables, concepts)
EVT_ALL_E - Events that occur (transactions, changes, snapshots)
LNK_ALL_L - Relationships between entities (edges in the graph)
SAT_ALL_ATTR_S - Attributes/properties (the descriptive payload)
This isn't just elegant. These four structures can represent:
Traditional 3NF databases
Star schemas
Data Vault models
Property graphs
Document stores
Time-series data
Hierarchical structures
The purpose of Dual Hashing
The system employs two complementary hash strategies:
sql
SHA3_256___IDENTITY_DECLARED -- The semantic/business identity
BLAKE3___IDENTITY_RAW -- The complete content fingerprint
This dual approach enables:
Federation without exposure: Organizations can join data without sharing sensitive content
Deduplication: Identical content is automatically recognized
Versioning: Changes create new hashes while preserving history
Performance: 8-character prefixes enable efficient clustering
Metadata AS Data: The Profound Unification
The demo loads Snowflake's metadata—but here's the key insight: it treats metadata exactly like any other data. A table definition, a customer record, a transaction—they're all just entities with relationships:
sql
-- A "column" isn't a structural element—it's data:
INSERT INTO HUB_ALL_H (ENTITY_TYPE, IDENTITY_RAW_JSON)
VALUES ('SF_COLUMN', '{"name": "customer_email", "type": "VARCHAR"}');
-- The relationship "table HAS column" is also data:
INSERT INTO LNK_ALL_L (SOURCE_HID, TARGET_HID, EDGE_TYPE)
VALUES (table_hash, column_hash, 'HAS_COLUMN');
This means:
· Minimizes physical schema changes: New attributes are just new entities
Self-describing: The model contains its own structure
Universal querying: One query language for data AND metadata
The Pattern Recognition Power
Because everything follows the same pattern, the system can:
1. Auto-Generate Ingestion
Once it knows a table has columns (from metadata), it can generate the procedures to ingest that table's data.
2. Recover Original Models
By analyzing relationship patterns, you can detect: "These entities form a star pattern—this was originally a star schema!"
3. Transform Between Paradigms
The Integrated data can be exposed as:
Normalized tables (via views)
Denormalized stars (via graph traversal)
JSON documents (from VARIANT columns)
Graph queries (via edge relationships)
The Universal Pipeline
ANY Source Format
↓
Four Canonical Tables
↓
ANY Target Format
Whether your source is:
REST APIs returning JSON
Legacy 3NF databases
CSV files
Event streams
Graph databases
They all become entities, events, edges, and attributes. Then they can be exposed as whatever consumers need.
Integration emerges when canonical identity rules are shared
The most powerful aspect: entities from different systems automatically integrate when they share identities:
sql
-- Customer from CRM, transactions from banking, tickets from support
-- ALL automatically connected through shared customer identity hash
CREATE VIEW CUSTOMER_360 AS
SELECT * FROM HUB_ALL_H h
LEFT JOIN [edges from ALL systems where h.SHA3_256 matches];
No pre-planned integration architecture needed. As systems are added, their relationships naturally merge into the graph.
The Deployment Flexibility
This same model works on:
Snowflake (as shown)
PostgreSQL (same structures)
MongoDB (as JSON documents)
Parquet files (columnar storage)
Kafka (event streaming)
Even browser localStorage (for edge computing)
One model, infinite deployments.
Impacts
1. The End of ETL as We Know It
Instead of building custom pipelines, you have a few generic patterns that handle everything.
2. True Data Mesh Architecture
Each domain owns its data but shares the same canonical format, enabling seamless federation.
3. Automatic Lineage and Governance
Every piece of data knows where it came from (EVENT_SID), when it arrived (INGESTED_AT_TS), and how it relates to everything else.
4. Time Travel Built-In
SCD Type 2 temporal tracking is automatic for everything—data AND metadata.
5. Zero-Copy Integration
Organizations can join data using hashes without moving or exposing sensitive information.
The Bootstrap Paradox
Perhaps most remarkably, this system can ingest its own metadata and generate procedures to replicate itself. It's self-describing, self-replicating, and self-improving. The metadata about tables and columns becomes the template for ingesting those very tables and columns.
Conclusion: A New Data Paradigm
This isn't just a clever technical implementation—it's a shift in how we think about data:
From structure TO content: Schema is just more data
From integration TO emergence: Relationships self-organize
From transformation TO projection: One storage, many views
From silos TO federation: Universal language for all data
The canonical model demonstrates that the distinction between data and metadata, between different modeling paradigms, between storage engines—these are all artificial constructs. Underneath, everything is entities and relationships, and this simple truth enables a radically simpler, more powerful approach to data architecture.
This "basic demo" is actually the seed of a data platform that scales from a Raspberry Pi to a global mesh, from a startup's MVP to an enterprise's entire data estate, all using the same four tables and the same simple patterns.
The future of data isn't more complexity—it's this kind of profound simplicity.
Minimal bootstrap demonstration for Snowflake of the implementation of the Model and basic hard-coded load and exposure views for 2 entities.
--Schemas
CREATE SCHEMA IF NOT EXISTS AIS_META;
CREATE SCHEMA IF NOT EXISTS UKB_REPO;
CREATE SCHEMA IF NOT EXISTS UKB_UTIL;
CREATE SCHEMA IF NOT EXISTS UKB_OPS;
-- Sequences (names kept)
CREATE OR REPLACE SEQUENCE UKB_UTIL.AIS_SEQ_IDENTITY_SID START=1 INCREMENT=1;
CREATE OR REPLACE SEQUENCE UKB_UTIL.AIS_SEQ_EVENT_SID START=1 INCREMENT=1;
CREATE OR REPLACE SEQUENCE UKB_UTIL.AIS_SEQ_EDGE_SID START=1 INCREMENT=1;
CREATE OR REPLACE SEQUENCE UKB_UTIL.AIS_SEQ_PAYLOAD_SID START=1 INCREMENT=1;
-- Optional hash validator (Snowflake has no real DOMAINs)
CREATE OR REPLACE FUNCTION UKB_UTIL.IS_HASH256(s STRING)
RETURNS BOOLEAN
AS $$ REGEXP_LIKE(s, '^[0-9a-f]{64}$') $$;
-- Hash shims (until external functions for BLAKE3/SHA3_256 are wired)
CREATE OR REPLACE FUNCTION UKB_UTIL.SHA3_256_HEX(s STRING)
RETURNS STRING AS $$ UPPER(TO_HEX(SHA2(TO_BINARY(COALESCE(s,''), 'UTF-8'),256))) $$;
CREATE OR REPLACE FUNCTION UKB_UTIL.BLAKE3_HEX(s STRING)
RETURNS STRING AS $$ UPPER(TO_HEX(SHA2(TO_BINARY(COALESCE(s,''), 'UTF-8'),256))) $$; -- placeholder
--Core tables (UKB_REPO)
-- ais_meta.template
CREATE OR REPLACE TABLE AIS_META.TEMPLATE (
TEMPLATE_ID VARCHAR(64) PRIMARY KEY, -- hash256
ENTITY_TYPE STRING,
SECTION STRING, -- identity | event | edge | payload
JSON_SCHEMA VARIANT,
CREATED_TS TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- ais_meta.hash_policy
CREATE OR REPLACE TABLE AIS_META.HASH_POLICY (
POLICY_VER NUMBER PRIMARY KEY,
DECLARED_ALGO STRING NOT NULL, -- e.g., 'sha3_256'
RAW_ALGO STRING NOT NULL, -- e.g., 'blake3'
EFFECTIVE_FROM TIMESTAMP_NTZ NOT NULL
);
INSERT INTO AIS_META.HASH_POLICY SELECT 1,'sha3_256','blake3',CURRENT_TIMESTAMP()
WHERE NOT EXISTS (SELECT 1 FROM AIS_META.HASH_POLICY WHERE POLICY_VER=1);
-- ais_meta.rule_version
CREATE OR REPLACE TABLE AIS_META.RULE_VERSION (
RULE_NAME STRING PRIMARY KEY,
THRESHOLD NUMBER,
EFFECTIVE_FROM TIMESTAMP_NTZ
);
INSERT INTO AIS_META.RULE_VERSION (RULE_NAME,THRESHOLD,EFFECTIVE_FROM)
SELECT * FROM VALUES
('attr_predicate_pct', 10, CURRENT_TIMESTAMP()),
('table_row_threshold',5e9, CURRENT_TIMESTAMP()),
('raw_retention_days', 90, CURRENT_TIMESTAMP())
WHERE NOT EXISTS (SELECT 1 FROM AIS_META.RULE_VERSION);
-- hub_all_h
CREATE OR REPLACE TABLE UKB_REPO.HUB_ALL_H (
ENTITY_TYPE STRING NOT NULL,
IDENTITY_SID NUMBER DEFAULT UKB_UTIL.AIS_SEQ_IDENTITY_SID.NEXTVAL PRIMARY KEY,
SHA3_256___IDENTITY_DECLARED VARCHAR(64) NOT NULL,
BLAKE3___IDENTITY_RAW VARCHAR(64) NOT NULL UNIQUE,
BLAKE3_PREFIX CHAR(8), -- maintained by ops
-- exemplar business keys (extend as needed)
CUSTOMER_NUMBER VARCHAR(50),
SOURCE_SYSTEM VARCHAR(20),
TEMPLATE_ID VARCHAR(64) REFERENCES AIS_META.TEMPLATE(TEMPLATE_ID),
DECLARED_DELTA_JSON VARIANT DEFAULT OBJECT_CONSTRUCT(),
IDENTITY_RAW_JSON VARIANT NOT NULL,
VALID_FROM_TS TIMESTAMP_NTZ NOT NULL DEFAULT TO_TIMESTAMP_NTZ('1900-01-01'),
VALID_TO_TS TIMESTAMP_NTZ NOT NULL DEFAULT TO_TIMESTAMP_NTZ('9999-12-31'),
INGESTED_AT_TS TIMESTAMP_NTZ NOT NULL DEFAULT CURRENT_TIMESTAMP(),
SEQUENCE_ID NUMBER AUTOINCREMENT,
CREATED_TS TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
CLUSTER BY (ENTITY_TYPE, BLAKE3_PREFIX, VALID_FROM_TS);
-- evt_all_e
CREATE OR REPLACE TABLE UKB_REPO.EVT_ALL_E (
EVENT_SID NUMBER DEFAULT UKB_UTIL.AIS_SEQ_EVENT_SID.NEXTVAL PRIMARY KEY,
SHA3_256___EVENT_DECLARED VARCHAR(64) NOT NULL,
BLAKE3___EVENT_RAW VARCHAR(64) NOT NULL UNIQUE,
BLAKE3_PREFIX CHAR(8),
EVENT_TYPE STRING NOT NULL,
EVENT_TS TIMESTAMP_NTZ NOT NULL,
SOURCE_SYSTEM STRING NOT NULL,
STATUS STRING,
TEMPLATE_ID VARCHAR(64) REFERENCES AIS_META.TEMPLATE(TEMPLATE_ID),
DECLARED_DELTA_JSON VARIANT DEFAULT OBJECT_CONSTRUCT(),
EVENT_RAW_JSON VARIANT NOT NULL,
INGESTED_AT_TS TIMESTAMP_NTZ NOT NULL DEFAULT CURRENT_TIMESTAMP(),
SEQUENCE_ID NUMBER AUTOINCREMENT,
CREATED_TS TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
CLUSTER BY (BLAKE3_PREFIX, EVENT_TS);
-- lnk_all_l
CREATE OR REPLACE TABLE UKB_REPO.LNK_ALL_L (
EDGE_SID NUMBER DEFAULT UKB_UTIL.AIS_SEQ_EDGE_SID.NEXTVAL PRIMARY KEY,
EVENT_SID NUMBER NOT NULL REFERENCES UKB_REPO.EVT_ALL_E(EVENT_SID),
SHA3_256___EDGE_DECLARED VARCHAR(64) NOT NULL,
BLAKE3___EDGE_RAW VARCHAR(64) NOT NULL UNIQUE,
BLAKE3_PREFIX CHAR(8),
SOURCE_HID VARCHAR(64) NOT NULL,
TARGET_HID VARCHAR(64) NOT NULL,
EDGE_TYPE VARCHAR(50) NOT NULL,
CONFIDENCE_SCORE NUMBER(5,4) DEFAULT 1,
TEMPLATE_ID VARCHAR(64) REFERENCES AIS_META.TEMPLATE(TEMPLATE_ID),
DECLARED_DELTA_JSON VARIANT DEFAULT OBJECT_CONSTRUCT(),
EDGE_RAW_JSON VARIANT NOT NULL,
VALID_FROM_TS TIMESTAMP_NTZ DEFAULT TO_TIMESTAMP_NTZ('1900-01-01'),
VALID_TO_TS TIMESTAMP_NTZ DEFAULT TO_TIMESTAMP_NTZ('9999-12-31'),
INGESTED_AT_TS TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
SEQUENCE_ID NUMBER AUTOINCREMENT,
CREATED_TS TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
CONSTRAINT UQ_EDGE_TRIPLET UNIQUE (SOURCE_HID, TARGET_HID, EDGE_TYPE)
)
CLUSTER BY (BLAKE3_PREFIX, SOURCE_HID, TARGET_HID);
-- sat_all_attr_s
CREATE OR REPLACE TABLE UKB_REPO.SAT_ALL_ATTR_S (
ENTITY_TYPE STRING NOT NULL,
PAYLOAD_SID NUMBER DEFAULT UKB_UTIL.AIS_SEQ_PAYLOAD_SID.NEXTVAL PRIMARY KEY,
EVENT_SID NUMBER NOT NULL REFERENCES UKB_REPO.EVT_ALL_E(EVENT_SID),
PARENT_HID VARCHAR(64) NOT NULL,
SHA3_256___PAYLOAD_DECLARED VARCHAR(64) NOT NULL,
BLAKE3___PAYLOAD_RAW VARCHAR(64) NOT NULL UNIQUE,
BLAKE3_PREFIX CHAR(8),
-- exemplar hot attributes
CUSTOMER_NAME VARCHAR(200),
CUSTOMER_EMAIL VARCHAR(320),
VALIDATION_STATUS VARCHAR(20),
TEMPLATE_ID VARCHAR(64) REFERENCES AIS_META.TEMPLATE(TEMPLATE_ID),
DECLARED_DELTA_JSON VARIANT DEFAULT OBJECT_CONSTRUCT(),
PAYLOAD_RAW_JSON VARIANT NOT NULL,
VALID_FROM_TS TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
VALID_TO_TS TIMESTAMP_NTZ,
INGESTED_AT_TS TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
SEQUENCE_ID NUMBER AUTOINCREMENT,
CREATED_TS TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
)
CLUSTER BY (ENTITY_TYPE, PARENT_HID, BLAKE3_PREFIX, VALID_FROM_TS);
-- hub_collision_h
CREATE OR REPLACE TABLE UKB_REPO.HUB_COLLISION_H (
BLAKE3___IDENTITY_RAW VARCHAR(64),
SOURCE_SYSTEM STRING,
TENANT_ID STRING,
ENVELOPE_JSON VARIANT,
FIRST_SEEN_TS TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
PRIMARY KEY (BLAKE3___IDENTITY_RAW, SOURCE_SYSTEM, TENANT_ID) -- informational
);
--Ops:
CREATE OR REPLACE PROCEDURE UKB_OPS.APPLY_PREFIXES()
RETURNS STRING
LANGUAGE SQL
AS
$$
BEGIN
UPDATE UKB_REPO.HUB_ALL_H SET BLAKE3_PREFIX = SUBSTR(BLAKE3___IDENTITY_RAW,1,8)
WHERE BLAKE3_PREFIX IS NULL OR BLAKE3_PREFIX <> SUBSTR(BLAKE3___IDENTITY_RAW,1,8);
UPDATE UKB_REPO.EVT_ALL_E SET BLAKE3_PREFIX = SUBSTR(BLAKE3___EVENT_RAW,1,8)
WHERE BLAKE3_PREFIX IS NULL OR BLAKE3_PREFIX <> SUBSTR(BLAKE3___EVENT_RAW,1,8);
UPDATE UKB_REPO.LNK_ALL_L SET BLAKE3_PREFIX = SUBSTR(BLAKE3___EDGE_RAW,1,8)
WHERE BLAKE3_PREFIX IS NULL OR BLAKE3_PREFIX <> SUBSTR(BLAKE3___EDGE_RAW,1,8);
UPDATE UKB_REPO.SAT_ALL_ATTR_S SET BLAKE3_PREFIX = SUBSTR(BLAKE3___PAYLOAD_RAW,1,8)
WHERE BLAKE3_PREFIX IS NULL OR BLAKE3_PREFIX <> SUBSTR(BLAKE3___PAYLOAD_RAW,1,8);
RETURN 'OK';
END;
$$;
-- Optional: run on a schedule
-- CREATE TASK UKB_OPS.TASK_APPLY_PREFIXES WAREHOUSE=<WH> SCHEDULE='USING CRON /10 * UTC' AS CALL UKB_OPS.APPLY_PREFIXES();
CREATE OR REPLACE FUNCTION UKB_UTIL.SHA3_256_HEX_V(v VARIANT)
RETURNS STRING
AS $$ UPPER(SHA2(TO_JSON(v), 256)) $$;
CREATE OR REPLACE FUNCTION UKB_UTIL.BLAKE3_HEX_V(v VARIANT)
RETURNS STRING
AS $$ UPPER(SHA2(TO_JSON(v), 256)) $$;
--LOAD
CREATE OR REPLACE PROCEDURE UKB_OPS.LOAD_SNOWFLAKE_METADATA()
RETURNS STRING
LANGUAGE SQL
AS
$$
DECLARE
v_now TIMESTAMP_NTZ := CURRENT_TIMESTAMP();
v_event_sid NUMBER;
v_tpl_tbl STRING;
v_tpl_col STRING;
v_tpl_edge STRING;
v_tpl_pay STRING;
v_tables NUMBER;
v_cols NUMBER;
BEGIN
-- Resolve template IDs (assumes you seeded them)
SELECT TEMPLATE_ID INTO :v_tpl_tbl
FROM AIS_META.TEMPLATE
WHERE SECTION='identity' AND ENTITY_TYPE='SF_TABLE'
QUALIFY ROW_NUMBER() OVER (ORDER BY CREATED_TS DESC)=1;
SELECT TEMPLATE_ID INTO :v_tpl_col
FROM AIS_META.TEMPLATE
WHERE SECTION='identity' AND ENTITY_TYPE='SF_COLUMN'
QUALIFY ROW_NUMBER() OVER (ORDER BY CREATED_TS DESC)=1;
SELECT TEMPLATE_ID INTO :v_tpl_edge
FROM AIS_META.TEMPLATE
WHERE SECTION='edge' AND ENTITY_TYPE='SF_EDGE'
QUALIFY ROW_NUMBER() OVER (ORDER BY CREATED_TS DESC)=1;
SELECT TEMPLATE_ID INTO :v_tpl_pay
FROM AIS_META.TEMPLATE
WHERE SECTION='payload' AND ENTITY_TYPE='SF_COLUMN'
QUALIFY ROW_NUMBER() OVER (ORDER BY CREATED_TS DESC)=1;
/* A) STAGING (identity vs payload signatures; bind :v_now) */
CREATE OR REPLACE TEMP TABLE STG_TABLES AS
SELECT
TABLE_CATALOG AS DATABASE_NAME,
TABLE_SCHEMA AS SCHEMA_NAME,
TABLE_NAME, TABLE_TYPE, COMMENT, ROW_COUNT, BYTES,
OBJECT_CONSTRUCT(*) AS RAW,
-- Declared identity key (canonical)
UPPER(TABLE_CATALOG)||'|'||UPPER(TABLE_SCHEMA)||'|'||UPPER(TABLE_NAME)||'|TABLE' AS DECL_KEY,
-- Identity signature (what makes the table the same table)
UPPER(COALESCE(TABLE_CATALOG,''))||'|'||
UPPER(COALESCE(TABLE_SCHEMA,'')) ||'|'||
UPPER(COALESCE(TABLE_NAME,'')) ||'|'||
UPPER(COALESCE(TABLE_TYPE,'')) AS ID_SIG_TBL,
-- Payload signature (attributes you want SAT to react to)
UPPER(COALESCE(TABLE_TYPE,'')) ||'|'||
UPPER(COALESCE(COMMENT,'')) ||'|'||
TO_VARCHAR(NVL(ROW_COUNT,-1)) ||'|'||
TO_VARCHAR(NVL(BYTES,-1)) AS PAY_SIG_TBL,
:v_now AS SNAP_TS
FROM SNOWFLAKE.ACCOUNT_USAGE.TABLES WHERE DELETED IS NULL;
CREATE OR REPLACE TEMP TABLE STG_COLUMNS AS
SELECT
TABLE_CATALOG AS DATABASE_NAME,
TABLE_SCHEMA AS SCHEMA_NAME,
TABLE_NAME, COLUMN_NAME,
ORDINAL_POSITION, DATA_TYPE, IS_NULLABLE, COMMENT,
OBJECT_CONSTRUCT(*) AS RAW,
UPPER(TABLE_CATALOG)||'|'||UPPER(TABLE_SCHEMA)||'|'||
UPPER(TABLE_NAME)||'|'||UPPER(COLUMN_NAME)||'|COLUMN' AS DECL_KEY,
-- Identity signature (stable)
UPPER(COALESCE(TABLE_CATALOG,''))||'|'||
UPPER(COALESCE(TABLE_SCHEMA,'')) ||'|'||
UPPER(COALESCE(TABLE_NAME,'')) ||'|'||
UPPER(COALESCE(COLUMN_NAME,'')) ||'|'||
TO_VARCHAR(NVL(ORDINAL_POSITION,-1)) AS ID_SIG_COL,
-- Payload signature (properties)
UPPER(COALESCE(DATA_TYPE,'')) ||'|'||
UPPER(COALESCE(IS_NULLABLE,'')) ||'|'||
UPPER(COALESCE(COMMENT,'')) AS PAY_SIG_COL,
:v_now AS SNAP_TS
FROM SNOWFLAKE.ACCOUNT_USAGE.COLUMNS WHERE DELETED IS NULL;
/* B) HUBS (IDENTITY ONLY) — close/open on identity signature */
-- TABLE hubs: close changed
UPDATE UKB_REPO.HUB_ALL_H h
SET VALID_TO_TS = :v_now
WHERE h.ENTITY_TYPE='SF_TABLE' AND h.VALID_TO_TS = '9999-12-31'::TIMESTAMP_NTZ
AND EXISTS (
SELECT 1 FROM STG_TABLES s
WHERE UKB_UTIL.SHA3_256_HEX(s.DECL_KEY) = h.SHA3_256___IDENTITY_DECLARED
AND UKB_UTIL.BLAKE3_HEX(s.ID_SIG_TBL) <> h.BLAKE3___IDENTITY_RAW
);
-- TABLE hubs: insert open
INSERT INTO UKB_REPO.HUB_ALL_H (
ENTITY_TYPE, SHA3_256___IDENTITY_DECLARED, BLAKE3___IDENTITY_RAW, BLAKE3_PREFIX,
CUSTOMER_NUMBER, SOURCE_SYSTEM, TEMPLATE_ID, DECLARED_DELTA_JSON, IDENTITY_RAW_JSON,
VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS
)
SELECT
'SF_TABLE',
UKB_UTIL.SHA3_256_HEX(s.DECL_KEY),
UKB_UTIL.BLAKE3_HEX(s.ID_SIG_TBL),
SUBSTR(UKB_UTIL.BLAKE3_HEX(s.ID_SIG_TBL),1,8),
NULL, 'SNOWFLAKE', :v_tpl_tbl, OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT('TABLE_CATALOG', s.DATABASE_NAME,
'TABLE_SCHEMA', s.SCHEMA_NAME,
'TABLE_NAME', s.TABLE_NAME,
'TABLE_TYPE', s.TABLE_TYPE),
:v_now, '9999-12-31'::TIMESTAMP_NTZ, :v_now, :v_now
FROM STG_TABLES s
WHERE NOT EXISTS (
SELECT 1 FROM UKB_REPO.HUB_ALL_H h
WHERE h.ENTITY_TYPE='SF_TABLE'
AND h.SHA3_256___IDENTITY_DECLARED = UKB_UTIL.SHA3_256_HEX(s.DECL_KEY)
AND h.VALID_TO_TS = '9999-12-31'::TIMESTAMP_NTZ
);
-- COLUMN hubs: close changed
UPDATE UKB_REPO.HUB_ALL_H h
SET VALID_TO_TS = :v_now
WHERE h.ENTITY_TYPE='SF_COLUMN' AND h.VALID_TO_TS = '9999-12-31'::TIMESTAMP_NTZ
AND EXISTS (
SELECT 1 FROM STG_COLUMNS s
WHERE UKB_UTIL.SHA3_256_HEX(s.DECL_KEY) = h.SHA3_256___IDENTITY_DECLARED
AND UKB_UTIL.BLAKE3_HEX(s.ID_SIG_COL) <> h.BLAKE3___IDENTITY_RAW
);
-- COLUMN hubs: insert open
INSERT INTO UKB_REPO.HUB_ALL_H (
ENTITY_TYPE, SHA3_256___IDENTITY_DECLARED, BLAKE3___IDENTITY_RAW, BLAKE3_PREFIX,
CUSTOMER_NUMBER, SOURCE_SYSTEM, TEMPLATE_ID, DECLARED_DELTA_JSON, IDENTITY_RAW_JSON,
VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS
)
SELECT
'SF_COLUMN',
UKB_UTIL.SHA3_256_HEX(s.DECL_KEY),
UKB_UTIL.BLAKE3_HEX(s.ID_SIG_COL),
SUBSTR(UKB_UTIL.BLAKE3_HEX(s.ID_SIG_COL),1,8),
NULL, 'SNOWFLAKE', :v_tpl_col, OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT('TABLE_CATALOG', s.DATABASE_NAME,
'TABLE_SCHEMA', s.SCHEMA_NAME,
'TABLE_NAME', s.TABLE_NAME,
'COLUMN_NAME', s.COLUMN_NAME,
'ORDINAL_POSITION', s.ORDINAL_POSITION),
:v_now, '9999-12-31'::TIMESTAMP_NTZ, :v_now, :v_now
FROM STG_COLUMNS s
WHERE NOT EXISTS (
SELECT 1 FROM UKB_REPO.HUB_ALL_H h
WHERE h.ENTITY_TYPE='SF_COLUMN'
AND h.SHA3_256___IDENTITY_DECLARED = UKB_UTIL.SHA3_256_HEX(s.DECL_KEY)
AND h.VALID_TO_TS = '9999-12-31'::TIMESTAMP_NTZ
);
/* C) EVENT (one per run) */
INSERT INTO UKB_REPO.EVT_ALL_E (
SHA3_256___EVENT_DECLARED, BLAKE3___EVENT_RAW, BLAKE3_PREFIX,
EVENT_TYPE, EVENT_TS, SOURCE_SYSTEM, STATUS,
TEMPLATE_ID, DECLARED_DELTA_JSON, EVENT_RAW_JSON,
INGESTED_AT_TS, CREATED_TS
)
SELECT
UKB_UTIL.SHA3_256_HEX('SNAP|'||TO_VARCHAR(:v_now)),
UKB_UTIL.BLAKE3_HEX(TO_VARCHAR(:v_now)),
SUBSTR(UKB_UTIL.BLAKE3_HEX(TO_VARCHAR(:v_now)),1,8),
'SNOWFLAKE_METADATA_SNAPSHOT', :v_now, 'SNOWFLAKE_ACCOUNT_USAGE', 'OK',
:v_tpl_tbl, OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT('tables', (SELECT COUNT(*) FROM STG_TABLES),
'columns',(SELECT COUNT(*) FROM STG_COLUMNS)),
:v_now, :v_now;
SELECT MAX(EVENT_SID) INTO :v_event_sid
FROM UKB_REPO.EVT_ALL_E
WHERE EVENT_TS = :v_now;
/* D) EDGES: TABLE → COLUMN (dedupe + SCD) */
-- Close changed edges
UPDATE UKB_REPO.LNK_ALL_L l
SET VALID_TO_TS = :v_now
FROM (
SELECT
UKB_UTIL.SHA3_256_HEX(UPPER(sc.DATABASE_NAME)||'|'||UPPER(sc.SCHEMA_NAME)||'|'||UPPER(sc.TABLE_NAME)||'|TABLE') AS TBL_HID,
UKB_UTIL.SHA3_256_HEX(sc.DECL_KEY) AS COL_HID,
UKB_UTIL.BLAKE3_HEX(
UPPER(sc.DATABASE_NAME)||'|'||UPPER(sc.SCHEMA_NAME)||'|'||
UPPER(sc.TABLE_NAME)||'|'||UPPER(sc.COLUMN_NAME)||'|'||
TO_VARCHAR(NVL(sc.ORDINAL_POSITION,-1))
) AS EDGE_RHID
FROM STG_COLUMNS sc
) C
WHERE l.EDGE_TYPE = 'HAS_COLUMN'
AND l.VALID_TO_TS = TO_TIMESTAMP_NTZ('9999-12-31')
AND l.SOURCE_HID = C.TBL_HID
AND l.TARGET_HID = C.COL_HID
AND l.BLAKE3___EDGE_RAW <> C.EDGE_RHID;
-- Insert missing open edges
INSERT INTO UKB_REPO.LNK_ALL_L (
EVENT_SID, SHA3_256___EDGE_DECLARED, BLAKE3___EDGE_RAW, BLAKE3_PREFIX,
SOURCE_HID, TARGET_HID, EDGE_TYPE, CONFIDENCE_SCORE,
TEMPLATE_ID, DECLARED_DELTA_JSON, EDGE_RAW_JSON,
VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS
)
SELECT
:v_event_sid,
UKB_UTIL.SHA3_256_HEX('HAS_COLUMN|'||TBL_HID||'|'||COL_HID),
EDGE_RHID,
SUBSTR(EDGE_RHID,1,8),
TBL_HID, COL_HID, 'HAS_COLUMN', 1,
:v_tpl_edge, OBJECT_CONSTRUCT(), EDGE_RAW,
:v_now, TO_TIMESTAMP_NTZ('9999-12-31'), :v_now, :v_now
FROM (
SELECT
UKB_UTIL.SHA3_256_HEX(UPPER(sc.DATABASE_NAME)||'|'||UPPER(sc.SCHEMA_NAME)||'|'||UPPER(sc.TABLE_NAME)||'|TABLE') AS TBL_HID,
UKB_UTIL.SHA3_256_HEX(sc.DECL_KEY) AS COL_HID,
UKB_UTIL.BLAKE3_HEX(
UPPER(sc.DATABASE_NAME)||'|'||UPPER(sc.SCHEMA_NAME)||'|'||
UPPER(sc.TABLE_NAME)||'|'||UPPER(sc.COLUMN_NAME)||'|'||
TO_VARCHAR(NVL(sc.ORDINAL_POSITION,-1))
) AS EDGE_RHID,
OBJECT_CONSTRUCT('db',sc.DATABASE_NAME,'sch',sc.SCHEMA_NAME,'tbl',sc.TABLE_NAME,'col',sc.COLUMN_NAME,'ord',sc.ORDINAL_POSITION) AS EDGE_RAW
FROM STG_COLUMNS sc
) C
WHERE NOT EXISTS (
SELECT 1 FROM UKB_REPO.LNK_ALL_L x
WHERE x.SOURCE_HID = C.TBL_HID
AND x.TARGET_HID = C.COL_HID
AND x.EDGE_TYPE = 'HAS_COLUMN'
AND x.VALID_TO_TS = TO_TIMESTAMP_NTZ('9999-12-31')
);
/* E) SAT payloads (attach to the thing they describe) */
-- TABLE payloads (parent = TABLE HID)
INSERT INTO UKB_REPO.SAT_ALL_ATTR_S (
ENTITY_TYPE, EVENT_SID, PARENT_HID,
SHA3_256___PAYLOAD_DECLARED, BLAKE3___PAYLOAD_RAW, BLAKE3_PREFIX,
CUSTOMER_NAME, CUSTOMER_EMAIL, VALIDATION_STATUS,
TEMPLATE_ID, DECLARED_DELTA_JSON, PAYLOAD_RAW_JSON,
VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS
)
SELECT
'SF_TABLE',
:v_event_sid,
UKB_UTIL.SHA3_256_HEX(s.DECL_KEY),
UKB_UTIL.SHA3_256_HEX(s.PAY_SIG_TBL),
UKB_UTIL.BLAKE3_HEX(s.PAY_SIG_TBL),
SUBSTR(UKB_UTIL.BLAKE3_HEX(s.PAY_SIG_TBL),1,8),
NULL, NULL, NULL,
:v_tpl_pay, OBJECT_CONSTRUCT(), s.RAW,
:v_now, NULL, :v_now, :v_now
FROM STG_TABLES s;
-- COLUMN payloads (parent = COLUMN HID)
INSERT INTO UKB_REPO.SAT_ALL_ATTR_S (
ENTITY_TYPE, EVENT_SID, PARENT_HID,
SHA3_256___PAYLOAD_DECLARED, BLAKE3___PAYLOAD_RAW, BLAKE3_PREFIX,
CUSTOMER_NAME, CUSTOMER_EMAIL, VALIDATION_STATUS,
TEMPLATE_ID, DECLARED_DELTA_JSON, PAYLOAD_RAW_JSON,
VALID_FROM_TS, VALID_TO_TS, INGESTED_AT_TS, CREATED_TS
)
SELECT
'SF_COLUMN',
:v_event_sid,
UKB_UTIL.SHA3_256_HEX(s.DECL_KEY),
UKB_UTIL.SHA3_256_HEX(s.PAY_SIG_COL),
UKB_UTIL.BLAKE3_HEX(s.PAY_SIG_COL),
SUBSTR(UKB_UTIL.BLAKE3_HEX(s.PAY_SIG_COL),1,8),
NULL, NULL, NULL,
:v_tpl_pay, OBJECT_CONSTRUCT(), s.RAW,
:v_now, NULL, :v_now, :v_now
FROM STG_COLUMNS s;
/* F) Prefix maintenance */
CALL UKB_OPS.APPLY_PREFIXES();
/* G) Return counts */
SELECT COUNT(*) INTO :v_tables FROM STG_TABLES;
SELECT COUNT(*) INTO :v_cols FROM STG_COLUMNS;
RETURN 'LOAD OK: tables=' || v_tables || ', columns=' || v_cols;
END;
$$;
CALL UKB_OPS.LOAD_SNOWFLAKE_METADATA();
--Views to expose as array
-- ===== Events (history) =====
CREATE OR REPLACE VIEW UKB_REPO.V_SF_EVENTS AS
SELECT
EVENT_SID,
SHA3_256___EVENT_DECLARED AS EVENT_HID,
BLAKE3___EVENT_RAW AS EVENT_RHID,
EVENT_TYPE, EVENT_TS, SOURCE_SYSTEM, STATUS,
(EVENT_RAW_JSON:"tables")::NUMBER AS TABLES_COUNT,
(EVENT_RAW_JSON:"columns")::NUMBER AS COLUMNS_COUNT,
INGESTED_AT_TS, CREATED_TS
FROM UKB_REPO.EVT_ALL_E
ORDER BY EVENT_TS DESC;
-- ===== Current TABLE identities (open SCD rows in HUB) =====
CREATE OR REPLACE VIEW UKB_REPO.V_SF_TABLES_CURRENT AS
SELECT
h.SHA3_256___IDENTITY_DECLARED AS TABLE_HID,
h.BLAKE3___IDENTITY_RAW AS TABLE_RHID,
h.BLAKE3_PREFIX,
h.IDENTITY_RAW_JSON:"TABLE_CATALOG"::STRING AS TABLE_CATALOG,
h.IDENTITY_RAW_JSON:"TABLE_SCHEMA"::STRING AS TABLE_SCHEMA,
h.IDENTITY_RAW_JSON:"TABLE_NAME"::STRING AS TABLE_NAME,
h.IDENTITY_RAW_JSON:"TABLE_TYPE"::STRING AS TABLE_TYPE,
h.VALID_FROM_TS, h.VALID_TO_TS, h.INGESTED_AT_TS
FROM UKB_REPO.HUB_ALL_H h
WHERE h.ENTITY_TYPE = 'SF_TABLE'
AND h.VALID_TO_TS = TO_TIMESTAMP_NTZ('9999-12-31');
-- ===== Current TABLE payload (latest SAT per TABLE_HID) =====
CREATE OR REPLACE VIEW UKB_REPO.V_SF_TABLE_ATTRS_CURRENT AS
SELECT
s.PARENT_HID AS TABLE_HID,
(s.PAYLOAD_RAW_JSON:"COMMENT")::STRING AS TABLE_COMMENT,
(s.PAYLOAD_RAW_JSON:"ROW_COUNT")::NUMBER AS ROW_COUNT,
(s.PAYLOAD_RAW_JSON:"BYTES")::NUMBER AS BYTES,
(s.PAYLOAD_RAW_JSON:"TABLE_TYPE")::STRING AS TABLE_TYPE_PAYLOAD,
s.EVENT_SID, s.VALID_FROM_TS, s.VALID_TO_TS, s.INGESTED_AT_TS
FROM (
SELECT s.*,
ROW_NUMBER() OVER (PARTITION BY s.PARENT_HID
ORDER BY s.INGESTED_AT_TS DESC, s.SEQUENCE_ID DESC) AS rn
FROM UKB_REPO.SAT_ALL_ATTR_S s
WHERE s.ENTITY_TYPE = 'SF_TABLE'
) s
WHERE s.rn = 1;
-- ===== Current COLUMN identities (open SCD rows) + parent TABLE via current edge =====
CREATE OR REPLACE VIEW UKB_REPO.V_SF_TABLE_COLUMNS_CURRENT AS
SELECT
t.TABLE_HID,
t.TABLE_CATALOG, t.TABLE_SCHEMA, t.TABLE_NAME, t.TABLE_TYPE,
c.COLUMN_HID, c.COLUMN_NAME, c.ORDINAL_POSITION
FROM UKB_REPO.V_SF_TABLES_CURRENT t
JOIN UKB_REPO.V_SF_COLUMNS_CURRENT c
ON c.TABLE_HID = t.TABLE_HID
ORDER BY t.TABLE_CATALOG, t.TABLE_SCHEMA, t.TABLE_NAME, c.ORDINAL_POSITION;
-- ===== Current COLUMN payload (latest SAT per COLUMN_HID) =====
CREATE OR REPLACE VIEW UKB_REPO.V_SF_COLUMN_ATTRS_CURRENT AS
SELECT
s.PARENT_HID AS COLUMN_HID,
(s.PAYLOAD_RAW_JSON:"DATA_TYPE")::STRING AS DATA_TYPE,
(s.PAYLOAD_RAW_JSON:"IS_NULLABLE")::STRING AS IS_NULLABLE,
(s.PAYLOAD_RAW_JSON:"COMMENT")::STRING AS COLUMN_COMMENT,
(s.PAYLOAD_RAW_JSON:"ORDINAL_POSITION")::NUMBER AS ORDINAL_POSITION,
s.EVENT_SID, s.VALID_FROM_TS, s.VALID_TO_TS, s.INGESTED_AT_TS
FROM (
SELECT s.*,
ROW_NUMBER() OVER (PARTITION BY s.PARENT_HID
ORDER BY s.INGESTED_AT_TS DESC, s.SEQUENCE_ID DESC) AS rn
FROM UKB_REPO.SAT_ALL_ATTR_S s
WHERE s.ENTITY_TYPE = 'SF_COLUMN'
) s
WHERE s.rn = 1;
-- ===== Denormalized TABLE ↔ COLUMN with payloads =====
CREATE OR REPLACE VIEW UKB_REPO.V_SF_TABLE_COLUMNS_ENRICHED AS
SELECT
t.TABLE_HID,
t.TABLE_CATALOG, t.TABLE_SCHEMA, t.TABLE_NAME, t.TABLE_TYPE,
ta.ROW_COUNT, ta.BYTES, ta.TABLE_COMMENT,
c.COLUMN_HID, c.COLUMN_NAME, c.ORDINAL_POSITION,
ca.DATA_TYPE, ca.IS_NULLABLE, ca.COLUMN_COMMENT
FROM UKB_REPO.V_SF_TABLES_CURRENT t
LEFT JOIN UKB_REPO.V_SF_TABLE_ATTRS_CURRENT ta ON ta.TABLE_HID = t.TABLE_HID
JOIN UKB_REPO.V_SF_COLUMNS_CURRENT c ON c.TABLE_HID = t.TABLE_HID
LEFT JOIN UKB_REPO.V_SF_COLUMN_ATTRS_CURRENT ca ON ca.COLUMN_HID = c.COLUMN_HID
ORDER BY t.TABLE_CATALOG, t.TABLE_SCHEMA, t.TABLE_NAME, c.ORDINAL_POSITION;
select * from V_SF_EVENTS;
select * from V_SF_TABLES_CURRENT;
select * from V_SF_TABLE_ATTRS_CURRENT ;
select * from V_SF_TABLE_COLUMNS_CURRENT ;
select * from V_SF_TABLE_COLUMNS_ENRICHED ;
select * from LNK_ALL_L order by source_hid ;
-- Payloads (child of hubs/events)
DELETE FROM UKB_REPO.SAT_ALL_ATTR_S;
-- Edges (child of events)
DELETE FROM UKB_REPO.LNK_ALL_L;
-- Collision bucket
DELETE FROM UKB_REPO.HUB_COLLISION_H;
-- Hubs (identities)
DELETE FROM UKB_REPO.HUB_ALL_H;
-- Events (parents of SAT/LNK)
DELETE FROM UKB_REPO.EVT_ALL_E;