Manage Data Shared With the RAI Native App
RelationalAI (RAI) Python users build models on top of data stored in Snowflake tables and views. Before models can be queried, the data must be shared with the RAI Native App using a data stream. These data streams are maintained by the RAI Native App’s CDC Service.
The CDC Service
Section titled “The CDC Service”Data streams use change data capture (CDC) to stream updates from Snowflake tables and views to the RAI Native App. The CDC service processes the change tracking data consumed by data streams to keep RAI models synchronized with their source data.
You can manage the CDC service using SQL, Python, or the RAI CLI.
Enable CDC
Section titled “Enable CDC”Requires the cdc_admin
application role.
To enable the CDC service, or to resume the service after suspending it, use the app.resume_cdc()
procedure:
-- Enable the CDC service.CALL relationalai.app.resume_cdc();/*+--------------------------------------------------------------------+ | CDC functionality on the RelationalAI application has been resumed | +--------------------------------------------------------------------+ */
To enable the CDC service, or to resume the service after suspending it, create a Provider
instance and use its .sql()
method to execute the app.resume_cdc()
SQL procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Enable the CDC service.app.sql("CALL relationalai.app.resume_cdc()")
To enable the CDC service, or to resume the service after suspending it, use the imports:setup
command’s --resume
flag:
# Enable the CDC service.rai imports:setup --resume
Disable CDC
Section titled “Disable CDC”Requires the cdc_admin
application role.
To disable the CDC service, use the app.suspend_cdc()
procedure:
-- Disable the CDC service.CALL relationalai.app.suspend_cdc();/*+-----------------------------------------------------------------------------------------------------------------+ | CDC functionality on the RelationalAI application has been suspended and its associated engine has been dropped | +-----------------------------------------------------------------------------------------------------------------+ */
To disable the CDC service, create a Provider
instance and use its .sql()
method to execute the app.suspend_cdc()
procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Disable the CDC service.app.sql("CALL relationalai.app.suspend_cdc()")
To disable the CDC service, use the imports:setup
command’s --suspend
flag:
# Disable the CDC service.rai imports:setup --suspend
Disabling CDC suspends the CDC engine. Change tracking data is still consumed by data streams, but is not processed until the service is resumed. Data streams cannot be created while CDC is disabled.
View CDC Service Status
Section titled “View CDC Service Status”Requires the app_user
application role.
To view the status of the CDC service, use the app.cdc_status()
procedure:
-- Get the CDC service status.CALL relationalai.app.cdc_status();/*+--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| | CDC_ENABLED | CDC_ENGINE_NAME | CDC_ENGINE_STATUS | CDC_ENGINE_SIZE | CDC_TASK_STATUS | CDC_TASK_INFO | |--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| | TRUE | CDC_MANAGED_ENGINE | READY | HIGHMEM_X64_S | started | {"createdOn": "2024-10-15 21:58:11.291 -0700", "lastSuspendedOn": null, "lastSuspendedReason": null, "state": "started"} | +--------------+--------------------+-------------------+------------------+-----------------+--------------------------------------------------------------------------------------------------------------------------| */
Refer to the reference docs for more details on the output of the cdc_status()
procedure.
To view the status of the CDC service, create a Provider
instance and use its .sql()
method to execute the app.cdc_status()
procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Get the CDC service status.cdc_status = app.sql("CALL relationalai.app.cdc_status()")print(cdc_status)# [Row(CDC_ENABLED=True, CDC_ENGINE_NAME='CDC_MANAGED_ENGINE', CDC_ENGINE_STATUS='READY', CDC_ENGINE_SIZE='M', CDC_TASK_STATUS='started', CDC_TASK_INFO='{\n "createdOn": "2024-10-24 22:53:15.200 -0700",\n "lastSuspendedOn": null,\n "lastSuspendedReason": null,\n "state": "started"\n}')]
To view the status of the CDC service, use the imports:setup
command:
$ rai imports:setup
---------------------------------------------------
▰▰▰▰ Imports setup fetched
To suspend imports, use 'rai imports:setup --suspend'
Field Value ─────────────────────────────────────────── engine CDC_MANAGED_ENGINE engine_size M engine_status READY status STARTED enabled True createdOn 2024-10-24 22:53:15 lastSuspendedOn N/A lastSuspendedReason N/A
---------------------------------------------------
Configure CDC Engine Size
Section titled “Configure CDC Engine Size”Requires the cdc_admin
application role.
When the RAI Native App is installed, the CDC engine is configured with the HIGHMEM_X64_S
size.
If you encounter performance issues with the CDC engine or need to process large data streams, you can change the CDC engine size to a larger instance family.
See Engine Sizes for more information on available engine sizes.
To change the size of the CDC engine, use the app.alter_cdc_engine_size()
procedure:
-- Change the size of the CDC engine to HIGHMEM_X64_M.CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M');/*+--------------------------------------+ | CDC engine size set to HIGHMEM_X64_M | +--------------------------------------+ */
To change the size of the CDC engine, create a Provider
instance and use its .sql()
method to execute the app.alter_cdc_engine_size()
procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Change the size of the CDC engine to HIGHMEM_X64_M.app.sql("CALL relationalai.app.alter_cdc_engine_size('HIGHMEM_X64_M')")
To change the size of the CDC engine, pass the new engine size to the imports:setup
command’s --engine_size
flag:
# Change the size of the CDC engine to HIGHMEM_X64_M.rai imports:setup --engine_size HIGHMEM_X64_M
If a batch of data stream changes is currently being processed, it is completed using the previously configured engine. A new engine with the new size is created whenever the next batch of changes is processed, at which point the old engine is deleted.
Data Streams
Section titled “Data Streams”Data streams track changes to tables and views in your Snowflake account, ensuring that queries from RAI Python models use the latest data. Changes are batched and processed by the CDC Service every minute.
The RAI Python API automatically creates data streams. Python users can populate an RAI Type
from rows in a source table or view they have access to. When a query is executed for the first time, data streams are created for the source tables or views if they do not already exist.
Although data streams can be created manually using SQL or the RAI CLI, the recommended approach is to use the Python API, which handles stream creation automatically.
Supported Source Types
Section titled “Supported Source Types”Data streams can be created on Snowflake tables and views, with some limitations:
- The user creating a data stream must have
SELECT
privileges on the source table or view. - The source table or view must have change tracking enabled.
- Data streams cannot be created on:
- Temporary or transient tables
- Dynamic tables
- External tables and views
Refer to the Snowflake documentation for more details on stream requirements and limitations.
Supported Column Types
Section titled “Supported Column Types”Data streams support the following Snowflake column types:
Tables or views with unsupported column types cannot be streamed into the RAI Native App.
JSON (VARIANT
) Data
Section titled “JSON (VARIANT) Data”Snowflake tables or views that contain VARIANT
columns cannot be streamed directly into the RAI Native App.
To use JSON data in your models, expand it into supported column types before creating a data stream.
See Supported Column Types for the full list of supported types.
Two approaches are:
- Flattening into a single view by converting JSON fields into scalar columns.
- Normalizing into child tables when a JSON array needs to be represented as multiple rows.
Flatten into a View
Section titled “Flatten into a View”The following example shows how to expand a JSON column in a web traffic dataset into a flat view. This method is useful when you want to keep a one-to-one mapping between rows in the raw table and rows in the view.
-- Raw table with a VARIANT columnCREATE OR REPLACE TABLE VIEWS_RAW ( view_id NUMBER, session_id NUMBER, user_id NUMBER, occurred_at TIMESTAMP_NTZ, VIEW_DATA VARIANT);-- Sample DataINSERT INTO VIEWS_RAWSELECT 1, 501, 10001, '2025-07-01 10:15:00'::TIMESTAMP_NTZ, PARSE_JSON('{"url":"https://example.com/pricing","referrer":"https://google.com","device":"desktop","events":["scroll","click"]}')UNION ALLSELECT 2, 502, 10002, '2025-07-01 10:20:00'::TIMESTAMP_NTZ, PARSE_JSON('{"url":"https://example.com/blog/post-1","referrer":null,"device":"mobile","events":["view"]}');
-- Flat view with supported column typesCREATE OR REPLACE VIEW VIEWS_CLEAN ASSELECT view_id, session_id, user_id, occurred_at, VIEW_DATA:url::STRING AS url, VIEW_DATA:referrer::STRING AS referrer, VIEW_DATA:device::STRING AS device, ARRAY_TO_STRING(VIEW_DATA:events, ',') AS eventsFROM VIEWS_RAW;
-- Enable change tracking so that the RAI Native App can create data streams-- from this table. Note that the underlying table for the view must also-- have change tracking enabled.ALTER TABLE VIEWS_RAW SET CHANGE_TRACKING = TRUE;ALTER VIEW VIEWS_CLEAN SET CHANGE_TRACKING = TRUE;
-- Query the view.SELECT * FROM VIEWS_CLEAN;/* +--------+-----------+--------+---------------------+------------------------------------+--------------------+---------+---------------+ | VIEW_ID| SESSION_ID| USER_ID| OCCURRED_AT | URL | REFERRER | DEVICE | EVENTS | |--------+-----------+--------+---------------------+------------------------------------+--------------------+---------+---------------| | 1 | 501 | 10001 | 2025-07-01 10:15:00 | https://example.com/pricing | https://google.com | desktop | scroll,click | | 2 | 502 | 10002 | 2025-07-01 10:20:00 | https://example.com/blog/post-1 | NULL | mobile | view | +--------+-----------+--------+---------------------+------------------------------------+--------------------+---------+---------------+ */
This approach works well when arrays can be safely collapsed into comma-separated strings.
Normalize into a Child Table
Section titled “Normalize into a Child Table”If your JSON data contains arrays that need to be represented as multiple rows, you can create a normalized child table with the expanded JSON data:
-- Raw table with a VARIANT columnCREATE OR REPLACE TABLE VIEWS_RAW ( view_id NUMBER, session_id NUMBER, user_id NUMBER, occurred_at TIMESTAMP_NTZ, VIEW_DATA VARIANT);-- Sample dataINSERT INTO VIEWS_RAWSELECT 1, 501, 10001, '2025-07-01 10:15:00'::TIMESTAMP_NTZ, PARSE_JSON('{"url":"https://example.com/pricing","referrer":"https://google.com","device":"desktop","events":["scroll","click"]}')UNION ALLSELECT 2, 502, 10002, '2025-07-01 10:20:00'::TIMESTAMP_NTZ, PARSE_JSON('{"url":"https://example.com/blog/post-1","referrer":null,"device":"mobile","events":["view"]}');
-- Child table with one row per event from the parent VIEWS_RAW table.CREATE OR REPLACE TABLE VIEW_EVENTS ASSELECT v.view_id, v.session_id, v.user_id, v.occurred_at, e.value::STRING AS eventFROM VIEWS_RAW AS v, LATERAL FLATTEN(INPUT => v.VIEW_DATA:events) AS e;
-- Enable change tracking so that the RAI Native App can create data streams-- from this table.ALTER TABLE VIEW_EVENTS SET CHANGE_TRACKING = TRUE;
-- Query the child table.SELECT * FROM VIEW_EVENTS;/*+--------+-----------+--------+---------------------+---------+| VIEW_ID| SESSION_ID| USER_ID| OCCURRED_AT | EVENT ||--------+-----------+--------+---------------------+---------|| 1 | 501 | 10001 | 2025-07-01 10:15:00 | scroll || 1 | 501 | 10001 | 2025-07-01 10:15:00 | click || 2 | 502 | 10002 | 2025-07-01 10:20:00 | view |+--------+-----------+--------+---------------------+---------+*/
VIEW_EVENTS
uses supported column types and can be streamed into the RAI Native App.
Change Tracking
Section titled “Change Tracking”Data streams use Snowflake’s native change tracking feature to capture changes to tables and views. You must enable change tracking on a source table or view before you can create a data stream.
Here’s how it works:
- Change tracking captures information about all Data Manipulation Language (DML) statements committed to a table or view, including
INSERT
,UPDATE
, andDELETE
operations. - Data streams read these changes and temporarily store them in a internal stage inside the RAI Native App’s Snowflake database. This data is staged even if the CDC Service is disabled.
- When the CDC Service is enabled, the staged data is processed in batches once every minute and made available to RAI models.
Staleness
Section titled “Staleness”When a stream is suspended, it stops consuming change tracking data from its source table or view, and that data is no longer stored in the app’s Snowflake database. Suspending a stream can help reduce storage costs, but Snowflake retains change tracking data only for a limited time, after which it becomes inaccessible.
If a stream remains suspended beyond its data retention period, it becomes stale. A stale stream may fail to catch up with recent changes when resumed, leading to data inconsistencies.
If you encounter data inconsistencies due to staleness, you can delete the stream and recreate it to start fresh.
Quarantined Streams
Section titled “Quarantined Streams”A quarantined stream means the CDC Service stopped syncing data from your source because too many errors occurred.
When a stream is quarantined:
- The stream is automatically suspended to prevent further issues.
- Change tracking data is still collected from your source.
- The CDC engine does not process new data until you fix the errors and resume the stream.
- Common causes include repeated failures in batch processing or scheduled tasks.
The following resources can help you address a quarantined stream:
Security and Access Control
Section titled “Security and Access Control”The RAI Native App uses Snowflake’s security features to manage access to data streams and the underlying source tables and views.
Role-Based Access Control (RBAC)
Section titled “Role-Based Access Control (RBAC)”RAI supports Snowflake’s Role-Based Access Control (RBAC) model, which manages access to data and resources through roles and privileges.
In Snowflake, privileges like SELECT
or USAGE
are granted to roles, and roles are assigned to users.
This allows organizations to control who can access specific tables, views, and other objects, and what actions they can perform.
RAI enforces Snowflake RBAC as follows:
- All app operations use the privileges of the active roles in your Snowflake session, as set in your configuration.
- When you create or use a data stream, Snowflake checks that your role has the required privileges on the object and its parent database and schema.
- If privileges are missing, access is denied.
Fine-Grained Access Control
Section titled “Fine-Grained Access Control”Users with the Enterprise Edition of Snowflake (including Business Critical and Virtual Private Snowflake) can use row access policies for controlling which rows are visible and masking policies for hiding sensitive column data.
When the RAI Native App executes SQL commands, all row and masking policies are evaluated against a role named RELATIONALAI
—not the role defined in the user’s configuration .
This means that:
- RAI currently does not support user-based policies at the row and column level.
- You can create policies that apply to the
RELATIONALAI
role, but these policies impact all users of the app.
Practical Example
Section titled “Practical Example”Consider the following masking policy that masks sensitive employee passwords when the user does not have the FULL_ACCESS
role:
CREATE MASKING POLICY employee_pwd_mask AS (val string) RETURNS string -> CASE WHEN CURRENT_ROLE() IN ('FULL_ACCESS') THEN val ELSE '******' END;
The app executes all queries using the RELATIONALAI
role, so this masking policy will apply to all users of the app.
All users will see the masked value, even if the user has the FULL_ACCESS
role.
Now suppose you add RELATIONALAI
to the list of roles that can see the original value:
CREATE MASKING POLICY employee_pwd_mask AS (val string) RETURNS string -> CASE WHEN CURRENT_ROLE() IN ('FULL_ACCESS', 'RELATIONALAI') THEN val ELSE '******' END;
In this case, the app will see the original value of the password and so will any app user with SELECT
privileges on the relevant table, even if they do not have the FULL_ACCESS
role.
Best Practices
Section titled “Best Practices”For data access policies to work effectively with the RAI Native App, consider the following best practices:
- Start with least privilege: Deny access by default, then allow only what the app needs.
- Write policies that check the app’s role, not individual users.
- If you use mapping tables for access control, map to the app’s role—not individual users.
Data Stream Management
Section titled “Data Stream Management”You can manage data streams using SQL, Python, or the RAI CLI.
Enable Change Tracking on a Table or View
Section titled “Enable Change Tracking on a Table or View”Requires ownership privileges on the table or view.
To enable change tracking on a table or view, use the ALTER TABLE
statement:
-- Enable change tracking on a table.ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE;
-- Enable change tracking on a view.ALTER VIEW MyView SET CHANGE_TRACKING = TRUE;
-- Verify that change tracking is enabled. If change tracking is enabled, the-- CHANGE_TRACKING column will be set to ON.SHOW TABLES LIKE 'MyTable';
To enable change tracking on a table or view, create a Provider
instance and use its .sql()
method to execute the ALTER TABLE
statement:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Enable change tracking on a table.app.sql("ALTER TABLE MyTable SET CHANGE_TRACKING = TRUE")
# Enable change tracking on a view.app.sql("ALTER VIEW MyView SET CHANGE_TRACKING = TRUE")
# Verify that change tracking is enabled. If change tracking is enabled, the# CHANGE_TRACKING column will be set to ON.print(app.sql("SHOW TABLES LIKE 'MyTable'"))
See Change Tracking for details on what operations are tracked.
Create a Data Stream
Section titled “Create a Data Stream”Requires the cdc_admin
application role.
To create a data stream, use the api.create_data_stream()
procedure:
-- Replace the placeholders with your database, schema, and table/view names.SET obj_name = '<db>.<schema>.<table_or_view>';SET obj_type = 'TABLE'; -- Set to 'VIEW' if needed.SET obj_ref = relationalai.api.object_reference($obj_type, $obj_name);
-- Enable change tracking on the table or viewALTER TABLE IDENTIFIER($obj_name) SET CHANGE_TRACKING = TRUE;
-- Create a data stream named 'my_stream' for the object referenceCALL relationalai.api.create_data_stream($obj_ref, 'my_stream', TRUE);/*+----------------------------------+ | Datastream created successfully. | +----------------------------------+ */
To create a data stream, create a Provider
instance and use its .create_streams()
method:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
table_or_view = "<db>.<schema>.<table_or_view>"
# Enable change tracking on the table or view.app.sql(f"ALTER TABLE IDENTIFIER('{table_or_view}') SET CHANGE_TRACKING = TRUE;")
# Create a data stream named 'my_stream' for the table or view.app.create_streams([table_or_view], 'MyModel')
To create a data stream, pass the fully-qualified table or view name and the model name to the imports:stream
command’s --source
and --model
flags:
rai imports:stream --source <db>.<schema>.<table_or_view> --model MyModel
Note that change tracking must be enabled on the source table or view before creating a data stream. Not all column types are supported by data streams. See Supported Column Types for details.
Suspend or Resume a Data Stream
Section titled “Suspend or Resume a Data Stream”Requires the cdc_admin
application role.
To suspend a data stream, use the api.suspend_data_stream()
procedure:
-- Suspend a data stream. Replace the placeholders with your database, schema,-- and table or view name.CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>');/*+-----------------------+ | Data stream suspended | +-----------------------+ */
While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.
To resume a suspended data stream, use the api.resume_data_stream()
procedure:
-- Resume a data stream. Replace the placeholders with your database, schema,-- and table or view name.CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>');/*+---------------------+ | Data stream resumed | +---------------------+ */
To suspend or resume a data stream, create a Provider
instance and use its .sql()
method to execute the api.suspend_data_stream()
SQL procedure:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Suspend a data stream. Replace the placeholders with your database, schema,# and table or view name.app.sql("CALL relationalai.api.suspend_data_stream('<db>.<schema>.<table_or_view>')")
While a data stream is suspended, the RAI Native App no longer consumes change tracking data for the stream’s source table or view. Suspended streams should be resumed at regular intervals to avoid becoming stale.
To resume a suspended data stream, use the resume_data_stream()
procedure:
# Resume a data stream. Replace the placeholders with your database, schema,# and table or view name.app.sql("CALL relationalai.api.resume_data_stream('<db>.<schema>.<table_or_view>')")
Delete a Data Stream
Section titled “Delete a Data Stream”Requires the cdc_admin
application role.
To delete a data stream, use the api.delete_data_stream()
procedure:
-- Delete a data stream. Replace the placeholders with your database, schema, and table or view name.CALL relationalai.api.delete_data_stream('<db>.<schema>.<table_or_view>');/*+-----------------------------------+ | Data stream deleted successfully. | +-----------------------------------+ */
To delete a data stream, create a Provider
instance and use its .delete_stream()
method:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Delete a data stream. Replace the placeholders with your database, schema,# and table or view name.app.delete_stream('<db>.<schema>.<table_or_view>')
To delete a data stream, pass the fully-qualified table or view name and the model name to the imports:delete
command’s --object
and --model
flags:
rai imports:delete --object <db>.<schema>.<table_or_view> --model MyModel
List Data Streams
Section titled “List Data Streams”Requires the cdc_admin
application role.
To view a list all data streams, query the api.data_streams
view:
SELECT * FROM relationalai.api.data_streams;/*+-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+------------------------- -+--------------+--------------------------+ | ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | |-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------| | ds_a1b2c3d4_e5f6_7a89_b123_d456e789 | 2024-10-23 12:23:45.250 | john.doe@company.com | CREATED | DATA_STREAM_VIEW | 1234abcd-5678-90ef-ab12-3456cdef7890 | example_db.sales.view1 | SalesModel | example_db.sales.view1 | | ds_8e7f6d5c_4a3b_2c1d_0e9f_7b6a8d9f | 2024-10-22 15:37:29.580 | maria.garcia@company.com | CREATED | DATA_STREAM_TABLE | bcd123ef-4567-890a-bcde-abcdef678901 | example_db.hr.employees | HRModel | example_db.hr.employees | | ds_9a8b7c6d_5e4f_3d2a_1b0e_f7g6h5i3 | 2024-10-21 17:44:10.300 | mark.jones@company.com | DELETING | DATA_STREAM_VIEW | 7890abcd-1234-5678-90ef-bcde4567890f | example_db.finance.budget | FinanceModel | example_db.finance.budget| +-------------------------------------+-------------------------+--------------------------+----------+-------------------+--------------------------------------+---------------------------+--------------+--------------------------+ */
To view a list all data streams, create a Provider
instance and use its .list_streams()
method:
from pprint import pprint
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# List all data streams.streams = app.list_streams()pprint(streams)
To view a list all data streams, use the imports:list
command:
rai imports:list
To list data streams for a specific model, pass the model name to the --model
flag:
rai imports:list --model MyModel
Get Data Stream Details
Section titled “Get Data Stream Details”Requires the cdc_admin
application role.
To get details about a specific data stream, pass the stream name and model name to the api.get_data_stream()
procedure:
-- Get details about the data stream. Replace the placeholders with your-- database, schema, and table or view name.CALL relationalai.api.get_data_stream('<db>.<schema>.<table_or_view>');/*+-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+ | ID | CREATED_AT | CREATED_BY | STATUS | REFERENCE_NAME | REFERENCE_ALIAS | FQ_OBJECT_NAME | RAI_DATABASE | RAI_RELATION | DATA_SYNC_STATUS | PENDING_BATCHES_COUNT | NEXT_BATCH_STATUS | NEXT_BATCH_UNLOADED_TIMESTAMP | NEXT_BATCH_DETAILS | LAST_BATCH_DETAILS | LAST_BATCH_UNLOADED_TIMESTAMP | LAST_TRANSACTION_ID | ERRORS | CDC_STATUS | |-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------| | ds_abcd1234_ef56_7890_abcd_1234ef567890 | 2024-10-23 10:12:34.567 | jane.doe@example.com | ACTIVE | DATA_STREAM_TABLE | a1bcdef2-3456-7890-1234-b567c890d123 | <db>.<schema>.<table_or_view> | MyModel | <db>.<schema>.<table_or_view> | SYNCED | 0 | NULL | NULL | NULL | {"rows": 10, "size": 512, ... } 2024-10-23 10:50:00.456 | 02a1b234-5678-1234-abcdef-0123456789ab | [] | STARTED | +-----------------------------------------+-------------------------+-----------------------+--------+-------------------+--------------------------------------+-------------------------------+--------------+-------------------------------+------------------+------------------------+-------------------+-------------------------------+---------------------------------+-------------------------------+-------------------------------+----------------------------------------+--------+------------+ */
To get details about a specific data stream, create a Provider
instance and use its .sql()
method to execute the api.get_data_stream()
SQL procedure with the name of the data stream and model:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# Get details about a data stream. Replace the placeholders with your database,# schema, and table or view name.stream_name = '<db>.<schema>.<table_or_view>'stream_details = app.sql(f"CALL relationalai.api.get_data_stream({stream_name})")print(stream_details)
To get details about a specific data stream, pass the stream ID to the --id
option of the imports:get
command:
rai imports:get --id <stream_id>
The stream ID is the unique identifier for the data stream, as returned in the output of the imports:list
command.
View All Quarantined Streams
Section titled “View All Quarantined Streams”Requires the cdc_admin
application role.
To view quarantined streams and their errors, use the api.data_stream_batches
view:
SELECT data_stream_id, fq_object_name, status, error.VALUE::string AS processing_errorFROM relationalai.api.data_stream_batches, LATERAL FLATTEN(input => processing_details:processingErrors) AS errorWHERE status = 'QUARANTINED';
To view quarantined streams and their errors, create a Provider
instance and use its .sql()
method to query the api.data_stream_batches
view:
import relationalai as rai
# Get a Provider instance.app = rai.Provider()
# View quarantined streams and their errors.quarantined_streams = app.sql("""SELECT data_stream_id, fq_object_name, status, error.VALUE::string AS processing_errorFROM relationalai.api.data_stream_batches, LATERAL FLATTEN(input => processing_details:processingErrors) AS errorWHERE status = 'QUARANTINED';""")
print(quarantined_streams)
Once the errors have been corrected, resume the data stream to continue processing. Note that quarantined streams continue to consume change tracking data from the source data, but these changes will not be processed by the CDC engine until the stream is resumed.
Troubleshoot Quarantined Streams
Section titled “Troubleshoot Quarantined Streams”If your data stream is quarantined, the system will attempt to recover it automatically once, 15 minutes after quarantine begins. You do not need to wait for this automatic recovery. You can troubleshoot and fix the issue at any time. If automatic recovery fails, manual intervention is required.
Follow these steps to diagnose and fix a quarantined stream.
-
Check Stream Status and Find the Error.
First, get your stream status and ID:
SELECT * FROM relationalai.get_data_stream('<MY_DB.MY_SCHEMA.MY_SOURCE_OBJECT>');import relationalai as raiapp = rai.Provider()stream_status = app.sql("SELECT * FROM relationalai.get_data_stream('<MY_DB.MY_SCHEMA.MY_SOURCE_OBJECT>')")print(stream_status)Look at the
errors
column to see why your stream is quarantined. The error message will help you choose the right troubleshooting path. -
Diagnose the Problem.
How you diagnose the issue depends on the error message.
Quarantine Message Example Recommended Diagnostic Path Task <task_name>
failed too many times and has been quarantined due to:<error_details>
Task Failure Diagnosis Data stream has been quarantined after batch processing failed 3 times. <error_details>
Batch Processing Diagnosis Other error messages General Diagnosis Refer to the table above to select the correct troubleshooting steps below.
Requires the
cdc_admin
application role.- Check batch details:
SELECT * FROM api.data_stream_batches WHERE fq_object_name ILIKE '<MY_DB.MY_SCHEMA.MY_SOURCE_OBJECT>';
- Find the batch with
status = 'QUARANTINED'
. - Review the
processing_details
for failed transactions or errors. - Check the error log:
SELECT * FROM api.data_stream_errors WHERE id = 'rai.cdc.<MY_DATA_STREAM_ID>';
Requires
ACCOUNTADMIN
privileges.- Check task history:
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(TASK_NAME => '<MY_DATA_STREAM_ID>_TASK'));
- Look for
status
,error_code
, anderror_message
in the latest run.
- Note these timestamps from your stream status in Step 1:
last_batch_unloaded_timestamp
quarantined_on
- Review any schema changes to your source between these times. If the table schema changed (e.g., columns were added or removed), it could cause the stream to be quarantined.
- If necessary, delete the stream to force a re-creation the next time a program using it is executed.
- Check batch details:
-
Fix and Resume
If you find the root cause in Step 2, fix it. The system will attempt to recover the stream automatically the next time a program using the stream is run.
If you can’t determine the cause or the stream remains quarantined after fixing, then you can delete the stream manually. This forces the stream to be re-created the next time a program using the stream is run.
If you continue to encounter issues, please contact support.