Skip to content

Manage Data Shared With the RAI Native App

RelationalAI (RAI) Python users build models on top of data stored in Snowflake tables and views. Before models can be queried, the data must be shared with the RAI Native App using a data stream. These data streams are maintained by the RAI Native App’s CDC Service.

Data streams use change data capture (CDC) to stream updates from Snowflake tables and views to the RAI Native App. The CDC service processes the change tracking data consumed by data streams to keep RAI models synchronized with their source data.

You can manage the CDC service using SQL, Python, or the RAI CLI.

To enable the CDC service, or to resume the service after suspending it, use the app.resume_cdc() procedure:

-- Enable the CDC service.
CALL relationalai.app.resume_cdc();
/*+--------------------------------------------------------------------+
| CDC functionality on the RelationalAI application has been resumed |
+--------------------------------------------------------------------+ */

To disable the CDC service, use the app.suspend_cdc() procedure:

-- Disable the CDC service.
CALL relationalai.app.suspend_cdc();
/*+-----------------------------------------------------------------------------------------------------------------+
| CDC functionality on the RelationalAI application has been suspended and its associated engine has been dropped |
+-----------------------------------------------------------------------------------------------------------------+ */

Disabling CDC suspends the CDC engine. Change tracking data is still consumed by data streams, but is not processed until the service is resumed. Data streams cannot be created while CDC is disabled.

To view the status of the CDC service, use the app.cdc_status() procedure:

-- Get the CDC service status.
CALL relationalai.app.cdc_status();
/*+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
| CDC_ENABLED | CDC_ENGINE_NAME | CDC_ENGINE_STATUS | CDC_ENGINE_SIZE | CDC_TASK_STATUS | CDC_TASK_INFO |
|--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
| TRUE | CDC_MANAGED_ENGINE | READY | HIGHMEM_X64_S | started | {"createdOn": "2024-10-15 21:58:11.291 -0700", "lastSuspendedOn": null, "lastSuspendedReason": null, "state": "started"} |
+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| */

Refer to the reference docs for more details on the output of the cdc_status() procedure.

When the RAI Native App is installed, the CDC engine is configured with the HIGHMEM_X64_S size. If you encounter performance issues with the CDC engine or need to process large data streams, you can change the CDC engine size to a larger instance family. See Engine Sizes for more information on available engine sizes.

To change the size of the CDC engine, use the app.alter_cdc_engine_size() procedure:

-- Change the size of the CDC engine to HIGHMEM_X64_M.
CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M');
/*+--------------------------------------+
| CDC engine size set to HIGHMEM_X64_M |
+--------------------------------------+ */

If a batch of data stream changes is currently being processed, it is completed using the previously configured engine. A new engine with the new size is created whenever the next batch of changes is processed, at which point the old engine is deleted.

Data streams track changes to tables and views in your Snowflake account, ensuring that queries from RAI Python models use the latest data. Changes are batched and processed by the CDC Service every minute.

The RAI Python API automatically creates data streams. Python users can populate an RAI Type from rows in a source table or view they have access to. When a query is executed for the first time, data streams are created for the source tables or views if they do not already exist.

Although data streams can be created manually using SQL or the RAI CLI, the recommended approach is to use the Python API, which handles stream creation automatically.

Data streams can be created on Snowflake tables and views, with some limitations:

  • The user creating a data stream must have SELECT privileges on the source table or view.
  • The source table or view must have change tracking enabled.
  • Data streams cannot be created on temporary or transient tables.
  • Data streams are not supported on external tables or views.

Refer to the Snowflake documentation for more details on stream requirements and limitations.

Data streams support the following Snowflake column types:

Tables or views with unsupported column types cannot be streamed into the RAI Native App.

Data streams use Snowflake’s native change tracking feature to capture changes to tables and views. You must enable change tracking on a source table or view before you can create a data stream.

Here’s how it works:

  • Change tracking captures information about all Data Manipulation Language (DML) statements committed to a table or view, including INSERT, UPDATE, and DELETE operations.
  • Data streams read these changes and temporarily store them in a internal stage inside the RAI Native App’s Snowflake database. This data is staged even if the CDC Service is disabled.
  • When the CDC Service is enabled, the staged data is processed in batches once every minute and made available to RAI models.

When a stream is suspended, it stops consuming change tracking data from its source table or view, and that data is no longer stored in the app’s Snowflake database. Suspending a stream can help reduce storage costs, but Snowflake retains change tracking data only for a limited time, after which it becomes inaccessible.

If a stream remains suspended beyond its data retention period, it becomes stale. A stale stream may fail to catch up with recent changes when resumed, leading to data inconsistencies.

If you encounter data inconsistencies due to staleness, you can delete the stream and recreate it to start fresh.

If the CDC Service encounters too many failures while attempting to process a data stream, the stream is given a QUARANTINED status. Change tracking data for quarantined streams is consumed, but not processed. Resolve the errors causing the stream to be quarantined, then resume the data stream to process the pending changes. See View Quarantined Streams for details on viewing errors for quarantined streams.

The RAI Native App uses Snowflake’s security features to manage access to data streams and the underlying source tables and views.

RAI supports Snowflake’s Role-Based Access Control (RBAC) model, which manages access to data and resources through roles and privileges. In Snowflake, privileges like SELECT or USAGE are granted to roles, and roles are assigned to users. This allows organizations to control who can access specific tables, views, and other objects, and what actions they can perform.

RAI enforces Snowflake RBAC as follows:

  • All app operations use the privileges of the active roles in your Snowflake session, as set in your configuration.
  • When you create or use a data stream, Snowflake checks that your role has the required privileges on the object and its parent database and schema.
  • If privileges are missing, access is denied.

Users with the Enterprise Edition of Snowflake (including Business Critical and Virtual Private Snowflake) can use row access policies for controlling which rows are visible and masking policies for hiding sensitive column data.

When the RAI Native App executes SQL commands, all row and masking policies are evaluated against a role named RELATIONALAInot the role defined in the user’s configuration .

This means that:

  • RAI currently does not support user-based policies at the row and column level.
  • You can create policies that apply to the RELATIONALAI role, but these policies impact all users of the app.

Consider the following masking policy that masks sensitive employee passwords when the user does not have the FULL_ACCESS role:

CREATE MASKING POLICY employee_pwd_mask AS (val string) RETURNS string ->
CASE
WHEN CURRENT_ROLE() IN ('FULL_ACCESS') THEN val
ELSE '******'
END;

The app executes all queries using the RELATIONALAI role, so this masking policy will apply to all users of the app. All users will see the masked value, even if the user has the FULL_ACCESS role.

Now suppose you add RELATIONALAI to the list of roles that can see the original value:

CREATE MASKING POLICY employee_pwd_mask AS (val string) RETURNS string ->
CASE
WHEN CURRENT_ROLE() IN ('FULL_ACCESS', 'RELATIONALAI') THEN val
ELSE '******'
END;

In this case, the app will see the original value of the password and so will any app user with SELECT privileges on the relevant table, even if they do not have the FULL_ACCESS role.

For data access policies to work effectively with the RAI Native App, consider the following best practices:

  • Start with least privilege: Deny access by default, then allow only what the app needs.
  • Write policies that check the app’s role, not individual users.
  • If you use mapping tables for access control, map to the app’s role—not individual users.

You can manage data streams using SQL, Python, or the RAI CLI.

Requires ownership privileges on the table or view.

To enable change tracking on a table or view, use the ALTER TABLE statement:

-- Enable change tracking on a table.
ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE;
-- Enable change tracking on a view.
ALTER VIEW MyView SET CHANGE_TRACKING = TRUE;
-- Verify that change tracking is enabled. If change tracking is enabled, the
-- CHANGE_TRACKING column will be set to ON.
SHOW TABLES LIKE 'MyTable';

See Change Tracking for details on what operations are tracked.

To create a data stream, use the api.create_data_stream() procedure:

-- Replace the placeholders with your database, schema, and table/view names.
SET obj_name = '<db>.<schema>.<table_or_view>';
SET obj_type = 'TABLE'; -- Set to 'VIEW' if needed.
SET obj_ref = relationalai.api.object_reference($obj_type, $obj_name);
-- Enable change tracking on the table or view
ALTER TABLE IDENTIFIER($obj_name) SET CHANGE_TRACKING = TRUE;
-- Create a data stream named 'my_stream' for the object reference
CALL relationalai.api.create_data_stream($obj_ref, 'my_stream', TRUE);
/*+----------------------------------+
| Datastream created successfully. |
+----------------------------------+ */

Note that change tracking must be enabled on the source table or view before creating a data stream. Not all column types are supported by data streams. See Supported Column Types for details.

To suspend a data stream, use the api.suspend_data_stream() procedure:

-- Suspend a data stream. Replace the placeholders with your database, schema,
-- and table or view name.
CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>');
/*+-----------------------+
| Data stream suspended |
+-----------------------+ */

While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.

To resume a suspended data stream, use the api.resume_data_stream() procedure:

-- Resume a data stream. Replace the placeholders with your database, schema,
-- and table or view name.
CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>');
/*+---------------------+
| Data stream resumed |
+---------------------+ */

To delete a data stream, use the api.delete_data_stream() procedure:

-- Delete a data stream. Replace the placeholders with your database, schema, and table or view name.
CALL relationalai.api.delete_data_stream('<db>.<schema>.<table_or_view>');
/*+-----------------------------------+
| Data stream deleted successfully. |
+-----------------------------------+ */

To view a list all data streams, query the api.data_streams view:

SELECT * FROM relationalai.api.data_streams;
/*+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+------------------------- -+--------------+--------------------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION |
|-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------|
| ds_a1b2c3d4_e5f6_7a89_b123_d456e789 | 2024-10-23 12:23:45.250 | john.doe@company.com | CREATED | DATA_STREAM_VIEW | 1234abcd-5678-90ef-ab12-3456cdef7890 | example_db.sales.view1 | SalesModel | example_db.sales.view1 |
| ds_8e7f6d5c_4a3b_2c1d_0e9f_7b6a8d9f | 2024-10-22 15:37:29.580 | maria.garcia@company.com | CREATED | DATA_STREAM_TABLE | bcd123ef-4567-890a-bcde-abcdef678901 | example_db.hr.employees | HRModel | example_db.hr.employees |
| ds_9a8b7c6d_5e4f_3d2a_1b0e_f7g6h5i3 | 2024-10-21 17:44:10.300 | mark.jones@company.com | DELETING | DATA_STREAM_VIEW | 7890abcd-1234-5678-90ef-bcde4567890f | example_db.finance.budget | FinanceModel | example_db.finance.budget|
+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+ */

To get details about a specific data stream, pass the stream name and model name to the api.get_data_stream() procedure:

-- Get details about the data stream. Replace the placeholders with your
-- database, schema, and table or view name.
CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');
/*+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS |
|-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------|
| ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED |
+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+ */

To view quarantined streams and their errors, use the api.data_stream_batches view:

SELECT
data_stream_id,
fq_object_name,
status,
error.VALUE::string AS processing_error
FROM
relationalai.api.data_stream_batches,
LATERAL FLATTEN(input => processing_details:processingErrors) AS error
WHERE
status = 'QUARANTINED';

Once the errors have been corrected, resume the data stream to continue processing. Note that quarantined streams continue to consume change tracking data from the source data, but these changes will not be processed by the CDC engine until the stream is resumed.