Skip to content

Manage Data Shared With the RAI Native App

RelationalAI (RAI) Python users build models on top of data stored in Snowflake tables and views. Before models can be queried, the data must be shared with the RAI Native App using a data stream. These data streams are maintained by the RAI Native App’s CDC Service.

Data streams use change data capture (CDC) to stream updates from Snowflake tables and views to the RAI Native App. The CDC service processes the change tracking data consumed by data streams to keep RAI models synchronized with their source data.

You can manage the CDC service using SQL, Python, or the RAI CLI.

To enable the CDC service, or to resume the service after suspending it, use the app.resume_cdc() procedure:

-- Enable the CDC service.
CALL relationalai.app.resume_cdc();
/*+--------------------------------------------------------------------+
| CDC functionality on the RelationalAI application has been resumed |
+--------------------------------------------------------------------+ */

To disable the CDC service, use the app.suspend_cdc() procedure:

-- Disable the CDC service.
CALL relationalai.app.suspend_cdc();
/*+-----------------------------------------------------------------------------------------------------------------+
| CDC functionality on the RelationalAI application has been suspended and its associated engine has been dropped |
+-----------------------------------------------------------------------------------------------------------------+ */

Disabling CDC suspends the CDC engine. Change tracking data is still consumed by data streams, but is not processed until the service is resumed. Data streams cannot be created while CDC is disabled.

To view the status of the CDC service, use the app.cdc_status() procedure:

-- Get the CDC service status.
CALL relationalai.app.cdc_status();
/*+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
| CDC_ENABLED | CDC_ENGINE_NAME | CDC_ENGINE_STATUS | CDC_ENGINE_SIZE | CDC_TASK_STATUS | CDC_TASK_INFO |
|--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
| TRUE | CDC_MANAGED_ENGINE | READY | HIGHMEM_X64_S | started | {"createdOn": "2024-10-15 21:58:11.291 -0700", "lastSuspendedOn": null, "lastSuspendedReason": null, "state": "started"} |
+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| */

Refer to the reference docs for more details on the output of the cdc_status() procedure.

When the RAI Native App is installed, the CDC engine is configured with the HIGHMEM_X64_S size. If you encounter performance issues with the CDC engine or need to process large data streams, you can change the CDC engine size to a larger instance family. See Engine Sizes for more information on available engine sizes.

To change the size of the CDC engine, use the app.alter_cdc_engine_size() procedure:

-- Change the size of the CDC engine to HIGHMEM_X64_M.
CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M');
/*+--------------------------------------+
| CDC engine size set to HIGHMEM_X64_M |
+--------------------------------------+ */

If a batch of data stream changes is currently being processed, it is completed using the previously configured engine. A new engine with the new size is created whenever the next batch of changes is processed, at which point the old engine is deleted.

Data streams track changes to tables and views in your Snowflake account, ensuring that queries from RAI Python models use the latest data. Changes are batched and processed by the CDC Service every minute.

The RAI Python API automatically creates data streams. Python users can populate an RAI Type from rows in a source table or view they have access to. When a query is executed for the first time, data streams are created for the source tables or views if they do not already exist.

Although data streams can be created manually using SQL or the RAI CLI, the recommended approach is to use the Python API, which handles stream creation automatically.

Data streams can be created on Snowflake tables and views, with some limitations:

  • The user creating a data stream must have SELECT privileges on the source table or view.
  • The source table or view must have change tracking enabled.
  • Data streams cannot be created on:
    • Temporary or transient tables
    • Dynamic tables
    • External tables and views

Refer to the Snowflake documentation for more details on stream requirements and limitations.

Data streams support the following Snowflake column types:

Tables or views with unsupported column types cannot be streamed into the RAI Native App.

Snowflake tables or views that contain VARIANT columns cannot be streamed directly into the RAI Native App. To use JSON data in your models, expand it into supported column types before creating a data stream. See Supported Column Types for the full list of supported types.

Two approaches are:

  1. Flattening into a single view by converting JSON fields into scalar columns.
  2. Normalizing into child tables when a JSON array needs to be represented as multiple rows.

The following example shows how to expand a JSON column in a web traffic dataset into a flat view. This method is useful when you want to keep a one-to-one mapping between rows in the raw table and rows in the view.

-- Raw table with a VARIANT column
CREATE OR REPLACE TABLE VIEWS_RAW (
view_id NUMBER,
session_id NUMBER,
user_id NUMBER,
occurred_at TIMESTAMP_NTZ,
VIEW_DATA VARIANT
);
-- Sample Data
INSERT INTO VIEWS_RAW
SELECT 1, 501, 10001, '2025-07-01 10:15:00'::TIMESTAMP_NTZ,
PARSE_JSON('{"url":"https://example.com/pricing","referrer":"https://google.com","device":"desktop","events":["scroll","click"]}')
UNION ALL
SELECT 2, 502, 10002, '2025-07-01 10:20:00'::TIMESTAMP_NTZ,
PARSE_JSON('{"url":"https://example.com/blog/post-1","referrer":null,"device":"mobile","events":["view"]}');
-- Flat view with supported column types
CREATE OR REPLACE VIEW VIEWS_CLEAN AS
SELECT
view_id,
session_id,
user_id,
occurred_at,
VIEW_DATA:url::STRING AS url,
VIEW_DATA:referrer::STRING AS referrer,
VIEW_DATA:device::STRING AS device,
ARRAY_TO_STRING(VIEW_DATA:events, ',') AS events
FROM VIEWS_RAW;
-- Enable change tracking so that the RAI Native App can create data streams
-- from this table. Note that the underlying table for the view must also
-- have change tracking enabled.
ALTER TABLE VIEWS_RAW SET CHANGE_TRACKING = TRUE;
ALTER VIEW VIEWS_CLEAN SET CHANGE_TRACKING = TRUE;
-- Query the view.
SELECT * FROM VIEWS_CLEAN;
/* +--------+-----------+--------+---------------------+------------------------------------+--------------------+---------+---------------+
| VIEW_ID| SESSION_ID| USER_ID| OCCURRED_AT | URL | REFERRER | DEVICE | EVENTS |
|--------+-----------+--------+---------------------+------------------------------------+--------------------+---------+---------------|
| 1 | 501 | 10001 | 2025-07-01 10:15:00 | https://example.com/pricing | https://google.com | desktop | scroll,click |
| 2 | 502 | 10002 | 2025-07-01 10:20:00 | https://example.com/blog/post-1 | NULL | mobile | view |
+--------+-----------+--------+---------------------+------------------------------------+--------------------+---------+---------------+ */

This approach works well when arrays can be safely collapsed into comma-separated strings.

If your JSON data contains arrays that need to be represented as multiple rows, you can create a normalized child table with the expanded JSON data:

-- Raw table with a VARIANT column
CREATE OR REPLACE TABLE VIEWS_RAW (
view_id NUMBER,
session_id NUMBER,
user_id NUMBER,
occurred_at TIMESTAMP_NTZ,
VIEW_DATA VARIANT
);
-- Sample data
INSERT INTO VIEWS_RAW
SELECT 1, 501, 10001, '2025-07-01 10:15:00'::TIMESTAMP_NTZ,
PARSE_JSON('{"url":"https://example.com/pricing","referrer":"https://google.com","device":"desktop","events":["scroll","click"]}')
UNION ALL
SELECT 2, 502, 10002, '2025-07-01 10:20:00'::TIMESTAMP_NTZ,
PARSE_JSON('{"url":"https://example.com/blog/post-1","referrer":null,"device":"mobile","events":["view"]}');
-- Child table with one row per event from the parent VIEWS_RAW table.
CREATE OR REPLACE TABLE VIEW_EVENTS AS
SELECT
v.view_id,
v.session_id,
v.user_id,
v.occurred_at,
e.value::STRING AS event
FROM VIEWS_RAW AS v,
LATERAL FLATTEN(INPUT => v.VIEW_DATA:events) AS e;
-- Enable change tracking so that the RAI Native App can create data streams
-- from this table.
ALTER TABLE VIEW_EVENTS SET CHANGE_TRACKING = TRUE;
-- Query the child table.
SELECT * FROM VIEW_EVENTS;
/*
+--------+-----------+--------+---------------------+---------+
| VIEW_ID| SESSION_ID| USER_ID| OCCURRED_AT | EVENT |
|--------+-----------+--------+---------------------+---------|
| 1 | 501 | 10001 | 2025-07-01 10:15:00 | scroll |
| 1 | 501 | 10001 | 2025-07-01 10:15:00 | click |
| 2 | 502 | 10002 | 2025-07-01 10:20:00 | view |
+--------+-----------+--------+---------------------+---------+
*/

VIEW_EVENTS uses supported column types and can be streamed into the RAI Native App.

Data streams use Snowflake’s native change tracking feature to capture changes to tables and views. You must enable change tracking on a source table or view before you can create a data stream.

Here’s how it works:

  • Change tracking captures information about all Data Manipulation Language (DML) statements committed to a table or view, including INSERT, UPDATE, and DELETE operations.
  • Data streams read these changes and temporarily store them in a internal stage inside the RAI Native App’s Snowflake database. This data is staged even if the CDC Service is disabled.
  • When the CDC Service is enabled, the staged data is processed in batches once every minute and made available to RAI models.

When a stream is suspended, it stops consuming change tracking data from its source table or view, and that data is no longer stored in the app’s Snowflake database. Suspending a stream can help reduce storage costs, but Snowflake retains change tracking data only for a limited time, after which it becomes inaccessible.

If a stream remains suspended beyond its data retention period, it becomes stale. A stale stream may fail to catch up with recent changes when resumed, leading to data inconsistencies.

If you encounter data inconsistencies due to staleness, you can delete the stream and recreate it to start fresh.

A quarantined stream means the CDC Service stopped syncing data from your source because too many errors occurred.

When a stream is quarantined:

  • The stream is automatically suspended to prevent further issues.
  • Change tracking data is still collected from your source.
  • The CDC engine does not process new data until you fix the errors and resume the stream.
  • Common causes include repeated failures in batch processing or scheduled tasks.

The following resources can help you address a quarantined stream:

The RAI Native App uses Snowflake’s security features to manage access to data streams and the underlying source tables and views.

RAI supports Snowflake’s Role-Based Access Control (RBAC) model, which manages access to data and resources through roles and privileges. In Snowflake, privileges like SELECT or USAGE are granted to roles, and roles are assigned to users. This allows organizations to control who can access specific tables, views, and other objects, and what actions they can perform.

RAI enforces Snowflake RBAC as follows:

  • All app operations use the privileges of the active roles in your Snowflake session, as set in your configuration.
  • When you create or use a data stream, Snowflake checks that your role has the required privileges on the object and its parent database and schema.
  • If privileges are missing, access is denied.

Users with the Enterprise Edition of Snowflake (including Business Critical and Virtual Private Snowflake) can use row access policies for controlling which rows are visible and masking policies for hiding sensitive column data.

When the RAI Native App executes SQL commands, all row and masking policies are evaluated against a role named RELATIONALAInot the role defined in the user’s configuration .

This means that:

  • RAI currently does not support user-based policies at the row and column level.
  • You can create policies that apply to the RELATIONALAI role, but these policies impact all users of the app.

Consider the following masking policy that masks sensitive employee passwords when the user does not have the FULL_ACCESS role:

CREATE MASKING POLICY employee_pwd_mask AS (val string) RETURNS string ->
CASE
WHEN CURRENT_ROLE() IN ('FULL_ACCESS') THEN val
ELSE '******'
END;

The app executes all queries using the RELATIONALAI role, so this masking policy will apply to all users of the app. All users will see the masked value, even if the user has the FULL_ACCESS role.

Now suppose you add RELATIONALAI to the list of roles that can see the original value:

CREATE MASKING POLICY employee_pwd_mask AS (val string) RETURNS string ->
CASE
WHEN CURRENT_ROLE() IN ('FULL_ACCESS', 'RELATIONALAI') THEN val
ELSE '******'
END;

In this case, the app will see the original value of the password and so will any app user with SELECT privileges on the relevant table, even if they do not have the FULL_ACCESS role.

For data access policies to work effectively with the RAI Native App, consider the following best practices:

  • Start with least privilege: Deny access by default, then allow only what the app needs.
  • Write policies that check the app’s role, not individual users.
  • If you use mapping tables for access control, map to the app’s role—not individual users.

You can manage data streams using SQL, Python, or the RAI CLI.

Requires ownership privileges on the table or view.

To enable change tracking on a table or view, use the ALTER TABLE statement:

-- Enable change tracking on a table.
ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE;
-- Enable change tracking on a view.
ALTER VIEW MyView SET CHANGE_TRACKING = TRUE;
-- Verify that change tracking is enabled. If change tracking is enabled, the
-- CHANGE_TRACKING column will be set to ON.
SHOW TABLES LIKE 'MyTable';

See Change Tracking for details on what operations are tracked.

To create a data stream, use the api.create_data_stream() procedure:

-- Replace the placeholders with your database, schema, and table/view names.
SET obj_name = '<db>.<schema>.<table_or_view>';
SET obj_type = 'TABLE'; -- Set to 'VIEW' if needed.
SET obj_ref = relationalai.api.object_reference($obj_type, $obj_name);
-- Enable change tracking on the table or view
ALTER TABLE IDENTIFIER($obj_name) SET CHANGE_TRACKING = TRUE;
-- Create a data stream named 'my_stream' for the object reference
CALL relationalai.api.create_data_stream($obj_ref, 'my_stream', TRUE);
/*+----------------------------------+
| Datastream created successfully. |
+----------------------------------+ */

Note that change tracking must be enabled on the source table or view before creating a data stream. Not all column types are supported by data streams. See Supported Column Types for details.

To suspend a data stream, use the api.suspend_data_stream() procedure:

-- Suspend a data stream. Replace the placeholders with your database, schema,
-- and table or view name.
CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>');
/*+-----------------------+
| Data stream suspended |
+-----------------------+ */

While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.

To resume a suspended data stream, use the api.resume_data_stream() procedure:

-- Resume a data stream. Replace the placeholders with your database, schema,
-- and table or view name.
CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>');
/*+---------------------+
| Data stream resumed |
+---------------------+ */

To delete a data stream, use the api.delete_data_stream() procedure:

-- Delete a data stream. Replace the placeholders with your database, schema, and table or view name.
CALL relationalai.api.delete_data_stream('<db>.<schema>.<table_or_view>');
/*+-----------------------------------+
| Data stream deleted successfully. |
+-----------------------------------+ */

To view a list all data streams, query the api.data_streams view:

SELECT * FROM relationalai.api.data_streams;
/*+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+------------------------- -+--------------+--------------------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION |
|-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------|
| ds_a1b2c3d4_e5f6_7a89_b123_d456e789 | 2024-10-23 12:23:45.250 | john.doe@company.com | CREATED | DATA_STREAM_VIEW | 1234abcd-5678-90ef-ab12-3456cdef7890 | example_db.sales.view1 | SalesModel | example_db.sales.view1 |
| ds_8e7f6d5c_4a3b_2c1d_0e9f_7b6a8d9f | 2024-10-22 15:37:29.580 | maria.garcia@company.com | CREATED | DATA_STREAM_TABLE | bcd123ef-4567-890a-bcde-abcdef678901 | example_db.hr.employees | HRModel | example_db.hr.employees |
| ds_9a8b7c6d_5e4f_3d2a_1b0e_f7g6h5i3 | 2024-10-21 17:44:10.300 | mark.jones@company.com | DELETING | DATA_STREAM_VIEW | 7890abcd-1234-5678-90ef-bcde4567890f | example_db.finance.budget | FinanceModel | example_db.finance.budget|
+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+ */

To get details about a specific data stream, pass the stream name and model name to the api.get_data_stream() procedure:

-- Get details about the data stream. Replace the placeholders with your
-- database, schema, and table or view name.
CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');
/*+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS |
|-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------|
| ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED |
+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+ */

To view quarantined streams and their errors, use the api.data_stream_batches view:

SELECT
data_stream_id,
fq_object_name,
status,
error.VALUE::string AS processing_error
FROM
relationalai.api.data_stream_batches,
LATERAL FLATTEN(input => processing_details:processingErrors) AS error
WHERE
status = 'QUARANTINED';

Once the errors have been corrected, resume the data stream to continue processing. Note that quarantined streams continue to consume change tracking data from the source data, but these changes will not be processed by the CDC engine until the stream is resumed.

If your data stream is quarantined, the system will attempt to recover it automatically once, 15 minutes after quarantine begins. You do not need to wait for this automatic recovery. You can troubleshoot and fix the issue at any time. If automatic recovery fails, manual intervention is required.

Follow these steps to diagnose and fix a quarantined stream.

  1. Check Stream Status and Find the Error.

    First, get your stream status and ID:

    SELECT * FROM relationalai.get_data_stream('<MY_DB.MY_SCHEMA.MY_SOURCE_OBJECT>');

    Look at the errors column to see why your stream is quarantined. The error message will help you choose the right troubleshooting path.

  2. Diagnose the Problem.

    How you diagnose the issue depends on the error message.

    Quarantine Message ExampleRecommended Diagnostic Path
    Task <task_name> failed too many times and has been quarantined due to: <error_details>Task Failure Diagnosis
    Data stream has been quarantined after batch processing failed 3 times. <error_details>Batch Processing Diagnosis
    Other error messagesGeneral Diagnosis

    Refer to the table above to select the correct troubleshooting steps below.

    Requires the cdc_admin application role.

    1. Check batch details:
      SELECT * FROM api.data_stream_batches WHERE fq_object_name ILIKE '<MY_DB.MY_SCHEMA.MY_SOURCE_OBJECT>';
    2. Find the batch with status = 'QUARANTINED'.
    3. Review the processing_details for failed transactions or errors.
    4. Check the error log:
      SELECT * FROM api.data_stream_errors WHERE id = 'rai.cdc.<MY_DATA_STREAM_ID>';
  3. Fix and Resume

    If you find the root cause in Step 2, fix it. The system will attempt to recover the stream automatically the next time a program using the stream is run.

    If you can’t determine the cause or the stream remains quarantined after fixing, then you can delete the stream manually. This forces the stream to be re-created the next time a program using the stream is run.

    If you continue to encounter issues, please contact support.