Fix data stream issues
Sometimes a data stream can get into a bad state where it is not processing changes from the source as expected. This guide explains how to recover from common data stream issues.
- The RAI Native App is installed in your Snowflake account.
- You have a data stream that already exists and needs troubleshooting.
Fix stale streams
Section titled “Fix stale streams”When a stream is suspended, it stops consuming change tracking data from its source table or view, and that data is no longer stored in the app’s Snowflake database. Suspending a stream can help reduce storage costs, but Snowflake retains change tracking data only for a limited time, after which it becomes inaccessible.
If a stream remains suspended beyond its data retention period, it becomes stale. A stale stream may fail to catch up with recent changes when resumed, leading to data inconsistencies.
If you encounter data inconsistencies due to staleness, you can delete the stream so that PyRel can recreate it with a fresh state.
Delete a data stream
Section titled “Delete a data stream”Delete a stream when it has become stale, when quarantine recovery requires recreation, or when you want PyRel to build a fresh stream the next time the source is used.
Fix quarantined streams
Section titled “Fix quarantined streams”A quarantined stream means the CDC Service stopped syncing data from your source because too many errors occurred.
When a stream is quarantined:
- The stream is automatically suspended to prevent further issues.
- Change tracking data is still collected from your source.
- The CDC reasoner does not process new data until you fix the errors and resume the stream.
- Common causes include repeated failures in batch processing or scheduled tasks.
View All Quarantined Streams
Section titled “View All Quarantined Streams”Requires the cdc_admin application role.
To view quarantined streams and their errors, use the api.data_stream_batches view:
SELECT data_stream_id, fq_object_name, status, error.VALUE::string AS processing_errorFROM relationalai.api.data_stream_batches, LATERAL FLATTEN(input => processing_details:processingErrors) AS errorWHERE status = 'QUARANTINED';To view quarantined streams and their errors, create a Snowpark session from your default PyRel connection with create_config(), then query the api.data_stream_batches view:
from relationalai.config import create_config
query = """SELECT data_stream_id, fq_object_name, status, error.VALUE::string AS processing_errorFROM relationalai.api.data_stream_batches, LATERAL FLATTEN(input => processing_details:processingErrors) AS errorWHERE status = 'QUARANTINED';"""
session = create_config().get_session()
rows = session.sql(query).collect()print(rows)Once the errors have been corrected, resume the data stream to continue processing. Note that quarantined streams continue to consume change tracking data from the source data, but these changes will not be processed by the CDC reasoner until the stream is resumed.
Repair a quarantined stream
Section titled “Repair a quarantined stream”If your data stream is quarantined, the system will attempt to recover it automatically once, 15 minutes after quarantine begins. You do not need to wait for this automatic recovery. You can troubleshoot and fix the issue at any time. If automatic recovery fails, manual intervention is required.
Follow these steps to diagnose and fix a quarantined stream.
-
Check stream status and find the error.
First, get your stream status and ID:
CALL relationalai.api.get_data_stream('<MY_DB.MY_SCHEMA.MY_SOURCE_OBJECT>');from relationalai.config import create_configsession = create_config().get_session()stream_status = session.sql("CALL relationalai.api.get_data_stream('<MY_DB.MY_SCHEMA.MY_SOURCE_OBJECT>')").collect()print(stream_status)Look at the
errorscolumn to see why your stream is quarantined. The error message will help you choose the right troubleshooting path. -
Diagnose and fix the problem.
How you diagnose the issue depends on the error message.
Quarantine Message Example Recommended Diagnostic Path Task <task_name>failed too many times and has been quarantined due to:<error_details>Task Failure Diagnosis Data stream has been quarantined after batch processing failed 3 times. <error_details>Batch Processing Diagnosis Other error messages General Diagnosis Refer to the table above to select the correct troubleshooting steps below.
Requires the
cdc_adminapplication role.- Check batch details:
SELECT * FROM relationalai.api.data_stream_batches WHERE fq_object_name ILIKE '<MY_DB.MY_SCHEMA.MY_SOURCE_OBJECT>';
- Find the batch with
status = 'QUARANTINED'. - Review the
processing_detailsfor failed jobs or errors. - Check the error log:
SELECT * FROM relationalai.api.data_stream_errors WHERE id = 'rai.cdc.<MY_DATA_STREAM_ID>';
Requires
ACCOUNTADMINprivileges.- Check task history:
SELECT * FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(TASK_NAME => '<MY_DATA_STREAM_ID>_TASK'));
- Look for
status,error_code, anderror_messagein the latest run.
- Note these timestamps from your stream status in Step 1:
last_batch_unloaded_timestampquarantined_on
- Review any schema changes to your source between these times. If the table schema changed (e.g., columns were added or removed), it could cause the stream to be quarantined.
- Check batch details:
-
Recreate the stream
Call
relationalai.api.delete_data_stream()to delete the quarantined stream. This forces the stream to be re-created the next time a program using the stream is run.To delete a data stream, use the
api.delete_data_stream()procedure:-- Delete a data stream. Replace the placeholders with your database, schema, and table or view name.CALL relationalai.api.delete_data_stream('<db>.<schema>.<table_or_view>');Output +-----------------------------------+| Data stream deleted successfully. |+-----------------------------------+To delete a data stream, create a Snowpark session from your default PyRel connection with
create_config(), then call theapi.delete_data_stream()procedure:from relationalai.config import create_configsession = create_config().get_session()# Delete a data stream. Replace the placeholders with your database, schema,# and table or view name.session.sql("CALL relationalai.api.delete_data_stream('<db>.<schema>.<table_or_view>')").collect()If you continue to encounter issues, please contact support.
Troubleshoot common data stream issues
Section titled “Troubleshoot common data stream issues”Use this table if the source and stream were set up successfully, but later source changes are not producing the data stream behavior you expect.
| Symptom | Likely cause | Fix |
|---|---|---|
| The source and stream were set up, but no data is flowing. | The source object is not a supported Snowflake table or view, the Snowflake role cannot SELECT from it, or change tracking is not enabled on the source. | Recheck what Snowflake objects can be used as data sources and enable change tracking. Make sure the role used for the source can read the object. |
| The source looks valid, but no streams are picking up recent changes. | The CDC service is disabled, suspended, or unhealthy enough that streams cannot advance. | Check verify the CDC service is ready. If needed, enable the CDC service or restore the service to a healthy state. Then check individual streams again. |
| CDC is enabled, but a stream is not syncing. | That stream is unhealthy, missing, or not syncing normally. | Use view all data streams to find the stream, then inspect it with view details for one data stream to check if the stream is suspended or if there are errors. If the stream is suspended, resume the stream. If there are errors, fix them or try deleting the stream. PyRel will recreate the next time it is needed. |
| The stream is in a quarantined state. | The stream hit a condition that stopped processing, such as a source-side issue or a repeated processing failure. | Review the affected stream and follow Fix Quarantined Streams. Resolve the underlying issue before you expect new changes to flow again. |
| The stream looks healthy, but PyRel query results still seem older than expected. | This is usually query-time freshness behavior in PyRel, not a CDC service or stream health issue covered on this page. | Troubleshoot PyRel freshness and data sync behavior in Configure data sync behavior. If you still see outdated or incorrect data, the stream may be stale. Delete the stream so that PyRel can recreate it. |