Manage Data Shared With the RAI Native App
RelationalAI (RAI) Python users build models on top of data stored in Snowflake tables and views. Before models can be queried, the data must be shared with the RAI Native App using a data stream. These data streams are maintained by the RAI Native App’s CDC Service.
The CDC Service
Section titled “The CDC Service”Data streams use change data capture (CDC) to stream updates from Snowflake tables and views to the RAI Native App. The CDC service processes the change tracking data consumed by data streams to keep RAI models synchronized with their source data.
You can manage the CDC service using SQL, Python, or the RAI CLI.
Enable CDC
Section titled “Enable CDC”Requires the cdc_admin
application role.
To enable the CDC service, or to resume the service after suspending it, use the app.resume_cdc()
procedure:
-- Enable the CDC service.CALL relationalai.app.resume_cdc();/*+--------------------------------------------------------------------+ | CDC functionality on the RelationalAI application has been resumed | +--------------------------------------------------------------------+ */
To enable the CDC service, or to resume the service after suspending it, create a Provider
instance and use its .sql()
method to execute the app.resume_cdc()
SQL procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Enable the CDC service.app.sql("CALL relationalai.app.resume_cdc()")
To enable the CDC service, or to resume the service after suspending it, use the imports:setup
command’s --resume
flag:
# Enable the CDC service.rai imports:setup --resume
Disable CDC
Section titled “Disable CDC”Requires the cdc_admin
application role.
To disable the CDC service, use the app.suspend_cdc()
procedure:
-- Disable the CDC service.CALL relationalai.app.suspend_cdc();/*+-----------------------------------------------------------------------------------------------------------------+ | CDC functionality on the RelationalAI application has been suspended and its associated engine has been dropped | +-----------------------------------------------------------------------------------------------------------------+ */
To disable the CDC service, create a Provider
instance and use its .sql()
method to execute the app.suspend_cdc()
procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Disable the CDC service.app.sql("CALL relationalai.app.suspend_cdc()")
To disable the CDC service, use the imports:setup
command’s --suspend
flag:
# Disable the CDC service.rai imports:setup --suspend
Disabling CDC suspends the CDC engine. Change tracking data is still consumed by data streams, but is not processed until the service is resumed. Data streams cannot be created while CDC is disabled.
View CDC Service Status
Section titled “View CDC Service Status”Requires the app_user
application role.
To view the status of the CDC service, use the app.cdc_status()
procedure:
-- Get the CDC service status.CALL relationalai.app.cdc_status();/*+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| | CDC_ENABLED | CDC_ENGINE_NAME | CDC_ENGINE_STATUS | CDC_ENGINE_SIZE | CDC_TASK_STATUS | CDC_TASK_INFO | |--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| | TRUE | CDC_MANAGED_ENGINE | READY | HIGHMEM_X64_S | started | {"createdOn": "2024-10-15 21:58:11.291 -0700", "lastSuspendedOn": null, "lastSuspendedReason": null, "state": "started"} | +--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| */
Refer to the reference docs for more details on the output of the cdc_status()
procedure.
To view the status of the CDC service, create a Provider
instance and use its .sql()
method to execute the app.cdc_status()
procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Get the CDC service status.cdc_status = app.sql("CALL relationalai.app.cdc_status()")print(cdc_status)# [Row(CDC_ENABLED=True, CDC_ENGINE_NAME='CDC_MANAGED_ENGINE', CDC_ENGINE_STATUS='READY', CDC_ENGINE_SIZE='M', CDC_TASK_STATUS='started', CDC_TASK_INFO='{\n "createdOn": "2024-10-24 22:53:15.200 -0700",\n "lastSuspendedOn": null,\n "lastSuspendedReason": null,\n "state": "started"\n}')]
To view the status of the CDC service, use the imports:setup
command:
$ rai imports:setup
---------------------------------------------------
▰▰▰▰ Imports setup fetched
To suspend imports, use 'rai imports:setup --suspend'
Field Value ─────────────────────────────────────────── engine CDC_MANAGED_ENGINE engine_size M engine_status READY status STARTED enabled True createdOn 2024-10-24 22:53:15 lastSuspendedOn N/A lastSuspendedReason N/A
---------------------------------------------------
Configure CDC Engine Size
Section titled “Configure CDC Engine Size”Requires the cdc_admin
application role.
When the RAI Native App is installed, the CDC engine is configured with the HIGHMEM_X64_S
size.
If you encounter performance issues with the CDC engine or need to process large data streams, you can change the CDC engine size to a larger instance family.
See Engine Sizes for more information on available engine sizes.
To change the size of the CDC engine, use the app.alter_cdc_engine_size()
procedure:
-- Change the size of the CDC engine to HIGHMEM_X64_M.CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M');/*+--------------------------------------+ | CDC engine size set to HIGHMEM_X64_M | +--------------------------------------+ */
To change the size of the CDC engine, create a Provider
instance and use its .sql()
method to execute the app.alter_cdc_engine_size()
procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Change the size of the CDC engine to HIGHMEM_X64_M.app.sql("CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M')")
To change the size of the CDC engine, pass the new engine size to the imports:setup
command’s --engine_size
flag:
# Change the size of the CDC engine to HIGHMEM_X64_M.rai imports:setup --engine_size HIGHMEM_X64_M
If a batch of data stream changes is currently being processed, it is completed using the previously configured engine. A new engine with the new size is created whenever the next batch of changes is processed, at which point the old engine is deleted.
Data Streams
Section titled “Data Streams”Data streams track changes to tables and views in your Snowflake account, ensuring that queries from RAI Python models use the latest data. Changes are batched and processed by the CDC Service every minute.
The RAI Python API automatically creates data streams. Python users can populate an RAI Type
from rows in a source table or view they have access to. When a query is executed for the first time, data streams are created for the source tables or views if they do not already exist.
Although data streams can be created manually using SQL or the RAI CLI, the recommended approach is to use the Python API, which handles stream creation automatically.
Supported Source Types
Section titled “Supported Source Types”Data streams can be created on Snowflake tables and views, with some limitations:
- The user creating a data stream must have
SELECT
privileges on the source table or view. - The source table or view must have change tracking enabled.
- Data streams cannot be created on temporary or transient tables.
- Data streams are not supported on external tables or views.
Refer to the Snowflake documentation for more details on stream requirements and limitations.
Supported Column Types
Section titled “Supported Column Types”Data streams support the following Snowflake column types:
Tables or views with unsupported column types cannot be streamed into the RAI Native App.
Change Tracking
Section titled “Change Tracking”Data streams use Snowflake’s native change tracking feature to capture changes to tables and views. You must enable change tracking on a source table or view before you can create a data stream.
Here’s how it works:
- Change tracking captures information about all Data Manipulation Language (DML) statements committed to a table or view, including
INSERT
,UPDATE
, andDELETE
operations. - Data streams read these changes and temporarily store them in a internal stage inside the RAI Native App’s Snowflake database. This data is staged even if the CDC Service is disabled.
- When the CDC Service is enabled, the staged data is processed in batches once every minute and made available to RAI models.
Staleness
Section titled “Staleness”When a stream is suspended, it stops consuming change tracking data from its source table or view, and that data is no longer stored in the app’s Snowflake database. Suspending a stream can help reduce storage costs, but Snowflake retains change tracking data only for a limited time, after which it becomes inaccessible.
If a stream remains suspended beyond its data retention period, it becomes stale. A stale stream may fail to catch up with recent changes when resumed, leading to data inconsistencies.
If you encounter data inconsistencies due to staleness, you can delete the stream and recreate it to start fresh.
Quarantined Streams
Section titled “Quarantined Streams”If the CDC Service encounters too many failures while attempting to process a data stream, the stream is given a QUARANTINED
status.
Change tracking data for quarantined streams is consumed, but not processed.
Resolve the errors causing the stream to be quarantined, then resume the data stream to process the pending changes.
See View Quarantined Streams for details on viewing errors for quarantined streams.
Security and Access Control
Section titled “Security and Access Control”The RAI Native App uses Snowflake’s security features to manage access to data streams and the underlying source tables and views.
Role-Based Access Control (RBAC)
Section titled “Role-Based Access Control (RBAC)”RAI supports Snowflake’s Role-Based Access Control (RBAC) model, which manages access to data and resources through roles and privileges.
In Snowflake, privileges like SELECT
or USAGE
are granted to roles, and roles are assigned to users.
This allows organizations to control who can access specific tables, views, and other objects, and what actions they can perform.
RAI enforces Snowflake RBAC as follows:
- All app operations use the privileges of the active roles in your Snowflake session, as set in your configuration.
- When you create or use a data stream, Snowflake checks that your role has the required privileges on the object and its parent database and schema.
- If privileges are missing, access is denied.
Fine-Grained Access Control
Section titled “Fine-Grained Access Control”Users with the Enterprise Edition of Snowflake (including Business Critical and Virtual Private Snowflake) can use row access policies for controlling which rows are visible and masking policies for hiding sensitive column data.
When the RAI Native App executes SQL commands, all row and masking policies are evaluated against a role named RELATIONALAI
—not the role defined in the user’s configuration .
This means that:
- RAI currently does not support user-based policies at the row and column level.
- You can create policies that apply to the
RELATIONALAI
role, but these policies impact all users of the app.
Practical Example
Section titled “Practical Example”Consider the following masking policy that masks sensitive employee passwords when the user does not have the FULL_ACCESS
role:
CREATE MASKING POLICY employee_pwd_mask AS (val string) RETURNS string -> CASE WHEN CURRENT_ROLE() IN ('FULL_ACCESS') THEN val ELSE '******' END;
The app executes all queries using the RELATIONALAI
role, so this masking policy will apply to all users of the app.
All users will see the masked value, even if the user has the FULL_ACCESS
role.
Now suppose you add RELATIONALAI
to the list of roles that can see the original value:
CREATE MASKING POLICY employee_pwd_mask AS (val string) RETURNS string -> CASE WHEN CURRENT_ROLE() IN ('FULL_ACCESS', 'RELATIONALAI') THEN val ELSE '******' END;
In this case, the app will see the original value of the password and so will any app user with SELECT
privileges on the relevant table, even if they do not have the FULL_ACCESS
role.
Best Practices
Section titled “Best Practices”For data access policies to work effectively with the RAI Native App, consider the following best practices:
- Start with least privilege: Deny access by default, then allow only what the app needs.
- Write policies that check the app’s role, not individual users.
- If you use mapping tables for access control, map to the app’s role—not individual users.
Data Stream Management
Section titled “Data Stream Management”You can manage data streams using SQL, Python, or the RAI CLI.
Enable Change Tracking on a Table or View
Section titled “Enable Change Tracking on a Table or View”Requires ownership privileges on the table or view.
To enable change tracking on a table or view, use the ALTER TABLE
statement:
-- Enable change tracking on a table.ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE;
-- Enable change tracking on a view.ALTER VIEW MyView SET CHANGE_TRACKING = TRUE;
-- Verify that change tracking is enabled. If change tracking is enabled, the-- CHANGE_TRACKING column will be set to ON.SHOW TABLES LIKE 'MyTable';
To enable change tracking on a table or view, create a Provider
instance and use its .sql()
method to execute the ALTER TABLE
statement:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Enable change tracking on a table.app.sql("ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE")
# Enable change tracking on a view.app.sql("ALTER VIEW MyView SET CHANGE_TRACKING = TRUE")
# Verify that change tracking is enabled. If change tracking is enabled, the# CHANGE_TRACKING column will be set to ON.print(app.sql("SHOW TABLES LIKE 'MyTable'"))
See Change Tracking for details on what operations are tracked.
Create a Data Stream
Section titled “Create a Data Stream”Requires the cdc_admin
application role.
To create a data stream, use the api.create_data_stream()
procedure:
-- Replace the placeholders with your database, schema, and table/view names.SET obj_name = '<db>.<schema>.<table_or_view>';SET obj_type = 'TABLE'; -- Set to 'VIEW' if needed.SET obj_ref = relationalai.api.object_reference($obj_type, $obj_name);
-- Enable change tracking on the table or viewALTER TABLE IDENTIFIER($obj_name) SET CHANGE_TRACKING = TRUE;
-- Create a data stream named 'my_stream' for the object referenceCALL relationalai.api.create_data_stream($obj_ref, 'my_stream', TRUE);/*+----------------------------------+ | Datastream created successfully. | +----------------------------------+ */
To create a data stream, create a Provider
instance and use its .create_streams()
method:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
table_or_view = "<db>.<schema>.<table_or_view>"
# Enable change tracking on the table or view.app.sql(f"ALTER TABLE IDENTIFIER('{table_or_view}') SET CHANGE_TRACKING = TRUE;")
# Create a data stream named 'my_stream' for the table or view.app.create_streams([table_or_view], 'MyModel')
To create a data stream, pass the fully-qualified table or view name and the model name to the imports:stream
command’s --source
and --model
flags:
rai imports:stream --source <db>.<schema>.<table_or_view> --model MyModel
Note that change tracking must be enabled on the source table or view before creating a data stream. Not all column types are supported by data streams. See Supported Column Types for details.
Suspend or Resume a Data Stream
Section titled “Suspend or Resume a Data Stream”Requires the cdc_admin
application role.
To suspend a data stream, use the api.suspend_data_stream()
procedure:
-- Suspend a data stream. Replace the placeholders with your database, schema,-- and table or view name.CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>');/*+-----------------------+ | Data stream suspended | +-----------------------+ */
While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.
To resume a suspended data stream, use the api.resume_data_stream()
procedure:
-- Resume a data stream. Replace the placeholders with your database, schema,-- and table or view name.CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>');/*+---------------------+ | Data stream resumed | +---------------------+ */
To suspend or resume a data stream, create a Provider
instance and use its .sql()
method to execute the api.suspend_data_stream()
SQL procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Suspend a data stream. Replace the placeholders with your database, schema,# and table or view name.app.sql("CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>')")
While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.
To resume a suspended data stream, use the resume_data_stream()
procedure:
# Resume a data stream. Replace the placeholders with your database, schema,# and table or view name.app.sql("CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>')")
Delete a Data Stream
Section titled “Delete a Data Stream”Requires the cdc_admin
application role.
To delete a data stream, use the api.delete_data_stream()
procedure:
-- Delete a data stream. Replace the placeholders with your database, schema, and table or view name.CALL relationalai.api.delete_data_stream('<db>.<schema>.<table_or_view>');/*+-----------------------------------+ | Data stream deleted successfully. | +-----------------------------------+ */
To delete a data stream, create a Provider
instance and use its .delete_stream()
method:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Delete a data stream. Replace the placeholders with your database, schema,# and table or view name.app.delete_stream('<db>.<schema>.<table_or_view>')
To delete a data stream, pass the fully-qualified table or view name and the model name to the imports:delete
command’s --object
and --model
flags:
rai imports:delete --object <db>.<schema>.<table_or_view> --model MyModel
List Data Streams
Section titled “List Data Streams”Requires the cdc_admin
application role.
To view a list all data streams, query the api.data_streams
view:
SELECT * FROM relationalai.api.data_streams;/*+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+------------------------- -+--------------+--------------------------+ | ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | |-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------| | ds_a1b2c3d4_e5f6_7a89_b123_d456e789 | 2024-10-23 12:23:45.250 | john.doe@company.com | CREATED | DATA_STREAM_VIEW | 1234abcd-5678-90ef-ab12-3456cdef7890 | example_db.sales.view1 | SalesModel | example_db.sales.view1 | | ds_8e7f6d5c_4a3b_2c1d_0e9f_7b6a8d9f | 2024-10-22 15:37:29.580 | maria.garcia@company.com | CREATED | DATA_STREAM_TABLE | bcd123ef-4567-890a-bcde-abcdef678901 | example_db.hr.employees | HRModel | example_db.hr.employees | | ds_9a8b7c6d_5e4f_3d2a_1b0e_f7g6h5i3 | 2024-10-21 17:44:10.300 | mark.jones@company.com | DELETING | DATA_STREAM_VIEW | 7890abcd-1234-5678-90ef-bcde4567890f | example_db.finance.budget | FinanceModel | example_db.finance.budget| +-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+ */
To view a list all data streams, create a Provider
instance and use its .list_streams()
method:
from pprint import pprint
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# List all data streams.streams = app.list_streams()pprint(streams)
To view a list all data streams, use the imports:list
command:
rai imports:list
To list data streams for a specific model, pass the model name to the --model
flag:
rai imports:list --model MyModel
Get Data Stream Details
Section titled “Get Data Stream Details”Requires the cdc_admin
application role.
To get details about a specific data stream, pass the stream name and model name to the api.get_data_stream()
procedure:
-- Get details about the data stream. Replace the placeholders with your-- database, schema, and table or view name.CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');/*+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+ | ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS | |-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------| | ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED | +-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+ */
To get details about a specific data stream, create a Provider
instance and use its .sql()
method to execute the api.get_data_stream()
SQL procedure with the name of the data stream and model:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Get details about a data stream. Replace the placeholders with your database,# schema, and table or view name.stream_name = '<db>.<schema>.<table_or_view>'stream_details = app.sql(f"CALL relationalai.api.get_data_stream({stream_name})")print(stream_details)
To get details about a specific data stream, pass the stream ID to the --id
option of the imports:get
command:
rai imports:get --id <stream_id>
The stream ID is the unique identifier for the data stream, as returned in the output of the imports:list
command.
View Quarantined Streams
Section titled “View Quarantined Streams”Requires the cdc_admin
application role.
To view quarantined streams and their errors, use the api.data_stream_batches
view:
SELECT data_stream_id, fq_object_name, status, error.VALUE::string AS processing_errorFROM relationalai.api.data_stream_batches, LATERAL FLATTEN(input => processing_details:processingErrors) AS errorWHERE status = 'QUARANTINED';
To view quarantined streams and their errors, create a Provider
instance and use its .sql()
method to query the api.data_stream_batches
view:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# View quarantined streams and their errors.quarantined_streams = app.sql("""SELECT data_stream_id, fq_object_name, status, error.VALUE::string AS processing_errorFROM relationalai.api.data_stream_batches, LATERAL FLATTEN(input => processing_details:processingErrors) AS errorWHERE status = 'QUARANTINED';""")
print(quarantined_streams)
Once the errors have been corrected, resume the data stream to continue processing. Note that quarantined streams continue to consume change tracking data from the source data, but these changes will not be processed by the CDC engine until the stream is resumed.