Skip to content

Managing RAI Data Streams

This guide explains what RAI data streams are and shows how to manage them. It also covers how to create, list, monitor, and delete them using the Snowflake user interface.

You can also manage RAI data streams through the RAI CLI. See Managing RAI Data Streams for more details.

Introduction

What is a RAI Data Stream?

A RAI data stream synchronizes your data from a Snowflake database to a RAI database. It uses change data capture (opens in a new tab) (CDC) to keep track of data changes and synchronize them with RAI.

💡

You can think of a RAI data stream as a materialized view that connects your Snowflake data with RAI.

A RAI data stream originates from a Snowflake object, which can be a table (opens in a new tab) or a view (opens in a new tab). Only one RAI database link can originate from a given Snowflake object.

🔎

A RAI data stream is identified by the RAI database link and the fully qualified name of its source Snowflake object and has the form <dblink_database>.<dblink_schema>-<obj_database>.<obj_schema>.<obj_name>.

The target data object of the RAI data stream is a base relation located in a RAI database.

🔎

A RAI data stream is associated with and managed by a RAI database link.

Behind the Scenes

A RAI data stream consists of a Snowflake stream (opens in a new tab) and a Snowflake task (opens in a new tab).

A stream object captures data manipulation language changes made to tables, such as inserts, updates, and deletes, along with associated metadata. This process is referred to as change data capture (CDC), and it enables actions to be taken on the modified data.

A table stream (or a “stream”) creates a “change table” that outlines the row-level changes between two transactional points of time in a table. See Change Tracking Using Table Streams (opens in a new tab) for more details.

Meanwhile, the Snowflake task associated with the RAI data stream handles the data synchronization. Tasks are combined with streams for continuous Extract, Load, Transform (ELT) workflows to process recently changed table rows.

The synchronization between RAI and Snowflake occurs at regular intervals — typically once every minute.

Create a RAI Data Stream

You can create a RAI data stream between RAI and Snowflake using the create_data_stream procedure. The create_data_stream procedure takes as input a Snowflake data source and creates a stream between Snowflake and RAI.

Here is an example that creates a data stream between the sf_my_edges Snowflake table and the rai_my_edges RAI relation in the my_rai_db RAI database:

CREATE OR REPLACE TABLE my_sf_edges(x INT, y INT)
    AS SELECT * FROM VALUES
    (11, 12), (12, 13), (13, 13), (12, 43), (13, 50);
 
CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges', 'my_rai_db', 'my_rai_edges');
/*+------------------------------------------------+
  | { "account": "***",  ..., "state": "CREATED" } |
  +------------------------------------------------+ */
🔎

The name of the RAI data object — a RAI relation — is case-sensitive, unlike the Snowflake objects.

A RAI data stream is identified by the fully qualified name (opens in a new tab) of its source object, 'my_sf_db.my_sf_schema.my_sf_edges'.

Each SQL object can have only one RAI data stream associated with it. Trying to create multiple RAI data streams associated with the same SQL object will result in an error.

Note that you can also use a version of create_data_stream where you don’t need to specify the RAI relation rai_my_edges. In this case, the RAI relation name is identical to the Snowflake table:

-- Equivalent to `CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges', 'my_rai_db', 'my_sf_edges')`;
CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges', 'my_rai_db');
/*+------------------------------------------------+
  | { "account": "***",  ..., "state": "CREATED" } |
  +------------------------------------------------+ */

Similarly, you can also omit the RAI database my_rai_db but, in this case, the database is assumed to have been previously set using use_rai_database:

-- Equivalent to `CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges', 'my_rai_db', 'my_sf_edges')`;
CALL RAI.use_database('my_rai_db');
CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_edges');
/*+------------------------------------------------+
  | { "account": "***",  ..., "state": "CREATED" } |
  +------------------------------------------------+ */

If change tracking has not been enabled on the source SQL object by using ALTER TABLE … SET CHANGE_TRACKING = TRUE, then only the object owner (i.e., the role that has the OWNERSHIP privilege on the object) can create a stream on the object.

🔎

Creating a RAI data stream automatically enables change tracking on the stream’s source table or view.

In addition to tables, you can also create RAI data streams on views. Here is an example that creates a view my_sf_view on the my_sf_edges table and then creates a new RAI data stream originating from this view:

CREATE OR REPLACE TABLE my_sf_edges(x INT, y INT)
    AS SELECT * FROM VALUES
    (11, 12), (12, 13), (13, 13), (12, 43), (13, 50);
 
-- Create a view of rows with `x > 12`.
CREATE OR REPLACE VIEW my_sf_view
    AS SELECT x, y FROM my_sf_edges
    WHERE x > 12;
 
CALL RAI.create_data_stream('my_sf_db.my_sf_schema.my_sf_view', 'my_rai_db', 'my_rai_view');
/*+------------------------------------------------+
  | { "account": "***",  ..., "state": "CREATED" } |
  +------------------------------------------------+ */

Once created, the output in Snowflake returns a JSON object, similar to the following:

{
    "account": "************",
    "createdBy": "************",
    "createdOn": "2023-06-26T11:24:09.710Z",
    "dbLink": "my_sf_db.my_sf_schema",
    "id": "************",
    "integration": "myintegration",
    "name": "datasource",
    "rai": {
        "database": "my_rai_db",
        "relation": "my_rai_view"
    },
    "snowflake": {
        "database": "my_sf_db",
        "object": "my_sf_db.my_sf_schema.datasource",
        "schema": "my_sf_schema"
    },
    "state": "CREATED"
}

Monitor a RAI Data Stream

You can view all available RAI data streams, get static information on them, retrieve their status, and view scheduled tasks.

List All RAI Data Streams

You can list all available RAI data streams using the list_data_streams function:

SELECT RAI.list_data_streams();
 

This function returns a JSON array describing the available RAI data streams along with their status:

[
    {
        "account": "************",
        "createdBy": "************",
        "createdOn": "2023-06-26T14:27:19.514Z",
        "dbLink": "my_sf_db.my_sf_schema",
        "id": "************",
        "integration": "myintegration",
        "name": "my_sf_db.my_sf_schema.my_sf_edges",
        "rai": {
            "database": "my_rai_db",
            "relation": "my_rai_edges"
        },
        "snowflake": {
            "database": "my_sf_db",
            "object": "my_sf_db.my_sf_schema.my_sf_edges",
            "schema": "my_sf_schema"
        },
        "state": "CREATED"
    },
    {
        "account": "************",
        "createdBy": "************",
        "createdOn": "2023-06-26T16:22:11.312Z",
        "dbLink": "my_sf_db.my_sf_chema",
        "id": "************",
        "integration": "myintegration",
        "name": "my_sf_db.my_sf_schema.my_sf_view",
        "rai": {
            "database": "my_rai_db",
            "relation": "my_rai_view"
        },
        "snowflake": {
            "database": "my_sf_db",
            "object": "my_sf_db.my)sf_schema.my_sf_view",
            "schema": "my_sf_schema"
        },
        "state": "CREATED"
    }
]

Get Static Information on a RAI Data Stream

You can obtain all static information about a specific RAI data stream by calling the get_data_stream function:

SELECT RAI.get_data_stream('my_sf_db.my_sf_schema.my_sf_edges');

Here, you need to provide the ID of the RAI data stream, my_sf_db.my_sf_schema.my_sf_edges, which is the fully qualified name of its source SQL object.

The function get_data_stream returns only static metadata information that remain unchanged over time, such as the creation time and information about the source Snowflake object and the target RAI object. For more details, see the get_data_stream section in the SQL Library Reference.

Note that no status information is returned as the status and health of a RAI data stream can change over time and are not considered static information. For this purpose, use get_data_stream_status, discussed in the next section.

Get the Status of a RAI Data Stream

To retrieve information about a RAI data stream that may change over time — including the state and the health of the RAI data stream — use the get_data_stream_status procedure:

CALL RAI.get_data_stream_status('my_sf_db.my_sf_schema.my_sf_edges');
-- Example output: healthy data stream that has been fully synced
/* +---------------------------------------------------------------------------------------------+
   | PROPERTY	                                        | VALUE                                  |
   |---------------------------------------------------------------------------------------------|
   | Source                                             | "my_sf_db.my_sf_schema.my_sf_edges"    |
   | DB Link	                                        | "my_sf_db.my_sf_schema"                |
   | Integration	                                    | "myintegration"                        |
   | Data sync Status                                   | "Syncing: pending unloading from SF"   |
   | Data stream health                                 | "Healthy"                              |
   | Latest changes received from SF - SF unload time   | "2023-06-26 19:12:02.483"              |
   | Latest changes received from SF - Total rows       | 1000000                                |
   | Latest changes written to RAI - SF unload time     | "2023-06-26 19:12:02.483"              |
   | Latest changes written to RAI - Load end time      | "2023-06-26 19:12:44.323"              |
   | Latest changes written to RAI - Total rows         | 1000000                                |
   +---------------------------------------------------------------------------------------------+ */

Here, you can see the RAI data stream is “healthy” but still busy synchronizing the data.

Besides some basic static information, which you can also retrieve using get_data_stream, you can find details on the most recent successful synchronization of the data. For more on the health and syncing status of a RAI data stream, see the description of the get_data_stream_status procedure.

When a RAI data stream is created, a task (opens in a new tab) is simultaneously initiated in the background to handle the data synchronization. The synchronization occurs periodically, typically once every minute.

🔎

Right after creating a RAI data stream and checking its status using get_data_stream_status, you may see that the RAI data stream is in a pending state.

This usually indicates that the stream is not ready yet, i.e., the data have not been fully synchronized. Using the target object of the RAI data stream — a base relation in the RAI database — before the RAI data stream has completed its first synchronization will return an error. This is because the base relation does not exist yet.

This typically occurs when you try to create a graph using create_graph immediately after creating the RAI data stream. In such cases, the RAI data stream doesn’t have the sufficient time to synchronize the data even once.

View Scheduled Tasks

You can check currently scheduled tasks and their respective schedules by querying the information_schema.task_history table. The query below shows that there are currently two scheduled tasks, one for the my_sf_edges table and one for the my_sf_view view:

SELECT name, condition_text, scheduled_time FROM
    TABLE(information_schema.task_history())
    WHERE SCHEMA_NAME = 'RAI'
    AND database_name = 'USER_EDUCATION'
    AND state != 'SKIPPED'
    ORDER BY SCHEDULED_TIME DESC
;
 
/*+---------------------------------------------------------------------------------------------------------------------------------------------------+
  | NAME                                      | CONDITION_TEXT                                                        | SCHEDULED_TIME                |
  +---------------------------------------------------------------------------------------------------------------------------------------------------+
  | MY_SF_DB14_MY_SF_SCHEMA3_MY_SF_VIEW_TASK  | SYSTEM$STREAM_HAS_DATA('MY_SF_DB14_MY_SF_SCHEMA3_MY_SF_VIEW_STREAM')  | 2023-06-26 15:50:30.729 -0700 |
  | MY_SF_DB14_MY_SF_SCHEMA3_MY_SF_EDGES_TASK | SYSTEM$STREAM_HAS_DATA('MY_SF_DB14_MY_SF_SCHEMA3_MY_SF_EDGES_STREAM') | 2023-06-26 14:20:25.338 -0700 |
  +---------------------------------------------------------------------------------------------------------------------------------------------------+ */

Delete a RAI Data Stream

You can delete a RAI data stream by calling the delete_data_stream procedure and specifying the fully qualified name (opens in a new tab) of the RAI data stream in the form <database>.<schema>.<object>.

Here is an example that deletes the sf_db.sf_schema.sf_my_view RAI data stream that was previously created:

-- Delete the `my_sf_db.my_sf_schema.my_sf_view` RAI data stream.
CALL RAI.delete_data_stream('my_sf_db.my_sf_schema.my_sf_view');
/*+------+
  | "ok" |
  +------+ */
🔎

If you attempt to delete a RAI data stream that does not exist, the return value in Snowflake is null.

You can recreate a previously deleted RAI data stream immediately after executing delete_data_stream.

🔎

When you delete a RAI data stream, both the Snowflake SQL object and the RAI relation persist, i.e., they are not deleted. Additionally, it’s advisable to delete any graphs that have been created from the RAI data stream before deleting the RAI data stream itself.

See Also

For more information on configuring and managing the RAI integration for Snowflake, check out the RAI Integration guide.

Was this doc helpful?