Skip to content

Manage Data Shared With the RAI Native App

RelationalAI (RAI) Python users build models on top of data stored in Snowflake tables and views. Before models can be queried, the data must be shared with the RAI Native App using a data stream. These data streams are maintained by the RAI Native App’s CDC Service.

Data streams use change data capture (CDC) to stream updates from Snowflake tables and views to the RAI Native App. The CDC service processes the change tracking data consumed by data streams to keep RAI models synchronized with their source data.

You can manage the CDC service using SQL, Python, or the RAI CLI.

To enable the CDC service, or to resume the service after suspending it, use the app.resume_cdc() procedure:

-- Enable the CDC service.
CALL relationalai.app.resume_cdc();
/*+--------------------------------------------------------------------+
| CDC functionality on the RelationalAI application has been resumed |
+--------------------------------------------------------------------+ */

To disable the CDC service, use the app.suspend_cdc() procedure:

-- Disable the CDC service.
CALL relationalai.app.suspend_cdc();
/*+-----------------------------------------------------------------------------------------------------------------+
| CDC functionality on the RelationalAI application has been suspended and its associated engine has been dropped |
+-----------------------------------------------------------------------------------------------------------------+ */

Disabling CDC suspends the CDC engine. Change tracking data is still consumed by data streams, but is not processed until the service is resumed. Data streams cannot be created while CDC is disabled.

To view the status of the CDC service, use the app.cdc_status() procedure:

-- Get the CDC service status.
CALL relationalai.app.cdc_status();
/*+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
| CDC_ENABLED | CDC_ENGINE_NAME | CDC_ENGINE_STATUS | CDC_ENGINE_SIZE | CDC_TASK_STATUS | CDC_TASK_INFO |
|--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------|
| TRUE | CDC_MANAGED_ENGINE | READY | HIGHMEM_X64_S | started | {"createdOn": "2024-10-15 21:58:11.291 -0700", "lastSuspendedOn": null, "lastSuspendedReason": null, "state": "started"} |
+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| */

Refer to the reference docs for more details on the output of the cdc_status() procedure.

To change the size of the CDC engine, use the app.alter_cdc_engine_size() procedure:

-- Change the size of the CDC engine to HIGHMEM_X64_M.
CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M');
/*+--------------------------------------+
| CDC engine size set to HIGHMEM_X64_M |
+--------------------------------------+ */

If a batch of data stream changes is currently being processed, it is completed using the previously configured engine. A new engine with the new size is created whenever the next batch of changes is processed, at which point the old engine is deleted.

Data streams track changes to tables and views in your Snowflake account, ensuring that queries from RAI Python models use the latest data. Changes are batched and processed by the CDC Service every minute.

The RAI Python API automatically creates data streams. Python users can populate an RAI Type from rows in a source table or view they have access to. When a query is executed for the first time, data streams are created for the source tables or views if they do not already exist.

Although data streams can be created manually using SQL or the RAI CLI, the recommended approach is to use the Python API, which handles stream creation automatically.

Data streams support the following Snowflake column types:

Tables or views with unsupported column types cannot be streamed into the RAI Native App.

Data streams use Snowflake’s native change tracking feature to capture changes to tables and views. Change tracking stores information about all Data Manipulation Language (DML) statements committed to a table or view, including INSERT, UPDATE, and DELETE operations.

Change tracking data consumed by data streams is temporarily stored in a stage in the app’s Snowflake database. This data is staged even if the CDC Service is disabled. When the CDC Service is enabled, the staged data is processed in batches once every minute and made available to RAI models.

When a stream is suspended, it stops consuming change tracking data from its source table or view, and that data is no longer stored in the app’s Snowflake database Suspending a stream can help reduce storage costs, but Snowflake retains change tracking data only for a limited time, after which it becomes inaccessible.

If a stream remains suspended beyond its data retention period, it becomes stale. A stale stream may fail to catch up with recent changes when resumed, leading to data inconsistencies.

If you encounter data inconsistencies due to staleness, you can delete the stream and recreate it to start fresh.

If the CDC Service encounters too many failures while attempting to process a data stream, the stream is given a QUARANTINED status. Change tracking data for quarantined streams is consumed, but not processed. Resolve the errors causing the stream to be quarantined, then resume the data stream to process the pending changes. See View Quarantined Streams for details on viewing errors for quarantined streams.

You can manage data streams using SQL, Python, or the RAI CLI.

Requires ownership privileges on the table or view.

To enable change tracking on a table or view, use the ALTER TABLE statement:

-- Enable change tracking on a table.
ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE;
-- Enable change tracking on a view.
ALTER VIEW MyView SET CHANGE_TRACKING = TRUE;
-- Verify that change tracking is enabled. If change tracking is enabled, the
-- CHANGE_TRACKING column will be set to ON.
SHOW TABLES LIKE 'MyTable';

See Change Tracking for details on what operations are tracked.

To create a data stream, use the api.create_data_stream() procedure:

-- Replace the placeholders with your database, schema, and table/view names.
SET obj_name = '<db>.<schema>.<table_or_view>';
SET obj_type = 'TABLE'; -- Set to 'VIEW' if needed.
SET obj_ref = relationalai.api.object_reference($obj_type, $obj_name);
-- Enable change tracking on the table or view
ALTER TABLE IDENTIFIER($obj_name) SET CHANGE_TRACKING = TRUE;
-- Stream the table or view into the model 'MyModel' with the name 'my_stream'.
CALL relationalai.api.create_data_stream($obj_ref, 'MyModel', 'my_stream', TRUE);
/*+----------------------------------+
| Datastream created successfully. |
+----------------------------------+ */

Note that change tracking must be enabled on the source table or view before creating a data stream. Not all column types are supported by data streams. See Supported Column Types for details.

To suspend a data stream, use the api.suspend_data_stream() procedure:

-- Suspend a data stream for the model named MyModel.
CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+-----------------------+
| Data stream suspended |
+-----------------------+ */

While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.

To resume a suspended data stream, use the api.resume_data_stream() procedure:

-- Resume a data stream for the model named MyModel.
CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+---------------------+
| Data stream resumed |
+---------------------+ */

To delete a data stream, use the api.delete_data_stream() procedure:

-- Delete a data stream for the model named MyModel. Replace the placeholders
-- with your database, schema, and table or view name.
CALL relationalai.api.delete_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+-----------------------------------+
| Data stream deleted successfully. |
+-----------------------------------+ */

To view a list all data streams, query the api.data_streams view:

SELECT * FROM relationalai.api.data_streams;
/*+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+------------------------- -+--------------+--------------------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION |
|-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------|
| ds_a1b2c3d4_e5f6_7a89_b123_d456e789 | 2024-10-23 12:23:45.250 | john.doe@company.com | CREATED | DATA_STREAM_VIEW | 1234abcd-5678-90ef-ab12-3456cdef7890 | example_db.sales.view1 | SalesModel | example_db.sales.view1 |
| ds_8e7f6d5c_4a3b_2c1d_0e9f_7b6a8d9f | 2024-10-22 15:37:29.580 | maria.garcia@company.com | CREATED | DATA_STREAM_TABLE | bcd123ef-4567-890a-bcde-abcdef678901 | example_db.hr.employees | HRModel | example_db.hr.employees |
| ds_9a8b7c6d_5e4f_3d2a_1b0e_f7g6h5i3 | 2024-10-21 17:44:10.300 | mark.jones@company.com | DELETING | DATA_STREAM_VIEW | 7890abcd-1234-5678-90ef-bcde4567890f | example_db.finance.budget | FinanceModel | example_db.finance.budget|
+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+ */

To get details about a specific data stream, pass the stream name and model name to the api.get_data_stream() procedure:

-- Get details about the data stream for the model named MyModel. Replace the
-- placeholders with your database, schema, and table or view name.
CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>', 'MyModel');
/*+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+
| ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS |
|-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------|
| ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED |
+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+ */

To view quarantined streams and their errors, use the api.data_stream_batches view:

SELECT
data_stream_id,
fq_object_name,
status,
error.VALUE::string AS processing_error
FROM
relationalai.api.data_stream_batches,
LATERAL FLATTEN(input => processing_details:processingErrors) AS error
WHERE
status = 'QUARANTINED';

Once the errors have been corrected, resume the data stream to continue processing. Note that quarantined streams continue to consume change tracking data from the source data, but these changes will not be processed by the CDC engine until the stream is resumed.