Skip to content

Get started with a coding agent

Use a coding agent and RelationalAI skills to turn raw CSV files into a working semantic model you can query and refine. This tutorial shows you how to set up PyRel, generate and inspect a PyRel semantic model, and use that model to answer a business question.

  • You have access to a Snowflake account with the RelationalAI (RAI) Native App installed.

    How do I know if the RAI Native App is installed? To check if the RAI Native App is installed in your Snowflake account:

    1. Log in to your Snowflake account.
    2. Run the following SQL command in a worksheet:
      SHOW APPLICATIONS;
    3. Check the output for an application named RELATIONALAI.
      • If you see it listed, the RAI Native App is installed.
      • If not, you need to install it.

    How do I install the RAI Native App? See Install the RAI Native App for Snowflake for instructions.

  • You have a Snowflake user with the RAI_DEVELOPER database role.

    How do I check my roles? To check your roles in Snowflake:

    1. Log in to your Snowflake account.
    2. Run the following SQL command in a worksheet:
      SHOW GRANTS TO USER <your_username>;
    3. Check the output in the name column for the RAI_DEVELOPER role.
      • If you see it listed, you have the required role.
      • If not, contact your Snowflake account administrator to have the role assigned to you. See Set Up User Access for the RAI Native App for more information.

Copy and paste the following prompt into your coding agent to install the RAI agent skills for your preferred coding agent. Replace <MY_CODING_AGENT> with the name of your agent, such as “Claude Code”, “GitHub Copilot”, or “OpenAI Codex”:

Install the RelationalAI Agent Skills (https://github.com/RelationalAI/rai-agent-skills) for <MY_CODING_AGENT>.
Provide me with instructions on how to keep up-to-date and leverage them across projects. Once installed, print the skill inventory given in the README.
  1. Gather your Snowflake connection details.

    You will need:

    DetailWhere to find it
    AccountLog in to the Snowflake account that has the RAI Native App installed. Your account can be extracted from the URL once you are logged in.

    For example, if your Snowflake URL is:

    https://app.snowflake.com/org123/account_name/...

    then your account is:

    org123-account_name
    UserThe Snowflake username you will use to connect. This user must have the RAI_DEVELOPER role.
    PasswordThe password you use to log in to Snowflake.
    WarehouseThe name of the Snowflake warehouse you want to use to execute SQL queries. Can be any existing X-SMALL or larger warehouse.
  2. Install RAI and set up your project environment.

    Both the agent prompt and manual steps below make use of the uv package manager because it manages project environments and package installation with one tool. This makes it easier to keep your agent’s Python setup isolated from the rest of your machine.

    Copy and paste the following prompt into your coding agent to install RAI and set up your project environment using the rai-setup skill. Replace the placeholders, such as <MY_SNOWFLAKE_USER>, with your Snowflake connection details:

    /rai-setup Use uv (install if missing) to create a new project called my-first-rai-project using version of Python supported by RelationalAI. Then use uv to install the relationalai package in this new project environment. Use username password auth with these values:
    - User: <MY_SNOWFLAKE_USER>
    - Account: <MY_SNOWFLAKE_ACCOUNT>
    - Warehouse: <MY_SNOWFLAKE_WAREHOUSE>
    - Role: RAI_DEVELOPER
    - Password: Use a placeholder and tell me how to edit my file with my password.
    • Uses uv to install relationalai and create the project environment if needed.
    • Creates or updates raiconfig.yaml with your Snowflake connection settings.
    • Uses a placeholder for your password and instructs you to edit the file.
  3. Verify your connection with rai connect.

    In your terminal, run:

    Terminal window
    uv run rai connect

    You should see a success message confirming that you are connected to Snowflake.

This tutorial uses sample CSV files from the Energy Grid Planning template. The template models an ERCOT-style grid planning scenario for evaluating AI data center interconnection requests against existing capacity and possible infrastructure upgrades.

The sample data includes six CSV files covering:

  • Substations
  • Generators
  • Transmission lines
  • Demand forecasts
  • Pending data center requests
  • Candidate substation upgrades

Follow these steps to download the sample data and add it to your project.

Copy and paste the following prompt into your coding agent to download the sample data and place it in your project:

Download the Energy Grid Planning sample data from https://docs.relational.ai/templates/zips/v1/energy_grid_planning.zip into my current project.
Extract the zip, copy the `data/` directory from the extracted `energy_grid_planning` folder into my project root as `./data`, and then clean up the downloaded zip file and extracted folder.

Now that you have some data, let’s use RelationalAI’s agent skills to build a semantic model and help you answer a complex business question.

RelationalAI semantic models are built using PyRel, a Python DSL for declaratively defining concepts, properties, relationships, and connecting them to your data. Use the rai-build-starter-ontology skill to build a PyRel semantic model based on the sample data.

  1. Invoke the rai-build-starter-ontology skill.

    Copy and paste the following prompt into your coding agent:

    /rai-build-starter-ontology Build an ontology for grid infrastructure planning from the CSVs in data/ covering substations, generators, transmission lines, demand forecasts, data center requests, and substation upgrades.
    Write the code to a file named energy_grid.py in the project root.
  2. Choose one-shot mode.

    The agent will prompt you to choose between one-shot and guided mode. For this tutorial, select one-shot mode so the agent generates the entire model in a single response.

  • Analyzes the CSV files in /data to infer the domain model.
  • Identifies the concepts, properties, and relationships to include in the model.
  • Writes a PyRel model with validation queries; running them may take 3 to 5 minutes.
  • A complete energy_grid.py model with concepts, relationships, and validation queries.
  • If the agent skips validation, run uv run energy_grid.py yourself.
  • Iterate until all validation queries pass and the counts and linked pairs look correct.

Expand the section below for a sample output of what the generated energy_grid.py.

Sample output using Claude Sonnet 4.6 (High) in CoPilot
energy_grid.py
"""Energy Grid Planning — RAI starter ontology.
Domain: ERCOT-style Texas grid, 12 substations, 10 hyperscaler interconnection
requests totalling 2,930 MW.
Scoped questions (Step 1):
1. Which substations have capacity headroom to absorb new data center load?
2. What are the forecast demand levels at each substation, with and without
DC growth, across planning horizons?
3. Which substation upgrades would enable the most pending DC requests?
In scope: substations, generators, transmission lines, demand forecasts,
data center requests, substation upgrades.
Out of scope (this version): load_history, customers, load_zones,
maintenance_windows, renewable_profiles, train/val/test_forecasts.
Data source: local CSVs (≤ 96 rows) loaded with model.data() — suitable for
this prototype. Promote to model.Table() when data moves to Snowflake.
Design decisions:
- All IDs are String (SUB-xxx, GEN-xxx, TL-xxx, DF-xxx, DC-xxx, UPG-xxx).
- Integer for whole-number MW / hours / cost values; Float for decimals.
- Boolean columns (IS_RENEWABLE, IS_ACTIVE, etc.) are read by pandas as
Boolean dtype (pandas parses lowercase "true"/"false" in CSV as bool).
They are modelled as unary Relationships (is_renewable, is_active,
is_ai_workload, includes_dc_growth, enables_low_carbon) — derived with
`== True` (Python bool, not the string "true").
- TransmissionLine has two Substation FK slots (from_substation /
to_substation). Split into two separate Properties to avoid silent
same-type-slot collapse (see SKILL.md § Step 4).
- All functional FKs (N Generator → 1 Substation, etc.) use Property, not
Relationship, because each source entity maps to exactly one target.
"""
from pathlib import Path
import pandas as pd
from relationalai.semantics import Float, Integer, Model, String
from relationalai.semantics import inspect
from relationalai.semantics.std import aggregates as aggs
# ---------------------------------------------------------------------------
# Model
# ---------------------------------------------------------------------------
model = Model("energy_grid_planning")
DATA_DIR = Path(__file__).parent / "data"
# ---------------------------------------------------------------------------
# Data sources (model.data — prototyping with local CSVs, ≤ hundreds of rows)
# ---------------------------------------------------------------------------
sub_src = model.data(pd.read_csv(DATA_DIR / "substations.csv"))
gen_src = model.data(pd.read_csv(DATA_DIR / "generators.csv"))
tl_src = model.data(pd.read_csv(DATA_DIR / "transmission_lines.csv"))
df_src = model.data(pd.read_csv(DATA_DIR / "demand_forecasts.csv"))
dc_src = model.data(pd.read_csv(DATA_DIR / "data_center_requests.csv"))
upg_src = model.data(pd.read_csv(DATA_DIR / "substation_upgrades.csv"))
# ---------------------------------------------------------------------------
# Concepts
# ---------------------------------------------------------------------------
Substation = model.Concept("Substation", identify_by={"id": String})
Generator = model.Concept("Generator", identify_by={"id": String})
TransmissionLine = model.Concept("TransmissionLine", identify_by={"id": String})
DemandForecast = model.Concept("DemandForecast", identify_by={"id": String})
DataCenterRequest = model.Concept("DataCenterRequest", identify_by={"id": String})
SubstationUpgrade = model.Concept("SubstationUpgrade", identify_by={"id": String})
# ---------------------------------------------------------------------------
# Properties — Substation
# ---------------------------------------------------------------------------
Substation.name = model.Property(f"{Substation} has {String:name}")
Substation.voltage_kv = model.Property(f"{Substation} has {Integer:voltage_kv}")
Substation.max_capacity_mw = model.Property(f"{Substation} has {Integer:max_capacity_mw}")
Substation.current_load_mw = model.Property(f"{Substation} has {Integer:current_load_mw}")
Substation.latitude = model.Property(f"{Substation} has {Float:latitude}")
Substation.longitude = model.Property(f"{Substation} has {Float:longitude}")
# ---------------------------------------------------------------------------
# Properties — Generator
# ---------------------------------------------------------------------------
Generator.name = model.Property(f"{Generator} has {String:name}")
Generator.gen_type = model.Property(f"{Generator} has {String:gen_type}")
Generator.capacity_mw = model.Property(f"{Generator} has {Integer:capacity_mw}")
Generator.min_output_mw = model.Property(f"{Generator} has {Integer:min_output_mw}")
Generator.ramp_rate_mw_per_hr = model.Property(f"{Generator} has {Integer:ramp_rate_mw_per_hr}")
Generator.startup_cost = model.Property(f"{Generator} has {Integer:startup_cost}")
Generator.marginal_cost = model.Property(f"{Generator} has {Float:marginal_cost}")
Generator.min_up_time_hrs = model.Property(f"{Generator} has {Integer:min_up_time_hrs}")
Generator.min_down_time_hrs = model.Property(f"{Generator} has {Integer:min_down_time_hrs}")
Generator.emissions_rate = model.Property(f"{Generator} has {Float:emissions_rate}")
# Functional FK: each Generator connects to exactly one Substation (N:1)
Generator.substation = model.Property(
f"{Generator} is connected to {Substation:substation}",
short_name="gen_substation",
)
# Unary flag: Generator is renewable
Generator.is_renewable = model.Relationship(f"{Generator} is renewable")
# ---------------------------------------------------------------------------
# Properties — TransmissionLine
# Two separate Properties for the same-type FK slots (from / to Substation)
# to avoid silent slot collapse on model.define().
# ---------------------------------------------------------------------------
TransmissionLine.capacity_mw = model.Property(f"{TransmissionLine} has {Integer:capacity_mw}")
TransmissionLine.length_km = model.Property(f"{TransmissionLine} has {Integer:length_km}")
TransmissionLine.impedance = model.Property(f"{TransmissionLine} has {Float:impedance}")
TransmissionLine.maintenance_priority = model.Property(f"{TransmissionLine} has {String:maintenance_priority}")
TransmissionLine.from_substation = model.Property(
f"{TransmissionLine} originates from {Substation:from_substation}",
short_name="tl_from",
)
TransmissionLine.to_substation = model.Property(
f"{TransmissionLine} terminates at {Substation:to_substation}",
short_name="tl_to",
)
# Unary flag: all 18 lines are currently active
TransmissionLine.is_active = model.Relationship(f"{TransmissionLine} is active")
# ---------------------------------------------------------------------------
# Properties — DemandForecast
# ---------------------------------------------------------------------------
DemandForecast.forecast_period = model.Property(f"{DemandForecast} has {Integer:forecast_period}")
DemandForecast.predicted_load_mw = model.Property(f"{DemandForecast} has {Float:predicted_load_mw}")
DemandForecast.confidence = model.Property(f"{DemandForecast} has {Float:confidence}")
# Functional FK: each forecast is for one Substation (N:1)
DemandForecast.substation = model.Property(
f"{DemandForecast} forecasts load for {Substation:substation}",
short_name="df_substation",
)
# Unary flag: forecast scenario includes projected DC growth
DemandForecast.includes_dc_growth = model.Relationship(f"{DemandForecast} includes dc growth")
# ---------------------------------------------------------------------------
# Properties — DataCenterRequest
# ---------------------------------------------------------------------------
DataCenterRequest.name = model.Property(f"{DataCenterRequest} has {String:name}")
DataCenterRequest.hyperscaler = model.Property(f"{DataCenterRequest} has {String:hyperscaler}")
DataCenterRequest.requested_mw = model.Property(f"{DataCenterRequest} has {Integer:requested_mw}")
DataCenterRequest.annual_revenue_per_mw = model.Property(f"{DataCenterRequest} has {Integer:annual_revenue_per_mw}")
DataCenterRequest.pue = model.Property(f"{DataCenterRequest} has {Float:pue}")
DataCenterRequest.cooling_type = model.Property(f"{DataCenterRequest} has {String:cooling_type}")
DataCenterRequest.low_carbon_requirement_pct = model.Property(
f"{DataCenterRequest} has {Float:low_carbon_requirement_pct}"
)
DataCenterRequest.queue_position = model.Property(f"{DataCenterRequest} has {Integer:queue_position}")
DataCenterRequest.status = model.Property(f"{DataCenterRequest} has {String:status}")
# Functional FK: each DC request targets one Substation (N:1)
DataCenterRequest.substation = model.Property(
f"{DataCenterRequest} is sited at {Substation:substation}",
short_name="dc_substation",
)
# Unary flag: AI/GPU workload (higher power density, liquid cooling preference)
DataCenterRequest.is_ai_workload = model.Relationship(f"{DataCenterRequest} is ai workload")
# ---------------------------------------------------------------------------
# Properties — SubstationUpgrade
# ---------------------------------------------------------------------------
SubstationUpgrade.capacity_increase_mw = model.Property(
f"{SubstationUpgrade} has {Integer:capacity_increase_mw}"
)
SubstationUpgrade.cost_million = model.Property(f"{SubstationUpgrade} has {Integer:cost_million}")
SubstationUpgrade.lead_time_months = model.Property(f"{SubstationUpgrade} has {Integer:lead_time_months}")
# Functional FK: each upgrade is for one Substation (N:1)
SubstationUpgrade.substation = model.Property(
f"{SubstationUpgrade} upgrades {Substation:substation}",
short_name="upg_substation",
)
# Unary flag: upgrade enables low-carbon interconnection compliance
SubstationUpgrade.enables_low_carbon = model.Relationship(f"{SubstationUpgrade} enables low carbon")
# ---------------------------------------------------------------------------
# Data bindings
# ---------------------------------------------------------------------------
# --- Substations (12 rows) ---
model.define(Substation.new(
id=sub_src.ID,
name=sub_src.NAME,
voltage_kv=sub_src.VOLTAGE_KV,
max_capacity_mw=sub_src.MAX_CAPACITY_MW,
current_load_mw=sub_src.CURRENT_LOAD_MW,
latitude=sub_src.LATITUDE,
longitude=sub_src.LONGITUDE,
))
# --- Generators (15 rows) ---
model.define(Generator.new(
id=gen_src.ID,
name=gen_src.NAME,
gen_type=gen_src.GEN_TYPE,
capacity_mw=gen_src.CAPACITY_MW,
min_output_mw=gen_src.MIN_OUTPUT_MW,
ramp_rate_mw_per_hr=gen_src.RAMP_RATE_MW_PER_HR,
startup_cost=gen_src.STARTUP_COST,
marginal_cost=gen_src.MARGINAL_COST,
min_up_time_hrs=gen_src.MIN_UP_TIME_HRS,
min_down_time_hrs=gen_src.MIN_DOWN_TIME_HRS,
emissions_rate=gen_src.EMISSIONS_RATE,
substation=Substation.filter_by(id=gen_src.SUBSTATION_ID),
))
model.where(
Generator.filter_by(id=gen_src.ID),
gen_src.IS_RENEWABLE == True,
).define(Generator.is_renewable())
# --- Transmission lines (18 rows) ---
# Two filter_by() calls produce independent Substation references, so from ≠ to.
model.define(TransmissionLine.new(
id=tl_src.ID,
capacity_mw=tl_src.CAPACITY_MW,
length_km=tl_src.LENGTH_KM,
impedance=tl_src.IMPEDANCE,
maintenance_priority=tl_src.MAINTENANCE_PRIORITY,
from_substation=Substation.filter_by(id=tl_src.FROM_SUBSTATION_ID),
to_substation=Substation.filter_by(id=tl_src.TO_SUBSTATION_ID),
))
model.where(
TransmissionLine.filter_by(id=tl_src.ID),
tl_src.IS_ACTIVE == True,
).define(TransmissionLine.is_active())
# --- Demand forecasts (96 rows: 12 substations × 4 periods × 2 DC variants) ---
model.define(DemandForecast.new(
id=df_src.ID,
forecast_period=df_src.FORECAST_PERIOD,
predicted_load_mw=df_src.PREDICTED_LOAD_MW,
confidence=df_src.CONFIDENCE,
substation=Substation.filter_by(id=df_src.SUBSTATION_ID),
))
model.where(
DemandForecast.filter_by(id=df_src.ID),
df_src.INCLUDES_DC_GROWTH == True,
).define(DemandForecast.includes_dc_growth())
# --- Data center requests (10 rows, all status="pending") ---
model.define(DataCenterRequest.new(
id=dc_src.ID,
name=dc_src.NAME,
hyperscaler=dc_src.HYPERSCALER,
requested_mw=dc_src.REQUESTED_MW,
annual_revenue_per_mw=dc_src.ANNUAL_REVENUE_PER_MW,
pue=dc_src.PUE,
cooling_type=dc_src.COOLING_TYPE,
low_carbon_requirement_pct=dc_src.LOW_CARBON_REQUIREMENT_PCT,
queue_position=dc_src.QUEUE_POSITION,
status=dc_src.STATUS,
substation=Substation.filter_by(id=dc_src.SUBSTATION_ID),
))
model.where(
DataCenterRequest.filter_by(id=dc_src.ID),
dc_src.IS_AI_WORKLOAD == True,
).define(DataCenterRequest.is_ai_workload())
# --- Substation upgrades (10 rows) ---
model.define(SubstationUpgrade.new(
id=upg_src.ID,
capacity_increase_mw=upg_src.CAPACITY_INCREASE_MW,
cost_million=upg_src.COST_MILLION,
lead_time_months=upg_src.LEAD_TIME_MONTHS,
substation=Substation.filter_by(id=upg_src.SUBSTATION_ID),
))
model.where(
SubstationUpgrade.filter_by(id=upg_src.ID),
upg_src.ENABLES_LOW_CARBON == True,
).define(SubstationUpgrade.enables_low_carbon())
# ---------------------------------------------------------------------------
# Step 7 — Validation queries
# Run: python energy_grid.py
# ---------------------------------------------------------------------------
if __name__ == "__main__":
# 7b — Count instances per concept (zero means data binding failed)
print("=" * 60)
print("7b — Instance counts")
print("=" * 60)
for concept, expected in [
(Substation, 12),
(Generator, 15),
(TransmissionLine, 18),
(DemandForecast, 96),
(DataCenterRequest, 10),
(SubstationUpgrade, 10),
]:
result = model.select(aggs.count(concept).alias("count")).to_df()
count = result["count"].iloc[0] if len(result) > 0 else 0
status = "OK" if count == expected else f"WARN expected {expected}"
print(f" {concept!s:<22} count={count} {status}")
# 7c — Verify FK joins resolved
print()
print("=" * 60)
print("7c — FK join verification")
print("=" * 60)
# Generator → Substation
df = model.where(
Generator.substation(Substation)
).select(
Generator.id.alias("generator_id"),
Substation.id.alias("substation_id"),
).to_df()
print(f" Generator→Substation linked pairs: {len(df)} (expect 15)")
# TransmissionLine from/to Substation (same-type-slot check: from ≠ to)
from_sub = Substation.ref()
to_sub = Substation.ref()
df = model.where(
TransmissionLine.from_substation(from_sub),
TransmissionLine.to_substation(to_sub),
).select(
TransmissionLine.id.alias("line_id"),
from_sub.id.alias("from_id"),
to_sub.id.alias("to_id"),
).to_df()
print(f" TransmissionLine with both endpoints: {len(df)} (expect 18)")
mismatched = (df["from_id"] == df["to_id"]).sum()
print(f" Lines where from_id == to_id (should be 0): {mismatched}")
# DemandForecast → Substation
df = model.where(
DemandForecast.substation(Substation)
).select(
DemandForecast.id.alias("forecast_id"),
Substation.id.alias("substation_id"),
).to_df()
print(f" DemandForecast→Substation linked pairs: {len(df)} (expect 96)")
# DataCenterRequest → Substation
df = model.where(
DataCenterRequest.substation(Substation)
).select(
DataCenterRequest.id.alias("dc_id"),
Substation.id.alias("substation_id"),
).to_df()
print(f" DataCenterRequest→Substation linked pairs: {len(df)} (expect 10)")
# SubstationUpgrade → Substation
df = model.where(
SubstationUpgrade.substation(Substation)
).select(
SubstationUpgrade.id.alias("upgrade_id"),
Substation.id.alias("substation_id"),
).to_df()
print(f" SubstationUpgrade→Substation linked pairs: {len(df)} (expect 10)")
# Unary flag counts
print()
df = model.select(aggs.count(Generator).alias("n")).where(Generator.is_renewable()).to_df()
print(f" Renewable generators: {df['n'].iloc[0] if len(df) else 0} (expect 7)")
df = model.select(aggs.count(DemandForecast).alias("n")).where(DemandForecast.includes_dc_growth()).to_df()
print(f" DC-growth forecasts: {df['n'].iloc[0] if len(df) else 0} (expect 48)")
df = model.select(aggs.count(DataCenterRequest).alias("n")).where(DataCenterRequest.is_ai_workload()).to_df()
print(f" AI-workload DC requests: {df['n'].iloc[0] if len(df) else 0} (expect 7)")
df = model.select(aggs.count(SubstationUpgrade).alias("n")).where(SubstationUpgrade.enables_low_carbon()).to_df()
print(f" Low-carbon upgrades: {df['n'].iloc[0] if len(df) else 0}")
# 7d — Scoped question 1: substation capacity headroom vs pending DC demand
print()
print("=" * 60)
print("Q1 — Substation capacity headroom vs pending DC requests")
print("=" * 60)
df = model.where(
DataCenterRequest.substation(Substation),
DataCenterRequest.status == "pending",
).select(
Substation.id.alias("substation_id"),
Substation.name.alias("substation_name"),
Substation.max_capacity_mw.alias("max_mw"),
Substation.current_load_mw.alias("current_mw"),
aggs.sum(DataCenterRequest.requested_mw).per(Substation).alias("pending_dc_mw"),
).to_df()
print(df.to_string(index=False))
# 7d — Scoped question 2: 24-month demand forecasts, with vs without DC growth
print()
print("=" * 60)
print("Q2 — 24-month demand forecasts (without DC growth)")
print("=" * 60)
df = model.where(
DemandForecast.substation(Substation),
DemandForecast.forecast_period == 24,
).select(
Substation.id.alias("substation_id"),
DemandForecast.predicted_load_mw.alias("predicted_mw"),
DemandForecast.confidence.alias("confidence"),
).to_df()
# exclude the includes_dc_growth=true rows
print(df.to_string(index=False))
# 7d — Scoped question 3: upgrades per substation, sorted by capacity added
print()
print("=" * 60)
print("Q3 — Available substation upgrades (capacity + cost + lead time)")
print("=" * 60)
df = model.where(
SubstationUpgrade.substation(Substation),
).select(
Substation.id.alias("substation_id"),
Substation.name.alias("substation_name"),
SubstationUpgrade.id.alias("upgrade_id"),
SubstationUpgrade.capacity_increase_mw.alias("capacity_increase_mw"),
SubstationUpgrade.cost_million.alias("cost_million"),
SubstationUpgrade.lead_time_months.alias("lead_time_months"),
).to_df()
print(df.to_string(index=False))
# 7e — inspect.schema() — what actually registered
print()
print("=" * 60)
print("7e — Registered model schema (inspect.schema)")
print("=" * 60)
schema = inspect.schema(model)
scoped_concepts = [
"Substation", "Generator", "TransmissionLine",
"DemandForecast", "DataCenterRequest", "SubstationUpgrade",
]
for concept_name in scoped_concepts:
c = schema[concept_name]
idents = ", ".join(f"{f.name}:{f.type_name}" for f in c.identify_by)
print(f"\n{concept_name} [id: {idents}]")
for prop in c.properties:
print(f" .{prop.name}: {prop.type_name}")
for rel in c.relationships:
print(f" ~{rel.name}: {rel.reading}")
print(f"\nData sources: {[t.name for t in schema.tables]}")

To help you understand the structure of the model you just built, use the rai-querying skill to visualize the ontology as a concept-relationship diagram and report row counts per concept.

Copy and paste the following prompt into your coding agent to invoke the skill:

/rai-querying Show the ontology as a concept-relationship diagram and report row counts per concept.
  • Generates a concept-relationship diagram of your model so you can inspect its structure.
  • A diagram with concepts as boxes and relationships as labeled arrows.
  • Some agents render it directly; others return Mermaid you can render yourself.

Expand the section below for a sample output of what the generated diagram might look like.

Sample output from Claude Sonnet 4.6 High in CoPilot
erDiagram
    Substation {
        String id PK
        String name
        Integer voltage_kv
        Integer max_capacity_mw
        Integer current_load_mw
        Float latitude
        Float longitude
    }

    Generator {
        String id PK
        String name
        String gen_type
        Integer capacity_mw
        Integer min_output_mw
        Integer ramp_rate_mw_per_hr
        Integer startup_cost
        Float marginal_cost
        Integer min_up_time_hrs
        Integer min_down_time_hrs
        Float emissions_rate
        Boolean is_renewable
    }

    TransmissionLine {
        String id PK
        Integer capacity_mw
        Integer length_km
        Float impedance
        String maintenance_priority
        Boolean is_active
    }

    DemandForecast {
        String id PK
        Integer forecast_period
        Float predicted_load_mw
        Float confidence
        Boolean includes_dc_growth
    }

    DataCenterRequest {
        String id PK
        String name
        String hyperscaler
        Integer requested_mw
        Integer annual_revenue_per_mw
        Float pue
        String cooling_type
        Float low_carbon_requirement_pct
        Integer queue_position
        String status
        Boolean is_ai_workload
    }

    SubstationUpgrade {
        String id PK
        Integer capacity_increase_mw
        Integer cost_million
        Integer lead_time_months
        Boolean enables_low_carbon
    }

    Substation ||--o{ Generator : "connected to (gen_substation)"
    Substation ||--o{ TransmissionLine : "originates from (tl_from)"
    Substation ||--o{ TransmissionLine : "terminates at (tl_to)"
    Substation ||--o{ DemandForecast : "forecasts load for (df_substation)"
    Substation ||--o{ DataCenterRequest : "is sited at (dc_substation)"
    Substation ||--o{ SubstationUpgrade : "upgrades (upg_substation)"
Diagram source code (Mermaid syntax)
```mermaid
erDiagram
Substation {
String id PK
String name
Integer voltage_kv
Integer max_capacity_mw
Integer current_load_mw
Float latitude
Float longitude
}
Generator {
String id PK
String name
String gen_type
Integer capacity_mw
Integer min_output_mw
Integer ramp_rate_mw_per_hr
Integer startup_cost
Float marginal_cost
Integer min_up_time_hrs
Integer min_down_time_hrs
Float emissions_rate
Boolean is_renewable
}
TransmissionLine {
String id PK
Integer capacity_mw
Integer length_km
Float impedance
String maintenance_priority
Boolean is_active
}
DemandForecast {
String id PK
Integer forecast_period
Float predicted_load_mw
Float confidence
Boolean includes_dc_growth
}
DataCenterRequest {
String id PK
String name
String hyperscaler
Integer requested_mw
Integer annual_revenue_per_mw
Float pue
String cooling_type
Float low_carbon_requirement_pct
Integer queue_position
String status
Boolean is_ai_workload
}
SubstationUpgrade {
String id PK
Integer capacity_increase_mw
Integer cost_million
Integer lead_time_months
Boolean enables_low_carbon
}
Substation ||--o{ Generator : "connected to (gen_substation)"
Substation ||--o{ TransmissionLine : "originates from (tl_from)"
Substation ||--o{ TransmissionLine : "terminates at (tl_to)"
Substation ||--o{ DemandForecast : "forecasts load for (df_substation)"
Substation ||--o{ DataCenterRequest : "is sited at (dc_substation)"
Substation ||--o{ SubstationUpgrade : "upgrades (upg_substation)"
```

Use the rai-discovery skill to refine your questions and choose the right skills and prompts. In this grid planning scenario, the goal is to decide which requests to approve and upgrades to fund.

Copy and paste the following prompt into your coding agent to invoke the skill:

/rai-discovery I need to determine which datacenter requests to approve and which upgrades to fund at what budget level. What combination of skills/prompts can I use to answer these questions? Give me options as:
"[/skill1 [+ /skill2 [+ /skill3 ...]]] <prompt>"
  • Evaluates your question against the model and suggests relevant skills and prompts.
  • Checks which concepts and properties matter and whether you need more data.
  • Returns prompt options, what each reveals, and a recommended path forward.

The output will vary, but you can expect several options that combine skills such as rai-rules-authoring, rai-prescriptive-problem-formulation, and rai-graph-analysis to approach the problem in different ways.

Sample output from Claude Sonnet 4.6 High in CoPilot

Based on the ontology — DataCenterRequest (with requested_mw, annual_revenue_per_mw, queue_position, FK to Substation) and SubstationUpgrade (with cost_million, capacity_increase_mw, enables_low_carbon, FK to Substation) — both questions are READY (all needed data is in the model). The “at what budget level” phrasing is a classic scenario parameter signal.

Here are three distinct paths:

Option 1 — Joint optimization (approve requests + fund upgrades simultaneously)

/rai-prescriptive-problem-formulation + /rai-prescriptive-solver-management + /rai-prescriptive-results-interpretation Jointly select which DataCenterRequests to approve and which SubstationUpgrades to fund to maximize total annual revenue (requested_mw × annual_revenue_per_mw), subject to per-substation capacity constraints (current_load_mw + approved MW ≤ max_capacity_mw + funded capacity_increase_mw) and a total upgrade budget — run as scenarios at 100M, and $150M.

Option 2 — Rules pre-screen, then optimize the hard cases

/rai-rules-authoring + /rai-prescriptive-problem-formulation + /rai-prescriptive-solver-management + /rai-prescriptive-results-interpretation First classify which DataCenterRequests can be approved immediately at current substation headroom (max_capacity_mw − current_load_mw ≥ requested_mw) using rules, then optimize approval of the remaining requests alongside upgrade funding within budget — gives an explainable “fast-track vs needs-upgrade” split before optimization runs.

Option 3 — Identify critical substations first, then optimize weighted by network importance

/rai-graph-analysis + /rai-prescriptive-problem-formulation + /rai-prescriptive-solver-management + /rai-prescriptive-results-interpretation Run centrality on the TransmissionLine-connected Substation network to identify which substations are critical connectors, then select SubstationUpgrades and approve DataCenterRequests weighted by substation centrality score — prioritizes upgrades at high-impact nodes alongside revenue.

Recommendation: Start with Option 1 — it’s the most direct answer to both questions together, the scenario budget parameter makes the “at what level” question explicit, and all data is already in the model. Use Option 2 if stakeholders need a clear “which requests need no upgrades” cut before seeing optimization results. Use Option 3 if grid resilience is a factor alongside revenue.

Pick one of the options from the previous step. Then paste its prompt into your coding agent to run the required skills.

For example, we start with Option 3 in the previous step’s sample output because it combines graph analysis and prescriptive optimization:

`/rai-graph-analysis + /rai-prescriptive-problem-formulation + /rai-prescriptive-solver-management + /rai-prescriptive-results-interpretation` Run centrality on the `TransmissionLine`-connected `Substation` network to identify which substations are critical connectors, then select `SubstationUpgrade`s and approve `DataCenterRequest`s weighted by substation centrality score — prioritizes upgrades at high-impact nodes alongside revenue.
  • Runs graph analysis to find critical substations in the transmission network.
  • Uses those results to optimize which requests and upgrades to approve.
  • A Python script such as centrality_upgrade_selection.py.
  • A results summary showing approved requests, selected upgrades, and why.

Expand the section below for a sample output of what the generated code and results might look like.

Sample output from Claude Sonnet 4.6 High in CoPilot
"""centrality_upgrade_selection.py — Energy Grid Planning
======================================================================
Centrality-driven substation upgrade selection and DC request approval.
Phase 1 — Graph Analysis
• Nodes: Substations (12)
• Edges: Active TransmissionLines (undirected — physical bidirectional)
• Algorithm: betweenness_centrality — identifies critical connector
substations (nodes on the most shortest paths; structural bottlenecks)
• betweenness requires weighted=False
Phase 2 — Coefficient Computation (pandas)
• DC weighted revenue = requested_mw × annual_revenue_per_mw × betweenness
• Upgrade capacity bonus = capacity_increase_mw × 100,000 $/MW × betweenness
(100k $/MW is midpoint of DC revenue range; keeps bonus on comparable scale)
Phase 3 — Enrichment (load coefficients back into model)
Phase 4 — Optimization (MILP via HiGHS)
• x_approve ∈ {0,1} per DataCenterRequest — approve or reject
• x_select ∈ {0,1} per SubstationUpgrade — build or skip
• Objective (maximize):
Σ centrality_weighted_revenue × x_approve (approved DC revenue)
+ Σ centrality_capacity_bonus × x_select (capacity unlocked at bottlenecks)
• Constraints:
(1) Upgrade budget ≤ $300M
(2) Per substation: Σ approved DC load ≤ headroom + Σ selected upgrade capacity
(3) At most one upgrade option per substation
======================================================================
"""
import pandas as pd
from relationalai.semantics import Float, Integer, String
from relationalai.semantics.std import aggregates as aggs
from relationalai.semantics.reasoners.graph import Graph
from relationalai.semantics.reasoners.prescriptive import Problem
# Import the base ontology — all concept definitions and data loading are
# already executed at module level; importing avoids duplicate Model creation.
from energy_grid import (
model,
Substation,
TransmissionLine,
DataCenterRequest,
SubstationUpgrade,
substations_df,
dc_requests_df,
upgrades_df,
)
# =====================================================================
# Phase 1 — Graph: Betweenness Centrality
# =====================================================================
# Undirected (physical lines are bidirectional), unweighted (betweenness
# requires weighted=False). node_concept=Substation binds graph.Node
# directly to Substation, so the centrality assignment is shorthand —
# no explicit model.where(graph.Node == Substation).define(...) needed.
# aggregator="sum" collapses parallel edges between the same substation pair
# (TL-009 and TL-014 both connect SUB-010↔SUB-002; undirected graph treats them
# as one topological edge — aggregator="sum" suppresses the multi-edge warning).
graph = Graph(model, directed=False, weighted=False, node_concept=Substation, aggregator="sum")
tl = TransmissionLine.ref()
sub_from = Substation.ref()
sub_to = Substation.ref()
model.where(
tl.is_active(),
tl.from_substation(sub_from),
tl.to_substation(sub_to),
).define(graph.Edge.new(src=sub_from, dst=sub_to))
# Validate before running the algorithm
print("── Graph validation ──────────────────────────────────────────")
graph.num_nodes().inspect() # expect 12
graph.num_edges().inspect() # expect 18 (all lines are active)
# Compute betweenness — graph.Node IS Substation (node_concept shorthand)
graph.Node.betweenness = graph.betweenness_centrality()
# Extract centrality scores to pandas for coefficient computation
centrality_df = (
model.select(
Substation.id.alias("SUBSTATION_ID"),
Substation.betweenness.alias("betweenness"),
)
.to_df()
.sort_values("betweenness", ascending=False)
.reset_index(drop=True)
)
print("\n── Betweenness centrality (all substations) ──────────────────")
print(centrality_df.to_string(index=False))
# =====================================================================
# Phase 2 — Compute Centrality-Weighted Coefficients (pandas)
# =====================================================================
# DC revenue weighted by substation centrality
dc_enriched = dc_requests_df.merge(centrality_df, on="SUBSTATION_ID", how="left")
dc_enriched["betweenness"] = dc_enriched["betweenness"].fillna(0.0)
dc_enriched["CENTRALITY_WEIGHTED_REVENUE"] = (
dc_enriched["REQUESTED_MW"].astype(float)
* dc_enriched["ANNUAL_REVENUE_PER_MW"].astype(float)
* dc_enriched["betweenness"]
)
# Upgrade capacity bonus weighted by substation centrality.
# Scale: 100,000 $/MW is the mid-range DC revenue rate — makes the
# bonus comparable to the DC revenue term so neither dominates.
AVG_REVENUE_PER_MW = 100_000.0
upg_enriched = upgrades_df.merge(centrality_df, on="SUBSTATION_ID", how="left")
upg_enriched["betweenness"] = upg_enriched["betweenness"].fillna(0.0)
upg_enriched["CENTRALITY_CAPACITY_BONUS"] = (
upg_enriched["CAPACITY_INCREASE_MW"].astype(float)
* AVG_REVENUE_PER_MW
* upg_enriched["betweenness"]
)
# Substation headroom — available capacity before any upgrades
headroom_df = substations_df[["ID"]].copy()
headroom_df["HEADROOM_MW"] = (
substations_df["MAX_CAPACITY_MW"] - substations_df["CURRENT_LOAD_MW"]
)
# =====================================================================
# Phase 3 — Load Enriched Properties into Model
# =====================================================================
# DataCenterRequest.new(id=...) is idempotent when the ID already exists
# (same identify_by key) — it matches the existing instance and adds
# the new property binding without creating duplicates.
headroom_src = model.data(headroom_df)
Substation.headroom_mw = model.Property(f"{Substation} has {Integer:headroom_mw}")
model.define(
sub := Substation.new(id=headroom_src["ID"]),
sub.headroom_mw(headroom_src["HEADROOM_MW"]),
)
dc_coeff_src = model.data(dc_enriched[["ID", "CENTRALITY_WEIGHTED_REVENUE"]])
DataCenterRequest.centrality_weighted_revenue = model.Property(
f"{DataCenterRequest} has {Float:centrality_weighted_revenue}"
)
model.define(
dc := DataCenterRequest.new(id=dc_coeff_src["ID"]),
dc.centrality_weighted_revenue(dc_coeff_src["CENTRALITY_WEIGHTED_REVENUE"]),
)
upg_coeff_src = model.data(upg_enriched[["ID", "CENTRALITY_CAPACITY_BONUS"]])
SubstationUpgrade.centrality_capacity_bonus = model.Property(
f"{SubstationUpgrade} has {Float:centrality_capacity_bonus}"
)
model.define(
upg := SubstationUpgrade.new(id=upg_coeff_src["ID"]),
upg.centrality_capacity_bonus(upg_coeff_src["CENTRALITY_CAPACITY_BONUS"]),
)
# =====================================================================
# Phase 4 — Optimization: MILP via HiGHS
# =====================================================================
BUDGET_MILLION = 300 # $300M upgrade budget — allows ~3-5 upgrades
problem = Problem(model, Float)
# ── Decision Variables ─────────────────────────────────────────────
# Binary: approve each DC request (1) or reject (0)
DataCenterRequest.x_approve = model.Property(
f"{DataCenterRequest} has {Float:x_approve}"
)
problem.solve_for(DataCenterRequest.x_approve, type="bin", name=["dc_id"])
# Binary: select each upgrade option (1) or skip (0)
SubstationUpgrade.x_select = model.Property(
f"{SubstationUpgrade} has {Float:x_select}"
)
problem.solve_for(SubstationUpgrade.x_select, type="bin", name=["upg_id"])
# ── Constraint 1 — Upgrade budget ──────────────────────────────────
# Total cost of selected upgrades must not exceed $300M
problem.satisfy(
model.require(
aggs.sum(SubstationUpgrade.cost_million * SubstationUpgrade.x_select)
<= float(BUDGET_MILLION)
),
name="upgrade_budget",
)
# ── Constraint 2 — Per-substation capacity ─────────────────────────
# Approved DC load at each substation ≤ current headroom
# + capacity added by any selected upgrade at that substation
problem.satisfy(
model.require(
aggs.sum(DataCenterRequest.requested_mw * DataCenterRequest.x_approve)
.per(Substation)
.where(DataCenterRequest.substation(Substation))
<=
Substation.headroom_mw
+ aggs.sum(SubstationUpgrade.capacity_increase_mw * SubstationUpgrade.x_select)
.per(Substation)
.where(SubstationUpgrade.substation(Substation))
),
name="per_substation_capacity",
)
# ── Constraint 3 — At most one upgrade per substation ──────────────
# Each substation may select at most one of its available upgrade options
problem.satisfy(
model.require(
aggs.sum(SubstationUpgrade.x_select)
.per(Substation)
.where(SubstationUpgrade.substation(Substation))
<= 1.0
),
name="one_upgrade_per_substation",
)
# ── Objective — maximize centrality-weighted value ─────────────────
# Two terms combined via model.union() (different concept scopes):
# 1. Revenue from approved DCs, weighted by substation centrality
# 2. Capacity bonus from selected upgrades, weighted by substation centrality
problem.maximize(
aggs.sum(
model.union(
DataCenterRequest.centrality_weighted_revenue * DataCenterRequest.x_approve,
SubstationUpgrade.centrality_capacity_bonus * SubstationUpgrade.x_select,
)
)
)
# =====================================================================
# Phase 5 — Solve and Report Results
# =====================================================================
problem.display()
problem.solve("highs", time_limit_sec=120)
si = problem.solve_info()
si.display()
if si.termination_status not in ("OPTIMAL", "LOCALLY_SOLVED"):
print(f"\nSolve failed with status: {si.termination_status}")
print("Possible causes: budget too tight, or per-substation capacity infeasible.")
else:
print(f"\nObjective value: {si.objective_value:,.0f}")
# ── Extract decision variable values ───────────────────────────
dc_sol = (
model.select(
DataCenterRequest.id.alias("ID"),
DataCenterRequest.x_approve.alias("approved"),
)
.to_df()
)
dc_sol["approved"] = dc_sol["approved"].round(0).astype(int)
approved_ids = set(dc_sol.loc[dc_sol["approved"] == 1, "ID"])
upg_sol = (
model.select(
SubstationUpgrade.id.alias("ID"),
SubstationUpgrade.x_select.alias("selected"),
)
.to_df()
)
upg_sol["selected"] = upg_sol["selected"].round(0).astype(int)
selected_ids = set(upg_sol.loc[upg_sol["selected"] == 1, "ID"])
# ── Approved DC Requests ───────────────────────────────────────
approved_detail = dc_enriched.loc[
dc_enriched["ID"].isin(approved_ids),
["ID", "SUBSTATION_ID", "REQUESTED_MW", "ANNUAL_REVENUE_PER_MW", "betweenness"],
].copy().reset_index(drop=True)
approved_detail["annual_revenue_M"] = (
approved_detail["REQUESTED_MW"] * approved_detail["ANNUAL_REVENUE_PER_MW"] / 1e6
)
approved_detail = approved_detail.sort_values("betweenness", ascending=False)
print("\n── Approved DataCenterRequests ───────────────────────────────")
print(
approved_detail[
["ID", "SUBSTATION_ID", "betweenness", "REQUESTED_MW", "annual_revenue_M"]
].to_string(index=False, float_format="{:.4f}".format)
)
# ── Selected Substation Upgrades ───────────────────────────────
selected_detail = upg_enriched.loc[
upg_enriched["ID"].isin(selected_ids),
["ID", "SUBSTATION_ID", "CAPACITY_INCREASE_MW", "COST_MILLION", "betweenness"],
].copy().reset_index(drop=True)
selected_detail = selected_detail.sort_values("betweenness", ascending=False)
print("\n── Selected SubstationUpgrades ───────────────────────────────")
if len(selected_detail) > 0:
print(
selected_detail[
["ID", "SUBSTATION_ID", "betweenness", "CAPACITY_INCREASE_MW", "COST_MILLION"]
].to_string(index=False, float_format="{:.4f}".format)
)
else:
print(" No upgrades selected within budget.")
# ── Summary ────────────────────────────────────────────────────
total_rev_M = approved_detail["annual_revenue_M"].sum()
total_cost_M = selected_detail["COST_MILLION"].sum() if len(selected_detail) > 0 else 0
print("\n── Summary ───────────────────────────────────────────────────")
print(f" DC requests approved : {len(approved_ids)} / {len(dc_requests_df)}")
print(f" Upgrades selected : {len(selected_ids)} / {len(upgrades_df)}")
print(f" Upgrade spend : ${int(total_cost_M)}M / ${BUDGET_MILLION}M budget")
print(f" Annual DC revenue : ${total_rev_M:.1f}M")
print(f" Objective value : {si.objective_value:,.0f}")
# ── Centrality context for interpretation ──────────────────────
print("\n── Substation centrality context ─────────────────────────────")
context = centrality_df.copy()
context["has_approved_dc"] = context["SUBSTATION_ID"].isin(
approved_detail["SUBSTATION_ID"]
)
context["has_selected_upg"] = context["SUBSTATION_ID"].isin(
selected_detail["SUBSTATION_ID"] if len(selected_detail) > 0 else []
)
print(context.to_string(index=False))