Prepare your data sources
Use this guide to make sure a Snowflake table or view is ready to stream into the RAI Native App.
It covers supported object and column types, how to reshape unsupported VARIANT data into supported columns, how to enable change tracking, and what access-control constraints to check before you create a stream.
- The RAI Native App is installed in your Snowflake account.
- You have a Snowflake table or view that you want to use as a data source.
What Snowflake objects can be used as data sources
Section titled “What Snowflake objects can be used as data sources”Use this table to check whether your Snowflake object type can be used as a stream source:
| Snowflake object type | Supported? |
|---|---|
| Standard table | |
| Standard view | |
| Snowflake-managed Iceberg table Preview | |
| Temporary table | |
| Transient table | |
| Dynamic table | |
| External table | |
| External view |
What column data types are supported
Section titled “What column data types are supported”Data streams support the following Snowflake column types:
If your table or view uses only the column types listed above, you can use it as a stream source after you enable change tracking. If it includes unsupported column types, reshape the data into supported columns before you create the stream.
Reshape VARIANT data into supported columns
Section titled “Reshape VARIANT data into supported columns”Snowflake tables or views that contain VARIANT columns can’t be streamed directly into the RAI Native App.
To use JSON data in your models, expand it into supported column types before you create a data stream.
Use this table to choose the right reshaping pattern:
| What to use | When to use it |
|---|---|
| Flatten into a view | Use when each source row should stay as one output row and JSON fields can become scalar columns. |
| Normalize into a child table | Use when a JSON array must become multiple rows that you want to query independently. |
Flatten into a view
Section titled “Flatten into a view”Use this pattern when you want to keep a one-to-one mapping between rows in the raw table and rows in the streamable source. This works well when array values can be safely collapsed into a scalar representation such as a comma-separated string.
For example, consider the following purchase-events table with a VARIANT column:
-- Source table with a VARIANT columnCREATE OR REPLACE TABLE PURCHASE_EVENTS_RAW ( event_id NUMBER, order_id NUMBER, customer_id NUMBER, occurred_at TIMESTAMP_NTZ, EVENT_DATA VARIANT);
-- Sample rows with nested JSON dataINSERT INTO PURCHASE_EVENTS_RAWSELECT 1, 501, 10001, '2025-07-01 10:15:00'::TIMESTAMP_NTZ, PARSE_JSON('{"channel":"web","status":"submitted","region":"NA","items":["SKU-123","SKU-456"]}')UNION ALLSELECT 2, 502, 10002, '2025-07-01 10:20:00'::TIMESTAMP_NTZ, PARSE_JSON('{"channel":"mobile","status":"shipped","region":"EMEA","items":["SKU-789"]}');Follow these steps to create a view with supported columns:
-
Create a flat view with supported columns
Extract the JSON fields into scalar columns that the stream can use:
CREATE OR REPLACE VIEW PURCHASE_EVENTS_CLEAN ASSELECTevent_id,order_id,customer_id,occurred_at,EVENT_DATA:channel::STRING AS channel,EVENT_DATA:status::STRING AS status,EVENT_DATA:region::STRING AS region,ARRAY_TO_STRING(EVENT_DATA:items, ',') AS itemsFROM PURCHASE_EVENTS_RAW;EVENT_DATA:channel::STRING,EVENT_DATA:status::STRING, andEVENT_DATA:region::STRINGextract scalar values from theVARIANTpayload and cast them to supportedSTRINGcolumns.ARRAY_TO_STRING(EVENT_DATA:items, ',')converts the JSON array initemsinto a single supported scalar value.- The resulting
PURCHASE_EVENTS_CLEANview keeps one row per source record, but replaces the unsupportedVARIANTcolumn with streamable columns.
-
Enable change tracking on the table and view
The underlying table for the view must also have change tracking enabled:
ALTER TABLE PURCHASE_EVENTS_RAW SET CHANGE_TRACKING = TRUE;ALTER VIEW PURCHASE_EVENTS_CLEAN SET CHANGE_TRACKING = TRUE; -
Verify the reshaped view
Query the view and confirm that the JSON fields now appear as supported scalar columns:
SELECT * FROM PURCHASE_EVENTS_CLEAN;Output +--------+-----------+--------+---------------------+------------------------------------+--------------------+---------+---------------+| EVENT_ID| ORDER_ID | CUSTOMER_ID| OCCURRED_AT | CHANNEL | STATUS | REGION | ITEMS ||--------+-----------+--------+---------------------+------------------------------------+--------------------+---------+---------------|| 1 | 501 | 10001 | 2025-07-01 10:15:00 | web | submitted | NA | SKU-123,SKU-456 || 2 | 502 | 10002 | 2025-07-01 10:20:00 | mobile | shipped | EMEA | SKU-789 |+--------+-----------+--------+---------------------+------------------------------------+--------------------+---------+---------------+PURCHASE_EVENTS_CLEANuses supported column types and can be used by the RAI Native App.
Normalize into a child table
Section titled “Normalize into a child table”Use this pattern when your JSON data includes arrays that need to be represented as multiple rows. The child table becomes the streamable source object.
For example, consider the following purchase-events table with a VARIANT column:
-- Source table with a VARIANT columnCREATE OR REPLACE TABLE PURCHASE_EVENTS_RAW ( event_id NUMBER, order_id NUMBER, customer_id NUMBER, occurred_at TIMESTAMP_NTZ, EVENT_DATA VARIANT);
-- Sample rows with nested JSON dataINSERT INTO PURCHASE_EVENTS_RAWSELECT 1, 501, 10001, '2025-07-01 10:15:00'::TIMESTAMP_NTZ, PARSE_JSON('{"channel":"web","status":"submitted","region":"NA","items":["SKU-123","SKU-456"]}')UNION ALLSELECT 2, 502, 10002, '2025-07-01 10:20:00'::TIMESTAMP_NTZ, PARSE_JSON('{"channel":"mobile","status":"shipped","region":"EMEA","items":["SKU-789"]}');Follow these steps to create a child table with supported columns:
-
Create a child table with one row per array value
Use
LATERAL FLATTENto expand the JSON array into streamable rows:CREATE OR REPLACE TABLE PURCHASE_EVENT_ITEMS ASSELECTp.event_id,p.order_id,p.customer_id,p.occurred_at,item.value::STRING AS itemFROM PURCHASE_EVENTS_RAW AS p,LATERAL FLATTEN(INPUT => p.EVENT_DATA:items) AS item;LATERAL FLATTEN(INPUT => p.EVENT_DATA:items)expands the unsupported JSON array into one result row per array element.item.value::STRING AS itemcasts each expanded JSON value to a supportedSTRINGcolumn.- The parent row metadata stays attached through
event_id,order_id,customer_id, andoccurred_at, so each purchased item can be streamed as its own row.
-
Enable change tracking on the child table
Turn on change tracking for the reshaped source:
ALTER TABLE PURCHASE_EVENT_ITEMS SET CHANGE_TRACKING = TRUE; -
Verify the child table
Query the child table and confirm that each array value is now stored in its own row:
SELECT * FROM PURCHASE_EVENT_ITEMS;Output +--------+-----------+--------+---------------------+---------+| EVENT_ID| ORDER_ID | CUSTOMER_ID| OCCURRED_AT | ITEM ||--------+-----------+--------+---------------------+---------|| 1 | 501 | 10001 | 2025-07-01 10:15:00 | SKU-123 || 1 | 501 | 10001 | 2025-07-01 10:15:00 | SKU-456 || 2 | 502 | 10002 | 2025-07-01 10:20:00 | SKU-789 |+--------+-----------+--------+---------------------+---------+PURCHASE_EVENT_ITEMSuses supported column types and can be used by the RAI Native App.
Enable change tracking
Section titled “Enable change tracking”The RAI Native App uses Snowflake’s native change tracking feature to capture changes to tables and views and synchronize them with PyRel semantic models built on top of those sources. You must enable change tracking on each source object before PyRel can use it as a source.
Follow these steps to enable change tracking on a Snowflake table:
-
Check whether change tracking is already enabled
Use
SHOW TABLESand inspect theCHANGE_TRACKINGcolumn:SHOW TABLES LIKE 'MyTable';If change tracking is enabled, the
CHANGE_TRACKINGcolumn is set toON.from relationalai.config import create_configsession = create_config().get_session()rows = session.sql("SHOW TABLES LIKE 'MyTable'").collect()print(rows)If change tracking is enabled, the
CHANGE_TRACKINGcolumn is set toON. -
Enable change tracking if needed
Turn on change tracking for the table:
ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE;from relationalai.config import create_configsession = create_config().get_session()session.sql("ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE").collect() -
Verify the setting
Run the same
SHOW TABLEScheck again and confirm thatCHANGE_TRACKINGis nowON:SHOW TABLES LIKE 'MyTable';from relationalai.config import create_configsession = create_config().get_session()rows = session.sql("SHOW TABLES LIKE 'MyTable'").collect()print(rows)
Views require change tracking on two layers. The underlying tables that the view depends on must have change tracking enabled, and the view itself must also have change tracking enabled before you can stream from it.
Follow these steps to enable change tracking on a Snowflake view and its underlying tables:
-
Identify the underlying tables
Query Snowflake dependency metadata to identify the objects the view depends on. If the results include another view, repeat the check for that view until you reach only base tables.
Query the dependency metadata for the view:
SELECTreferenced_database,referenced_schema,referenced_object_name,referenced_object_domainFROM snowflake.account_usage.object_dependenciesWHERE referencing_database = 'MyDatabase'AND referencing_schema = 'MySchema'AND referencing_object_name = 'MyView'AND referencing_object_domain = 'VIEW';Query the dependency metadata for the view:
from relationalai.config import create_configsession = create_config().get_session()rows = session.sql("""SELECTreferenced_database,referenced_schema,referenced_object_name,referenced_object_domainFROM snowflake.account_usage.object_dependenciesWHERE referencing_database = 'MyDatabase'AND referencing_schema = 'MySchema'AND referencing_object_name = 'MyView'AND referencing_object_domain = 'VIEW'""").collect()print(rows) -
Enable change tracking on the underlying tables
Turn on change tracking for every base table used by the view:
ALTER TABLE MyBaseTable SET CHANGE_TRACKING = TRUE;from relationalai.config import create_configsession = create_config().get_session()session.sql("ALTER TABLE MyBaseTable SET CHANGE_TRACKING = TRUE").collect() -
Enable change tracking on the view
After the underlying tables are ready, enable change tracking on the view:
ALTER VIEW MyView SET CHANGE_TRACKING = TRUE;from relationalai.config import create_configsession = create_config().get_session()session.sql("ALTER VIEW MyView SET CHANGE_TRACKING = TRUE").collect() -
Verify the setting
Confirm that change tracking is enabled on the view:
SHOW VIEWS LIKE 'MyView';If change tracking is enabled, the
CHANGE_TRACKINGcolumn is set toON.from relationalai.config import create_configsession = create_config().get_session()rows = session.sql("SHOW VIEWS LIKE 'MyView'").collect()print(rows)If change tracking is enabled, the
CHANGE_TRACKINGcolumn is set toON.