Skip to content

Manage data streams

Data streams are the core mechanism that keeps data in PyRel semantic models up to date with changes in Snowflake source tables and views. This guide explains what data streams are, how they work, and how to manage them for normal operation.

  • The RAI Native App is installed in your Snowflake account.
  • You have a Snowflake table or view that is already being used as a data source.

Data streams track changes to tables and views in your Snowflake account, ensuring that queries from PyRel semantic models use the latest data. Each stream corresponds to one source object, which is a table or view that a model reads from.

Here’s how it works:

  • Change tracking captures information about all Data Manipulation Language (DML) statements committed to a table or view, including INSERT, UPDATE, and DELETE operations.
  • Data streams read these changes and temporarily store them in a internal stage inside the RAI Native App’s Snowflake database. This data is staged even if the CDC service is disabled.
  • When the CDC service is enabled, the staged data is processed in batches once every minute and made available to semantic models.

PyRel automatically creates data streams. Users can declare Snowflake tables or views as data sources in their PyRel semantic models. When a query is executed that references data in a table or view, the Python API checks whether a data stream exists for the referenced object. If a stream does not exist, it is created automatically using the api.create_data_stream() procedure.

Use the list view when you want to confirm that a stream exists and see its high-level state.

To view a list all data streams, query the api.data_streams view:

SELECT * FROM relationalai.api.data_streams;
Output
+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+---------+----------+-----------------+----------------+-------------------------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | VERSION | COLUMNS | REFERENCE_HASH | SOURCE_DETAILS | CREATED_AT_TZ |
|-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+---------+----------+-----------------+----------------+-------------------------------|
| ds_a1b2c3d4_e5f6_7a89_b123_d456e789 | 2025-07-24 19:19:46.955 | john.doe@company.com | CREATED | DATA_STREAM_VIEW | 1234abcd-5678-90ef-ab12-3456cdef7890 | example_db.sales.view1 | SalesModel | example_db.sales.view1 | 1.0 | [{...}] | eca05e...fe3092 | NULL | 2025-07-24 19:19:46.955 -0700 |
| ds_8e7f6d5c_4a3b_2c1d_0e9f_7b6a8d9f | 2025-07-24 19:24:53.265 | maria.garcia@company.com | CREATED | DATA_STREAM_TABLE | bcd123ef-4567-890a-bcde-abcdef678901 | example_db.hr.employees | HRModel | example_db.hr.employees | 1.0 | [{...}] | bbafd7...1ddbe1 | NULL | 2025-07-24 19:24:53.265 -0700 |
| ds_9a8b7c6d_5e4f_3d2a_1b0e_f7g6h5i3 | 2025-07-29 17:46:27.281 | mark.jones@company.com | DELETING | DATA_STREAM_VIEW | 7890abcd-1234-5678-90ef-bcde4567890f | example_db.finance.budget | FinanceModel | example_db.finance.budget| 1.0 | [{...}] | e58cbc...8cbd89 | NULL | 2025-07-29 17:46:27.281 -0700 |
+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+---------+----------+-----------------+----------------+-------------------------------+
  • Scan the FQ_OBJECT_NAME column first to confirm that the stream exists for the source object you care about.
  • Scan the STATUS column next to see the stream’s current high-level lifecycle state.
  • The RAI_DATABASE and RAI_RELATION columns are internal only and can be ignored.

Use the detail view when you want to inspect one stream more closely.

To get details about a specific data stream, pass the stream name and model name to the api.get_data_stream() procedure:

-- Get details about the data stream. Replace the placeholders with your
-- database, schema, and table or view name.
CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');
Output
+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS |
|-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------|
| ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } | 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED |
+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+
  • The STATUS column shows the stream’s high-level state.
  • The DATA_SYNC_STATUS column shows whether the stream is currently synced or waiting on more processing.
  • The PENDING_BATCHES_COUNT and NEXT_BATCH_STATUS columns help you see whether the stream still has work to process.
  • The ERRORS column helps you quickly tell whether the stream is in a normal or abnormal state.

Suspend a stream when you want to intentionally pause change processing for one source object without deleting the stream.

Follow these steps to suspend a data stream.

  1. Suspend the stream

    Call relationalai.api.suspend_data_stream() with the fully qualified name of the source object you want to suspend:

    -- Suspend a data stream. Replace the placeholders with your database,
    -- schema, and table or view name.
    CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>');
  2. Verify that the stream is suspended

    Run relationalai.api.get_data_stream() and confirm that STATUS is SUSPENDED:

    -- Get details about the data stream. Replace the placeholders with your
    -- database, schema, and table or view name.
    CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');

Resume a stream when you want a suspended stream to start processing source changes again.

  1. Resume the stream

    Call api.resume_data_stream() for the suspended source object:

    -- Resume a data stream. Replace the placeholders with your database,
    -- schema, and table or view name.
    CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>');
  2. Verify that the stream is active again

    Run relationalai.api.get_data_stream() and confirm that the STATUS is ACTIVE:

    -- Get details about the data stream. Replace the placeholders with your
    -- database, schema, and table or view name.
    CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');

Use the detail view to verify whether a stream is advancing normally.

Run api.get_data_stream(), then inspect the stream status fields:

-- Get details about the data stream. Replace the placeholders with your
-- database, schema, and table or view name.
CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');
Output
+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS |
|-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------|
| ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } | 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED |
+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+
  • The stream is active and fully synced when STATUS is ACTIVE, DATA_SYNC_STATUS is SYNCED, PENDING_BATCHES_COUNT is 0, and CDC_STATUS is STARTED.
  • The stream is active but still syncing when STATUS is ACTIVE and PENDING_BATCHES_COUNT is greater than 0.
  • If STATUS is SUSPENDED, the stream is paused and not syncing changes. See Suspend a data stream and Resume a suspended data stream for more info.
  • If the ERRORS column is not empty, the stream is in an abnormal state and may require troubleshooting. See Fix data stream issues for more info.