Skip to content

Data Ingestion

This how-to guide demonstrates how to perform data ingestion procedures in RAI.

Goal

This guide shows you how to carry out data ingestion in a RAI database using the Rel language. In this guide, you’ll learn the steps for common data workflows and transformations, including schema mappings and ways to ensure data integrity.

Note that the provided examples within this guide are focused on data loading from the cloud. These workflows apply to all data formats, not just CSV data.

Introduction

In RAI, following the Datalog nomenclature, relations are divided into two categories:

Relation TypeDescription
BaseRAI database objects where all the data are stored and persisted. They are analogous to tables in SQL databases.
DerivedRelations whose content is defined by logical rules. Like base relations, their content is organized in tuples. Unlike base relations, the rules defining a derived relation, rather than its content, are persisted in the database. Their content may or may not be stored in the database (a process known as materialization). Derived relations are analogous to views in SQL databases.

When you define a base or derived relation, RAI’s system tracks the arity and type of the tuples in the relation. It provides an automatic schema generation. Unlike traditional database systems, you do not have to specify this information beforehand as a “database schema.” You can, however, enforce schemas as you will learn throughout this guide.

Therefore, the ingestion workflows mainly rely on:

Ingestion Workflows

The data ingestion workflow involves several steps: Extracting data from different sources, Loading them into a data warehouse, and Transforming the data — including cleaning schema mapping — to prepare the data for further consumption. This corresponds to the Extract, Load, and Transform (ELT/ETL) paradigm and its possible variants.

The RKGS supports multiple forms of ELT and ETL workflows. Deciding which workflow to adopt depends on your needs and, more particularly, on the data in the ELT process that should be considered the “source of truth.” The following table summarizes the main recommended workflows that the RKGS supports:

WorkflowSource of TruthDescription
ELTRaw dataThe loaded data without transformations are stored in the database as base relations.
ETLTransformed dataThe transformed data are stored in the database as base relations.

Moreover, there are other special ELT cases you can consider:

WorkflowSource of TruthDescription
ELT-ExternalExternal dataThe reference data are stored in the database as derived relations.
ELT-MultipleMultiple sourcesSeveral data sources are stored in the database as base relations.

ELT

In this scenario, RAI becomes your data warehouse. You can think of the transformed data as a view on top of the loaded raw data. In this way, everything is reactive and up to date as new data come in.

DataDatabase statusSource of truthUpdates
RawBase relationRequires a write query
TransformedDerived relationAutomatically and incrementally maintained

Here’s an example. You start by loading and storing your raw data — directly from some data source — without any transformation:

// write query
 
def insert:raw_elt = load_csv["azure://raidocs.blob.core.windows.net/data-ingestion/raw_data.csv"]
def output = ::std::display::table[raw_elt]

Your raw data become the source of truth.

Then, you install the derived relation transformed_elt containing the transformed data:

// model
 
@outline
def my_transformation[D, C](col, row, value) {
    D(col_in, row, v)
    and
    col = (C:rename[col_in] <++ col_in)
    and
    value = v
    from v, col_in
}
 
@outline
module trans_col[D]
    module rename
        def Name = :Full_name
    end
end
// model
 
def transformed_elt = my_transformation[raw_elt, trans_col[raw_elt]]
// read query
 
def output  = ::std::display::table[transformed_elt]

In this case, the transformation my_transformation is also installed as logic in the database. This is usually done before the data import. In this example, the column “Name” in your dataset is now “Full_name.” You can change this transformation logic in my_transformation after the data import. This is possible because all derived relations will be automatically updated as the derived logic changes (similar to changes in the underlying data).

ETL

In this approach, you first perform a transformation over the loaded data. After that, the transformed data are persisted in the database as a base relation. This base relation becomes the source of truth.

DataDatabase statusSource of truthUpdates
RawNot persistedRequires a write query
TransformedBase relationAutomatically and incrementally maintained

For instance, consider the following ingestion example:

// write query
 
def raw_data = load_csv["azure://raidocs.blob.core.windows.net/data-ingestion/raw_data.csv"]
def insert:transformed_etl = my_transformation[raw_data, trans_col[raw_data]]
 
def output = ::std::display::table[transformed_etl]

The example above defines the raw_data relation by loading the source dataset as a CSV file from the cloud. Then, it performs some transformation through the relation my_transformation over the loaded data. In this example, the column “Name” in your dataset is now “Full_name.” Finally, it stores the transformed data into the database as a base relation. The relation transformed_data is now your source of truth.

The performed transformation may or may not be persisted as installed logic, depending on the use case. For example, when still in developing or debugging mode, the installation might be inconvenient. This is how the example above looks like when the transformation is not installed as logic:

// write query
 
def raw_data = load_csv["azure://raidocs.blob.core.windows.net/data-ingestion/raw_data.csv"]
 
@outline
def my_transformation[D, C](col, row, value) {
    D(col_in, row, v)
    and
    col = (C:rename[col_in] <++ col_in)
    and
    value = v
    from v, col_in
}
 
@outline
module trans_col[D]
    module rename
        def Name = :Full_name
    end
end
 
def insert:transformed_etl = my_transformation[raw_data, trans_col[raw_data]]
def output = ::std::display::table[transformed_etl]

For more details, see Transformations.

With this approach, when new data come in, the source of truth is always kept up to date since the transformation is automatically applied to the new data through the insert declaration. As a result, the transformation is either performed on the spot within the same transaction or installed as logic.

Special Cases

ELT-External

The source of truth lies externally of RAI. All logic is installed as derived relations through models, and they get updated whenever this logic is triggered — which is with each transaction.

DataDatabase statusSource of truthUpdates
RawDerived relationAutomatically and incrementally maintained
TransformedDerived relationAutomatically and incrementally maintained

Here’s an example:

// model
 
def raw_data_logic = load_csv["azure://raidocs.blob.core.windows.net/data-ingestion/raw_data.csv"]
def transformed_data_logic = my_transformation[raw_data_logic, trans_col[raw_data_logic]]
// read query
 
def output  = ::std::display::table[transformed_data_logic]

There is one potential pitfall when the import fails: Every relation will be “reset” to empty.

With this approach, when new data come in, the only way to update your source of truth is by triggering the installed derived relations above, which can be done with any query request.

ELT-Multiple

There may exist cases where storing raw and transformed data as base relations is convenient. For instance, it may be beneficial for performance purposes.

DataDatabase statusSource of truthUpdates
RawBase relationRequires a write query
TransformedBase relationRequires a write query

Here’s an example:

// write query
 
def insert:raw_source = load_csv["azure://raidocs.blob.core.windows.net/data-ingestion/raw_data.csv"]
def insert:transformed_source = my_transformation[raw_source, trans_col[raw_source]]

However, this approach is not recommended since it creates two sources of truth, which can likely be out of sync and affect RAI’s RKGS incrementality. When the source of truth changes, you need to execute a write query to update both base relations.

Schema Declarations

Here are some useful practices to ingest data with predefined schema using RAI’s supported data files.

Note that the following cases are focused on the ingestion workflow. You can see the CSV Import and JSON Import guides for more details and examples on how to import data.

JSON-Defined Schema

A common pattern in Rel while ingesting data is to define the schema of the CSV data in a JSON file. Next, you can load the JSON-defined schema using load_json, and finally ingest the data through the load_csv function. This style of ingestion is mostly used because it decouples the code with the schema.

Say you have the following data saved in the CSV file test-ingestion.csv (opens in a new tab):

ID,Name,Sex,Age,Height,Weight
1724,Aristidis Akratopoulos,M,35,176,74
16616,Thomas Burke,M,21,183,66

Now you can define the schema in the JSON file config_json_1.json (opens in a new tab):

{
   "columns": [
      {"ID": "int"},
      {"Name": "string"} ,
      {"Sex": "string"},
      {"Age": "int"},
      {"Height": "string"},
      {"Weight": "string"}
   ]
}

Finally, you can ingest the data as follows:

// write query
 
// Load the schema.
def config_json[:path] = "azure://raidocs.blob.core.windows.net/data-ingestion/config_json_1.json"
def schema_json = load_json[config_json]
 
 
// Ingest the data.
def config_csv[:path] = "azure://raidocs.blob.core.windows.net/data-ingestion/test-ingestion.csv"
def config_csv[:schema] = col, type: schema_json(:columns, :[], _, col, type)
def insert[:ingested_data] = load_csv[config_csv]
def output = ::std::display::table[ingested_data]

Note that if your URL and/or cloud credentials to access the data within the relation :path contain the percent sign, %, you need to escape the character with \% or declare the string as a raw string (raw"..."). This is because % is used for string interpolation within a Rel string.

JSON-Defined Schema With Required Columns

As an extension of the previous case, you can specify in the JSON-defined schema which columns in the CSV data file are mandatory.

Consider the following data to be ingested within the CSV file test-ingestion2.csv (opens in a new tab):

ID,Name,Sex,Age,Height,Weight
NA,Aristidis Akratopoulos,M,NA,NA,NA
16616,Thomas Burke,M,21,183,66

Note that there are some missing values as “NA” in the example to showcase how loading errors are reported depending on the column category, i.e., if it is required or not.

Now you can define the schema in the JSON file config_json_2.json (opens in a new tab) by adding which columns are mandatory. Here are the first two columns in this example:

{
   "columns": [
      {"ID": "int", "required": true},
      {"Name": "string", "required": true } ,
      {"Sex": "string", "required": false},
      {"Age": "int", "required": false},
      {"Height": "string", "required": false},
      {"Weight": "string", "required": false}
   ]
}

Finally, you can ingest the data as follows:

// write query
 
// Load the schema.
def config_json[:path] = "azure://raidocs.blob.core.windows.net/data-ingestion/config_json_2.json"
def schema_json = load_json[config_json]
 
// Ingest the data.
def config_csv[:path] = "azure://raidocs.blob.core.windows.net/data-ingestion/test-ingestion2.csv"
def config_csv[:schema] = col, type: schema_json(:columns, :[], _, col, type) and col != :required
def insert[:ingested_data_req] = load_csv[config_csv]
def output = ingested_data_req

In the output, you can see that there are some import errors reported within the relation :load_errors. This allows you to check whether the reported errors are related to mandatory columns or not.

Following the example, you can first install the JSON schema:

// model
 
// Install the schema.
def config_json_model[:path] = "azure://raidocs.blob.core.windows.net/data-ingestion/config_json_2.json"
def schema_json_model = load_json[config_json_model]

Then, you can install the following relation for checking which rows contain errors related to the required columns in the installed schema:

// model
 
def req_col_missing_rows(row_num, col_num, row_value) =
    ingested_data_req(:load_errors, row_num, col_num, row_value) and
    schema_json_model(:columns, :[], col_num, :required, boolean_true)

Finally, you can query the relation req_col_missing_rows:

// read query
 
def output = req_col_missing_rows

As expected, it points to the first column (“ID”) and row of the ingested CSV file. This is because “ID” is a required column and has not been defined.

Bulk Ingestion

A frequent use case is the ingestion of several files at once into the same relation.

Same Schema

If all files come from the same data source, their structure should be the same and therefore you only need one schema definition.

Here is an example where you ingest two CSV files with the same JSON-defined schema config_json_1.json into the relation bulk_data:

// read query
 
// Load the schema.
def config_json[:path] = "azure://raidocs.blob.core.windows.net/data-ingestion/config_json_1.json"
def schema_json = load_json[config_json]
 
// Load the data.
def path_data = "azure://raidocs.blob.core.windows.net/data-ingestion/bulk_NNN.csv"
def partid = range[1,2,1]
def files = string_replace[path_data, "NNN", string[x]] for x in partid
 
module config[i in partid]
    def path = files[i]
    def schema = col, type: schema_json(:columns, :[], _, col, type)
end
 
def bulk_data(col, i, pos, v) = load_csv[config[i]](col, pos, v)
def output = ::std::display::table[bulk_data]

Different Schemas

On the other hand, you can have several data sources with potentially different file structures. In this case, each file requires its own schema:

// read query
 
// Set the range.
def iter = range[1,2,1]
 
// Define the URLs of schemas and data.
def path_schema = "azure://raidocs.blob.core.windows.net/data-ingestion/bulk_json_NNN.json"
def urls_schema = string_replace[path_schema, "NNN", string[i]] for i in iter
 
def path_data = "azure://raidocs.blob.core.windows.net/data-ingestion/bulk_NNN.csv"
def urls_data = string_replace[path_data, "NNN", string[i]] for i in iter
 
// Load the JSON schema.
module config_json[i in iter]
    def path = urls_schema[i]
end
def schema_json(i, col, type) = load_json[config_json[i]](:columns, :[], _, col, type)
 
// Load the CSV data.
module config_csv[i in iter]
    def path = urls_data[i]
    def schema = schema_json[i]
end
def bulk_data(col, i, pos, v) = load_csv[config_csv[i]](col, pos, v)
 
def output = ::std::display::table[bulk_data]

Transformations

When extracting data, especially from different sources, you need to perform certain actions so that your data match the data platform you are using. This step is essential to build a common and consistent data model across systems.

Transformations in Rel are usually performed through parameterized modules, where the data to be transformed are used as the input:

// Model
 
@inline
module my_transformation[raw_data]
   ...
end

Several common transformations within RAI are discussed in the following sections.

Data Cleaning and Preparation

This entails different tasks such as data type conversions, imputing missing values, renaming fields, adding and removing columns, etc.

Say you have the following data saved in the CSV file transformation.csv (opens in a new tab):

ID,DOB,Height,Weight,Name
1,1995-02-21,168,69,King Arthur
2,2015-10-30,177,68,Mckinney King
3,2000-11-19,187,77,Ranky Mark
4,2020-12-09,NA,69,Luther Barky
5,2000-01-19,179,NA,April Fuche

You also have the following JSON-defined schema trans_json_1.json (opens in a new tab):

{
   "columns": [
     {"ID": "int"},
     {"DOB": "string"},
     {"Height": "string"},
     {"Weight": "string"},
     {"Name": "string"}
   ]
}

Now, you want to perform some transformations following the ETL ingestion workflow. That is, the entire ingestion process is done without storing anything within the database. After the transformations are completed, you will insert the result as a base relation, which will become your source of truth.

In the following example, you first load the schema defined in JSON and data in CSV and then apply some data transformations, all in the same write query. Finally, you store the transformed data as the base relation transformed_data_ref:

// write query
 
// Load the schema.
def config_json1[:path] = "azure://raidocs.blob.core.windows.net/data-ingestion/trans_json_1.json"
def schema_json1 = load_json[config_json1]
 
// Load the data.
module config
    def path = "azure://raidocs.blob.core.windows.net/data-ingestion/transformation.csv"
    def schema = col, type: schema_json1(:columns, :[], _, col, type)
end
 
// Define transformations.
@ondemand @outline
def transform[D, C](col, row, value) {
    D(col_in, row, v)
    and
    col = (C:rename[col_in] <++ col_in)
    and
    not C:remove(col_in)
    and
    value = (C:transform[col_in, col, v] <++ v)
    from v, col_in
}
 
@inline
module my_col[D]
    module rename
        def DOB = :DOB
        def DOB = :DOB_next_year
        def Height = :Height
        def Name = :Full_name
    end
 
    module remove
        def Weight = true
    end
 
    module transform
        def DOB:DOB(x, y) {parse_date(x, "Y-m-d", y)}
        def DOB:DOB_next_year(x, y) {parse_date[x, "Y-m-d"] + ^Year[1] = y}
        def Height:Height(x, y in Float) { y = (parse_float[x]  <++ mean[ D[:Height] . parse_float])}
    end
end
 
def csv_data = load_csv[config]
 
// Store the source of truth as a base relation.
def insert:transformed_data_ref = transform[csv_data, my_col[csv_data]]
def output = ::std::display::table[transformed_data_ref]

The code above defines the relation transform with arguments of some data, named D, and a parameterized module, named C. This parameterized module allows you to define certain transformations over the columns of the passed data. These transformations can be defined as nested modules. See, for instance, rename and remove transformation modules. In this case, the argument of the parameterized module is the loaded data, i.e., my_col[raw_data].

Note that some annotations precede the relation and module definitions. See Annotations in the Rel Reference manual for more details.

The following are different accomplished transformations.

Data Type Conversion

In order to obtain full performance from the RAI system, it is convenient to convert predefined data types within your dataset to Rel data types.

Note that the second column in the dataset above, “DOB”, represents the date of birth. As an example, you want to convert it to the Rel data type Date, instead of the String type as specified in the previous schema.

For this type conversion, you can use the built-in relation parse_date. In the example above, this corresponds to:

def DOB:DOB(x, y) {parse_date(x, "Y-m-d", y)}

Imputing Missing Values

There are some missing values represented as “NA” in the dataset above. In this case, you want to impute them with the average value of the column containing a certain missing value.

This is done within the transform module using the built-in relations mean and parse_float as follows:

def Height:Height(x, y in Float) { y = (parse_float[x]  <++ mean[ D[:Height] . parse_float])}

Note that imputing missing values is application dependent and, therefore, you may not need to handle them as in the previous example. One advantage of using the Graph Normal Form is that it eliminates the need to store missing values.

Adding and Removing Columns

The example below adds a new column, i.e., the :DOB_next_year relation, through the module rename. It contains the dates of birth plus one year. Moreover, it renames the column “Name” as “Full_name”:

module rename
    def DOB = :DOB
    def DOB = :DOB_next_year
    def Height = :Height
    def Name = :Full_name
end

Following the same approach, you can remove columns through another nested module. In this case, assume you do not need the “Weight” column:

module remove
    def Weight = true
end

Schema Mappings

A schema mapping defines how data are converted between the schema of an external data source and RAI’s system.

RAI’s system allows users to work with data stored in a highly normalized format called Graph Normal Form (GNF).

Transformation to Domain-Specific GNF

Generally, when importing data, the primary key is automatically defined as the FilePos.

However, when you transform your data into a domain-specific GNF, you usually want to use a different primary key. Specifically, use one that contains meaningful information and is not just a generic numeric identifier. In this way, your relation columns are more recognizable.

For instance, consider the previously stored base relation transformed_data_ref. Now, assume you want to define the column :Full_name as the primary key. You can do that as follows:

// write query
 
@ondemand @outline
def transform[D, T](column, key, value) {
    D(column, row, value)
    and T:primary_key_column(pk_column)
    and key = D[pk_column, row]
    from row, pk_column
}
 
def my_mapping:primary_key_column = :Full_name
 
def data_gnf = transform[transformed_data_ref, my_mapping]
 
def insert:data_gnf_base = data_gnf
def output = data_gnf_base

Defining the Canonical Ontology

The next step after your data are ingested into the system, i.e., loaded, transformed, and stored, is to define a canonical ontology to be used as your common data model.

The ontology describes the domain of interest including the different kinds of elements within your data and how they relate to each other. In Rel, you can mainly use entities and value types for this purpose.

For example, considering the previous base relation data_gnf_base as your source of truth, you can define an entity, named Person, to model all the data contained in that relation.

First, you declare the entity:

// model
 
entity type Person = String

When working with entities, you want to define two other relations. One maps entities to their identifying property like this:

// write query
 
def names(x) = data_gnf_base[:Full_name](pos, x) from pos
 
def insert:person_name = {
    ^Person[name], name from name in names
}

And another contains entities:

// model
 
def Person(p) = person_name(p, _)

Moreover, you can associate some properties to the entity:

// write query
 
def insert:person(prop, e, value) = {
    data_gnf_base(prop, name, value) and person_name(e, name) from name
}
 
def output = ::std::display::table[person]

Similarly, you can also define the following value type to model the Symbol :Height in data_gnf_base, i.e., the “Height” column in the ingested CSV file transformation.csv:

value type Height = Float

See Entities and Value Types for more details.

Data Integrity Checks

RAI’s system allows you to guarantee that performing data ingestions does not affect the integrity of the data. This is done through integrity constraints (ICs).

Ingestion Constraints

You can define ICs that enforce some rules to the loaded data. Combining ICs within write queries ensures that the datasets that are not adhering to the rules will not be inserted.

As an example, consider the dataset from Data Cleaning and Preparation. Now, say you want to ingest the dataset of people only if all people in the dataset are older than 10 years old. For that, you can write the following IC:

// write query
 
def config_json1[:path] = "azure://raidocs.blob.core.windows.net/data-ingestion/trans_json_1.json"
def schema_json1 = load_json[config_json1]
 
module config
    def path = "azure://raidocs.blob.core.windows.net/data-ingestion/transformation.csv"
    def schema = col, type: schema_json1(:columns, :[], _, col, type)
end
 
// Set an integrity constraint to enforce data integrity.
ic dob_violation(pos, date) {
    raw_data_ic(:DOB, pos, date)
    implies
    2021 - date_year[parse_date[date, "Y-m-d"]] > 10
}
 
def insert[:raw_data_ic] = load_csv[config]

As expected, it returns the INTEGRITY_CONSTRAINT_VIOLATION error message since there are two rows within the dataset that violate the defined dob_violation IC. Because of this violation, the whole transaction is aborted. In general, you can define ICs at the time of inserting the data when you want the data to adhere to certain constraints.

Transformations Verification

You can also define ICs that verify previously performed transformations.

Here’s another example with the same dataset. Among the transformations aforementioned, you performed a data type conversion where the “DOB” column is no longer a string but a Date Rel object.

You can use an IC to confirm this over the stored base relation transformed_data_ref:

// model
 
ic check_DOB(pos, dob){
    transformed_data_ref(:DOB, pos, dob)
    implies
    Date(dob)
}

See Integrity Constraints in the Rel Reference manual for more details.

Summary

You have learned how to ingest data using different approaches. By following the examples in this guide, you are now able to extract, load, and transform data using Rel. You have also become familiar with which workflow to apply based on your data format.

See Also

For more details on how to interact with data, see the rest of the Data Import and Export guides.

Was this doc helpful?