Manage data streams
Data streams are the core mechanism that keeps data in PyRel semantic models up to date with changes in Snowflake source tables and views. This guide explains what data streams are, how they work, and how to manage them for normal operation.
- The RAI Native App is installed in your Snowflake account.
- You have a Snowflake table or view that is already being used as a data source.
What a data stream is
Section titled “What a data stream is”Data streams track changes to tables and views in your Snowflake account, ensuring that queries from PyRel semantic models use the latest data. Each stream corresponds to one source object, which is a table or view that a model reads from.
Here’s how it works:
- Change tracking captures information about all Data Manipulation Language (DML) statements committed to a table or view, including
INSERT,UPDATE, andDELETEoperations. - Data streams read these changes and temporarily store them in a internal stage inside the RAI Native App’s Snowflake database. This data is staged even if the CDC service is disabled.
- When the CDC service is enabled, the staged data is processed in batches once every minute and made available to semantic models.
How data streams are created
Section titled “How data streams are created”PyRel automatically creates data streams.
Users can declare Snowflake tables or views as data sources in their PyRel semantic models.
When a query is executed that references data in a table or view, the Python API checks whether a data stream exists for the referenced object.
If a stream does not exist, it is created automatically using the api.create_data_stream() procedure.
View all data streams
Section titled “View all data streams”Requires the cdc_admin application role.
Use the list view when you want to confirm that a stream exists and see its high-level state.
To view a list all data streams, query the api.data_streams view:
SELECT * FROM relationalai.api.data_streams;+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+---------+----------+-----------------+----------------+-------------------------------+| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | VERSION | COLUMNS | REFERENCE_HASH | SOURCE_DETAILS | CREATED_AT_TZ ||-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+---------+----------+-----------------+----------------+-------------------------------|| ds_a1b2c3d4_e5f6_7a89_b123_d456e789 | 2025-07-24 19:19:46.955 | john.doe@company.com | CREATED | DATA_STREAM_VIEW | 1234abcd-5678-90ef-ab12-3456cdef7890 | example_db.sales.view1 | SalesModel | example_db.sales.view1 | 1.0 | [{...}] | eca05e...fe3092 | NULL | 2025-07-24 19:19:46.955 -0700 || ds_8e7f6d5c_4a3b_2c1d_0e9f_7b6a8d9f | 2025-07-24 19:24:53.265 | maria.garcia@company.com | CREATED | DATA_STREAM_TABLE | bcd123ef-4567-890a-bcde-abcdef678901 | example_db.hr.employees | HRModel | example_db.hr.employees | 1.0 | [{...}] | bbafd7...1ddbe1 | NULL | 2025-07-24 19:24:53.265 -0700 || ds_9a8b7c6d_5e4f_3d2a_1b0e_f7g6h5i3 | 2025-07-29 17:46:27.281 | mark.jones@company.com | DELETING | DATA_STREAM_VIEW | 7890abcd-1234-5678-90ef-bcde4567890f | example_db.finance.budget | FinanceModel | example_db.finance.budget| 1.0 | [{...}] | e58cbc...8cbd89 | NULL | 2025-07-29 17:46:27.281 -0700 |+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+---------+----------+-----------------+----------------+-------------------------------+To view a list all data streams, create a Snowpark session from your default PyRel connection with create_config(), then query the api.data_streams view:
from pprint import pprint
from relationalai.config import create_config
session = create_config().get_session()
# List all data streams.rows = session.sql("SELECT * FROM relationalai.api.data_streams").collect()pprint(rows)- Scan the
FQ_OBJECT_NAMEcolumn first to confirm that the stream exists for the source object you care about. - Scan the
STATUScolumn next to see the stream’s current high-level lifecycle state. - The
RAI_DATABASEandRAI_RELATIONcolumns are internal only and can be ignored.
View details for one data stream
Section titled “View details for one data stream”Requires the cdc_admin application role.
Use the detail view when you want to inspect one stream more closely.
To get details about a specific data stream, pass the stream name and model name to the api.get_data_stream() procedure:
-- Get details about the data stream. Replace the placeholders with your-- database, schema, and table or view name.CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS ||-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------|| ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } | 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED |+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+To get details about a specific data stream, create a Snowpark session from your default PyRel connection with create_config(), then call the api.get_data_stream() procedure:
from relationalai.config import create_config
stream_name = "<db>.<schema>.<table_or_view>"
session = create_config().get_session()
rows = session.sql(f"CALL relationalai.api.get_data_stream('{stream_name}')").collect()print(rows)- The
STATUScolumn shows the stream’s high-level state. - The
DATA_SYNC_STATUScolumn shows whether the stream is currently synced or waiting on more processing. - The
PENDING_BATCHES_COUNTandNEXT_BATCH_STATUScolumns help you see whether the stream still has work to process. - The
ERRORScolumn helps you quickly tell whether the stream is in a normal or abnormal state.
Suspend a data stream
Section titled “Suspend a data stream”Suspend a stream when you want to intentionally pause change processing for one source object without deleting the stream.
Follow these steps to suspend a data stream.
-
Suspend the stream
Call
relationalai.api.suspend_data_stream()with the fully qualified name of the source object you want to suspend:-- Suspend a data stream. Replace the placeholders with your database,-- schema, and table or view name.CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>');from relationalai.config import create_configsession = create_config().get_session()# Suspend a data stream. Replace the placeholders with your database,# schema, and table or view name.session.sql("CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>')").collect() -
Verify that the stream is suspended
Run
relationalai.api.get_data_stream()and confirm thatSTATUSisSUSPENDED:-- Get details about the data stream. Replace the placeholders with your-- database, schema, and table or view name.CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');from relationalai.config import create_configstream_name = "<db>.<schema>.<table_or_view>"session = create_config().get_session()rows = session.sql(f"CALL relationalai.api.get_data_stream('{stream_name}')").collect()print(rows)
Resume a suspended data stream
Section titled “Resume a suspended data stream”Resume a stream when you want a suspended stream to start processing source changes again.
-
Resume the stream
Call
api.resume_data_stream()for the suspended source object:-- Resume a data stream. Replace the placeholders with your database,-- schema, and table or view name.CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>');from relationalai.config import create_configsession = create_config().get_session()# Resume a data stream. Replace the placeholders with your database,# schema, and table or view name.session.sql("CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>')").collect() -
Verify that the stream is active again
Run
relationalai.api.get_data_stream()and confirm that theSTATUSisACTIVE:-- Get details about the data stream. Replace the placeholders with your-- database, schema, and table or view name.CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');from relationalai.config import create_configstream_name = "<db>.<schema>.<table_or_view>"session = create_config().get_session()rows = session.sql(f"CALL relationalai.api.get_data_stream('{stream_name}')").collect()print(rows)
Verify a stream is syncing changes
Section titled “Verify a stream is syncing changes”Use the detail view to verify whether a stream is advancing normally.
Run api.get_data_stream(), then inspect the stream status fields:
-- Get details about the data stream. Replace the placeholders with your-- database, schema, and table or view name.CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS ||-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------|| ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } | 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED |+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+from relationalai.config import create_config
stream_name = "<db>.<schema>.<table_or_view>"
session = create_config().get_session()
rows = session.sql(f"CALL relationalai.api.get_data_stream('{stream_name}')").collect()print(rows)- The stream is active and fully synced when
STATUSisACTIVE,DATA_SYNC_STATUSisSYNCED,PENDING_BATCHES_COUNTis0, andCDC_STATUSisSTARTED. - The stream is active but still syncing when
STATUSisACTIVEandPENDING_BATCHES_COUNTis greater than0. - If
STATUSisSUSPENDED, the stream is paused and not syncing changes. See Suspend a data stream and Resume a suspended data stream for more info. - If the
ERRORScolumn is not empty, the stream is in an abnormal state and may require troubleshooting. See Fix data stream issues for more info.