Skip to content

This feature is currently in Preview.

relationalai.experimental.solvers.SolverModel

Signature
class SolverModel

Represents a prescriptive optimization model that can be defined, inspected, and solved using RelationalAI’s Prescriptive Reasoner. SolverModel objects have methods for defining decision variables, constraints, and objective functions, and to invoke a solver on your model.

To create a SolverModel object, create a RAI model and pass it to the SolverModel constructor:

import relationalai as rai
from relationalai.experimental import solvers
# Define a RAI model.
model = rai.Model("SupportTicketRouting")
# Declare support pipeline stage type.
Stage = model.Type("Stage")
# Declare route type to connect stages.
Route = model.Type("Route")
Route.source.declare()
Route.destination.declare()
Route.capacity.declare()
# Define sample data for the model.
31 collapsed lines
# Define support teams and endpoints.
for name in [
"Incoming Tickets",
"US Tier 1",
"EU Tier 1",
"US Tier 2",
"EU Tier 2",
"Escalations",
"Resolved Tickets",
]:
with model.rule():
Stage.add(name=name)
# Define routes with capacities.
for src_name, dst_name, cap in [
("Incoming Tickets", "US Tier 1", 50),
("Incoming Tickets", "EU Tier 1", 40),
("US Tier 1", "Resolved Tickets", 20),
("US Tier 1", "US Tier 2", 30),
("EU Tier 1", "Resolved Tickets", 10),
("EU Tier 1", "EU Tier 2", 25),
("US Tier 2", "Resolved Tickets", 15),
("US Tier 2", "Escalations", 20),
("EU Tier 2", "Resolved Tickets", 15),
("EU Tier 2", "Escalations", 20),
("Escalations", "Resolved Tickets", 25),
]:
with model.rule():
src = Stage(name=src_name)
dst = Stage(name=dst_name)
Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)

The SolverModel class provides methods to define variables, constraints, and objectives, as well as to solve the model and retrieve results.

Signature
SolverModel.__init__(model: dsl.Model) -> None

Create a new SolverModel object from an existing RAI model.

Signature
SolverModel.variable(
instance: dsl.Instance,
name_args=list[str|dslProducer]|None = None,
type=str|None = None,
lower=int|float|dsl.Producer|None = None,
upper=int|float|dsl.Producer|None = None,
fixed=int|float|dsl.Producer|None = None
) -> None

Declare a solver variable for each entity produced by instance. By default, variables are continuous and unbounded. Use the optional type, lower, upper, and fixed parameters to constrain the to integer or binary values and to set bounds or fixed values. If name_args is provided, it will be used to construct the variable name.

For example:

import relationalai as rai
from relationalai.experimental import solvers
# Define a RAI model.
model = rai.Model("SupportTicketRouting")
41 collapsed lines
# Declare support pipeline stage type.
Stage = model.Type("Stage")
# Declare route type to connect stages.
Route = model.Type("Route")
Route.source.declare()
Route.destination.declare()
Route.capacity.declare()
# Define sample data for the model.
# Define support teams and endpoints.
for name in [
"Incoming Tickets",
"US Tier 1",
"EU Tier 1",
"US Tier 2",
"EU Tier 2",
"Escalations",
"Resolved Tickets",
]:
with model.rule():
Stage.add(name=name)
# Define routes with capacities.
for src_name, dst_name, cap in [
("Incoming Tickets", "US Tier 1", 50),
("Incoming Tickets", "EU Tier 1", 40),
("US Tier 1", "Resolved Tickets", 20),
("US Tier 1", "US Tier 2", 30),
("EU Tier 1", "Resolved Tickets", 10),
("EU Tier 1", "EU Tier 2", 25),
("US Tier 2", "Resolved Tickets", 15),
("US Tier 2", "Escalations", 20),
("EU Tier 2", "Resolved Tickets", 15),
("EU Tier 2", "Escalations", 20),
("Escalations", "Resolved Tickets", 25),
]:
with model.rule():
src = Stage(name=src_name)
dst = Stage(name=dst_name)
Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
# Define solver variables for each route.
with model.rule():
route = Route()
solver_model.variable(
route,
name_args=["route", route.source.name, route.destination.name],
type="integer",
lower=0,
upper=route.capacity,
)
Signature
SolverModel.constraint(
expr: SolverExpression,
name_args: list[str|dsl.Producer]|None = None
) -> None

Declare a constraint for each expression produced by expr. If name_args is provided, it will be used to construct a constraint name that appears in the output of SolverModel.print() and SolverModel.printed_model.

For example:

import relationalai as rai
from relationalai.experimental import solvers
# Define a RAI model.
model = rai.Model("SupportTicketRouting")
56 collapsed lines
# Declare support pipeline stage type.
Stage = model.Type("Stage")
# Declare route type to connect stages.
Route = model.Type("Route")
Route.source.declare()
Route.destination.declare()
Route.capacity.declare()
# Define sample data for the model.
# Define support teams and endpoints.
for name in [
"Incoming Tickets",
"US Tier 1",
"EU Tier 1",
"US Tier 2",
"EU Tier 2",
"Escalations",
"Resolved Tickets",
]:
with model.rule():
Stage.add(name=name)
# Define routes with capacities.
for src_name, dst_name, cap in [
("Incoming Tickets", "US Tier 1", 50),
("Incoming Tickets", "EU Tier 1", 40),
("US Tier 1", "Resolved Tickets", 20),
("US Tier 1", "US Tier 2", 30),
("EU Tier 1", "Resolved Tickets", 10),
("EU Tier 1", "EU Tier 2", 25),
("US Tier 2", "Resolved Tickets", 15),
("US Tier 2", "Escalations", 20),
("EU Tier 2", "Resolved Tickets", 15),
("EU Tier 2", "Escalations", 20),
("Escalations", "Resolved Tickets", 25),
]:
with model.rule():
src = Stage(name=src_name)
dst = Stage(name=dst_name)
Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
# Define solver variables for each route.
with model.rule():
route = Route()
solver_model.variable(
route,
name_args=["route", route.source.name, route.destination.name],
type="integer",
lower=0,
upper=route.capacity,
)
# Define a constraint to ensure the total number of tickets routed to
# US Tier 1 and US Tier 2 is between 10 and 30.
with solvers.operators():
us_tier_1 = Route(source=Stage(name="US Tier 1"))
us_tier_2 = Route(source=Stage(name="US Tier 2"))
solver_model.constraint(
solvers.integer_interval(us_tier_1 + us_tier_2, 10, 30),
name_args=["us_tier_total"]
)
# Define a constraint to ensure the total number of tickets routed in
# and out of each stage is balanced.
with solvers.operators():
stage = Stage()
stage.name != "Incoming Tickets" # Exclude the source stage
stage.name != "Resolved Tickets" # Exclude the sink stage
incoming_tickets = solvers.sum(Route(destination=stage), per=[stage])
outgoing_tickets = solvers.sum(Route(source=stage), per=[stage])
solver_model.constraint(
incoming_tickets == outgoing_tickets,
name_args=["max_outgoing_tickets", stage.name]
)
Signature
SolverModel.min_objective(expr: SolverExpression) -> None

Define a minimization objective for the solver model. A solver model can have at most one objective function, so the expr parameter should produce exactly one expression.

For example:

import relationalai as rai
from relationalai.std import alias
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("RegionalWaterAllocation")
112 collapsed lines
# Declare water distribution stage type.
WaterSource = model.Type("WaterSource")
Region = model.Type("Region")
Consumer = model.Type("Consumer")
Consumer.region.declare() # Region where the consumer is located.
Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.
Pipeline = model.Type("Pipeline")
Pipeline.source.declare()
Pipeline.destination.declare()
Pipeline.capacity.declare()
# Define water sources.
with model.rule(dynamic=True):
WaterSource.add(name="Reservoir")
# Define regions.
for region in [
{"id": 1, "name": "North"},
{"id": 2, "name": "South"},
{"id": 3, "name": "East"},
{"id": 4, "name": "West"},
]:
with model.rule():
Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.
for consumer in [
{"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
{"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
{"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
{"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
{"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
{"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
{"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
{"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
{"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
{"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
{"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
]:
with model.rule():
region = Region(id=consumer["region_id"])
Consumer.add(id=consumer["id"]).set(
name=consumer["name"],
region=region,
demand=consumer["demand"]
)
# Define pipelines from the reservoir to each region.
for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
with model.rule():
source = WaterSource(name="Reservoir")
region = Region(id=region_id)
Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.
for consumer_id, capacity in [
(1, 150.0), (2, 60.0), (3, 40.0), # North
(4, 80.0), (5, 140.0), (6, 50.0), # South
(7, 90.0), (8, 180.0), # East
(9, 40.0), (10, 30.0), (11, 200.0) # West
]:
with model.rule():
consumer = Consumer(id=consumer_id)
region = consumer.region
Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.
# Define continuous variables for each pipeline to represent it's flow.
with model.rule():
pipe = Pipeline()
solver_model.variable(
pipe,
name_args=["pipe", pipe.source.name, pipe.destination.name],
lower=0,
upper=pipe.capacity,
)
# Define a constraint to ensure flow conservation in each region.
# NOTE: `solvers.operators()` overrides infix operators like `==` to build
# solver expressions instead of filter expressions.
with solvers.operators():
region = Region()
flow_in = solvers.sum(Pipeline(destination=region), per=[region])
flow_out = solvers.sum(Pipeline(source=region), per=[region])
solver_model.constraint(
flow_in == flow_out,
name_args=["preserve_flow", region.name]
)
# Define a constraint satisfy at least half of high-demand consumers' demand.
with model.rule():
consumer = Consumer()
consumer.demand >= 100.0 # Filter for high-demand consumers.
pipe = Pipeline(source=consumer.region, destination=consumer)
with solvers.operators():
solver_model.constraint(
pipe >= .5 * consumer.demand,
name_args=["satisfy_consumer", consumer.id]
)
# Define a constraint to limit the total flow from the reservoir.
with solvers.operators():
total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
solver_model.constraint(
total_flow <= 1000.0, # Max flow from the reservoir
name_args=["max_reservoir_flow"]
)
# Define the objective to minimize unmet demand.
with solvers.operators():
consumer = Consumer()
pipe = Pipeline(source=consumer.region, destination=consumer)
unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
Signature
SolverModel.max_objective(expr: SolverExpression) -> None

Define a maximization objective for the solver model. A solver model can have at most one objective function, so the expr parameter should produce exactly one expression.

For example:

import relationalai as rai
from relationalai.experimental import solvers
# Define a RAI model.
model = rai.Model("SupportTicketRouting")
79 collapsed lines
# Declare support pipeline stage type.
Stage = model.Type("Stage")
# Declare route type to connect stages.
Route = model.Type("Route")
Route.source.declare()
Route.destination.declare()
Route.capacity.declare()
# Define sample data for the model.
# Define support teams and endpoints.
for name in [
"Incoming Tickets",
"US Tier 1",
"EU Tier 1",
"US Tier 2",
"EU Tier 2",
"Escalations",
"Resolved Tickets",
]:
with model.rule():
Stage.add(name=name)
# Define routes with capacities.
for src_name, dst_name, cap in [
("Incoming Tickets", "US Tier 1", 50),
("Incoming Tickets", "EU Tier 1", 40),
("US Tier 1", "Resolved Tickets", 20),
("US Tier 1", "US Tier 2", 30),
("EU Tier 1", "Resolved Tickets", 10),
("EU Tier 1", "EU Tier 2", 25),
("US Tier 2", "Resolved Tickets", 15),
("US Tier 2", "Escalations", 20),
("EU Tier 2", "Resolved Tickets", 15),
("EU Tier 2", "Escalations", 20),
("Escalations", "Resolved Tickets", 25),
]:
with model.rule():
src = Stage(name=src_name)
dst = Stage(name=dst_name)
Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
# Define solver variables for each route.
with model.rule():
route = Route()
solver_model.variable(
route,
name_args=["route", route.source.name, route.destination.name],
type="integer",
lower=0,
upper=route.capacity,
)
# Define a constraint to ensure the total number of tickets routed to
# US Tier 1 and US Tier 2 is between 10 and 30.
with solvers.operators():
us_tier_1 = Route(source=Stage(name="US Tier 1"))
us_tier_2 = Route(source=Stage(name="US Tier 2"))
solver_model.constraint(
solvers.integer_interval(us_tier_1 + us_tier_2, 10, 30),
name_args=["us_tier_total"]
)
# Define a constraint to ensure the total number of tickets routed in
# and out of each stage is balanced.
with solvers.operators():
stage = Stage()
stage.name != "Incoming Tickets" # Exclude the source stage
stage.name != "Resolved Tickets" # Exclude the sink stage
incoming_tickets = solvers.sum(Route(destination=stage), per=[stage])
outgoing_tickets = solvers.sum(Route(source=stage), per=[stage])
solver_model.constraint(
incoming_tickets == outgoing_tickets,
name_args=["max_outgoing_tickets", stage.name]
)
# Define an objective to maximize the total number of tickets resolved.
with solvers.operators():
resolved = Stage(name="Resolved Tickets")
total_throughput = solvers.sum(Route(destination=resolved))
solver_model.max_objective(total_throughput)
Signature
SolverModel.solve(solver: Solver, **options) -> None

Use the specified solver backend to solve the solver model. The solver parameter should be an instance of the Solver class.

For example:

import relationalai as rai
from relationalai.std import alias
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("RegionalWaterAllocation")
119 collapsed lines
# Declare water distribution stage type.
WaterSource = model.Type("WaterSource")
Region = model.Type("Region")
Consumer = model.Type("Consumer")
Consumer.region.declare() # Region where the consumer is located.
Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.
Pipeline = model.Type("Pipeline")
Pipeline.source.declare()
Pipeline.destination.declare()
Pipeline.capacity.declare()
# Define water sources.
with model.rule(dynamic=True):
WaterSource.add(name="Reservoir")
# Define regions.
for region in [
{"id": 1, "name": "North"},
{"id": 2, "name": "South"},
{"id": 3, "name": "East"},
{"id": 4, "name": "West"},
]:
with model.rule():
Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.
for consumer in [
{"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
{"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
{"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
{"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
{"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
{"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
{"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
{"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
{"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
{"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
{"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
]:
with model.rule():
region = Region(id=consumer["region_id"])
Consumer.add(id=consumer["id"]).set(
name=consumer["name"],
region=region,
demand=consumer["demand"]
)
# Define pipelines from the reservoir to each region.
for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
with model.rule():
source = WaterSource(name="Reservoir")
region = Region(id=region_id)
Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.
for consumer_id, capacity in [
(1, 150.0), (2, 60.0), (3, 40.0), # North
(4, 80.0), (5, 140.0), (6, 50.0), # South
(7, 90.0), (8, 180.0), # East
(9, 40.0), (10, 30.0), (11, 200.0) # West
]:
with model.rule():
consumer = Consumer(id=consumer_id)
region = consumer.region
Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.
# Define continuous variables for each pipeline to represent it's flow.
with model.rule():
pipe = Pipeline()
solver_model.variable(
pipe,
name_args=["pipe", pipe.source.name, pipe.destination.name],
lower=0,
upper=pipe.capacity,
)
# Define a constraint to ensure flow conservation in each region.
# NOTE: `solvers.operators()` overrides infix operators like `==` to build
# solver expressions instead of filter expressions.
with solvers.operators():
region = Region()
flow_in = solvers.sum(Pipeline(destination=region), per=[region])
flow_out = solvers.sum(Pipeline(source=region), per=[region])
solver_model.constraint(
flow_in == flow_out,
name_args=["preserve_flow", region.name]
)
# Define a constraint satisfy at least half of high-demand consumers' demand.
with model.rule():
consumer = Consumer()
consumer.demand >= 100.0 # Filter for high-demand consumers.
pipe = Pipeline(source=consumer.region, destination=consumer)
with solvers.operators():
solver_model.constraint(
pipe >= .5 * consumer.demand,
name_args=["satisfy_consumer", consumer.id]
)
# Define a constraint to limit the total flow from the reservoir.
with solvers.operators():
total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
solver_model.constraint(
total_flow <= 1000.0, # Max flow from the reservoir
name_args=["max_reservoir_flow"]
)
# Define the objective to minimize unmet demand.
with solvers.operators():
consumer = Consumer()
pipe = Pipeline(source=consumer.region, destination=consumer)
unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
# Create a HiGHS Solver instance.
solver = solvers.Solver("highs")
# Solve the model.
solver_model.solve(solver)

Keyword arguments in **options can be used to configure the solver backend, such as setting time limits or disabling logging. The following standard options are available for all backends:

OptionTypeDescription
silentboolIf True, don’t emit solver logs. Defaults to False.
time_limit_secint, floatMaximum time in seconds to allow the solver to run. Defaults to 300.
solution_limitintMaximum number of solutions to return. Ignored by backends that only produce single solutions. Defaults to None, which means no limit.
relative_gap_tolerancefloatRelative gap tolerance for the solver. Refer to the docs for the chosen solver backend for the default value.
absolute_gap_tolerancefloatAbsolute gap tolerance for the solver. Refer to the docs for the chosen solver backend for the default value.
print_formatstr, NoneOptional output format of string representation of the solver model. Must be one of "moi", "latex", "mof", "lp", "mps", or "nl". Defaults to None. See Model Output String to learn how to retrieve this string.
print_onlyboolIf True, only print the solver model using the format specified by print_format without solving it. Defaults to False.

For instance, to set a time limit of 10 seconds:

import relationalai as rai
from relationalai.std import alias
from relationalai.experimental import solvers
125 collapsed lines
# Create a RAI model.
model = rai.Model("RegionalWaterAllocation")
# Declare water distribution stage type.
WaterSource = model.Type("WaterSource")
Region = model.Type("Region")
Consumer = model.Type("Consumer")
Consumer.region.declare() # Region where the consumer is located.
Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.
Pipeline = model.Type("Pipeline")
Pipeline.source.declare()
Pipeline.destination.declare()
Pipeline.capacity.declare()
# Define water sources.
with model.rule(dynamic=True):
WaterSource.add(name="Reservoir")
# Define regions.
for region in [
{"id": 1, "name": "North"},
{"id": 2, "name": "South"},
{"id": 3, "name": "East"},
{"id": 4, "name": "West"},
]:
with model.rule():
Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.
for consumer in [
{"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
{"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
{"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
{"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
{"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
{"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
{"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
{"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
{"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
{"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
{"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
]:
with model.rule():
region = Region(id=consumer["region_id"])
Consumer.add(id=consumer["id"]).set(
name=consumer["name"],
region=region,
demand=consumer["demand"]
)
# Define pipelines from the reservoir to each region.
for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
with model.rule():
source = WaterSource(name="Reservoir")
region = Region(id=region_id)
Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.
for consumer_id, capacity in [
(1, 150.0), (2, 60.0), (3, 40.0), # North
(4, 80.0), (5, 140.0), (6, 50.0), # South
(7, 90.0), (8, 180.0), # East
(9, 40.0), (10, 30.0), (11, 200.0) # West
]:
with model.rule():
consumer = Consumer(id=consumer_id)
region = consumer.region
Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.
# Define continuous variables for each pipeline to represent it's flow.
with model.rule():
pipe = Pipeline()
solver_model.variable(
pipe,
name_args=["pipe", pipe.source.name, pipe.destination.name],
lower=0,
upper=pipe.capacity,
)
# Define a constraint to ensure flow conservation in each region.
# NOTE: `solvers.operators()` overrides infix operators like `==` to build
# solver expressions instead of filter expressions.
with solvers.operators():
region = Region()
flow_in = solvers.sum(Pipeline(destination=region), per=[region])
flow_out = solvers.sum(Pipeline(source=region), per=[region])
solver_model.constraint(
flow_in == flow_out,
name_args=["preserve_flow", region.name]
)
# Define a constraint satisfy at least half of high-demand consumers' demand.
with model.rule():
consumer = Consumer()
consumer.demand >= 100.0 # Filter for high-demand consumers.
pipe = Pipeline(source=consumer.region, destination=consumer)
with solvers.operators():
solver_model.constraint(
pipe >= .5 * consumer.demand,
name_args=["satisfy_consumer", consumer.id]
)
# Define a constraint to limit the total flow from the reservoir.
with solvers.operators():
total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
solver_model.constraint(
total_flow <= 1000.0, # Max flow from the reservoir
name_args=["max_reservoir_flow"]
)
# Define the objective to minimize unmet demand.
with solvers.operators():
consumer = Consumer()
pipe = Pipeline(source=consumer.region, destination=consumer)
unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
# Create a HiGHS Solver instance.
solver = solvers.Solver("highs")
# Solve the model.
solver_model.solve(solver, time_limit_sec=10)
Signature
SolverModel.value(entity: dsl.Instance) -> dsl.Expression

Returns a dsl.Expression that produces the optimal value of the solver variable associated with the given entity. Requires the .solve() method to have been called first, otherwise .value() will return an empty expression.

For example:

import relationalai as rai
from relationalai.std import alias
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("RegionalWaterAllocation")
132 collapsed lines
# Declare water distribution stage type.
WaterSource = model.Type("WaterSource")
Region = model.Type("Region")
Consumer = model.Type("Consumer")
Consumer.region.declare() # Region where the consumer is located.
Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.
Pipeline = model.Type("Pipeline")
Pipeline.source.declare()
Pipeline.destination.declare()
Pipeline.capacity.declare()
# Define water sources.
with model.rule(dynamic=True):
WaterSource.add(name="Reservoir")
# Define regions.
for region in [
{"id": 1, "name": "North"},
{"id": 2, "name": "South"},
{"id": 3, "name": "East"},
{"id": 4, "name": "West"},
]:
with model.rule():
Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.
for consumer in [
{"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0},
{"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0},
{"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0},
{"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0},
{"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0},
{"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0},
{"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0},
{"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0},
{"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0},
{"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0},
{"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},
]:
with model.rule():
region = Region(id=consumer["region_id"])
Consumer.add(id=consumer["id"]).set(
name=consumer["name"],
region=region,
demand=consumer["demand"]
)
# Define pipelines from the reservoir to each region.
for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]:
with model.rule():
source = WaterSource(name="Reservoir")
region = Region(id=region_id)
Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.
for consumer_id, capacity in [
(1, 150.0), (2, 60.0), (3, 40.0), # North
(4, 80.0), (5, 140.0), (6, 50.0), # South
(7, 90.0), (8, 180.0), # East
(9, 40.0), (10, 30.0), (11, 200.0) # West
]:
with model.rule():
consumer = Consumer(id=consumer_id)
region = consumer.region
Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.
# Define continuous variables for each pipeline to represent it's flow.
with model.rule():
pipe = Pipeline()
solver_model.variable(
pipe,
name_args=["pipe", pipe.source.name, pipe.destination.name],
lower=0,
upper=pipe.capacity,
)
# Define a constraint to ensure flow conservation in each region.
# NOTE: `solvers.operators()` overrides infix operators like `==` to build
# solver expressions instead of filter expressions.
with solvers.operators():
region = Region()
flow_in = solvers.sum(Pipeline(destination=region), per=[region])
flow_out = solvers.sum(Pipeline(source=region), per=[region])
solver_model.constraint(
flow_in == flow_out,
name_args=["preserve_flow", region.name]
)
# Define a constraint satisfy at least half of high-demand consumers' demand.
with model.rule():
consumer = Consumer()
consumer.demand >= 100.0 # Filter for high-demand consumers.
pipe = Pipeline(source=consumer.region, destination=consumer)
with solvers.operators():
solver_model.constraint(
pipe >= .5 * consumer.demand,
name_args=["satisfy_consumer", consumer.id]
)
# Define a constraint to limit the total flow from the reservoir.
with solvers.operators():
total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir")))
solver_model.constraint(
total_flow <= 1000.0, # Max flow from the reservoir
name_args=["max_reservoir_flow"]
)
# Define the objective to minimize unmet demand.
with solvers.operators():
consumer = Consumer()
pipe = Pipeline(source=consumer.region, destination=consumer)
unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer])
solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
# Create a HiGHS Solver instance.
solver = solvers.Solver("highs")
# Solve the model.
solver_model.solve(solver)
# View the solution's objective value.
with model.query() as select:
response = select(solver_model.objective_value)
print(f"\n\nUnmet demand (optimized): {response.results.iloc[0, 0]}")
# Output:
# Unmet demand (optimized): 120.0
# View the optimized flow for each pipeline.
with model.query() as select:
pipe = Pipeline()
response = select(
alias(pipe.source.name, "source_name"),
alias(pipe.destination.name, "destination_name"),
solver_model.value(pipe)
)
print("\n\nOptimized pipeline flows:")
print(response.results)
output
Optimized pipeline flows:
0 East Industrial 180.0
1 East Residential 20.0
2 North Farms 40.0
3 North Industrial 60.0
4 North Residential 150.0
5 Reservoir East 200.0
6 Reservoir North 250.0
7 Reservoir South 270.0
8 Reservoir West 220.0
9 South Farms 50.0
10 South Industrial 140.0
11 South Residential 80.0
12 West Farms 150.0
13 West Industrial 30.0
14 West Residential 40.0
Signature
SolverModel.variable_name(entity: dsl.Instance) -> dsl.Expression

Returns a dsl.Expression that produces the names assigned to the solver variables associated with entity.

import relationalai as rai
from relationalai.experimental import solvers
# Create the model.
model = rai.Model("ShiftAssignment")
36 collapsed lines
# Declare entity types.
Worker = model.Type("Worker")
Shift = model.Type("Shift")
Shift.capacity.declare() # Max people allowed per shift
# Declare relationship types
Assignment = model.Type("Assignment")
Assignment.worker.declare()
Assignment.shift.declare()
# Define sample data.
with model.rule():
Worker.add(name="Alice")
Worker.add(name="Bob")
Worker.add(name="Carlos")
with model.rule():
Shift.add(name="Morning", capacity=1)
Shift.add(name="Afternoon", capacity=2)
Shift.add(name="Night", capacity=1)
# Define all possible assignments.
with model.rule():
Assignment.add(worker=Worker(), shift=Shift())
# Create the solver model.
solver_model = solvers.SolverModel(model)
# Define binary assignment variable.
with model.rule():
assignment = Assignment()
solver_model.variable(
assignment,
name_args=["assigned", assignment.worker.name, assignment.shift.name],
type="zero_one"
)
# Get the variable names for each assignment.
with model.query() as select:
assignment = Assignment()
response = select(solver_model.variable_name(assignment))
print(response.results)
output
name
0 assigned_Alice_Afternoon
1 assigned_Alice_Morning
2 assigned_Alice_Night
3 assigned_Bob_Afternoon
4 assigned_Bob_Morning
5 assigned_Bob_Night
6 assigned_Carlos_Afternoon
7 assigned_Carlos_Morning
8 assigned_Carlos_Night
Signature
SolverModel.print() -> None

Print a canonical representation of the model to stdout.

For example:

import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
45 collapsed lines
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)
# Define a conditional constraint that requires the charger if headphones are included.
with solvers.operators():
headphones = Product(name="Headphones")
charger = Product(name="Charger")
solver_model.constraint(
solvers.if_then_else(
headphones == 1, # If the headphones are included
charger == 1, # Then the charger must be included
charger == 0, # Otherwise, the charger must not be included
),
name_args=["headphones_require_charger"]
)
# Define a constraint to limit the bundle size to 3 products.
with solvers.operators():
product = Product()
solver_model.constraint(
solvers.sum(product) == 3,
name_args=["bundle_size"]
)
# Define the objective to minimize the total cost of the bundle.
with solvers.operators():
product = Product()
total_cost = solvers.sum(product.cost * product)
solver_model.min_objective(total_cost) # Minimize total bundle cost
# Inspect the solver model.
solver_model.print()
output
variables:
include_Charger
include_Headphones
include_Keyboard
include_Laptop
include_Mouse
minimization objectives:
SUM(MULTIPLY(12.0,include_Charger),MULTIPLY(25.0,include_Keyboard),MULTIPLY(30.0,include_Headphones),MULTIPLY(650.0,include_Laptop),MULTIPLY(8.0,include_Mouse))
constraints:
AND(EQ(include_Laptop,1),ZEROONE(include_Laptop))
EQ(SUM(include_Charger,include_Headphones,include_Keyboard,include_Laptop,include_Mouse),3)
ZEROONE(include_Charger)
ZEROONE(include_Headphones)
ZEROONE(include_Keyboard)
ZEROONE(include_Mouse)

Alternatively, set the print_format option when calling SolverModel.solve(). The result is available in the SolverModel.printed_model attribute. print_format offers a wider range of output format options, but requires a solver engine, even if print_only=True.

The SolverModel class provides several attributes to access the results and metadata of the model after it has been solved or printed.

Signature
SolverModel.printed_model: dsl.Expression

A dsl.Expression that produces a string containing a human-readable representation of the model in the format specified by the SoverModel.solve() method’s print_format option. Requires the model to have been solved or printed with print_only=True option.

For example:

import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
45 collapsed lines
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)
# Define a conditional constraint that requires the charger if headphones are included.
with solvers.operators():
headphones = Product(name="Headphones")
charger = Product(name="Charger")
solver_model.constraint(
solvers.if_then_else(
headphones == 1, # If the headphones are included
charger == 1, # Then the charger must be included
charger == 0, # Otherwise, the charger must not be included
),
name_args=["headphones_require_charger"]
)
# Define a constraint to limit the bundle size to 3 products.
with solvers.operators():
product = Product()
solver_model.constraint(
solvers.sum(product) == 3,
name_args=["bundle_size"]
)
# Define the objective to minimize the total cost of the bundle.
with solvers.operators():
product = Product()
total_cost = solvers.sum(product.cost * product)
solver_model.min_objective(total_cost) # Minimize total bundle cost
# Create a Solver object.
solver = solvers.Solver("minizinc")
# Solve the model.
solver_model.solve(
solver,
log_to_console=False,
print_format="moi",
print_only=True # Print the model without solving it
)
# View the string representation of the model.
with model.query() as select:
response = select(solver_model.printed_model)
print(response.results.iloc[0, 0])
output
Minimize: (8) include_Mouse + (12) include_Charger + (25) include_Keyboard + (30) include_Headphones + (650) include_Laptop
Subject to:
ifelse(==(include_Headphones, (1)), ==(include_Charger, (1)), ==(include_Charger, (0))) == (1) (headphones_require_charger)
(1) include_Laptop == (1)
(1) include_Keyboard + (1) include_Mouse + (1) include_Laptop + (1) include_Headphones + (1) include_Charger == (3) (bundle_size)
include_Keyboard ∈ {0, 1}
include_Mouse ∈ {0, 1}
include_Laptop ∈ {0, 1}
include_Headphones ∈ {0, 1}
include_Charger ∈ {0, 1}
Signature
SolverModel.objective_value: dsl.Expression

A dsl.Expression that produces the optimal value of the objective function. Requires the model to have been solved with SolverModel.solve() with print_only=False.

For example:

import relationalai as rai
from relationalai.experimental import solvers
# Create a RAI model.
model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.
Product = model.Type("Product")
Product.price.declare()
Product.cost.declare()
# Define sample products with price and cost properties.
with model.rule():
Product.add(name="Laptop").set(price=999.0, cost=650.0)
Product.add(name="Mouse").set(price=25.0, cost=8.0)
Product.add(name="Keyboard").set(price=70.0, cost=25.0)
Product.add(name="Charger").set(price=35.0, cost=12.0)
Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.
solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.
63 collapsed lines
# Define binary decision variables to include or exclude each product.
with model.rule():
product = Product()
with model.match():
with product.name == "Laptop":
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one",
fixed=1 # Bundle must always include the laptop
)
# Other products are optional.
with model.case():
solver_model.variable(
product,
name_args=["include", product.name],
type="zero_one"
)
# Define a conditional constraint that requires the charger if headphones are included.
with solvers.operators():
headphones = Product(name="Headphones")
charger = Product(name="Charger")
solver_model.constraint(
solvers.if_then_else(
headphones == 1, # If the headphones are included
charger == 1, # Then the charger must be included
charger == 0, # Otherwise, the charger must not be included
),
name_args=["headphones_require_charger"]
)
# Define a constraint to limit the bundle size to 3 products.
with solvers.operators():
product = Product()
solver_model.constraint(
solvers.sum(product) == 3,
name_args=["bundle_size"]
)
# Define the objective to minimize the total cost of the bundle.
with solvers.operators():
product = Product()
total_cost = solvers.sum(product.cost * product)
solver_model.min_objective(total_cost) # Minimize total bundle cost
# Inspect the solver model.
solver_model.print()
# variables:
# include_Charger
# include_Headphones
# include_Keyboard
# include_Laptop
# include_Mouse
# minimization objectives:
# SUM(MULTIPLY(12.0,include_Charger),MULTIPLY(25.0,include_Keyboard),MULTIPLY(30.0,include_Headphones),MULTIPLY(650.0,include_Laptop),MULTIPLY(8.0,include_Mouse))
# constraints:
# AND(EQ(include_Laptop,1),ZEROONE(include_Laptop))
# EQ(SUM(include_Charger,include_Headphones,include_Keyboard,include_Laptop,include_Mouse),3)
# ZEROONE(include_Charger)
# ZEROONE(include_Headphones)
# ZEROONE(include_Keyboard)
# ZEROONE(include_Mouse)
# Create a Solver object.
solver = solvers.Solver("minizinc")
# Solve the model.
solver_model.solve(solver)
# View the solution's objective value.
with model.query() as select:
response = select(solver_model.objective_value)
print(f"\n\nMinimum cost: {response.results.iloc[0, 0]}")
output
Minimum cost: 683
Signature
SolverModel.termination_status: dsl.Expression

A dsl.Expression that produces the termination status of the solver. This status indicates whether the solver found an optimal solution, was infeasible, or encountered another issue. Requires the model to have been solved with SolverModel.solve() with print_only=False.

For example:

import relationalai as rai
from relationalai.experimental import solvers
208 collapsed lines
# Create a RAI model.
model = rai.Model("WeeklyShiftAssignment")
# Declare entity types and properties.
# Entity types.
Employee = model.Type("Employee")
Shift = model.Type("Shift")
Day = model.Type("Day")
Available = model.Type("Available")
Scenario = model.Type("Scenario") # All possible employee–shift–day combinations.
Assignment = model.Type("Assignment") # A Scenario that is assigned.
# Properties.
Employee.name.declare()
Shift.name.declare()
Shift.capacity.declare()
Available.employee.declare()
Available.day.declare()
Scenario.employee.declare()
Scenario.shift.declare()
Scenario.day.declare()
# Define sample data.
# Employees.
with model.rule(dynamic=True):
for name in ["Alice", "Bob", "Carol", "Dave", "Eve"]:
Employee.add(name=name)
# Shifts.
with model.rule():
Shift.add(name="Morning").set(capacity=2)
Shift.add(name="Evening").set(capacity=3)
# Days of the week.
with model.rule(dynamic=True):
for name in ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]:
Day.add(name=name)
# Employee-day availability.
with model.rule(dynamic=True):
# Alice works weekdays only
for name in ["Mon", "Tue", "Wed", "Thu", "Fri"]:
Available.add(employee=Employee(name="Alice"), day=Day(name=name))
# Bob works all days
Available.add(employee=Employee(name="Bob"), day=Day())
# Carol works weekends only
for name in ["Sat", "Sun"]:
Available.add(employee=Employee(name="Carol"), day=Day(name=name))
# Dave works Mon/Wed/Fri
for name in ["Mon", "Wed", "Fri"]:
Available.add(employee=Employee(name="Dave"), day=Day(name=name))
# Eve works Tue/Thu/Sat
for name in ["Tue", "Thu", "Sat"]:
Available.add(employee=Employee(name="Eve"), day=Day(name=name))
# All possible employee–shift–day combinations
with model.rule():
Scenario.add(employee=Employee(), shift=Shift(), day=Day())
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Define solver variables.
# Scenario assignment: binary variable for each employee–shift–day combination.
with model.rule():
scenario = Scenario()
solver_model.variable(
scenario,
type="zero_one",
name_args=["assigned", scenario.employee.name, scenario.day.name, scenario.shift.name],
)
# Define Solver Constraints
# Unavailable employee–day pairs cannot be assigned.
with model.rule():
scenario = Scenario()
with model.not_found():
Available(employee=scenario.employee, day=scenario.day)
solver_model.constraint(
solvers.not_(scenario),
name_args=["unavailable", scenario.employee.name, scenario.day.name]
)
# If an employee works the morning shift, they can't work the evening shift.
with model.rule():
works_morning = Scenario(shift=Shift(name="Morning"))
works_evening = Scenario(
shift=Shift(name="Evening"),
employee=works_morning.employee,
day=works_morning.day
)
solver_model.constraint(
solvers.implies(works_morning, solvers.not_(works_evening))
)
# At least one of Alice or Bob works Morning on Monday.
with model.rule():
morning, monday = Shift(name="Morning"), Day(name="Mon")
alice_works = Scenario(employee=Employee(name="Alice"), shift=morning, day=monday)
bob_works = Scenario(employee=Employee(name="Bob"), shift=morning, day=monday)
solver_model.constraint(solvers.or_(alice_works, bob_works))
# If Alice works in the morning, she doesn't work in the evening. Otherwise, she does work in the evening.
with model.rule():
alice = Employee(name="Alice")
day = Available(employee=alice).day
works_morning = Scenario(employee=alice, shift=Shift(name="Morning"), day=day)
works_evening = Scenario(employee=alice, shift=Shift(name="Evening"), day=day)
solver_model.constraint(
solvers.if_then_else(works_morning, solvers.not_(works_evening), works_evening),
name_args=["alice_shift", day.name]
)
# Limit the number of employees assigned to each shift to its capacity.
with solvers.operators():
scenario = Scenario()
shift, day = scenario.shift, scenario.day
num_shifts_per_day = solvers.count(scenario, per=[shift, day])
solver_model.constraint(
num_shifts_per_day <= shift.capacity,
name_args=["shift_cap", shift.name, day.name]
)
# Define solver objective.
# Maximize total assignments across the week.
with model.rule():
scenario = Scenario()
solver_model.max_objective(solvers.sum(scenario))
# Solve the solver model.
# Create a Solver instance.
solver = solvers.Solver("minizinc")
# Solve the model.
solver_model.solve(solver)
# View the objective value.
with model.query() as select:
response = select(solver_model.objective_value)
print(f"Total assignments: {response.results.iloc[0, 0]}")
# Output:
# Total assignments: 20
# View the solution values for each scenario variable.
with model.query() as select:
scenario = Scenario()
response = select(
scenario.employee.name,
scenario.shift.name,
scenario.day.name,
solver_model.value(scenario)
)
print("Solution:")
print(response.results)
# Output:
# Solution:
# name name2 name3 value
# 0 Alice Evening Fri 1
# 1 Alice Evening Mon 0
# 2 Alice Evening Sat 0
# 3 Alice Evening Sun 0
# 4 Alice Evening Thu 1
# .. ... ... ... ...
# 65 Eve Morning Sat 1
# 66 Eve Morning Sun 0
# 67 Eve Morning Thu 0
# 68 Eve Morning Tue 0
# 69 Eve Morning Wed 0
# [70 rows x 4 columns]
# Use the solution to define Assignments.
# Define Assignments from Scenarios whose solution value is 1.
with model.rule():
scenario = Scenario()
solver_model.value(scenario) == 1
scenario.set(Assignment)
# View Dave's assignments.
with model.query() as select:
assignment = Assignment(employee=Employee(name="Dave"))
response = select(assignment.day.name, assignment.shift.name)
print("Dave's assignments:")
print(response.results)
# Output:
# Dave's assignments:
# name name2
# 0 Fri Morning
# 1 Mon Morning
# 2 Wed Evening
# View solution metadata.
with model.query() as select:
response = select(solver_model.termination_status)
print(f"Solver termination status: {response.results.iloc[0, 0]}")
output
Solver termination status: OPTIMAL
Signature
SolverModel.solve_time_sec: dsl.Expression

A dsl.Expression that produces the time taken by the solver to solve the model, in seconds. Requires the model to have been solved with SolverModel.solve() with print_only=False.

For example:

import relationalai as rai
from relationalai.experimental import solvers
215 collapsed lines
# Create a RAI model.
model = rai.Model("WeeklyShiftAssignment")
# Declare entity types and properties.
# Entity types.
Employee = model.Type("Employee")
Shift = model.Type("Shift")
Day = model.Type("Day")
Available = model.Type("Available")
Scenario = model.Type("Scenario") # All possible employee–shift–day combinations.
Assignment = model.Type("Assignment") # A Scenario that is assigned.
# Properties.
Employee.name.declare()
Shift.name.declare()
Shift.capacity.declare()
Available.employee.declare()
Available.day.declare()
Scenario.employee.declare()
Scenario.shift.declare()
Scenario.day.declare()
# Define sample data.
# Employees.
with model.rule(dynamic=True):
for name in ["Alice", "Bob", "Carol", "Dave", "Eve"]:
Employee.add(name=name)
# Shifts.
with model.rule():
Shift.add(name="Morning").set(capacity=2)
Shift.add(name="Evening").set(capacity=3)
# Days of the week.
with model.rule(dynamic=True):
for name in ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]:
Day.add(name=name)
# Employee-day availability.
with model.rule(dynamic=True):
# Alice works weekdays only
for name in ["Mon", "Tue", "Wed", "Thu", "Fri"]:
Available.add(employee=Employee(name="Alice"), day=Day(name=name))
# Bob works all days
Available.add(employee=Employee(name="Bob"), day=Day())
# Carol works weekends only
for name in ["Sat", "Sun"]:
Available.add(employee=Employee(name="Carol"), day=Day(name=name))
# Dave works Mon/Wed/Fri
for name in ["Mon", "Wed", "Fri"]:
Available.add(employee=Employee(name="Dave"), day=Day(name=name))
# Eve works Tue/Thu/Sat
for name in ["Tue", "Thu", "Sat"]:
Available.add(employee=Employee(name="Eve"), day=Day(name=name))
# All possible employee–shift–day combinations
with model.rule():
Scenario.add(employee=Employee(), shift=Shift(), day=Day())
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Define solver variables.
# Scenario assignment: binary variable for each employee–shift–day combination.
with model.rule():
scenario = Scenario()
solver_model.variable(
scenario,
type="zero_one",
name_args=["assigned", scenario.employee.name, scenario.day.name, scenario.shift.name],
)
# Define Solver Constraints
# Unavailable employee–day pairs cannot be assigned.
with model.rule():
scenario = Scenario()
with model.not_found():
Available(employee=scenario.employee, day=scenario.day)
solver_model.constraint(
solvers.not_(scenario),
name_args=["unavailable", scenario.employee.name, scenario.day.name]
)
# If an employee works the morning shift, they can't work the evening shift.
with model.rule():
works_morning = Scenario(shift=Shift(name="Morning"))
works_evening = Scenario(
shift=Shift(name="Evening"),
employee=works_morning.employee,
day=works_morning.day
)
solver_model.constraint(
solvers.implies(works_morning, solvers.not_(works_evening))
)
# At least one of Alice or Bob works Morning on Monday.
with model.rule():
morning, monday = Shift(name="Morning"), Day(name="Mon")
alice_works = Scenario(employee=Employee(name="Alice"), shift=morning, day=monday)
bob_works = Scenario(employee=Employee(name="Bob"), shift=morning, day=monday)
solver_model.constraint(solvers.or_(alice_works, bob_works))
# If Alice works in the morning, she doesn't work in the evening. Otherwise, she does work in the evening.
with model.rule():
alice = Employee(name="Alice")
day = Available(employee=alice).day
works_morning = Scenario(employee=alice, shift=Shift(name="Morning"), day=day)
works_evening = Scenario(employee=alice, shift=Shift(name="Evening"), day=day)
solver_model.constraint(
solvers.if_then_else(works_morning, solvers.not_(works_evening), works_evening),
name_args=["alice_shift", day.name]
)
# Limit the number of employees assigned to each shift to its capacity.
with solvers.operators():
scenario = Scenario()
shift, day = scenario.shift, scenario.day
num_shifts_per_day = solvers.count(scenario, per=[shift, day])
solver_model.constraint(
num_shifts_per_day <= shift.capacity,
name_args=["shift_cap", shift.name, day.name]
)
# Define solver objective.
# Maximize total assignments across the week.
with model.rule():
scenario = Scenario()
solver_model.max_objective(solvers.sum(scenario))
# Solve the solver model.
# Create a Solver instance.
solver = solvers.Solver("minizinc")
# Solve the model.
solver_model.solve(solver)
# View the objective value.
with model.query() as select:
response = select(solver_model.objective_value)
print(f"Total assignments: {response.results.iloc[0, 0]}")
# Output:
# Total assignments: 20
# View the solution values for each scenario variable.
with model.query() as select:
scenario = Scenario()
response = select(
scenario.employee.name,
scenario.shift.name,
scenario.day.name,
solver_model.value(scenario)
)
print("Solution:")
print(response.results)
# Output:
# Solution:
# name name2 name3 value
# 0 Alice Evening Fri 1
# 1 Alice Evening Mon 0
# 2 Alice Evening Sat 0
# 3 Alice Evening Sun 0
# 4 Alice Evening Thu 1
# .. ... ... ... ...
# 65 Eve Morning Sat 1
# 66 Eve Morning Sun 0
# 67 Eve Morning Thu 0
# 68 Eve Morning Tue 0
# 69 Eve Morning Wed 0
# [70 rows x 4 columns]
# Use the solution to define Assignments.
# Define Assignments from Scenarios whose solution value is 1.
with model.rule():
scenario = Scenario()
solver_model.value(scenario) == 1
scenario.set(Assignment)
# View Dave's assignments.
with model.query() as select:
assignment = Assignment(employee=Employee(name="Dave"))
response = select(assignment.day.name, assignment.shift.name)
print("Dave's assignments:")
print(response.results)
# Output:
# Dave's assignments:
# name name2
# 0 Fri Morning
# 1 Mon Morning
# 2 Wed Evening
# View solution metadata.
# Termination status.
with model.query() as select:
response = select(solver_model.termination_status)
print(f"Solver termination status: {response.results.iloc[0, 0]}")
# Output:
# Solver termination status: OPTIMAL
with model.query() as select:
response = select(solver_model.solve_time_sec)
print(f"Solver time: {response.results.iloc[0, 0]} seconds")
output
Solver time: 1.5687661170959473 seconds
Signature
SolverModel.solver_version: dsl.Expression

A dsl.Expression that produces the version of the solver backend used to solve the model. Requires the model to have been solved with SolverModel.solve().

For example:

import relationalai as rai
from relationalai.experimental import solvers
222 collapsed lines
# Create a RAI model.
model = rai.Model("WeeklyShiftAssignment")
# Declare entity types and properties.
# Entity types.
Employee = model.Type("Employee")
Shift = model.Type("Shift")
Day = model.Type("Day")
Available = model.Type("Available")
Scenario = model.Type("Scenario") # All possible employee–shift–day combinations.
Assignment = model.Type("Assignment") # A Scenario that is assigned.
# Properties.
Employee.name.declare()
Shift.name.declare()
Shift.capacity.declare()
Available.employee.declare()
Available.day.declare()
Scenario.employee.declare()
Scenario.shift.declare()
Scenario.day.declare()
# Define sample data.
# Employees.
with model.rule(dynamic=True):
for name in ["Alice", "Bob", "Carol", "Dave", "Eve"]:
Employee.add(name=name)
# Shifts.
with model.rule():
Shift.add(name="Morning").set(capacity=2)
Shift.add(name="Evening").set(capacity=3)
# Days of the week.
with model.rule(dynamic=True):
for name in ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]:
Day.add(name=name)
# Employee-day availability.
with model.rule(dynamic=True):
# Alice works weekdays only
for name in ["Mon", "Tue", "Wed", "Thu", "Fri"]:
Available.add(employee=Employee(name="Alice"), day=Day(name=name))
# Bob works all days
Available.add(employee=Employee(name="Bob"), day=Day())
# Carol works weekends only
for name in ["Sat", "Sun"]:
Available.add(employee=Employee(name="Carol"), day=Day(name=name))
# Dave works Mon/Wed/Fri
for name in ["Mon", "Wed", "Fri"]:
Available.add(employee=Employee(name="Dave"), day=Day(name=name))
# Eve works Tue/Thu/Sat
for name in ["Tue", "Thu", "Sat"]:
Available.add(employee=Employee(name="Eve"), day=Day(name=name))
# All possible employee–shift–day combinations
with model.rule():
Scenario.add(employee=Employee(), shift=Shift(), day=Day())
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Define solver variables.
# Scenario assignment: binary variable for each employee–shift–day combination.
with model.rule():
scenario = Scenario()
solver_model.variable(
scenario,
type="zero_one",
name_args=["assigned", scenario.employee.name, scenario.day.name, scenario.shift.name],
)
# Define Solver Constraints
# Unavailable employee–day pairs cannot be assigned.
with model.rule():
scenario = Scenario()
with model.not_found():
Available(employee=scenario.employee, day=scenario.day)
solver_model.constraint(
solvers.not_(scenario),
name_args=["unavailable", scenario.employee.name, scenario.day.name]
)
# If an employee works the morning shift, they can't work the evening shift.
with model.rule():
works_morning = Scenario(shift=Shift(name="Morning"))
works_evening = Scenario(
shift=Shift(name="Evening"),
employee=works_morning.employee,
day=works_morning.day
)
solver_model.constraint(
solvers.implies(works_morning, solvers.not_(works_evening))
)
# At least one of Alice or Bob works Morning on Monday.
with model.rule():
morning, monday = Shift(name="Morning"), Day(name="Mon")
alice_works = Scenario(employee=Employee(name="Alice"), shift=morning, day=monday)
bob_works = Scenario(employee=Employee(name="Bob"), shift=morning, day=monday)
solver_model.constraint(solvers.or_(alice_works, bob_works))
# If Alice works in the morning, she doesn't work in the evening. Otherwise, she does work in the evening.
with model.rule():
alice = Employee(name="Alice")
day = Available(employee=alice).day
works_morning = Scenario(employee=alice, shift=Shift(name="Morning"), day=day)
works_evening = Scenario(employee=alice, shift=Shift(name="Evening"), day=day)
solver_model.constraint(
solvers.if_then_else(works_morning, solvers.not_(works_evening), works_evening),
name_args=["alice_shift", day.name]
)
# Limit the number of employees assigned to each shift to its capacity.
with solvers.operators():
scenario = Scenario()
shift, day = scenario.shift, scenario.day
num_shifts_per_day = solvers.count(scenario, per=[shift, day])
solver_model.constraint(
num_shifts_per_day <= shift.capacity,
name_args=["shift_cap", shift.name, day.name]
)
# Define solver objective.
# Maximize total assignments across the week.
with model.rule():
scenario = Scenario()
solver_model.max_objective(solvers.sum(scenario))
# Solve the solver model.
# Create a Solver instance.
solver = solvers.Solver("minizinc")
# Solve the model.
solver_model.solve(solver)
# View the objective value.
with model.query() as select:
response = select(solver_model.objective_value)
print(f"Total assignments: {response.results.iloc[0, 0]}")
# Output:
# Total assignments: 20
# View the solution values for each scenario variable.
with model.query() as select:
scenario = Scenario()
response = select(
scenario.employee.name,
scenario.shift.name,
scenario.day.name,
solver_model.value(scenario)
)
print("Solution:")
print(response.results)
# Output:
# Solution:
# name name2 name3 value
# 0 Alice Evening Fri 1
# 1 Alice Evening Mon 0
# 2 Alice Evening Sat 0
# 3 Alice Evening Sun 0
# 4 Alice Evening Thu 1
# .. ... ... ... ...
# 65 Eve Morning Sat 1
# 66 Eve Morning Sun 0
# 67 Eve Morning Thu 0
# 68 Eve Morning Tue 0
# 69 Eve Morning Wed 0
# [70 rows x 4 columns]
# Use the solution to define Assignments.
# Define Assignments from Scenarios whose solution value is 1.
with model.rule():
scenario = Scenario()
solver_model.value(scenario) == 1
scenario.set(Assignment)
# View Dave's assignments.
with model.query() as select:
assignment = Assignment(employee=Employee(name="Dave"))
response = select(assignment.day.name, assignment.shift.name)
print("Dave's assignments:")
print(response.results)
# Output:
# Dave's assignments:
# name name2
# 0 Fri Morning
# 1 Mon Morning
# 2 Wed Evening
# View solution metadata.
# Termination status.
with model.query() as select:
response = select(solver_model.termination_status)
print(f"Solver termination status: {response.results.iloc[0, 0]}")
# Output:
# Solver termination status: OPTIMAL
# Solver time.
with model.query() as select:
response = select(solver_model.solve_time_sec)
print(f"Solver time: {response.results.iloc[0, 0]} seconds")
# Output:
# Solver time: 1.5687661170959473 seconds
with model.query() as select:
response = select(solver_model.solver_version)
print(f"Solver version: {response.results.iloc[0, 0]}")
output
Solver version: MiniZinc_nothing
Signature
SolverModel.error: dsl.Expression

A dsl.Expression that produces an error message if the solver encountered an issue during execution. Requires the model to have been solved with SolverModel.solve() with print_only=False.

For example:

import relationalai as rai
from relationalai.experimental import solvers
229 collapsed lines
# Create a RAI model.
model = rai.Model("WeeklyShiftAssignment")
# Declare entity types and properties.
# Entity types.
Employee = model.Type("Employee")
Shift = model.Type("Shift")
Day = model.Type("Day")
Available = model.Type("Available")
Scenario = model.Type("Scenario") # All possible employee–shift–day combinations.
Assignment = model.Type("Assignment") # A Scenario that is assigned.
# Properties.
Employee.name.declare()
Shift.name.declare()
Shift.capacity.declare()
Available.employee.declare()
Available.day.declare()
Scenario.employee.declare()
Scenario.shift.declare()
Scenario.day.declare()
# Define sample data.
# Employees.
with model.rule(dynamic=True):
for name in ["Alice", "Bob", "Carol", "Dave", "Eve"]:
Employee.add(name=name)
# Shifts.
with model.rule():
Shift.add(name="Morning").set(capacity=2)
Shift.add(name="Evening").set(capacity=3)
# Days of the week.
with model.rule(dynamic=True):
for name in ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]:
Day.add(name=name)
# Employee-day availability.
with model.rule(dynamic=True):
# Alice works weekdays only
for name in ["Mon", "Tue", "Wed", "Thu", "Fri"]:
Available.add(employee=Employee(name="Alice"), day=Day(name=name))
# Bob works all days
Available.add(employee=Employee(name="Bob"), day=Day())
# Carol works weekends only
for name in ["Sat", "Sun"]:
Available.add(employee=Employee(name="Carol"), day=Day(name=name))
# Dave works Mon/Wed/Fri
for name in ["Mon", "Wed", "Fri"]:
Available.add(employee=Employee(name="Dave"), day=Day(name=name))
# Eve works Tue/Thu/Sat
for name in ["Tue", "Thu", "Sat"]:
Available.add(employee=Employee(name="Eve"), day=Day(name=name))
# All possible employee–shift–day combinations
with model.rule():
Scenario.add(employee=Employee(), shift=Shift(), day=Day())
# Create a SolverModel instance from the model.
solver_model = solvers.SolverModel(model)
# Define solver variables.
# Scenario assignment: binary variable for each employee–shift–day combination.
with model.rule():
scenario = Scenario()
solver_model.variable(
scenario,
type="zero_one",
name_args=["assigned", scenario.employee.name, scenario.day.name, scenario.shift.name],
)
# Define Solver Constraints
# Unavailable employee–day pairs cannot be assigned.
with model.rule():
scenario = Scenario()
with model.not_found():
Available(employee=scenario.employee, day=scenario.day)
solver_model.constraint(
solvers.not_(scenario),
name_args=["unavailable", scenario.employee.name, scenario.day.name]
)
# If an employee works the morning shift, they can't work the evening shift.
with model.rule():
works_morning = Scenario(shift=Shift(name="Morning"))
works_evening = Scenario(
shift=Shift(name="Evening"),
employee=works_morning.employee,
day=works_morning.day
)
solver_model.constraint(
solvers.implies(works_morning, solvers.not_(works_evening))
)
# At least one of Alice or Bob works Morning on Monday.
with model.rule():
morning, monday = Shift(name="Morning"), Day(name="Mon")
alice_works = Scenario(employee=Employee(name="Alice"), shift=morning, day=monday)
bob_works = Scenario(employee=Employee(name="Bob"), shift=morning, day=monday)
solver_model.constraint(solvers.or_(alice_works, bob_works))
# If Alice works in the morning, she doesn't work in the evening. Otherwise, she does work in the evening.
with model.rule():
alice = Employee(name="Alice")
day = Available(employee=alice).day
works_morning = Scenario(employee=alice, shift=Shift(name="Morning"), day=day)
works_evening = Scenario(employee=alice, shift=Shift(name="Evening"), day=day)
solver_model.constraint(
solvers.if_then_else(works_morning, solvers.not_(works_evening), works_evening),
name_args=["alice_shift", day.name]
)
# Limit the number of employees assigned to each shift to its capacity.
with solvers.operators():
scenario = Scenario()
shift, day = scenario.shift, scenario.day
num_shifts_per_day = solvers.count(scenario, per=[shift, day])
solver_model.constraint(
num_shifts_per_day <= shift.capacity,
name_args=["shift_cap", shift.name, day.name]
)
# Define solver objective.
# Maximize total assignments across the week.
with model.rule():
scenario = Scenario()
solver_model.max_objective(solvers.sum(scenario))
# Solve the solver model.
# Create a Solver instance.
solver = solvers.Solver("minizinc")
# Solve the model.
solver_model.solve(solver)
# View the objective value.
with model.query() as select:
response = select(solver_model.objective_value)
print(f"Total assignments: {response.results.iloc[0, 0]}")
# Output:
# Total assignments: 20
# View the solution values for each scenario variable.
with model.query() as select:
scenario = Scenario()
response = select(
scenario.employee.name,
scenario.shift.name,
scenario.day.name,
solver_model.value(scenario)
)
print("Solution:")
print(response.results)
# Output:
# Solution:
# name name2 name3 value
# 0 Alice Evening Fri 1
# 1 Alice Evening Mon 0
# 2 Alice Evening Sat 0
# 3 Alice Evening Sun 0
# 4 Alice Evening Thu 1
# .. ... ... ... ...
# 65 Eve Morning Sat 1
# 66 Eve Morning Sun 0
# 67 Eve Morning Thu 0
# 68 Eve Morning Tue 0
# 69 Eve Morning Wed 0
# [70 rows x 4 columns]
# Use the solution to define Assignments.
# Define Assignments from Scenarios whose solution value is 1.
with model.rule():
scenario = Scenario()
solver_model.value(scenario) == 1
scenario.set(Assignment)
# View Dave's assignments.
with model.query() as select:
assignment = Assignment(employee=Employee(name="Dave"))
response = select(assignment.day.name, assignment.shift.name)
print("Dave's assignments:")
print(response.results)
# Output:
# Dave's assignments:
# name name2
# 0 Fri Morning
# 1 Mon Morning
# 2 Wed Evening
# View solution metadata.
# Termination status.
with model.query() as select:
response = select(solver_model.termination_status)
print(f"Solver termination status: {response.results.iloc[0, 0]}")
# Output:
# Solver termination status: OPTIMAL
# Solver time.
with model.query() as select:
response = select(solver_model.solve_time_sec)
print(f"Solver time: {response.results.iloc[0, 0]} seconds")
# Output:
# Solver time: 1.5687661170959473 seconds
# Solver version.
with model.query() as select:
response = select(solver_model.solver_version)
print(f"Solver version: {response.results.iloc[0, 0]}")
# Output:
# Solver version: MiniZinc_nothing
with model.query() as select:
response = select(solver_model.error)
print(f"Solver errors:\n{response.results}")