Skip to content

This feature is currently in Preview.

Working with RelationalAI Solver Models

Model real-world decision problems—like routing, scheduling, or resource allocation—using the SolverModel API. These prescriptive models let you to define variables, constraints, and objective functions, then solve them using RelationalAI’s (RAI) solver engine and a supported solver backend.

The SolverModel class is the main entry point for working with solver models in RelationalAI. It provides methods for defining variables, constraints, and objective functions, as well as solving the model.

To create a SolverModel instance, import the solvers module from the relationalai.experimental package and pass your RAI model object to the SolverModel constructor:

support_ticket_routing.py
import relationalai as rai
from relationalai.experimental import solvers
# Define a RAI model.
model = rai.Model("SupportTicketRouting")
# Declare support pipeline stage type.
Stage = model.Type("Stage")
# Declare route type to connect stages.
Route = model.Type("Route")
Route.source.declare()
Route.destination.declare()
Route.capacity.declare()
# Define sample data for the model.
31 collapsed lines
# Define support teams and endpoints.
for name in [
"Incoming Tickets",
"US Tier 1",
"EU Tier 1",
"US Tier 2",
"EU Tier 2",
"Escalations",
"Resolved Tickets",
]:
with model.rule():
Stage.add(name=name)
# Define routes with capacities.
for src_name, dst_name, cap in [
("Incoming Tickets", "US Tier 1", 50),
("Incoming Tickets", "EU Tier 1", 40),
("US Tier 1", "Resolved Tickets", 20),
("US Tier 1", "US Tier 2", 30),
("EU Tier 1", "Resolved Tickets", 10),
("EU Tier 1", "EU Tier 2", 25),
("US Tier 2", "Resolved Tickets", 15),
("US Tier 2", "Escalations", 20),
("EU Tier 2", "Resolved Tickets", 15),
("EU Tier 2", "Escalations", 20),
("Escalations", "Resolved Tickets", 25),
]:
with model.rule():
src = Stage(name=src_name)
dst = Stage(name=dst_name)
Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)

Variables represent the decision points in your model, such as flow amounts, assignments, or resource allocations and are defined from entities in your RAI model.

To define variables:

  1. Call the solver model’s .variable() method in a rule block.

    Inside a model.rule() block, pass an Instance of an entity type to your SolverModel object’s .variable() method to declare a variable for each entity produced by the Instance:

    support_ticket_routing.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Define a RAI model.
    model = rai.Model("SupportTicketRouting")
    41 collapsed lines
    # Declare support pipeline stage type.
    Stage = model.Type("Stage")
    # Declare route type to connect stages.
    Route = model.Type("Route")
    Route.source.declare()
    Route.destination.declare()
    Route.capacity.declare()
    # Define sample data for the model.
    # Define support teams and endpoints.
    for name in [
    "Incoming Tickets",
    "US Tier 1",
    "EU Tier 1",
    "US Tier 2",
    "EU Tier 2",
    "Escalations",
    "Resolved Tickets",
    ]:
    with model.rule():
    Stage.add(name=name)
    # Define routes with capacities.
    for src_name, dst_name, cap in [
    ("Incoming Tickets", "US Tier 1", 50),
    ("Incoming Tickets", "EU Tier 1", 40),
    ("US Tier 1", "Resolved Tickets", 20),
    ("US Tier 1", "US Tier 2", 30),
    ("EU Tier 1", "Resolved Tickets", 10),
    ("EU Tier 1", "EU Tier 2", 25),
    ("US Tier 2", "Resolved Tickets", 15),
    ("US Tier 2", "Escalations", 20),
    ("EU Tier 2", "Resolved Tickets", 15),
    ("EU Tier 2", "Escalations", 20),
    ("Escalations", "Resolved Tickets", 25),
    ]:
    with model.rule():
    src = Stage(name=src_name)
    dst = Stage(name=dst_name)
    Route.add(source=src, destination=dst).set(capacity=cap)
    # Create a SolverModel instance from the model.
    solver_model = solvers.SolverModel(model)
    # Specify variables, constraints, and the objective.
    # Define solver variables for each route.
    with model.rule():
    route = Route()
    solver_model.variable(
    route,
    name_args=["route", route.source.name, route.destination.name],
    type="integer",
    lower=0,
    upper=route.capacity,
  2. Specify names for the variables.

    Use the name_args parameter to specify variable names:

    support_ticket_routing.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Define a RAI model.
    model = rai.Model("SupportTicketRouting")
    41 collapsed lines
    # Declare support pipeline stage type.
    Stage = model.Type("Stage")
    # Declare route type to connect stages.
    Route = model.Type("Route")
    Route.source.declare()
    Route.destination.declare()
    Route.capacity.declare()
    # Define sample data for the model.
    # Define support teams and endpoints.
    for name in [
    "Incoming Tickets",
    "US Tier 1",
    "EU Tier 1",
    "US Tier 2",
    "EU Tier 2",
    "Escalations",
    "Resolved Tickets",
    ]:
    with model.rule():
    Stage.add(name=name)
    # Define routes with capacities.
    for src_name, dst_name, cap in [
    ("Incoming Tickets", "US Tier 1", 50),
    ("Incoming Tickets", "EU Tier 1", 40),
    ("US Tier 1", "Resolved Tickets", 20),
    ("US Tier 1", "US Tier 2", 30),
    ("EU Tier 1", "Resolved Tickets", 10),
    ("EU Tier 1", "EU Tier 2", 25),
    ("US Tier 2", "Resolved Tickets", 15),
    ("US Tier 2", "Escalations", 20),
    ("EU Tier 2", "Resolved Tickets", 15),
    ("EU Tier 2", "Escalations", 20),
    ("Escalations", "Resolved Tickets", 25),
    ]:
    with model.rule():
    src = Stage(name=src_name)
    dst = Stage(name=dst_name)
    Route.add(source=src, destination=dst).set(capacity=cap)
    # Create a SolverModel instance from the model.
    solver_model = solvers.SolverModel(model)
    # Specify variables, constraints, and the objective.
    # Define solver variables for each route.
    with model.rule():
    route = Route()
    solver_model.variable(
    route,
    name_args=["route", route.source.name, route.destination.name],
    type="integer",
    lower=0,
    upper=route.capacity,

    See Variable Names for more information on setting and retrieving variable names.

  3. Specify optional constraints.

    By default, variables are continuous. You can add common constraints using parameters—like type, lower, and upper—to make them integers or limit their range:

    support_ticket_routing.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Define a RAI model.
    model = rai.Model("SupportTicketRouting")
    41 collapsed lines
    # Declare support pipeline stage type.
    Stage = model.Type("Stage")
    # Declare route type to connect stages.
    Route = model.Type("Route")
    Route.source.declare()
    Route.destination.declare()
    Route.capacity.declare()
    # Define sample data for the model.
    # Define support teams and endpoints.
    for name in [
    "Incoming Tickets",
    "US Tier 1",
    "EU Tier 1",
    "US Tier 2",
    "EU Tier 2",
    "Escalations",
    "Resolved Tickets",
    ]:
    with model.rule():
    Stage.add(name=name)
    # Define routes with capacities.
    for src_name, dst_name, cap in [
    ("Incoming Tickets", "US Tier 1", 50),
    ("Incoming Tickets", "EU Tier 1", 40),
    ("US Tier 1", "Resolved Tickets", 20),
    ("US Tier 1", "US Tier 2", 30),
    ("EU Tier 1", "Resolved Tickets", 10),
    ("EU Tier 1", "EU Tier 2", 25),
    ("US Tier 2", "Resolved Tickets", 15),
    ("US Tier 2", "Escalations", 20),
    ("EU Tier 2", "Resolved Tickets", 15),
    ("EU Tier 2", "Escalations", 20),
    ("Escalations", "Resolved Tickets", 25),
    ]:
    with model.rule():
    src = Stage(name=src_name)
    dst = Stage(name=dst_name)
    Route.add(source=src, destination=dst).set(capacity=cap)
    # Create a SolverModel instance from the model.
    solver_model = solvers.SolverModel(model)
    # Specify variables, constraints, and the objective.
    # Define solver variables for each route.
    with model.rule():
    route = Route()
    solver_model.variable(
    route,
    name_args=["route", route.source.name, route.destination.name],
    type="integer",
    lower=0,
    upper=route.capacity,

    Variables are continuous by default but may be constrained to integer or binary variables using the type parameter. See Upper and Lower Bounds and Fixed Values to learn how to constrain values to a range or a specific value.

Continuous variables have type Float64 and are declared by default when you call the .variable() method without specifying the type parameter:

regional_water_allocation.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("RegionalWaterAllocation")
67 collapsed lines
# Declare water distribution stage type.
WaterSource = model.Type("WaterSource")
Region = model.Type("Region")
Consumer = model.Type("Consumer")
Consumer.region.declare() # Region where the consumer is located.
Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.
Pipeline = model.Type("Pipeline")
Pipeline.source.declare()
Pipeline.destination.declare()
Pipeline.capacity.declare()
# Define water sources.
with model.rule(dynamic=True):
WaterSource.add(name="Reservoir")
# Define regions.
for region in [
{"id": 1, "name": "North"},
{"id": 2, "name": "South"},
{"id": 3, "name": "East"},
{"id": 4, "name": "West"},
]:
with model.rule():
Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.
for consumer in [
{"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
{"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
{"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
{"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
{"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
{"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
{"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
{"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
{"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
{"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
{"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
]:
with model.rule():
region = Region(id=consumer["region_id"])
Consumer.add(id=consumer["id"]).set(
name=consumer["name"],
region=region,
demand=consumer["demand"]
)
# Define pipelines from the reservoir to each region.
for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
with model.rule():
source = WaterSource(name="Reservoir")
region = Region(id=region_id)
Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.
for consumer_id, capacity in [
(1, 150.0), (2, 60.0), (3, 40.0), # North
(4, 80.0), (5, 140.0), (6, 50.0), # South
(7, 90.0), (8, 180.0), # East
(9, 40.0), (10, 30.0), (11, 200.0) # West
]:
with model.rule():
consumer = Consumer(id=consumer_id)
region = consumer.region
Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.
# Define continuous variables for each pipeline to represent it's flow.
with model.rule():
pipe = Pipeline()
solver_model.variable(
pipe,
name_args=["pipe", pipe.source.name, pipe.destination.name],
lower=0,
upper=pipe.capacity,
)

You can also set a fixed value for a continuous variable using the fixed parameter. See the Fixed Values section for details.

Variables can be restricted to Int64 values by setting the type parameter to "integer":

support_ticket_routing.py
import relationalai as rai
from relationalai.experimental import solvers
# Define a RAI model.
model = rai.Model("SupportTicketRouting")
41 collapsed lines
# Declare support pipeline stage type.
Stage = model.Type("Stage")
# Declare route type to connect stages.
Route = model.Type("Route")
Route.source.declare()
Route.destination.declare()
Route.capacity.declare()
# Define sample data for the model.
# Define support teams and endpoints.
for name in [
"Incoming Tickets",
"US Tier 1",
"EU Tier 1",
"US Tier 2",
"EU Tier 2",
"Escalations",
"Resolved Tickets",
]:
with model.rule():
Stage.add(name=name)
# Define routes with capacities.
for src_name, dst_name, cap in [
("Incoming Tickets", "US Tier 1", 50),
("Incoming Tickets", "EU Tier 1", 40),
("US Tier 1", "Resolved Tickets", 20),
("US Tier 1", "US Tier 2", 30),
("EU Tier 1", "Resolved Tickets", 10),
("EU Tier 1", "EU Tier 2", 25),
("US Tier 2", "Resolved Tickets", 15),
("US Tier 2", "Escalations", 20),
("EU Tier 2", "Resolved Tickets", 15),
("EU Tier 2", "Escalations", 20),
("Escalations", "Resolved Tickets", 25),
]:
with model.rule():
src = Stage(name=src_name)
dst = Stage(name=dst_name)
Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
# Define solver variables for each route.
with model.rule():
route = Route()
solver_model.variable(
route,
name_args=["route", route.source.name, route.destination.name],
type="integer",
lower=0,
upper=route.capacity,

Integer variables can be used to represent discrete quantities, such as the number of items, assignments, or flows that must be whole numbers.

To restrict variables to binary values (0 or 1), set the type parameter to "zero_one" when defining the variable:

shift_assignments.py
import relationalai as rai
from relationalai.experimental import solvers
# Create the model.
model = rai.Model("ShiftAssignment")
24 collapsed lines
# Declare entity types.
Worker = model.Type("Worker")
Shift = model.Type("Shift")
Shift.capacity.declare() # Max people allowed per shift
# Declare relationship types
Assignment = model.Type("Assignment")
Assignment.worker.declare()
Assignment.shift.declare()
# Define sample data.
with model.rule():
Worker.add(name="Alice")
Worker.add(name="Bob")
Worker.add(name="Carlos")
with model.rule():
Shift.add(name="Morning", capacity=1)
Shift.add(name="Afternoon", capacity=2)
Shift.add(name="Night", capacity=1)
# Define all possible assignments.
with model.rule():
Assignment.add(worker=Worker(), shift=Shift())
# Create the solver model.
solver_model = solvers.SolverModel(model)
# Define binary assignment variable.
with model.rule():
assignment = Assignment()
solver_model.variable(
assignment,
name_args=["assigned", assignment.worker.name, assignment.shift.name],
type="zero_one"
)

Binary variables are often used for decision-making problems, such as whether to assign a worker to a shift or include an item in a bundle.

Use the .variable() method’s lower and upper parameters to set bounds on the variable values. These bounds are inclusive and can be literal values or RAI expressions:

support_ticket_routing.py
import relationalai as rai
from relationalai.experimental import solvers
# Define a RAI model.
model = rai.Model("SupportTicketRouting")
41 collapsed lines
# Declare support pipeline stage type.
Stage = model.Type("Stage")
# Declare route type to connect stages.
Route = model.Type("Route")
Route.source.declare()
Route.destination.declare()
Route.capacity.declare()
# Define sample data for the model.
# Define support teams and endpoints.
for name in [
"Incoming Tickets",
"US Tier 1",
"EU Tier 1",
"US Tier 2",
"EU Tier 2",
"Escalations",
"Resolved Tickets",
]:
with model.rule():
Stage.add(name=name)
# Define routes with capacities.
for src_name, dst_name, cap in [
("Incoming Tickets", "US Tier 1", 50),
("Incoming Tickets", "EU Tier 1", 40),
("US Tier 1", "Resolved Tickets", 20),
("US Tier 1", "US Tier 2", 30),
("EU Tier 1", "Resolved Tickets", 10),
("EU Tier 1", "EU Tier 2", 25),
("US Tier 2", "Resolved Tickets", 15),
("US Tier 2", "Escalations", 20),
("EU Tier 2", "Resolved Tickets", 15),
("EU Tier 2", "Escalations", 20),
("Escalations", "Resolved Tickets", 25),
]:
with model.rule():
src = Stage(name=src_name)
dst = Stage(name=dst_name)
Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
# Define solver variables for each route.
with model.rule():
route = Route()
solver_model.variable(
route,
name_args=["route", route.source.name, route.destination.name],
type="integer",
lower=0,
upper=route.capacity,

Omit one of the bounds to restrict the variable only in the specified direction. For example, setting only upper allows any value up to that limit.

To constrain a variable to a specific value, use the fixed parameter when defining the variable:

product_bundling.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)

Use fixed values to enforce mandatory choices or run what-if scenarios.

Variable names are always generated when you define a variable, but you can use the .variable() method’s name_args parameter to customize the names:

shift_assignments.py
import relationalai as rai
from relationalai.experimental import solvers
# Create the model.
model = rai.Model("ShiftAssignment")
24 collapsed lines
# Declare entity types.
Worker = model.Type("Worker")
Shift = model.Type("Shift")
Shift.capacity.declare() # Max people allowed per shift
# Declare relationship types
Assignment = model.Type("Assignment")
Assignment.worker.declare()
Assignment.shift.declare()
# Define sample data.
with model.rule():
Worker.add(name="Alice")
Worker.add(name="Bob")
Worker.add(name="Carlos")
with model.rule():
Shift.add(name="Morning", capacity=1)
Shift.add(name="Afternoon", capacity=2)
Shift.add(name="Night", capacity=1)
# Define all possible assignments.
with model.rule():
Assignment.add(worker=Worker(), shift=Shift())
# Create the solver model.
solver_model = solvers.SolverModel(model)
# Define binary assignment variable.
with model.rule():
assignment = Assignment()
solver_model.variable(
assignment,
name_args=["assigned", assignment.worker.name, assignment.shift.name],
type="zero_one"
)

Names are generated by joining the strings specified in the name_args list with underscores.

Use the .variable_name() method to get the names of variables after they’ve been declared:

shift_assignments.py
import relationalai as rai
from relationalai.experimental import solvers
# Create the model.
model = rai.Model("ShiftAssignment")
24 collapsed lines
# Declare entity types.
Worker = model.Type("Worker")
Shift = model.Type("Shift")
Shift.capacity.declare() # Max people allowed per shift
# Declare relationship types
Assignment = model.Type("Assignment")
Assignment.worker.declare()
Assignment.shift.declare()
# Define sample data.
with model.rule():
Worker.add(name="Alice")
Worker.add(name="Bob")
Worker.add(name="Carlos")
with model.rule():
Shift.add(name="Morning", capacity=1)
Shift.add(name="Afternoon", capacity=2)
Shift.add(name="Night", capacity=1)
# Define all possible assignments.
with model.rule():
Assignment.add(worker=Worker(), shift=Shift())
# Create the solver model.
solver_model = solvers.SolverModel(model)
# Define binary assignment variable.
with model.rule():
assignment = Assignment()
solver_model.variable(
assignment,
name_args=["assigned", assignment.worker.name, assignment.shift.name],
type="zero_one"
)
# Get the variable names for each assignment.
with model.query() as select:
assignment = Assignment()
response = select(solver_model.variable_name(assignment))
print(response.results)
output
name
0 assigned_Alice_Afternoon
1 assigned_Alice_Morning
2 assigned_Alice_Night
3 assigned_Bob_Afternoon
4 assigned_Bob_Morning
5 assigned_Bob_Night
6 assigned_Carlos_Afternoon
7 assigned_Carlos_Morning
8 assigned_Carlos_Night

Simple constraints on individual variables, like setting upper and lower bounds or fixing values, can be applied when defining variables.

To specify more complex constraints:

  1. Write a rule to define the constraint.

    Use the model.rule() or solvers.operators() context managers to define the constraint within a rule block:

    shift_assignments.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Create the model.
    model = rai.Model("ShiftAssignment")
    # Declare entity types.
    Worker = model.Type("Worker")
    Shift = model.Type("Shift")
    Shift.capacity.declare() # Max people allowed per shift
    # Declare relationship types
    Assignment = model.Type("Assignment")
    Assignment.worker.declare()
    Assignment.shift.declare()
    # Define sample data.
    13 collapsed lines
    with model.rule():
    Worker.add(name="Alice")
    Worker.add(name="Bob")
    Worker.add(name="Carlos")
    with model.rule():
    Shift.add(name="Morning", capacity=1)
    Shift.add(name="Afternoon", capacity=2)
    Shift.add(name="Night", capacity=1)
    # Define all possible assignments.
    with model.rule():
    Assignment.add(worker=Worker(), shift=Shift())
    # Create the solver model.
    solver_model = solvers.SolverModel(model)
    # Define binary assignment variable.
    with model.rule():
    assignment = Assignment()
    solver_model.variable(
    assignment,
    name_args=["assigned", assignment.worker.name, assignment.shift.name],
    type="zero_one"
    )
    # Define a constraint to limit the number of workers per shift.
    # NOTE: `solvers.operators()` overrides infix operators like `<=` to build
    # solver expressions instead of filter expressions.
    with solvers.operators():
    assignment = Assignment()
    shift = assignment.shift
    solver_model.constraint(
    solvers.count(assignment, per=[shift]) <= shift.capacity,
    name_args=["max_workers", shift.name]
    )
  2. Pass a solver expression to the .constraint() method.

    Use the functions like solvers.count() and infix operators like <= to build the constraint expression. Then pass the expression to the SolverModel instance’s .constraint() method:

    shift_assignments.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Create the model.
    model = rai.Model("ShiftAssignment")
    35 collapsed lines
    # Declare entity types.
    Worker = model.Type("Worker")
    Shift = model.Type("Shift")
    Shift.capacity.declare() # Max people allowed per shift
    # Declare relationship types.
    Assignment = model.Type("Assignment")
    Assignment.worker.declare()
    Assignment.shift.declare()
    # Define sample data.
    with model.rule():
    Worker.add(name="Alice")
    Worker.add(name="Bob")
    Worker.add(name="Carlos")
    Shift.add(name="Morning", capacity=1)
    Shift.add(name="Afternoon", capacity=2)
    Shift.add(name="Night", capacity=1)
    # Define all possible assignments.
    with model.rule():
    Assignment.add(worker=Worker(), shift=Shift())
    # Create the solver model.
    solver_model = solvers.SolverModel(model)
    # Define binary assignment variable.
    with model.rule():
    assignment = Assignment()
    solver_model.variable(
    assignment,
    name_args=["assigned", assignment.worker.name, assignment.shift.name],
    type="zero_one"
    )
    # Limit the number of workers assigned to each shift to it's capacity.
    with solvers.operators():
    assignment = Assignment()
    shift = assignment.shift
    # Use solver_mondel.constraint() to define the constraint.
    solver_model.constraint(
    # Pass in a solver expression.
    solvers.count(assignment, per=[shift]) <= shift.capacity,
    name_args=["max_workers", shift.name]
    )

    Complex expressions can be built using arithmetic and comparison operators and functions for conditional logic and setting bounds for values.

  3. Give the constraint a name.

    Use the name_args parameter to specify the constraint’s name:

    shift_assignments.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Create the model.
    model = rai.Model("ShiftAssignment")
    35 collapsed lines
    # Declare entity types.
    Worker = model.Type("Worker")
    Shift = model.Type("Shift")
    Shift.capacity.declare() # Max people allowed per shift
    # Declare relationship types.
    Assignment = model.Type("Assignment")
    Assignment.worker.declare()
    Assignment.shift.declare()
    # Define sample data.
    with model.rule():
    Worker.add(name="Alice")
    Worker.add(name="Bob")
    Worker.add(name="Carlos")
    Shift.add(name="Morning", capacity=1)
    Shift.add(name="Afternoon", capacity=2)
    Shift.add(name="Night", capacity=1)
    # Define all possible assignments.
    with model.rule():
    Assignment.add(worker=Worker(), shift=Shift())
    # Create the solver model.
    solver_model = solvers.SolverModel(model)
    # Define binary assignment variable.
    with model.rule():
    assignment = Assignment()
    solver_model.variable(
    assignment,
    name_args=["assigned", assignment.worker.name, assignment.shift.name],
    type="zero_one"
    )
    # Limit the number of workers assigned to each shift to it's capacity.
    with solvers.operators():
    assignment = Assignment()
    shift = assignment.shift
    solver_model.constraint(
    solvers.count(assignment, per=[shift]) <= shift.capacity,
    # Use name_args to give the constraint a meaningful name. This
    # creates names like "max_workers_Morning" and "max_workers_Afternoon".
    name_args=["max_workers", shift.name]
    )

    Names are generated by joining the strings specified in the name_args list with underscores. See Constraint Names for more details.

Use Python’s infix operators, like +, -, <=, and >= to build arithmetic and comparison expressions in solver constraints. See the Supported Infix Operators section for a complete list of supported operators.

In addition, the following functions are also available:

FunctionDescription
solvers.abs()Takes the absolute value of an expression.
solvers.exp()Raises to the power of an expression.
solvers.log()Takes the natural logarithm of an expression.
solvers.sum()Sums values across variables.
solvers.product()Multiplies values across variables.
solvers.count()Counts values.
solvers.min()Finds the minimum value across variables.
solvers.max()Finds the maximum value across variables.
solvers.all_different()Ensures all variables have different values.

For example, the following defines a constraint that ensures the total flow into a region is the same as the total flow out of the region:

regional_water_allocation.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("RegionalWaterAllocation")
67 collapsed lines
# Declare water distribution stage type.
WaterSource = model.Type("WaterSource")
Region = model.Type("Region")
Consumer = model.Type("Consumer")
Consumer.region.declare() # Region where the consumer is located.
Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.
Pipeline = model.Type("Pipeline")
Pipeline.source.declare()
Pipeline.destination.declare()
Pipeline.capacity.declare()
# Define water sources.
with model.rule(dynamic=True):
WaterSource.add(name="Reservoir")
# Define regions.
for region in [
{"id": 1, "name": "North"},
{"id": 2, "name": "South"},
{"id": 3, "name": "East"},
{"id": 4, "name": "West"},
]:
with model.rule():
Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.
for consumer in [
{"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
{"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
{"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
{"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
{"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
{"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
{"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
{"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
{"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
{"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
{"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
]:
with model.rule():
region = Region(id=consumer["region_id"])
Consumer.add(id=consumer["id"]).set(
name=consumer["name"],
region=region,
demand=consumer["demand"]
)
# Define pipelines from the reservoir to each region.
for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
with model.rule():
source = WaterSource(name="Reservoir")
region = Region(id=region_id)
Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.
for consumer_id, capacity in [
(1, 150.0), (2, 60.0), (3, 40.0), # North
(4, 80.0), (5, 140.0), (6, 50.0), # South
(7, 90.0), (8, 180.0), # East
(9, 40.0), (10, 30.0), (11, 200.0) # West
]:
with model.rule():
consumer = Consumer(id=consumer_id)
region = consumer.region
Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.
# Define continuous variables for each pipeline to represent it's flow.
with model.rule():
pipe = Pipeline()
solver_model.variable(
pipe,
name_args=["pipe", pipe.source.name, pipe.destination.name],
lower=0,
upper=pipe.capacity,
)
# Define a constraint to ensure flow conservation in each region.
with solvers.operators():
region = Region()
flow_in = solvers.sum(Pipeline(destination=region), per=[region])
flow_out = solvers.sum(Pipeline(source=region), per=[region])
solver_model.constraint(
flow_in == flow_out,
name_args=["preserve_flow", region.name]
)

In a solvers.operators() context, all infix operators are overridden to build solver expressions instead of filtering entities. If you need to filter entities, do so in a model.rule() context and then define the constraint in a nested solvers.operators() context:

regional_water_allocation.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("RegionalWaterAllocation")
91 collapsed lines
# Declare water distribution stage type.
WaterSource = model.Type("WaterSource")
Region = model.Type("Region")
Consumer = model.Type("Consumer")
Consumer.region.declare() # Region where the consumer is located.
Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.
Pipeline = model.Type("Pipeline")
Pipeline.source.declare()
Pipeline.destination.declare()
Pipeline.capacity.declare()
# Define water sources.
with model.rule(dynamic=True):
WaterSource.add(name="Reservoir")
# Define regions.
for region in [
{"id": 1, "name": "North"},
{"id": 2, "name": "South"},
{"id": 3, "name": "East"},
{"id": 4, "name": "West"},
]:
with model.rule():
Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.
for consumer in [
{"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
{"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
{"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
{"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
{"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
{"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
{"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
{"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
{"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
{"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
{"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
]:
with model.rule():
region = Region(id=consumer["region_id"])
Consumer.add(id=consumer["id"]).set(
name=consumer["name"],
region=region,
demand=consumer["demand"]
)
# Define pipelines from the reservoir to each region.
for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
with model.rule():
source = WaterSource(name="Reservoir")
region = Region(id=region_id)
Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.
for consumer_id, capacity in [
(1, 150.0), (2, 60.0), (3, 40.0), # North
(4, 80.0), (5, 140.0), (6, 50.0), # South
(7, 90.0), (8, 180.0), # East
(9, 40.0), (10, 30.0), (11, 200.0) # West
]:
with model.rule():
consumer = Consumer(id=consumer_id)
region = consumer.region
Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.
# Define continuous variables for each pipeline to represent it's flow.
with model.rule():
pipe = Pipeline()
solver_model.variable(
pipe,
name_args=["pipe", pipe.source.name, pipe.destination.name],
lower=0,
upper=pipe.capacity,
)
# Define a constraint to ensure flow conservation in each region.
with solvers.operators():
region = Region()
flow_in = solvers.sum(Pipeline(destination=region), per=[region])
flow_out = solvers.sum(Pipeline(source=region), per=[region])
solver_model.constraint(
flow_in == flow_out,
name_args=["preserve_flow", region.name]
)
# Define a constraint satisfy at least half of high-demand consumers' demand.
with model.rule():
consumer = Consumer()
consumer.demand >= 100.0 # Filter for high-demand consumers.
pipe = Pipeline(source=consumer.region, destination=consumer)
with solvers.operators():
solver_model.constraint(
pipe >= .5 * consumer.demand,
name_args=["satisfy_consumer", consumer.id]
)

Python’s logical operators, like and, or, and not aren’t supported in solver expressions. Instead, use the solvers module’s logical functions to build logical expressions:

FunctionDescription
[solvers.and_()](/api/python/experimental/solvers/and_)Logical AND operation.
solvers.or_()Logical OR operation.
solvers.xor()Logical XOR operation (exclusive OR).
solvers.not_()Logical NOT operation.
solvers.if_then_else()Conditional expression.
solvers.implies()Logical implication.
solvers.iff()Logical equivalence.

For example, the following defines a constraint in a product bundling problem that ensures that if a laptop is included in the bundle, then a charger must also be included:

product_bundling.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
35 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)
# Define a conditional constraint that requires the charger if headphones are included.
with solvers.operators():
headphones = Product(name="Headphones")
charger = Product(name="Charger")
solver_model.constraint(
solvers.if_then_else(
headphones == 1, # If the headphones are included
charger == 1, # Then the charger must be included
charger == 0, # Otherwise, the charger must not be included
),
name_args=["headphones_require_charger"]
)

The following functions can be used to constrain expressions to specific types or to set bounds on their values:

FunctionDescription
solvers.integer()Constrains an expression to integer values.
solvers.zero_one()Constrains an expression to binary values (0 or 1).
solvers.interval()Constrains an expression between two values, inclusive.
solvers.integer_interval()Constrains an expression to integers between two values, inclusive.

For example, the following model uses solvers.integer_interval() to define a constraint that ensures the combined number of tickets routed to two support teams is between 10 and 30:

support_ticket_routing.py
import relationalai as rai
from relationalai.experimental import solvers
# Define a RAI model.
model = rai.Model("SupportTicketRouting")
55 collapsed lines
# Declare support pipeline stage type.
Stage = model.Type("Stage")
# Declare route type to connect stages.
Route = model.Type("Route")
Route.source.declare()
Route.destination.declare()
Route.capacity.declare()
# Define sample data for the model.
# Define support teams and endpoints.
for name in [
"Incoming Tickets",
"US Tier 1",
"EU Tier 1",
"US Tier 2",
"EU Tier 2",
"Escalations",
"Resolved Tickets",
]:
with model.rule():
Stage.add(name=name)
# Define routes with capacities.
for src_name, dst_name, cap in [
("Incoming Tickets", "US Tier 1", 50),
("Incoming Tickets", "EU Tier 1", 40),
("US Tier 1", "Resolved Tickets", 20),
("US Tier 1", "US Tier 2", 30),
("EU Tier 1", "Resolved Tickets", 10),
("EU Tier 1", "EU Tier 2", 25),
("US Tier 2", "Resolved Tickets", 15),
("US Tier 2", "Escalations", 20),
("EU Tier 2", "Resolved Tickets", 15),
("EU Tier 2", "Escalations", 20),
("Escalations", "Resolved Tickets", 25),
]:
with model.rule():
src = Stage(name=src_name)
dst = Stage(name=dst_name)
Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
# Define solver variables for each route.
with model.rule():
route = Route()
solver_model.variable(
route,
name_args=["route", route.source.name, route.destination.name],
type="integer",
lower=0,
upper=route.capacity,
)
# Define a constraint to ensure the total number of tickets routed to
# US Tier 1 and US Tier 2 is between 10 and 30.
with solvers.operators():
us_tier_1 = Route(source=Stage(name="US Tier 1"))
us_tier_2 = Route(source=Stage(name="US Tier 2"))
solver_model.constraint(
solvers.integer_interval(us_tier_1 + us_tier_2, 10, 30),
name_args=["us_tier_total"]

Pass a list of string literals or entity properties to the name_args parameter of a SolverModel object’s .constraint() method to create a unique name generated by joining the provided arguments with underscores:

shift_assignments.py
import relationalai as rai
from relationalai.experimental import solvers
# Create the model.
model = rai.Model("ShiftAssignment")
36 collapsed lines
# Declare entity types.
Worker = model.Type("Worker")
Shift = model.Type("Shift")
Shift.capacity.declare() # Max people allowed per shift
# Declare relationship types
Assignment = model.Type("Assignment")
Assignment.worker.declare()
Assignment.shift.declare()
# Define sample data.
with model.rule():
Worker.add(name="Alice")
Worker.add(name="Bob")
Worker.add(name="Carlos")
with model.rule():
Shift.add(name="Morning", capacity=1)
Shift.add(name="Afternoon", capacity=2)
Shift.add(name="Night", capacity=1)
# Define all possible assignments.
with model.rule():
Assignment.add(worker=Worker(), shift=Shift())
# Create the solver model.
solver_model = solvers.SolverModel(model)
# Define binary assignment variable.
with model.rule():
assignment = Assignment()
solver_model.variable(
assignment,
name_args=["assigned", assignment.worker.name, assignment.shift.name],
type="zero_one"
)
# Define a constraint to limit the number of workers per shift.
with solvers.operators():
assignment = Assignment()
shift = assignment.shift
solver_model.constraint(
solvers.count(assignment, per=[shift]) <= shift.capacity,
name_args=["max_workers", shift.name]
)

Names are generated by joining the strings specified in the name_args list with underscores.

The following infix operators are supported in solvers.operators() contexts to build solver expressions:

OperatorDescription
+Add expressions.
-Subtract expressions.
*Multiply expressions.
/Divide expressions.
==Require equality.
!=Require inequality.
<Require less-than.
<=Require at-most.
>Require greater-than.
>=Require at-least.

All solvers require numeric input. RelationalAI’s solver engine only supports values of type Float64 or Int64 in solver variables, constraint expressions, and objective terms. Any values that do not evaluate to one of these types—such as strings, booleans, or missing values—are silently excluded from the model.

For example, consider the following table definition:

CREATE TABLE Pipeline (
source_id NUMBER,
destination_id NUMBER,
capacity NUMBER(38, 2)
);

The capacity column has type NUMBER(38, 2), a fixed-point decimal. If you use it directly in a solver constraint or objective without converting it to a supported type, its values will be filtered out and result in empty rule outputs:

# This will not work as expected if `pipe.capacity` is not a supported type.
with model.rule():
pipe = Pipeline()
solver_model.variable(
pipe,
name_args=["pipe", pipe.source.name, pipe.destination.name],
lower=0,
upper=pipe.capacity, # Filtered out. No variables are created!
)

To avoid these issues, create Snowflake views of your tables with necessary type conversions for the columns you plan to use in solver constraints and objectives.

For example, you can create a view that converts the capacity column to FLOAT:

CREATE OR REPLACE VIEW Pipeline_float_view AS
SELECT
source_id,
destination_id,
CAST(capacity AS FLOAT) AS capacity
FROM Pipeline;

When casting types:

  • Use FLOAT for decimal numbers, like capacity in the preceding example.
  • Use NUMBER(18) for integer columns, as Snowflake does not have an INT type.

Refer to the Snowflake docs for more information about Snowflake’s number types.

Solver objectives define the goal of the optimization problem, such as maximizing throughput in a support ticket routing model or minimizing unmet demand in a water allocation model.

To define an objective:

  1. Write a rule to define the objective.

    Use the model.rule() or solvers.operators() context managers to define the objective within a rule block:

    regional_water_allocation.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Create a RAI model.
    model = rai.Model("RegionalWaterAllocation")
    67 collapsed lines
    # Declare water distribution stage type.
    WaterSource = model.Type("WaterSource")
    Region = model.Type("Region")
    Consumer = model.Type("Consumer")
    Consumer.region.declare() # Region where the consumer is located.
    Consumer.demand.declare() # Water demand of the consumer.
    # Declare Pipeline type to connect stages.
    Pipeline = model.Type("Pipeline")
    Pipeline.source.declare()
    Pipeline.destination.declare()
    Pipeline.capacity.declare()
    # Define water sources.
    with model.rule(dynamic=True):
    WaterSource.add(name="Reservoir")
    # Define regions.
    for region in [
    {"id": 1, "name": "North"},
    {"id": 2, "name": "South"},
    {"id": 3, "name": "East"},
    {"id": 4, "name": "West"},
    ]:
    with model.rule():
    Region.add(id=region["id"]).set(name=region["name"])
    # Define consumers.
    for consumer in [
    {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
    {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
    {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
    {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
    {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
    {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
    {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
    {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
    {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
    {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
    {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
    ]:
    with model.rule():
    region = Region(id=consumer["region_id"])
    Consumer.add(id=consumer["id"]).set(
    name=consumer["name"],
    region=region,
    demand=consumer["demand"]
    )
    # Define pipelines from the reservoir to each region.
    for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
    with model.rule():
    source = WaterSource(name="Reservoir")
    region = Region(id=region_id)
    Pipeline.add(source=source, destination=region).set(capacity=capacity)
    # Define pipelines between consumers and their regions.
    for consumer_id, capacity in [
    (1, 150.0), (2, 60.0), (3, 40.0), # North
    (4, 80.0), (5, 140.0), (6, 50.0), # South
    (7, 90.0), (8, 180.0), # East
    (9, 40.0), (10, 30.0), (11, 200.0) # West
    ]:
    with model.rule():
    consumer = Consumer(id=consumer_id)
    region = consumer.region
    Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
    # Create a solver model.
    solver_model = solvers.SolverModel(model)
    # Specify variables and constraints.
    40 collapsed lines
    # Define continuous variables for each pipeline to represent it's flow.
    with model.rule():
    pipe = Pipeline()
    solver_model.variable(
    pipe,
    name_args=["pipe", pipe.source.name, pipe.destination.name],
    lower=0,
    upper=pipe.capacity,
    )
    # Define a constraint to ensure flow conservation in each region.
    # NOTE: `solvers.operators()` overrides infix operators like `==` to build
    # solver expressions instead of filter expressions.
    with solvers.operators():
    region = Region()
    flow_in = solvers.sum(Pipeline(destination=region), per=[region])
    flow_out = solvers.sum(Pipeline(source=region), per=[region])
    solver_model.constraint(
    flow_in == flow_out,
    name_args=["preserve_flow", region.name]
    )
    # Define a constraint satisfy at least half of high-demand consumers' demand.
    with model.rule():
    consumer = Consumer()
    consumer.demand >= 100.0 # Filter for high-demand consumers.
    pipe = Pipeline(source=consumer.region, destination=consumer)
    with solvers.operators():
    solver_model.constraint(
    pipe >= .5 * consumer.demand,
    name_args=["satisfy_consumer", consumer.id]
    )
    # Define a constraint to limit the total flow from the reservoir.
    with solvers.operators():
    total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
    solver_model.constraint(
    total_flow <= 1000.0, # Max flow from the reservoir
    name_args=["max_reservoir_flow"]
    )
    # Define the objective to minimize unmet demand.
    with solvers.operators():
    consumer = Consumer()
    pipe = Pipeline(source=consumer.region, destination=consumer)
    unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
    solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
  2. Pass a solver expression to the .min_objective() or .max_objective() method.

    Use infix operators and functions like solvers.sum() to build the objective expression. Then pass the expression to the SolverModel instance’s .min_objective() or .max_objective() method:

    regional_water_allocation.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Create a RAI model.
    model = rai.Model("RegionalWaterAllocation")
    112 collapsed lines
    # Declare water distribution stage type.
    WaterSource = model.Type("WaterSource")
    Region = model.Type("Region")
    Consumer = model.Type("Consumer")
    Consumer.region.declare() # Region where the consumer is located.
    Consumer.demand.declare() # Water demand of the consumer.
    # Declare Pipeline type to connect stages.
    Pipeline = model.Type("Pipeline")
    Pipeline.source.declare()
    Pipeline.destination.declare()
    Pipeline.capacity.declare()
    # Define water sources.
    with model.rule(dynamic=True):
    WaterSource.add(name="Reservoir")
    # Define regions.
    for region in [
    {"id": 1, "name": "North"},
    {"id": 2, "name": "South"},
    {"id": 3, "name": "East"},
    {"id": 4, "name": "West"},
    ]:
    with model.rule():
    Region.add(id=region["id"]).set(name=region["name"])
    # Define consumers.
    for consumer in [
    {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
    {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
    {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
    {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
    {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
    {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
    {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
    {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
    {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
    {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
    {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
    ]:
    with model.rule():
    region = Region(id=consumer["region_id"])
    Consumer.add(id=consumer["id"]).set(
    name=consumer["name"],
    region=region,
    demand=consumer["demand"]
    )
    # Define pipelines from the reservoir to each region.
    for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
    with model.rule():
    source = WaterSource(name="Reservoir")
    region = Region(id=region_id)
    Pipeline.add(source=source, destination=region).set(capacity=capacity)
    # Define pipelines between consumers and their regions.
    for consumer_id, capacity in [
    (1, 150.0), (2, 60.0), (3, 40.0), # North
    (4, 80.0), (5, 140.0), (6, 50.0), # South
    (7, 90.0), (8, 180.0), # East
    (9, 40.0), (10, 30.0), (11, 200.0) # West
    ]:
    with model.rule():
    consumer = Consumer(id=consumer_id)
    region = consumer.region
    Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
    # Create a solver model.
    solver_model = solvers.SolverModel(model)
    # Specify variables, constraints, and objective.
    # Define continuous variables for each pipeline to represent it's flow.
    with model.rule():
    pipe = Pipeline()
    solver_model.variable(
    pipe,
    name_args=["pipe", pipe.source.name, pipe.destination.name],
    lower=0,
    upper=pipe.capacity,
    )
    # Define a constraint to ensure flow conservation in each region.
    # NOTE: `solvers.operators()` overrides infix operators like `==` to build
    # solver expressions instead of filter expressions.
    with solvers.operators():
    region = Region()
    flow_in = solvers.sum(Pipeline(destination=region), per=[region])
    flow_out = solvers.sum(Pipeline(source=region), per=[region])
    solver_model.constraint(
    flow_in == flow_out,
    name_args=["preserve_flow", region.name]
    )
    # Define a constraint satisfy at least half of high-demand consumers' demand.
    with model.rule():
    consumer = Consumer()
    consumer.demand >= 100.0 # Filter for high-demand consumers.
    pipe = Pipeline(source=consumer.region, destination=consumer)
    with solvers.operators():
    solver_model.constraint(
    pipe >= .5 * consumer.demand,
    name_args=["satisfy_consumer", consumer.id]
    )
    # Define a constraint to limit the total flow from the reservoir.
    with solvers.operators():
    total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
    solver_model.constraint(
    total_flow <= 1000.0, # Max flow from the reservoir
    name_args=["max_reservoir_flow"]
    )
    # Define the objective to minimize unmet demand.
    with solvers.operators():
    consumer = Consumer()
    pipe = Pipeline(source=consumer.region, destination=consumer)
    unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
    solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))

Use a SolverModel object’s .max_objective() method to define the objective of a maximization problem. For example, the following defines an objective to maximize total throughput in a support ticket routing model:

support_ticket_routing.py
import relationalai as rai
from relationalai.experimental import solvers
# Define a RAI model.
model = rai.Model("SupportTicketRouting")
41 collapsed lines
# Declare support pipeline stage type.
Stage = model.Type("Stage")
# Declare route type to connect stages.
Route = model.Type("Route")
Route.source.declare()
Route.destination.declare()
Route.capacity.declare()
# Define sample data for the model.
# Define support teams and endpoints.
for name in [
"Incoming Tickets",
"US Tier 1",
"EU Tier 1",
"US Tier 2",
"EU Tier 2",
"Escalations",
"Resolved Tickets",
]:
with model.rule():
Stage.add(name=name)
# Define routes with capacities.
for src_name, dst_name, cap in [
("Incoming Tickets", "US Tier 1", 50),
("Incoming Tickets", "EU Tier 1", 40),
("US Tier 1", "Resolved Tickets", 20),
("US Tier 1", "US Tier 2", 30),
("EU Tier 1", "Resolved Tickets", 10),
("EU Tier 1", "EU Tier 2", 25),
("US Tier 2", "Resolved Tickets", 15),
("US Tier 2", "Escalations", 20),
("EU Tier 2", "Resolved Tickets", 15),
("EU Tier 2", "Escalations", 20),
("Escalations", "Resolved Tickets", 25),
]:
with model.rule():
src = Stage(name=src_name)
dst = Stage(name=dst_name)
Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Specify variables and constraints.
33 collapsed lines
# Define solver variables for each route.
with model.rule():
route = Route()
solver_model.variable(
route,
name_args=["route", route.source.name, route.destination.name],
type="integer",
lower=0,
upper=route.capacity,
)
# Define a constraint to ensure the total number of tickets routed to
# US Tier 1 and US Tier 2 is between 10 and 30.
with solvers.operators():
us_tier_1 = Route(source=Stage(name="US Tier 1"))
us_tier_2 = Route(source=Stage(name="US Tier 2"))
solver_model.constraint(
solvers.integer_interval(us_tier_1 + us_tier_2, 10, 30),
name_args=["us_tier_total"]
)
# Define a constraint to ensure the total number of tickets routed in
# and out of each stage is balanced.
with solvers.operators():
stage = Stage()
stage.name != "Incoming Tickets" # Exclude the source stage
stage.name != "Resolved Tickets" # Exclude the sink stage
incoming_tickets = solvers.sum(Route(destination=stage), per=[stage])
outgoing_tickets = solvers.sum(Route(source=stage), per=[stage])
solver_model.constraint(
incoming_tickets == outgoing_tickets,
name_args=["max_outgoing_tickets", stage.name]
)
# Define an objective to maximize the total number of tickets resolved.
with solvers.operators():
resolved = Stage(name="Resolved Tickets")
total_throughput = solvers.sum(Route(destination=resolved))
solver_model.max_objective(total_throughput)

Use a SolverModel object’s .min_objective() method to define the objective of a minimization problem. For example, the following defines an objective to minimize the cost of a bundle in a product bundling problem:

product_bundling.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables and constraints.
39 collapsed lines
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)
# Define a conditional constraint that requires the charger if headphones are included.
with solvers.operators():
headphones = Product(name="Headphones")
charger = Product(name="Charger")
solver_model.constraint(
solvers.if_then_else(
headphones == 1, # If the headphones are included
charger == 1, # Then the charger must be included
charger == 0, # Otherwise, the charger must not be included
),
name_args=["headphones_require_charger"]
)
# Define a constraint to limit the bundle size to 3 products.
with solvers.operators():
product = Product()
solver_model.constraint(
solvers.sum(product) == 3,
name_args=["bundle_size"]
)
# Define the objective to minimize the total cost of the bundle.
with solvers.operators():
product = Product()
total_cost = solvers.sum(product.cost * product)
solver_model.min_objective(total_cost) # Minimize total bundle cost

To inspect a SolverModel instance to see its variables, constraints, and objective, use the SolverModel object’s .print() method:

product_bundling.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
62 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)
# Define a conditional constraint that requires the charger if headphones are included.
with solvers.operators():
headphones = Product(name="Headphones")
charger = Product(name="Charger")
solver_model.constraint(
solvers.if_then_else(
headphones == 1, # If the headphones are included
charger == 1, # Then the charger must be included
charger == 0, # Otherwise, the charger must not be included
),
name_args=["headphones_require_charger"]
)
# Define a constraint to limit the bundle size to 3 products.
with solvers.operators():
product = Product()
solver_model.constraint(
solvers.sum(product) == 3,
name_args=["bundle_size"]
)
# Define the objective to minimize the total cost of the bundle.
with solvers.operators():
product = Product()
total_cost = solvers.sum(product.cost * product)
solver_model.min_objective(total_cost) # Minimize total bundle cost
# Inspect the solver model.
solver_model.print()

Information about the model is printed in a canonical expression format:

output
variables:
include_Charger
include_Headphones
include_Keyboard
include_Laptop
include_Mouse
minimization objectives:
SUM(MULTIPLY(12.0,include_Charger),MULTIPLY(25.0,include_Keyboard),MULTIPLY(30.0,include_Headphones),MULTIPLY(650.0,include_Laptop),MULTIPLY(8.0,include_Mouse))
constraints:
AND(EQ(include_Laptop,1),ZEROONE(include_Laptop))
EQ(SUM(include_Charger,include_Headphones,include_Keyboard,include_Laptop,include_Mouse),3)
ZEROONE(include_Charger)
ZEROONE(include_Headphones)
ZEROONE(include_Keyboard)
ZEROONE(include_Mouse)

Note that a SolverModel does not have to be fully defined to be printed. You can print as you specify the model to inspect its state at each step and debug any issues with the model definition before sending it to a solver.

Solver models can be solved using one of the supported backends, like HiGHS or Gurobi.

To solve a solver model:

  1. Create a Solver object.

    Use the solvers.Solver class to create a solver object for the target solver:

    regional_water_allocation.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Create a RAI model.
    model = rai.Model("RegionalWaterAllocation")
    67 collapsed lines
    # Declare water distribution stage type.
    WaterSource = model.Type("WaterSource")
    Region = model.Type("Region")
    Consumer = model.Type("Consumer")
    Consumer.region.declare() # Region where the consumer is located.
    Consumer.demand.declare() # Water demand of the consumer.
    # Declare Pipeline type to connect stages.
    Pipeline = model.Type("Pipeline")
    Pipeline.source.declare()
    Pipeline.destination.declare()
    Pipeline.capacity.declare()
    # Define water sources.
    with model.rule(dynamic=True):
    WaterSource.add(name="Reservoir")
    # Define regions.
    for region in [
    {"id": 1, "name": "North"},
    {"id": 2, "name": "South"},
    {"id": 3, "name": "East"},
    {"id": 4, "name": "West"},
    ]:
    with model.rule():
    Region.add(id=region["id"]).set(name=region["name"])
    # Define consumers.
    for consumer in [
    {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
    {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
    {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
    {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
    {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
    {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
    {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
    {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
    {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
    {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
    {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
    ]:
    with model.rule():
    region = Region(id=consumer["region_id"])
    Consumer.add(id=consumer["id"]).set(
    name=consumer["name"],
    region=region,
    demand=consumer["demand"]
    )
    # Define pipelines from the reservoir to each region.
    for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
    with model.rule():
    source = WaterSource(name="Reservoir")
    region = Region(id=region_id)
    Pipeline.add(source=source, destination=region).set(capacity=capacity)
    # Define pipelines between consumers and their regions.
    for consumer_id, capacity in [
    (1, 150.0), (2, 60.0), (3, 40.0), # North
    (4, 80.0), (5, 140.0), (6, 50.0), # South
    (7, 90.0), (8, 180.0), # East
    (9, 40.0), (10, 30.0), (11, 200.0) # West
    ]:
    with model.rule():
    consumer = Consumer(id=consumer_id)
    region = consumer.region
    Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
    # Create a solver model.
    solver_model = solvers.SolverModel(model)
    48 collapsed lines
    # Specify variables, constraints, and objective.
    # Define continuous variables for each pipeline to represent it's flow.
    with model.rule():
    pipe = Pipeline()
    solver_model.variable(
    pipe,
    name_args=["pipe", pipe.source.name, pipe.destination.name],
    lower=0,
    upper=pipe.capacity,
    )
    # Define a constraint to ensure flow conservation in each region.
    # NOTE: `solvers.operators()` overrides infix operators like `==` to build
    # solver expressions instead of filter expressions.
    with solvers.operators():
    region = Region()
    flow_in = solvers.sum(Pipeline(destination=region), per=[region])
    flow_out = solvers.sum(Pipeline(source=region), per=[region])
    solver_model.constraint(
    flow_in == flow_out,
    name_args=["preserve_flow", region.name]
    )
    # Define a constraint satisfy at least half of high-demand consumers' demand.
    with model.rule():
    consumer = Consumer()
    consumer.demand >= 100.0 # Filter for high-demand consumers.
    pipe = Pipeline(source=consumer.region, destination=consumer)
    with solvers.operators():
    solver_model.constraint(
    pipe >= .5 * consumer.demand,
    name_args=["satisfy_consumer", consumer.id]
    )
    # Define a constraint to limit the total flow from the reservoir.
    with solvers.operators():
    total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
    solver_model.constraint(
    total_flow <= 1000.0, # Max flow from the reservoir
    name_args=["max_reservoir_flow"]
    )
    # Define the objective to minimize unmet demand.
    with solvers.operators():
    consumer = Consumer()
    pipe = Pipeline(source=consumer.region, destination=consumer)
    unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
    solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
    # Create a HiGHS Solver instance.
    solver = solvers.Solver("highs")
  2. Pass the Solver to the SolverModel object’s .solve() method.

    Call the SolverModel object’s .solve() method and pass to it the Solver instance you created in the previous step:

    regional_water_allocation.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Create a RAI model.
    model = rai.Model("RegionalWaterAllocation")
    67 collapsed lines
    # Declare water distribution stage type.
    WaterSource = model.Type("WaterSource")
    Region = model.Type("Region")
    Consumer = model.Type("Consumer")
    Consumer.region.declare() # Region where the consumer is located.
    Consumer.demand.declare() # Water demand of the consumer.
    # Declare Pipeline type to connect stages.
    Pipeline = model.Type("Pipeline")
    Pipeline.source.declare()
    Pipeline.destination.declare()
    Pipeline.capacity.declare()
    # Define water sources.
    with model.rule(dynamic=True):
    WaterSource.add(name="Reservoir")
    # Define regions.
    for region in [
    {"id": 1, "name": "North"},
    {"id": 2, "name": "South"},
    {"id": 3, "name": "East"},
    {"id": 4, "name": "West"},
    ]:
    with model.rule():
    Region.add(id=region["id"]).set(name=region["name"])
    # Define consumers.
    for consumer in [
    {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
    {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
    {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
    {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
    {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
    {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
    {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
    {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
    {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
    {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
    {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
    ]:
    with model.rule():
    region = Region(id=consumer["region_id"])
    Consumer.add(id=consumer["id"]).set(
    name=consumer["name"],
    region=region,
    demand=consumer["demand"]
    )
    # Define pipelines from the reservoir to each region.
    for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
    with model.rule():
    source = WaterSource(name="Reservoir")
    region = Region(id=region_id)
    Pipeline.add(source=source, destination=region).set(capacity=capacity)
    # Define pipelines between consumers and their regions.
    for consumer_id, capacity in [
    (1, 150.0), (2, 60.0), (3, 40.0), # North
    (4, 80.0), (5, 140.0), (6, 50.0), # South
    (7, 90.0), (8, 180.0), # East
    (9, 40.0), (10, 30.0), (11, 200.0) # West
    ]:
    with model.rule():
    consumer = Consumer(id=consumer_id)
    region = consumer.region
    Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
    # Create a solver model.
    solver_model = solvers.SolverModel(model)
    48 collapsed lines
    # Specify variables, constraints, and objective.
    # Define continuous variables for each pipeline to represent it's flow.
    with model.rule():
    pipe = Pipeline()
    solver_model.variable(
    pipe,
    name_args=["pipe", pipe.source.name, pipe.destination.name],
    lower=0,
    upper=pipe.capacity,
    )
    # Define a constraint to ensure flow conservation in each region.
    # NOTE: `solvers.operators()` overrides infix operators like `==` to build
    # solver expressions instead of filter expressions.
    with solvers.operators():
    region = Region()
    flow_in = solvers.sum(Pipeline(destination=region), per=[region])
    flow_out = solvers.sum(Pipeline(source=region), per=[region])
    solver_model.constraint(
    flow_in == flow_out,
    name_args=["preserve_flow", region.name]
    )
    # Define a constraint satisfy at least half of high-demand consumers' demand.
    with model.rule():
    consumer = Consumer()
    consumer.demand >= 100.0 # Filter for high-demand consumers.
    pipe = Pipeline(source=consumer.region, destination=consumer)
    with solvers.operators():
    solver_model.constraint(
    pipe >= .5 * consumer.demand,
    name_args=["satisfy_consumer", consumer.id]
    )
    # Define a constraint to limit the total flow from the reservoir.
    with solvers.operators():
    total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
    solver_model.constraint(
    total_flow <= 1000.0, # Max flow from the reservoir
    name_args=["max_reservoir_flow"]
    )
    # Define the objective to minimize unmet demand.
    with solvers.operators():
    consumer = Consumer()
    pipe = Pipeline(source=consumer.region, destination=consumer)
    unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
    solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
    # Create a HiGHS Solver instance.
    solver = solvers.Solver("highs")
    # Solve the model.
    solver_model.solve(solver)

    When you call the .solve() method, a solver engine and the model’s configured logic engine are invoked and will be resumed or created if needed, even if you don’t use Model.query() to query your RAI model.

When you call the .solve() method, the solver will log its progress and results to the console. For example, the HiGHS solver will output logs like the following:

Terminal window
Running HiGHS 1.7.1 (git hash: 43329e528): Copyright (c) 2024 HiGHS under MIT licence terms
Coefficient ranges:
Matrix [1e+00, 1e+00]
Cost [1e+00, 1e+00]
Bound [3e+01, 3e+02]
RHS [1e+03, 1e+03]
Presolving model
2 rows, 13 cols, 7 nonzeros 0s
0 rows, 2 cols, 0 nonzeros 0s
0 rows, 0 cols, 0 nonzeros 0s
Presolve : Reductions: rows 0(-5); columns 0(-15); elements 0(-19) - Reduced to empty
Solving the original LP from the solution after postsolve
Model status : Optimal
Objective value : 1.2000000000e+02
HiGHS run time : 0.00

Optional keyword arguments can be passed to a SolverModel object’s .solve() method to configure the solver behavior. The following options are supported:

OptionTypeDescription
silentboolIf True, don’t emit solver logs. Defaults to False.
time_limit_secint, floatMaximum time in seconds to allow the solver to run. Defaults to 300.
solution_limitintMaximum number of solutions to return. Ignored by backends that only produce single solutions. Defaults to None, which means no limit.
relative_gap_tolerancefloatRelative gap tolerance for the solver. Refer to the docs for the chosen solver backend for the default value.
absolute_gap_tolerancefloatAbsolute gap tolerance for the solver. Refer to the docs for the chosen solver backend for the default value.
print_formatstr, NoneOptional output format of string representation of the solver model. Must be one of "moi", "latex", "mof", "lp", "mps", or "nl". Defaults to None. See Model Output String to learn how to retrieve this string.
print_onlyboolIf True, only print the solver model using the format specified by print_format without solving it. Defaults to False.

For example, the following solves a model with a time limit of 30 seconds and logs suppressed:

regional_water_allocation.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("RegionalWaterAllocation")
67 collapsed lines
# Declare water distribution stage type.
WaterSource = model.Type("WaterSource")
Region = model.Type("Region")
Consumer = model.Type("Consumer")
Consumer.region.declare() # Region where the consumer is located.
Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.
Pipeline = model.Type("Pipeline")
Pipeline.source.declare()
Pipeline.destination.declare()
Pipeline.capacity.declare()
# Define water sources.
with model.rule(dynamic=True):
WaterSource.add(name="Reservoir")
# Define regions.
for region in [
{"id": 1, "name": "North"},
{"id": 2, "name": "South"},
{"id": 3, "name": "East"},
{"id": 4, "name": "West"},
]:
with model.rule():
Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.
for consumer in [
{"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
{"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
{"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
{"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
{"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
{"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
{"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
{"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
{"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
{"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
{"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
]:
with model.rule():
region = Region(id=consumer["region_id"])
Consumer.add(id=consumer["id"]).set(
name=consumer["name"],
region=region,
demand=consumer["demand"]
)
# Define pipelines from the reservoir to each region.
for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
with model.rule():
source = WaterSource(name="Reservoir")
region = Region(id=region_id)
Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.
for consumer_id, capacity in [
(1, 150.0), (2, 60.0), (3, 40.0), # North
(4, 80.0), (5, 140.0), (6, 50.0), # South
(7, 90.0), (8, 180.0), # East
(9, 40.0), (10, 30.0), (11, 200.0) # West
]:
with model.rule():
consumer = Consumer(id=consumer_id)
region = consumer.region
Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.
solver_model = solvers.SolverModel(model)
48 collapsed lines
# Specify variables, constraints, and objective.
# Define continuous variables for each pipeline to represent it's flow.
with model.rule():
pipe = Pipeline()
solver_model.variable(
pipe,
name_args=["pipe", pipe.source.name, pipe.destination.name],
lower=0,
upper=pipe.capacity,
)
# Define a constraint to ensure flow conservation in each region.
# NOTE: `solvers.operators()` overrides infix operators like `==` to build
# solver expressions instead of filter expressions.
with solvers.operators():
region = Region()
flow_in = solvers.sum(Pipeline(destination=region), per=[region])
flow_out = solvers.sum(Pipeline(source=region), per=[region])
solver_model.constraint(
flow_in == flow_out,
name_args=["preserve_flow", region.name]
)
# Define a constraint satisfy at least half of high-demand consumers' demand.
with model.rule():
consumer = Consumer()
consumer.demand >= 100.0 # Filter for high-demand consumers.
pipe = Pipeline(source=consumer.region, destination=consumer)
with solvers.operators():
solver_model.constraint(
pipe >= .5 * consumer.demand,
name_args=["satisfy_consumer", consumer.id]
)
# Define a constraint to limit the total flow from the reservoir.
with solvers.operators():
total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
solver_model.constraint(
total_flow <= 1000.0, # Max flow from the reservoir
name_args=["max_reservoir_flow"]
)
# Define the objective to minimize unmet demand.
with solvers.operators():
consumer = Consumer()
pipe = Pipeline(source=consumer.region, destination=consumer)
unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
# Create a HiGHS Solver instance.
solver = solvers.Solver("highs")
# Solve the model.
solver_model.solve(solver, time_limit_sec=30, silent=True)

Solver engines are compute resources that run the solver backend and process the model. They are distinct from the logic engines that process the logic defined in a RAI model’s rules.

The logic engine materializes the solver model and sends it to the solver engine, which solves the model asynchronously as a background task. If the logic engine has no other tasks, and the time it takes for the solver engine to solve the model exceeds the auto-suspend time limit configured for the logic engine, then the logic engine will suspend itself and be resumed when the solver engine completes the task.

There are two ways to specify which solver engine to use when solving a model:

Pass a string with the engine’s name to the engine_name parameter of the Solver() constructor:

solver = solvers.Solver("ipopt", engine_name="<my_solver_engine_name>")

Which solver engine a model should use is determined as follows:

  • Use the Solver() constructor’s engine_name parameter if specified.
  • If not, use the experimental.solvers.engine configuration key in your raiconfig.toml file.
  • If that’s missing, use the configured Snowflake username as the engine name.

Solver engines automatically suspend after 60 minutes of inactivity. If the solver engine is suspended when the logic engine sends the model, it automatically resumes and solves the model.

Use the solvers.Provider.create_solver() method and specify the auto_suspend_mins parameter to manually create an engine with a custom auto-suspend timeout:

from relationalai.experimental.solvers import Provider
# Create a solver engine with a 30-minute auto-suspend timeout.
Provider().create_solver(
name="<my_solver_engine_name>",
auto_suspend_mins=30
)

Once a solver model is solved, you can access the results and other information about the solution using attributes and methods of the SolverModel object:

  1. Solve the solver model.

    First, use a Solver and the SolverModel object’s .solve() method to solve the model:

    regional_water_allocation.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Create a RAI model.
    model = rai.Model("RegionalWaterAllocation")
    67 collapsed lines
    # Declare water distribution stage type.
    WaterSource = model.Type("WaterSource")
    Region = model.Type("Region")
    Consumer = model.Type("Consumer")
    Consumer.region.declare() # Region where the consumer is located.
    Consumer.demand.declare() # Water demand of the consumer.
    # Declare Pipeline type to connect stages.
    Pipeline = model.Type("Pipeline")
    Pipeline.source.declare()
    Pipeline.destination.declare()
    Pipeline.capacity.declare()
    # Define water sources.
    with model.rule(dynamic=True):
    WaterSource.add(name="Reservoir")
    # Define regions.
    for region in [
    {"id": 1, "name": "North"},
    {"id": 2, "name": "South"},
    {"id": 3, "name": "East"},
    {"id": 4, "name": "West"},
    ]:
    with model.rule():
    Region.add(id=region["id"]).set(name=region["name"])
    # Define consumers.
    for consumer in [
    {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
    {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
    {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
    {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
    {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
    {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
    {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
    {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
    {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
    {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
    {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
    ]:
    with model.rule():
    region = Region(id=consumer["region_id"])
    Consumer.add(id=consumer["id"]).set(
    name=consumer["name"],
    region=region,
    demand=consumer["demand"]
    )
    # Define pipelines from the reservoir to each region.
    for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
    with model.rule():
    source = WaterSource(name="Reservoir")
    region = Region(id=region_id)
    Pipeline.add(source=source, destination=region).set(capacity=capacity)
    # Define pipelines between consumers and their regions.
    for consumer_id, capacity in [
    (1, 150.0), (2, 60.0), (3, 40.0), # North
    (4, 80.0), (5, 140.0), (6, 50.0), # South
    (7, 90.0), (8, 180.0), # East
    (9, 40.0), (10, 30.0), (11, 200.0) # West
    ]:
    with model.rule():
    consumer = Consumer(id=consumer_id)
    region = consumer.region
    Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
    # Create a solver model.
    solver_model = solvers.SolverModel(model)
    48 collapsed lines
    # Specify variables, constraints, and objective.
    # Define continuous variables for each pipeline to represent it's flow.
    with model.rule():
    pipe = Pipeline()
    solver_model.variable(
    pipe,
    name_args=["pipe", pipe.source.name, pipe.destination.name],
    lower=0,
    upper=pipe.capacity,
    )
    # Define a constraint to ensure flow conservation in each region.
    # NOTE: `solvers.operators()` overrides infix operators like `==` to build
    # solver expressions instead of filter expressions.
    with solvers.operators():
    region = Region()
    flow_in = solvers.sum(Pipeline(destination=region), per=[region])
    flow_out = solvers.sum(Pipeline(source=region), per=[region])
    solver_model.constraint(
    flow_in == flow_out,
    name_args=["preserve_flow", region.name]
    )
    # Define a constraint satisfy at least half of high-demand consumers' demand.
    with model.rule():
    consumer = Consumer()
    consumer.demand >= 100.0 # Filter for high-demand consumers.
    pipe = Pipeline(source=consumer.region, destination=consumer)
    with solvers.operators():
    solver_model.constraint(
    pipe >= .5 * consumer.demand,
    name_args=["satisfy_consumer", consumer.id]
    )
    # Define a constraint to limit the total flow from the reservoir.
    with solvers.operators():
    total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
    solver_model.constraint(
    total_flow <= 1000.0, # Max flow from the reservoir
    name_args=["max_reservoir_flow"]
    )
    # Define the objective to minimize unmet demand.
    with solvers.operators():
    consumer = Consumer()
    pipe = Pipeline(source=consumer.region, destination=consumer)
    unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
    solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
    # Create a HiGHS Solver instance.
    solver = solvers.Solver("highs")
    # Solve the model.
    solver_model.solve(solver)

    See Solve a Solver Model for details on the .solve() method and its parameters.

  2. Get the optimized objective value.

    Use the SolverModel object’s .objective_value attribute view the objective value of the solution:

    regional_water_allocation.py
    import relationalai as rai
    from relationalai.experimental import solvers
    # Create a RAI model.
    model = rai.Model("RegionalWaterAllocation")
    67 collapsed lines
    # Declare water distribution stage type.
    WaterSource = model.Type("WaterSource")
    Region = model.Type("Region")
    Consumer = model.Type("Consumer")
    Consumer.region.declare() # Region where the consumer is located.
    Consumer.demand.declare() # Water demand of the consumer.
    # Declare Pipeline type to connect stages.
    Pipeline = model.Type("Pipeline")
    Pipeline.source.declare()
    Pipeline.destination.declare()
    Pipeline.capacity.declare()
    # Define water sources.
    with model.rule(dynamic=True):
    WaterSource.add(name="Reservoir")
    # Define regions.
    for region in [
    {"id": 1, "name": "North"},
    {"id": 2, "name": "South"},
    {"id": 3, "name": "East"},
    {"id": 4, "name": "West"},
    ]:
    with model.rule():
    Region.add(id=region["id"]).set(name=region["name"])
    # Define consumers.
    for consumer in [
    {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
    {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
    {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
    {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
    {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
    {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
    {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
    {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
    {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
    {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
    {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
    ]:
    with model.rule():
    region = Region(id=consumer["region_id"])
    Consumer.add(id=consumer["id"]).set(
    name=consumer["name"],
    region=region,
    demand=consumer["demand"]
    )
    # Define pipelines from the reservoir to each region.
    for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
    with model.rule():
    source = WaterSource(name="Reservoir")
    region = Region(id=region_id)
    Pipeline.add(source=source, destination=region).set(capacity=capacity)
    # Define pipelines between consumers and their regions.
    for consumer_id, capacity in [
    (1, 150.0), (2, 60.0), (3, 40.0), # North
    (4, 80.0), (5, 140.0), (6, 50.0), # South
    (7, 90.0), (8, 180.0), # East
    (9, 40.0), (10, 30.0), (11, 200.0) # West
    ]:
    with model.rule():
    consumer = Consumer(id=consumer_id)
    region = consumer.region
    Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
    # Create a solver model.
    solver_model = solvers.SolverModel(model)
    48 collapsed lines
    # Specify variables, constraints, and objective.
    # Define continuous variables for each pipeline to represent it's flow.
    with model.rule():
    pipe = Pipeline()
    solver_model.variable(
    pipe,
    name_args=["pipe", pipe.source.name, pipe.destination.name],
    lower=0,
    upper=pipe.capacity,
    )
    # Define a constraint to ensure flow conservation in each region.
    # NOTE: `solvers.operators()` overrides infix operators like `==` to build
    # solver expressions instead of filter expressions.
    with solvers.operators():
    region = Region()
    flow_in = solvers.sum(Pipeline(destination=region), per=[region])
    flow_out = solvers.sum(Pipeline(source=region), per=[region])
    solver_model.constraint(
    flow_in == flow_out,
    name_args=["preserve_flow", region.name]
    )
    # Define a constraint satisfy at least half of high-demand consumers' demand.
    with model.rule():
    consumer = Consumer()
    consumer.demand >= 100.0 # Filter for high-demand consumers.
    pipe = Pipeline(source=consumer.region, destination=consumer)
    with solvers.operators():
    solver_model.constraint(
    pipe >= .5 * consumer.demand,
    name_args=["satisfy_consumer", consumer.id]
    )
    # Define a constraint to limit the total flow from the reservoir.
    with solvers.operators():
    total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
    solver_model.constraint(
    total_flow <= 1000.0, # Max flow from the reservoir
    name_args=["max_reservoir_flow"]
    )
    # Define the objective to minimize unmet demand.
    with solvers.operators():
    consumer = Consumer()
    pipe = Pipeline(source=consumer.region, destination=consumer)
    unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
    solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
    # Create a HiGHS Solver instance.
    solver = solvers.Solver("highs")
    # Solve the model.
    solver_model.solve(solver)
    # View the solution's objective value.
    with model.query() as select:
    response = select(solver_model.objective_value)
    print(f"\n\nUnmet demand (optimized): {response.results.iloc[0, 0]}")
    output
    Unmet demand (optimized): 120.0
  3. Get the solution values.

    Pass entities to the SolverModel object’s .value() method to get the value of its corresponding variable in the solution:

    regional_water_allocation.py
    import relationalai as rai
    from relationalai.std import alias
    from relationalai.experimental import solvers
    # Create a RAI model.
    model = rai.Model("RegionalWaterAllocation")
    67 collapsed lines
    # Declare water distribution stage type.
    WaterSource = model.Type("WaterSource")
    Region = model.Type("Region")
    Consumer = model.Type("Consumer")
    Consumer.region.declare() # Region where the consumer is located.
    Consumer.demand.declare() # Water demand of the consumer.
    # Declare Pipeline type to connect stages.
    Pipeline = model.Type("Pipeline")
    Pipeline.source.declare()
    Pipeline.destination.declare()
    Pipeline.capacity.declare()
    # Define water sources.
    with model.rule(dynamic=True):
    WaterSource.add(name="Reservoir")
    # Define regions.
    for region in [
    {"id": 1, "name": "North"},
    {"id": 2, "name": "South"},
    {"id": 3, "name": "East"},
    {"id": 4, "name": "West"},
    ]:
    with model.rule():
    Region.add(id=region["id"]).set(name=region["name"])
    # Define consumers.
    for consumer in [
    {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
    {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
    {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
    {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
    {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
    {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
    {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
    {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
    {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
    {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
    {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
    ]:
    with model.rule():
    region = Region(id=consumer["region_id"])
    Consumer.add(id=consumer["id"]).set(
    name=consumer["name"],
    region=region,
    demand=consumer["demand"]
    )
    # Define pipelines from the reservoir to each region.
    for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
    with model.rule():
    source = WaterSource(name="Reservoir")
    region = Region(id=region_id)
    Pipeline.add(source=source, destination=region).set(capacity=capacity)
    # Define pipelines between consumers and their regions.
    for consumer_id, capacity in [
    (1, 150.0), (2, 60.0), (3, 40.0), # North
    (4, 80.0), (5, 140.0), (6, 50.0), # South
    (7, 90.0), (8, 180.0), # East
    (9, 40.0), (10, 30.0), (11, 200.0) # West
    ]:
    with model.rule():
    consumer = Consumer(id=consumer_id)
    region = consumer.region
    Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
    # Create a solver model.
    solver_model = solvers.SolverModel(model)
    48 collapsed lines
    # Specify variables, constraints, and objective.
    # Define continuous variables for each pipeline to represent it's flow.
    with model.rule():
    pipe = Pipeline()
    solver_model.variable(
    pipe,
    name_args=["pipe", pipe.source.name, pipe.destination.name],
    lower=0,
    upper=pipe.capacity,
    )
    # Define a constraint to ensure flow conservation in each region.
    # NOTE: `solvers.operators()` overrides infix operators like `==` to build
    # solver expressions instead of filter expressions.
    with solvers.operators():
    region = Region()
    flow_in = solvers.sum(Pipeline(destination=region), per=[region])
    flow_out = solvers.sum(Pipeline(source=region), per=[region])
    solver_model.constraint(
    flow_in == flow_out,
    name_args=["preserve_flow", region.name]
    )
    # Define a constraint satisfy at least half of high-demand consumers' demand.
    with model.rule():
    consumer = Consumer()
    consumer.demand >= 100.0 # Filter for high-demand consumers.
    pipe = Pipeline(source=consumer.region, destination=consumer)
    with solvers.operators():
    solver_model.constraint(
    pipe >= .5 * consumer.demand,
    name_args=["satisfy_consumer", consumer.id]
    )
    # Define a constraint to limit the total flow from the reservoir.
    with solvers.operators():
    total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
    solver_model.constraint(
    total_flow <= 1000.0, # Max flow from the reservoir
    name_args=["max_reservoir_flow"]
    )
    # Define the objective to minimize unmet demand.
    with solvers.operators():
    consumer = Consumer()
    pipe = Pipeline(source=consumer.region, destination=consumer)
    unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
    solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
    # Create a HiGHS Solver instance.
    solver = solvers.Solver("highs")
    # Solve the model.
    solver_model.solve(solver)
    # View the optimized flow for each pipeline.
    with model.query() as select:
    pipe = Pipeline()
    response = select(
    alias(pipe.source.name, "source_name"),
    alias(pipe.destination.name, "destination_name"),
    solver_model.value(pipe)
    )
    print("\n\nOptimized pipeline flows:")
    print(response.results)
    output
    Optimized pipeline flows:
    0 East Industrial 180.0
    1 East Residential 20.0
    2 North Farms 40.0
    3 North Industrial 60.0
    4 North Residential 150.0
    5 Reservoir East 200.0
    6 Reservoir North 250.0
    7 Reservoir South 270.0
    8 Reservoir West 220.0
    9 South Farms 50.0
    10 South Industrial 140.0
    11 South Residential 80.0
    12 West Farms 150.0
    13 West Industrial 30.0
    14 West Residential 40.0

    In addition to the objective and solution values, you may also view execution information and inspect a printed version of the model.

The following attributes of the SolverModel object provide information about the execution of the solver:

AttributeTypeDescription
.termination_statusstrThe termination status of the solver, such as "OPTIMAL", "INFEASIBLE", or "TIME_LIMIT". See the MathOptInterface docs for a full list of possible statuses.
.solve_time_secfloatThe time taken to solve the model, in seconds. Note that this time is for the solver only and does not include network latency or time spent by the RAI engine processing the transaction.
.solver_versionstrThe version of the solver used to solve the model.
.errorstrIf the solver encountered an error, this attribute contains the error message. Otherwise, it is None.

For example, to get the termination status and solve time:

product_bundling.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
46 collapsed lines
# Specify variables, constraints, and the objective.
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)
# Define a conditional constraint that requires the charger if headphones are included.
with solvers.operators():
headphones = Product(name="Headphones")
charger = Product(name="Charger")
solver_model.constraint(
solvers.if_then_else(
headphones == 1, # If the headphones are included
charger == 1, # Then the charger must be included
charger == 0, # Otherwise, the charger must not be included
),
name_args=["headphones_require_charger"]
)
# Define a constraint to limit the bundle size to 3 products.
with solvers.operators():
product = Product()
solver_model.constraint(
solvers.sum(product) == 3,
name_args=["bundle_size"]
)
# Define the objective to minimize the total cost of the bundle.
with solvers.operators():
product = Product()
total_cost = solvers.sum(product.cost * product)
solver_model.min_objective(total_cost) # Minimize total bundle cost
# Create a Solver object.
solver = solvers.Solver("minizinc")
# Solve the model.
solver_model.solve(solver)
# Get the termination status and solve time.
with model.query() as select:
response = select(
solver_model.termination_status,
solver_model.solve_time_sec
)
print(response.results)
output
termination_status solve_time_sec
0 OPTIMAL 0.057283

If you set the print_format option in the SolverModel object’s .solve() method, then you can use .printed_model attribute to get a stringified version of the model:

product_bundling.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
46 collapsed lines
# Specify variables, constraints, and the objective.
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)
# Define a conditional constraint that requires the charger if headphones are included.
with solvers.operators():
headphones = Product(name="Headphones")
charger = Product(name="Charger")
solver_model.constraint(
solvers.if_then_else(
headphones == 1, # If the headphones are included
charger == 1, # Then the charger must be included
charger == 0, # Otherwise, the charger must not be included
),
name_args=["headphones_require_charger"]
)
# Define a constraint to limit the bundle size to 3 products.
with solvers.operators():
product = Product()
solver_model.constraint(
solvers.sum(product) == 3,
name_args=["bundle_size"]
)
# Define the objective to minimize the total cost of the bundle.
with solvers.operators():
product = Product()
total_cost = solvers.sum(product.cost * product)
solver_model.min_objective(total_cost) # Minimize total bundle cost
# Create a Solver object.
solver = solvers.Solver("minizinc")
# Solve the model.
solver_model.solve(
solver,
log_to_console=False,
print_format="moi",
)
# View the string representation of the model.
with model.query() as select:
response = select(solver_model.printed_model)
print(response.results.iloc[0, 0])
output
Minimize: (8) include_Mouse + (12) include_Charger + (25) include_Keyboard + (30) include_Headphones + (650) include_Laptop
Subject to:
ifelse(==(include_Headphones, (1)), ==(include_Charger, (1)), ==(include_Charger, (0))) == (1) (headphones_require_charger)
(1) include_Laptop == (1)
(1) include_Keyboard + (1) include_Mouse + (1) include_Laptop + (1) include_Headphones + (1) include_Charger == (3) (bundle_size)
include_Keyboard ∈ {0, 1}
include_Mouse ∈ {0, 1}
include_Laptop ∈ {0, 1}
include_Headphones ∈ {0, 1}
include_Charger ∈ {0, 1}

To print the model without solving it, set the print_only option to True when solving the model. The string representation can still be accessed using .printed_model, but other attributes like .objective_value and .termination_status will not be available since the model was not solved:

product_bundling.py
import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
65 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)
# Define a conditional constraint that requires the charger if headphones are included.
with solvers.operators():
headphones = Product(name="Headphones")
charger = Product(name="Charger")
solver_model.constraint(
solvers.if_then_else(
headphones == 1, # If the headphones are included
charger == 1, # Then the charger must be included
charger == 0, # Otherwise, the charger must not be included
),
name_args=["headphones_require_charger"]
)
# Define a constraint to limit the bundle size to 3 products.
with solvers.operators():
product = Product()
solver_model.constraint(
solvers.sum(product) == 3,
name_args=["bundle_size"]
)
# Define the objective to minimize the total cost of the bundle.
with solvers.operators():
product = Product()
total_cost = solvers.sum(product.cost * product)
solver_model.min_objective(total_cost) # Minimize total bundle cost
# Create a Solver object.
solver = solvers.Solver("minizinc")
# Solve the model.
solver_model.solve(
solver,
log_to_console=False,
print_format="moi",
print_only=True # Print the model without solving it
)
# View the string representation of the model.
with model.query() as select:
response = select(solver_model.printed_model)
print(response.results.iloc[0, 0])
# Attributes like objective_value are empty since the model wasn't solved.
with model.query() as select:
response = select(solver_model.objective_value)
# The following errors because the results are empty.
print("\n\nObjective value:", response.results.iloc[0, 0])
output
Minimize: (8) include_Mouse + (12) include_Charger + (25) include_Keyboard + (30) include_Headphones + (650) include_Laptop
Subject to:
ifelse(==(include_Headphones, (1)), ==(include_Charger, (1)), ==(include_Charger, (0))) == (1) (headphones_require_charger)
(1) include_Laptop == (1)
(1) include_Keyboard + (1) include_Mouse + (1) include_Laptop + (1) include_Headphones + (1) include_Charger == (3) (bundle_size)
include_Keyboard ∈ {0, 1}
include_Mouse ∈ {0, 1}
include_Laptop ∈ {0, 1}
include_Headphones ∈ {0, 1}
include_Charger ∈ {0, 1}
Traceback (most recent call last):
22 collapsed lines
File "/Users/jlpicard/make_it_solve/.venv/lib/python3.11/site-packages/pandas/core/indexes/range.py", line 1018, in __getitem__
return self._range[new_key]
~~~~~~~~~~~^^^^^^^^^
IndexError: range object index out of range
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/Users/jlpicard/make_it_solve/print_model.py", line 91, in <module>
print("\n\nObjective value:", response.results.iloc[0, 0])
~~~~~~~~~~~~~~~~~~~~~^^^^^^
File "/Users/jlpicard/make_it_solve/.venv/lib/python3.11/site-packages/pandas/core/indexing.py", line 1183, in __getitem__
return self.obj._get_value(*key, takeable=self._takeable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jlpicard/make_it_solve/.venv/lib/python3.11/site-packages/pandas/core/frame.py", line 4216, in _get_value
series = self._ixs(col, axis=1)
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jlpicard/make_it_solve/.venv/lib/python3.11/site-packages/pandas/core/frame.py", line 4013, in _ixs
label = self.columns[i]
~~~~~~~~~~~~^^^
File "/Users/jlpicard/make_it_solve/.venv/lib/python3.11/site-packages/pandas/core/indexes/range.py", line 1020, in __getitem__
raise IndexError(
IndexError: index 0 is out of bounds for axis 0 with size 0