relationalai.experimental.solvers.SolverModel
class SolverModel
Represents a prescriptive optimization model that can be defined, inspected, and solved using RelationalAI’s Prescriptive Reasoner.
SolverModel
objects have methods for defining decision variables, constraints, and objective functions, and to invoke a solver on your model.
Example
Section titled “Example”To create a SolverModel
object, create a RAI model and pass it to the SolverModel
constructor:
import relationalai as raifrom relationalai.experimental import solvers
# Define a RAI model.model = rai.Model("SupportTicketRouting")
# Declare support pipeline stage type.Stage = model.Type("Stage")
# Declare route type to connect stages.Route = model.Type("Route")Route.source.declare()Route.destination.declare()Route.capacity.declare()
# Define sample data for the model.31 collapsed lines
# Define support teams and endpoints.for name in [ "Incoming Tickets", "US Tier 1", "EU Tier 1", "US Tier 2", "EU Tier 2", "Escalations", "Resolved Tickets",]: with model.rule(): Stage.add(name=name)
# Define routes with capacities.for src_name, dst_name, cap in [ ("Incoming Tickets", "US Tier 1", 50), ("Incoming Tickets", "EU Tier 1", 40), ("US Tier 1", "Resolved Tickets", 20), ("US Tier 1", "US Tier 2", 30), ("EU Tier 1", "Resolved Tickets", 10), ("EU Tier 1", "EU Tier 2", 25), ("US Tier 2", "Resolved Tickets", 15), ("US Tier 2", "Escalations", 20), ("EU Tier 2", "Resolved Tickets", 15), ("EU Tier 2", "Escalations", 20), ("Escalations", "Resolved Tickets", 25),]: with model.rule(): src = Stage(name=src_name) dst = Stage(name=dst_name) Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.solver_model = solvers.SolverModel(model)
Methods
Section titled “Methods”The SolverModel
class provides methods to define variables, constraints, and objectives, as well as to solve the model and retrieve results.
.__init__()
Section titled “.__init__()”SolverModel.__init__(model: dsl.Model) -> None
Create a new SolverModel
object from an existing RAI model
.
.variable()
Section titled “.variable()”SolverModel.variable( instance: dsl.Instance, name_args=list[str|dslProducer]|None = None, type=str|None = None, lower=int|float|dsl.Producer|None = None, upper=int|float|dsl.Producer|None = None, fixed=int|float|dsl.Producer|None = None) -> None
Declare a solver variable for each entity produced by instance
.
By default, variables are continuous and unbounded.
Use the optional type
, lower
, upper
, and fixed
parameters to constrain the to integer or binary values and to set bounds or fixed values.
If name_args
is provided, it will be used to construct the variable name.
For example:
import relationalai as raifrom relationalai.experimental import solvers
# Define a RAI model.model = rai.Model("SupportTicketRouting")
41 collapsed lines
# Declare support pipeline stage type.Stage = model.Type("Stage")
# Declare route type to connect stages.Route = model.Type("Route")Route.source.declare()Route.destination.declare()Route.capacity.declare()
# Define sample data for the model.# Define support teams and endpoints.for name in [ "Incoming Tickets", "US Tier 1", "EU Tier 1", "US Tier 2", "EU Tier 2", "Escalations", "Resolved Tickets",]: with model.rule(): Stage.add(name=name)
# Define routes with capacities.for src_name, dst_name, cap in [ ("Incoming Tickets", "US Tier 1", 50), ("Incoming Tickets", "EU Tier 1", 40), ("US Tier 1", "Resolved Tickets", 20), ("US Tier 1", "US Tier 2", 30), ("EU Tier 1", "Resolved Tickets", 10), ("EU Tier 1", "EU Tier 2", 25), ("US Tier 2", "Resolved Tickets", 15), ("US Tier 2", "Escalations", 20), ("EU Tier 2", "Resolved Tickets", 15), ("EU Tier 2", "Escalations", 20), ("Escalations", "Resolved Tickets", 25),]: with model.rule(): src = Stage(name=src_name) dst = Stage(name=dst_name) Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.# Define solver variables for each route.with model.rule(): route = Route() solver_model.variable( route, name_args=["route", route.source.name, route.destination.name], type="integer", lower=0, upper=route.capacity, )
.constraint()
Section titled “.constraint()”SolverModel.constraint( expr: SolverExpression, name_args: list[str|dsl.Producer]|None = None) -> None
Declare a constraint for each expression produced by expr
.
If name_args
is provided, it will be used to construct a constraint name that appears in the output of SolverModel.print()
and SolverModel.printed_model
.
For example:
import relationalai as raifrom relationalai.experimental import solvers
# Define a RAI model.model = rai.Model("SupportTicketRouting")
56 collapsed lines
# Declare support pipeline stage type.Stage = model.Type("Stage")
# Declare route type to connect stages.Route = model.Type("Route")Route.source.declare()Route.destination.declare()Route.capacity.declare()
# Define sample data for the model.# Define support teams and endpoints.for name in [ "Incoming Tickets", "US Tier 1", "EU Tier 1", "US Tier 2", "EU Tier 2", "Escalations", "Resolved Tickets",]: with model.rule(): Stage.add(name=name)
# Define routes with capacities.for src_name, dst_name, cap in [ ("Incoming Tickets", "US Tier 1", 50), ("Incoming Tickets", "EU Tier 1", 40), ("US Tier 1", "Resolved Tickets", 20), ("US Tier 1", "US Tier 2", 30), ("EU Tier 1", "Resolved Tickets", 10), ("EU Tier 1", "EU Tier 2", 25), ("US Tier 2", "Resolved Tickets", 15), ("US Tier 2", "Escalations", 20), ("EU Tier 2", "Resolved Tickets", 15), ("EU Tier 2", "Escalations", 20), ("Escalations", "Resolved Tickets", 25),]: with model.rule(): src = Stage(name=src_name) dst = Stage(name=dst_name) Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.# Define solver variables for each route.with model.rule(): route = Route() solver_model.variable( route, name_args=["route", route.source.name, route.destination.name], type="integer", lower=0, upper=route.capacity, )
# Define a constraint to ensure the total number of tickets routed to# US Tier 1 and US Tier 2 is between 10 and 30.with solvers.operators(): us_tier_1 = Route(source=Stage(name="US Tier 1")) us_tier_2 = Route(source=Stage(name="US Tier 2")) solver_model.constraint( solvers.integer_interval(us_tier_1 + us_tier_2, 10, 30), name_args=["us_tier_total"] )
# Define a constraint to ensure the total number of tickets routed in# and out of each stage is balanced.with solvers.operators(): stage = Stage() stage.name != "Incoming Tickets" # Exclude the source stage stage.name != "Resolved Tickets" # Exclude the sink stage incoming_tickets = solvers.sum(Route(destination=stage), per=[stage]) outgoing_tickets = solvers.sum(Route(source=stage), per=[stage]) solver_model.constraint( incoming_tickets == outgoing_tickets, name_args=["max_outgoing_tickets", stage.name] )
.min_objective()
Section titled “.min_objective()”SolverModel.min_objective(expr: SolverExpression) -> None
Define a minimization objective for the solver model.
A solver model can have at most one objective function, so the expr
parameter should produce exactly one expression.
For example:
import relationalai as raifrom relationalai.std import aliasfrom relationalai.experimental import solvers
# Create a RAI model.model = rai.Model("RegionalWaterAllocation")
112 collapsed lines
# Declare water distribution stage type.WaterSource = model.Type("WaterSource")Region = model.Type("Region")Consumer = model.Type("Consumer")Consumer.region.declare() # Region where the consumer is located.Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.Pipeline = model.Type("Pipeline")Pipeline.source.declare()Pipeline.destination.declare()Pipeline.capacity.declare()
# Define water sources.with model.rule(dynamic=True): WaterSource.add(name="Reservoir")
# Define regions.for region in [ {"id": 1, "name": "North"}, {"id": 2, "name": "South"}, {"id": 3, "name": "East"}, {"id": 4, "name": "West"},]: with model.rule(): Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.for consumer in [ {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0}, {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0}, {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0}, {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0}, {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0}, {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0}, {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0}, {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0}, {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0}, {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0}, {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},]: with model.rule(): region = Region(id=consumer["region_id"]) Consumer.add(id=consumer["id"]).set( name=consumer["name"], region=region, demand=consumer["demand"] )
# Define pipelines from the reservoir to each region.for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]: with model.rule(): source = WaterSource(name="Reservoir") region = Region(id=region_id) Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.for consumer_id, capacity in [ (1, 150.0), (2, 60.0), (3, 40.0), # North (4, 80.0), (5, 140.0), (6, 50.0), # South (7, 90.0), (8, 180.0), # East (9, 40.0), (10, 30.0), (11, 200.0) # West]: with model.rule(): consumer = Consumer(id=consumer_id) region = consumer.region Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.# Define continuous variables for each pipeline to represent it's flow.with model.rule(): pipe = Pipeline() solver_model.variable( pipe, name_args=["pipe", pipe.source.name, pipe.destination.name], lower=0, upper=pipe.capacity, )
# Define a constraint to ensure flow conservation in each region.# NOTE: `solvers.operators()` overrides infix operators like `==` to build# solver expressions instead of filter expressions.with solvers.operators(): region = Region() flow_in = solvers.sum(Pipeline(destination=region), per=[region]) flow_out = solvers.sum(Pipeline(source=region), per=[region]) solver_model.constraint( flow_in == flow_out, name_args=["preserve_flow", region.name] )
# Define a constraint satisfy at least half of high-demand consumers' demand.with model.rule(): consumer = Consumer() consumer.demand >= 100.0 # Filter for high-demand consumers. pipe = Pipeline(source=consumer.region, destination=consumer) with solvers.operators(): solver_model.constraint( pipe >= .5 * consumer.demand, name_args=["satisfy_consumer", consumer.id] )
# Define a constraint to limit the total flow from the reservoir.with solvers.operators(): total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir"))) solver_model.constraint( total_flow <= 1000.0, # Max flow from the reservoir name_args=["max_reservoir_flow"] )
# Define the objective to minimize unmet demand.with solvers.operators(): consumer = Consumer() pipe = Pipeline(source=consumer.region, destination=consumer) unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer]) solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
.max_objective()
Section titled “.max_objective()”SolverModel.max_objective(expr: SolverExpression) -> None
Define a maximization objective for the solver model.
A solver model can have at most one objective function, so the expr
parameter should produce exactly one expression.
For example:
import relationalai as raifrom relationalai.experimental import solvers
# Define a RAI model.model = rai.Model("SupportTicketRouting")
79 collapsed lines
# Declare support pipeline stage type.Stage = model.Type("Stage")
# Declare route type to connect stages.Route = model.Type("Route")Route.source.declare()Route.destination.declare()Route.capacity.declare()
# Define sample data for the model.# Define support teams and endpoints.for name in [ "Incoming Tickets", "US Tier 1", "EU Tier 1", "US Tier 2", "EU Tier 2", "Escalations", "Resolved Tickets",]: with model.rule(): Stage.add(name=name)
# Define routes with capacities.for src_name, dst_name, cap in [ ("Incoming Tickets", "US Tier 1", 50), ("Incoming Tickets", "EU Tier 1", 40), ("US Tier 1", "Resolved Tickets", 20), ("US Tier 1", "US Tier 2", 30), ("EU Tier 1", "Resolved Tickets", 10), ("EU Tier 1", "EU Tier 2", 25), ("US Tier 2", "Resolved Tickets", 15), ("US Tier 2", "Escalations", 20), ("EU Tier 2", "Resolved Tickets", 15), ("EU Tier 2", "Escalations", 20), ("Escalations", "Resolved Tickets", 25),]: with model.rule(): src = Stage(name=src_name) dst = Stage(name=dst_name) Route.add(source=src, destination=dst).set(capacity=cap)
# Create a SolverModel instance from the model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.# Define solver variables for each route.with model.rule(): route = Route() solver_model.variable( route, name_args=["route", route.source.name, route.destination.name], type="integer", lower=0, upper=route.capacity, )
# Define a constraint to ensure the total number of tickets routed to# US Tier 1 and US Tier 2 is between 10 and 30.with solvers.operators(): us_tier_1 = Route(source=Stage(name="US Tier 1")) us_tier_2 = Route(source=Stage(name="US Tier 2")) solver_model.constraint( solvers.integer_interval(us_tier_1 + us_tier_2, 10, 30), name_args=["us_tier_total"] )
# Define a constraint to ensure the total number of tickets routed in# and out of each stage is balanced.with solvers.operators(): stage = Stage() stage.name != "Incoming Tickets" # Exclude the source stage stage.name != "Resolved Tickets" # Exclude the sink stage incoming_tickets = solvers.sum(Route(destination=stage), per=[stage]) outgoing_tickets = solvers.sum(Route(source=stage), per=[stage]) solver_model.constraint( incoming_tickets == outgoing_tickets, name_args=["max_outgoing_tickets", stage.name] )
# Define an objective to maximize the total number of tickets resolved.with solvers.operators(): resolved = Stage(name="Resolved Tickets") total_throughput = solvers.sum(Route(destination=resolved)) solver_model.max_objective(total_throughput)
.solve()
Section titled “.solve()”SolverModel.solve(solver: Solver, **options) -> None
Use the specified solver backend to solve the solver model.
The solver
parameter should be an instance of the Solver
class.
For example:
import relationalai as raifrom relationalai.std import aliasfrom relationalai.experimental import solvers
# Create a RAI model.model = rai.Model("RegionalWaterAllocation")
119 collapsed lines
# Declare water distribution stage type.WaterSource = model.Type("WaterSource")Region = model.Type("Region")Consumer = model.Type("Consumer")Consumer.region.declare() # Region where the consumer is located.Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.Pipeline = model.Type("Pipeline")Pipeline.source.declare()Pipeline.destination.declare()Pipeline.capacity.declare()
# Define water sources.with model.rule(dynamic=True): WaterSource.add(name="Reservoir")
# Define regions.for region in [ {"id": 1, "name": "North"}, {"id": 2, "name": "South"}, {"id": 3, "name": "East"}, {"id": 4, "name": "West"},]: with model.rule(): Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.for consumer in [ {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0}, {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0}, {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0}, {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0}, {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0}, {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0}, {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0}, {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0}, {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0}, {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0}, {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},]: with model.rule(): region = Region(id=consumer["region_id"]) Consumer.add(id=consumer["id"]).set( name=consumer["name"], region=region, demand=consumer["demand"] )
# Define pipelines from the reservoir to each region.for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]: with model.rule(): source = WaterSource(name="Reservoir") region = Region(id=region_id) Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.for consumer_id, capacity in [ (1, 150.0), (2, 60.0), (3, 40.0), # North (4, 80.0), (5, 140.0), (6, 50.0), # South (7, 90.0), (8, 180.0), # East (9, 40.0), (10, 30.0), (11, 200.0) # West]: with model.rule(): consumer = Consumer(id=consumer_id) region = consumer.region Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.# Define continuous variables for each pipeline to represent it's flow.with model.rule(): pipe = Pipeline() solver_model.variable( pipe, name_args=["pipe", pipe.source.name, pipe.destination.name], lower=0, upper=pipe.capacity, )
# Define a constraint to ensure flow conservation in each region.# NOTE: `solvers.operators()` overrides infix operators like `==` to build# solver expressions instead of filter expressions.with solvers.operators(): region = Region() flow_in = solvers.sum(Pipeline(destination=region), per=[region]) flow_out = solvers.sum(Pipeline(source=region), per=[region]) solver_model.constraint( flow_in == flow_out, name_args=["preserve_flow", region.name] )
# Define a constraint satisfy at least half of high-demand consumers' demand.with model.rule(): consumer = Consumer() consumer.demand >= 100.0 # Filter for high-demand consumers. pipe = Pipeline(source=consumer.region, destination=consumer) with solvers.operators(): solver_model.constraint( pipe >= .5 * consumer.demand, name_args=["satisfy_consumer", consumer.id] )
# Define a constraint to limit the total flow from the reservoir.with solvers.operators(): total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir"))) solver_model.constraint( total_flow <= 1000.0, # Max flow from the reservoir name_args=["max_reservoir_flow"] )
# Define the objective to minimize unmet demand.with solvers.operators(): consumer = Consumer() pipe = Pipeline(source=consumer.region, destination=consumer) unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer]) solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
# Create a HiGHS Solver instance.solver = solvers.Solver("highs")
# Solve the model.solver_model.solve(solver)
Keyword arguments in **options
can be used to configure the solver backend, such as setting time limits or disabling logging.
The following standard options are available for all backends:
Option | Type | Description |
---|---|---|
silent | bool | If True , don’t emit solver logs. Defaults to False . |
time_limit_sec | int , float | Maximum time in seconds to allow the solver to run. Defaults to 300 . |
solution_limit | int | Maximum number of solutions to return. Ignored by backends that only produce single solutions. Defaults to None , which means no limit. |
relative_gap_tolerance | float | Relative gap tolerance for the solver. Refer to the docs for the chosen solver backend for the default value. |
absolute_gap_tolerance | float | Absolute gap tolerance for the solver. Refer to the docs for the chosen solver backend for the default value. |
print_format | str , None | Optional output format of string representation of the solver model. Must be one of "moi" , "latex" , "mof" , "lp" , "mps" , or "nl" . Defaults to None . See Model Output String to learn how to retrieve this string. |
print_only | bool | If True , only print the solver model using the format specified by print_format without solving it. Defaults to False . |
For instance, to set a time limit of 10 seconds:
import relationalai as raifrom relationalai.std import aliasfrom relationalai.experimental import solvers
125 collapsed lines
# Create a RAI model.model = rai.Model("RegionalWaterAllocation")
# Declare water distribution stage type.WaterSource = model.Type("WaterSource")Region = model.Type("Region")Consumer = model.Type("Consumer")Consumer.region.declare() # Region where the consumer is located.Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.Pipeline = model.Type("Pipeline")Pipeline.source.declare()Pipeline.destination.declare()Pipeline.capacity.declare()
# Define water sources.with model.rule(dynamic=True): WaterSource.add(name="Reservoir")
# Define regions.for region in [ {"id": 1, "name": "North"}, {"id": 2, "name": "South"}, {"id": 3, "name": "East"}, {"id": 4, "name": "West"},]: with model.rule(): Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.for consumer in [ {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0}, {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0}, {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0}, {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0}, {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0}, {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0}, {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0}, {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0}, {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0}, {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0}, {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},]: with model.rule(): region = Region(id=consumer["region_id"]) Consumer.add(id=consumer["id"]).set( name=consumer["name"], region=region, demand=consumer["demand"] )
# Define pipelines from the reservoir to each region.for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]: with model.rule(): source = WaterSource(name="Reservoir") region = Region(id=region_id) Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.for consumer_id, capacity in [ (1, 150.0), (2, 60.0), (3, 40.0), # North (4, 80.0), (5, 140.0), (6, 50.0), # South (7, 90.0), (8, 180.0), # East (9, 40.0), (10, 30.0), (11, 200.0) # West]: with model.rule(): consumer = Consumer(id=consumer_id) region = consumer.region Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.# Define continuous variables for each pipeline to represent it's flow.with model.rule(): pipe = Pipeline() solver_model.variable( pipe, name_args=["pipe", pipe.source.name, pipe.destination.name], lower=0, upper=pipe.capacity, )
# Define a constraint to ensure flow conservation in each region.# NOTE: `solvers.operators()` overrides infix operators like `==` to build# solver expressions instead of filter expressions.with solvers.operators(): region = Region() flow_in = solvers.sum(Pipeline(destination=region), per=[region]) flow_out = solvers.sum(Pipeline(source=region), per=[region]) solver_model.constraint( flow_in == flow_out, name_args=["preserve_flow", region.name] )
# Define a constraint satisfy at least half of high-demand consumers' demand.with model.rule(): consumer = Consumer() consumer.demand >= 100.0 # Filter for high-demand consumers. pipe = Pipeline(source=consumer.region, destination=consumer) with solvers.operators(): solver_model.constraint( pipe >= .5 * consumer.demand, name_args=["satisfy_consumer", consumer.id] )
# Define a constraint to limit the total flow from the reservoir.with solvers.operators(): total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir"))) solver_model.constraint( total_flow <= 1000.0, # Max flow from the reservoir name_args=["max_reservoir_flow"] )
# Define the objective to minimize unmet demand.with solvers.operators(): consumer = Consumer() pipe = Pipeline(source=consumer.region, destination=consumer) unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer]) solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
# Create a HiGHS Solver instance.solver = solvers.Solver("highs")
# Solve the model.solver_model.solve(solver, time_limit_sec=10)
.value()
Section titled “.value()”SolverModel.value(entity: dsl.Instance) -> dsl.Expression
Returns a dsl.Expression
that produces the optimal value of the solver variable associated with the given entity
.
Requires the .solve()
method to have been called first, otherwise .value()
will return an empty expression.
For example:
import relationalai as raifrom relationalai.std import aliasfrom relationalai.experimental import solvers
# Create a RAI model.model = rai.Model("RegionalWaterAllocation")
132 collapsed lines
# Declare water distribution stage type.WaterSource = model.Type("WaterSource")Region = model.Type("Region")Consumer = model.Type("Consumer")Consumer.region.declare() # Region where the consumer is located.Consumer.demand.declare() # Water demand of the consumer.
# Declare Pipeline type to connect stages.Pipeline = model.Type("Pipeline")Pipeline.source.declare()Pipeline.destination.declare()Pipeline.capacity.declare()
# Define water sources.with model.rule(dynamic=True): WaterSource.add(name="Reservoir")
# Define regions.for region in [ {"id": 1, "name": "North"}, {"id": 2, "name": "South"}, {"id": 3, "name": "East"}, {"id": 4, "name": "West"},]: with model.rule(): Region.add(id=region["id"]).set(name=region["name"])
# Define consumers.for consumer in [ {"id": 1, "region_id": 1, "name": "Residential", "demand": 150.0}, {"id": 2, "region_id": 1, "name": "Industrial", "demand": 60.0}, {"id": 3, "region_id": 1, "name": "Farms", "demand": 40.0}, {"id": 4, "region_id": 2, "name": "Residential", "demand": 80.0}, {"id": 5, "region_id": 2, "name": "Industrial", "demand": 140.0}, {"id": 6, "region_id": 2, "name": "Farms", "demand": 50.0}, {"id": 7, "region_id": 3, "name": "Residential", "demand": 90.0}, {"id": 8, "region_id": 3, "name": "Industrial", "demand": 180.0}, {"id": 9, "region_id": 4, "name": "Residential", "demand": 40.0}, {"id": 10, "region_id": 4, "name": "Industrial", "demand": 30.0}, {"id": 11, "region_id": 4, "name": "Farms", "demand": 200.0},]: with model.rule(): region = Region(id=consumer["region_id"]) Consumer.add(id=consumer["id"]).set( name=consumer["name"], region=region, demand=consumer["demand"] )
# Define pipelines from the reservoir to each region.for region_id, capacity in [(1, 260.0), (2, 300.0), (3, 200.0), (4, 220.0)]: with model.rule(): source = WaterSource(name="Reservoir") region = Region(id=region_id) Pipeline.add(source=source, destination=region).set(capacity=capacity)
# Define pipelines between consumers and their regions.for consumer_id, capacity in [ (1, 150.0), (2, 60.0), (3, 40.0), # North (4, 80.0), (5, 140.0), (6, 50.0), # South (7, 90.0), (8, 180.0), # East (9, 40.0), (10, 30.0), (11, 200.0) # West]: with model.rule(): consumer = Consumer(id=consumer_id) region = consumer.region Pipeline.add(source=region, destination=consumer).set(capacity=capacity)
# Create a solver model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and objective.# Define continuous variables for each pipeline to represent it's flow.with model.rule(): pipe = Pipeline() solver_model.variable( pipe, name_args=["pipe", pipe.source.name, pipe.destination.name], lower=0, upper=pipe.capacity, )
# Define a constraint to ensure flow conservation in each region.# NOTE: `solvers.operators()` overrides infix operators like `==` to build# solver expressions instead of filter expressions.with solvers.operators(): region = Region() flow_in = solvers.sum(Pipeline(destination=region), per=[region]) flow_out = solvers.sum(Pipeline(source=region), per=[region]) solver_model.constraint( flow_in == flow_out, name_args=["preserve_flow", region.name] )
# Define a constraint satisfy at least half of high-demand consumers' demand.with model.rule(): consumer = Consumer() consumer.demand >= 100.0 # Filter for high-demand consumers. pipe = Pipeline(source=consumer.region, destination=consumer) with solvers.operators(): solver_model.constraint( pipe >= .5 * consumer.demand, name_args=["satisfy_consumer", consumer.id] )
# Define a constraint to limit the total flow from the reservoir.with solvers.operators(): total_flow = solvers.sum(Pipeline(source=WaterSource(name="Reservoir"))) solver_model.constraint( total_flow <= 1000.0, # Max flow from the reservoir name_args=["max_reservoir_flow"] )
# Define the objective to minimize unmet demand.with solvers.operators(): consumer = Consumer() pipe = Pipeline(source=consumer.region, destination=consumer) unmet_demand_per_consumer = consumer.demand - solvers.sum(pipe, per=[consumer]) solver_model.min_objective(solvers.sum(unmet_demand_per_consumer))
# Create a HiGHS Solver instance.solver = solvers.Solver("highs")
# Solve the model.solver_model.solve(solver)
# View the solution's objective value.with model.query() as select: response = select(solver_model.objective_value)print(f"\n\nUnmet demand (optimized): {response.results.iloc[0, 0]}")# Output:# Unmet demand (optimized): 120.0
# View the optimized flow for each pipeline.with model.query() as select: pipe = Pipeline() response = select( alias(pipe.source.name, "source_name"), alias(pipe.destination.name, "destination_name"), solver_model.value(pipe) )print("\n\nOptimized pipeline flows:")print(response.results)
Optimized pipeline flows:0 East Industrial 180.01 East Residential 20.02 North Farms 40.03 North Industrial 60.04 North Residential 150.05 Reservoir East 200.06 Reservoir North 250.07 Reservoir South 270.08 Reservoir West 220.09 South Farms 50.010 South Industrial 140.011 South Residential 80.012 West Farms 150.013 West Industrial 30.014 West Residential 40.0
.variable_name()
Section titled “.variable_name()”SolverModel.variable_name(entity: dsl.Instance) -> dsl.Expression
Returns a dsl.Expression
that produces the names assigned to the solver variables associated with entity
.
import relationalai as raifrom relationalai.experimental import solvers
# Create the model.model = rai.Model("ShiftAssignment")
36 collapsed lines
# Declare entity types.Worker = model.Type("Worker")Shift = model.Type("Shift")Shift.capacity.declare() # Max people allowed per shift
# Declare relationship typesAssignment = model.Type("Assignment")Assignment.worker.declare()Assignment.shift.declare()
# Define sample data.with model.rule(): Worker.add(name="Alice") Worker.add(name="Bob") Worker.add(name="Carlos")
with model.rule(): Shift.add(name="Morning", capacity=1) Shift.add(name="Afternoon", capacity=2) Shift.add(name="Night", capacity=1)
# Define all possible assignments.with model.rule(): Assignment.add(worker=Worker(), shift=Shift())
# Create the solver model.solver_model = solvers.SolverModel(model)
# Define binary assignment variable.with model.rule(): assignment = Assignment() solver_model.variable( assignment, name_args=["assigned", assignment.worker.name, assignment.shift.name], type="zero_one" )
# Get the variable names for each assignment.with model.query() as select: assignment = Assignment() response = select(solver_model.variable_name(assignment))print(response.results)
name0 assigned_Alice_Afternoon1 assigned_Alice_Morning2 assigned_Alice_Night3 assigned_Bob_Afternoon4 assigned_Bob_Morning5 assigned_Bob_Night6 assigned_Carlos_Afternoon7 assigned_Carlos_Morning8 assigned_Carlos_Night
.print()
Section titled “.print()”SolverModel.print() -> None
Print a canonical representation of the model to stdout
.
For example:
import relationalai as raifrom relationalai.experimental import solvers
# Create a RAI model.model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.Product = model.Type("Product")Product.price.declare()Product.cost.declare()
# Define sample products with price and cost properties.with model.rule(): Product.add(name="Laptop").set(price=999.0, cost=650.0) Product.add(name="Mouse").set(price=25.0, cost=8.0) Product.add(name="Keyboard").set(price=70.0, cost=25.0) Product.add(name="Charger").set(price=35.0, cost=12.0) Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.45 collapsed lines
# Define binary decision variables to include or exclude each product.with model.rule(): product = Product() with model.match(): with product.name == "Laptop": solver_model.variable( product, name_args=["include", product.name], type="zero_one", fixed=1 # Bundle must always include the laptop ) # Other products are optional. with model.case(): solver_model.variable( product, name_args=["include", product.name], type="zero_one" )
# Define a conditional constraint that requires the charger if headphones are included.with solvers.operators(): headphones = Product(name="Headphones") charger = Product(name="Charger") solver_model.constraint( solvers.if_then_else( headphones == 1, # If the headphones are included charger == 1, # Then the charger must be included charger == 0, # Otherwise, the charger must not be included ), name_args=["headphones_require_charger"] )
# Define a constraint to limit the bundle size to 3 products.with solvers.operators(): product = Product() solver_model.constraint( solvers.sum(product) == 3, name_args=["bundle_size"] )
# Define the objective to minimize the total cost of the bundle.with solvers.operators(): product = Product() total_cost = solvers.sum(product.cost * product) solver_model.min_objective(total_cost) # Minimize total bundle cost
# Inspect the solver model.solver_model.print()
variables: include_Chargerinclude_Headphones include_Keyboard include_Laptop include_Mouseminimization objectives:SUM(MULTIPLY(12.0,include_Charger),MULTIPLY(25.0,include_Keyboard),MULTIPLY(30.0,include_Headphones),MULTIPLY(650.0,include_Laptop),MULTIPLY(8.0,include_Mouse))constraints: AND(EQ(include_Laptop,1),ZEROONE(include_Laptop))EQ(SUM(include_Charger,include_Headphones,include_Keyboard,include_Laptop,include_Mouse),3) ZEROONE(include_Charger) ZEROONE(include_Headphones) ZEROONE(include_Keyboard) ZEROONE(include_Mouse)
Alternatively, set the print_format
option when calling SolverModel.solve()
.
The result is available in the SolverModel.printed_model
attribute.
print_format
offers a wider range of output format options, but requires a solver engine, even if print_only=True
.
Attributes
Section titled “Attributes”The SolverModel
class provides several attributes to access the results and metadata of the model after it has been solved or printed.
.printed_model
Section titled “.printed_model”SolverModel.printed_model: dsl.Expression
A dsl.Expression
that produces a string containing a human-readable representation of the model in the format specified by the SoverModel.solve()
method’s print_format
option.
Requires the model to have been solved or printed with print_only=True
option.
For example:
import relationalai as raifrom relationalai.experimental import solvers
# Create a RAI model.model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.Product = model.Type("Product")Product.price.declare()Product.cost.declare()
# Define sample products with price and cost properties.with model.rule(): Product.add(name="Laptop").set(price=999.0, cost=650.0) Product.add(name="Mouse").set(price=25.0, cost=8.0) Product.add(name="Keyboard").set(price=70.0, cost=25.0) Product.add(name="Charger").set(price=35.0, cost=12.0) Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.45 collapsed lines
# Define binary decision variables to include or exclude each product.with model.rule(): product = Product() with model.match(): with product.name == "Laptop": solver_model.variable( product, name_args=["include", product.name], type="zero_one", fixed=1 # Bundle must always include the laptop ) # Other products are optional. with model.case(): solver_model.variable( product, name_args=["include", product.name], type="zero_one" )
# Define a conditional constraint that requires the charger if headphones are included.with solvers.operators(): headphones = Product(name="Headphones") charger = Product(name="Charger") solver_model.constraint( solvers.if_then_else( headphones == 1, # If the headphones are included charger == 1, # Then the charger must be included charger == 0, # Otherwise, the charger must not be included ), name_args=["headphones_require_charger"] )
# Define a constraint to limit the bundle size to 3 products.with solvers.operators(): product = Product() solver_model.constraint( solvers.sum(product) == 3, name_args=["bundle_size"] )
# Define the objective to minimize the total cost of the bundle.with solvers.operators(): product = Product() total_cost = solvers.sum(product.cost * product) solver_model.min_objective(total_cost) # Minimize total bundle cost
# Create a Solver object.solver = solvers.Solver("minizinc")
# Solve the model.solver_model.solve( solver, log_to_console=False, print_format="moi", print_only=True # Print the model without solving it)
# View the string representation of the model.with model.query() as select: response = select(solver_model.printed_model)print(response.results.iloc[0, 0])
Minimize: (8) include_Mouse + (12) include_Charger + (25) include_Keyboard + (30) include_Headphones + (650) include_Laptop
Subject to: ifelse(==(include_Headphones, (1)), ==(include_Charger, (1)), ==(include_Charger, (0))) == (1) (headphones_require_charger) (1) include_Laptop == (1) (1) include_Keyboard + (1) include_Mouse + (1) include_Laptop + (1) include_Headphones + (1) include_Charger == (3) (bundle_size) include_Keyboard ∈ {0, 1} include_Mouse ∈ {0, 1} include_Laptop ∈ {0, 1} include_Headphones ∈ {0, 1} include_Charger ∈ {0, 1}
.objective_value
Section titled “.objective_value”SolverModel.objective_value: dsl.Expression
A dsl.Expression
that produces the optimal value of the objective function.
Requires the model to have been solved with SolverModel.solve()
with print_only=False
.
For example:
import relationalai as raifrom relationalai.experimental import solvers
# Create a RAI model.model = rai.Model("ProductBundling")
12 collapsed lines
# Declare a Product entity type.Product = model.Type("Product")Product.price.declare()Product.cost.declare()
# Define sample products with price and cost properties.with model.rule(): Product.add(name="Laptop").set(price=999.0, cost=650.0) Product.add(name="Mouse").set(price=25.0, cost=8.0) Product.add(name="Keyboard").set(price=70.0, cost=25.0) Product.add(name="Charger").set(price=35.0, cost=12.0) Product.add(name="Headphones").set(price=85.0, cost=30.0)
# Create a solver model.solver_model = solvers.SolverModel(model)
# Specify variables, constraints, and the objective.63 collapsed lines
# Define binary decision variables to include or exclude each product.with model.rule(): product = Product() with model.match(): with product.name == "Laptop": solver_model.variable( product, name_args=["include", product.name], type="zero_one", fixed=1 # Bundle must always include the laptop ) # Other products are optional. with model.case(): solver_model.variable( product, name_args=["include", product.name], type="zero_one" )
# Define a conditional constraint that requires the charger if headphones are included.with solvers.operators(): headphones = Product(name="Headphones") charger = Product(name="Charger") solver_model.constraint( solvers.if_then_else( headphones == 1, # If the headphones are included charger == 1, # Then the charger must be included charger == 0, # Otherwise, the charger must not be included ), name_args=["headphones_require_charger"] )
# Define a constraint to limit the bundle size to 3 products.with solvers.operators(): product = Product() solver_model.constraint( solvers.sum(product) == 3, name_args=["bundle_size"] )
# Define the objective to minimize the total cost of the bundle.with solvers.operators(): product = Product() total_cost = solvers.sum(product.cost * product) solver_model.min_objective(total_cost) # Minimize total bundle cost
# Inspect the solver model.solver_model.print()# variables:# include_Charger# include_Headphones# include_Keyboard# include_Laptop# include_Mouse# minimization objectives:# SUM(MULTIPLY(12.0,include_Charger),MULTIPLY(25.0,include_Keyboard),MULTIPLY(30.0,include_Headphones),MULTIPLY(650.0,include_Laptop),MULTIPLY(8.0,include_Mouse))# constraints:# AND(EQ(include_Laptop,1),ZEROONE(include_Laptop))# EQ(SUM(include_Charger,include_Headphones,include_Keyboard,include_Laptop,include_Mouse),3)# ZEROONE(include_Charger)# ZEROONE(include_Headphones)# ZEROONE(include_Keyboard)# ZEROONE(include_Mouse)
# Create a Solver object.solver = solvers.Solver("minizinc")
# Solve the model.solver_model.solve(solver)
# View the solution's objective value.with model.query() as select: response = select(solver_model.objective_value)print(f"\n\nMinimum cost: {response.results.iloc[0, 0]}")
Minimum cost: 683
.termination_status
Section titled “.termination_status”SolverModel.termination_status: dsl.Expression
A dsl.Expression
that produces the termination status of the solver.
This status indicates whether the solver found an optimal solution, was infeasible, or encountered another issue.
Requires the model to have been solved with SolverModel.solve()
with print_only=False
.
For example:
import relationalai as raifrom relationalai.experimental import solvers
208 collapsed lines
# Create a RAI model.model = rai.Model("WeeklyShiftAssignment")
# Declare entity types and properties.
# Entity types.Employee = model.Type("Employee")Shift = model.Type("Shift")Day = model.Type("Day")Available = model.Type("Available")Scenario = model.Type("Scenario") # All possible employee–shift–day combinations.Assignment = model.Type("Assignment") # A Scenario that is assigned.
# Properties.Employee.name.declare()Shift.name.declare()Shift.capacity.declare()Available.employee.declare()Available.day.declare()Scenario.employee.declare()Scenario.shift.declare()Scenario.day.declare()
# Define sample data.
# Employees.with model.rule(dynamic=True): for name in ["Alice", "Bob", "Carol", "Dave", "Eve"]: Employee.add(name=name)
# Shifts.with model.rule(): Shift.add(name="Morning").set(capacity=2) Shift.add(name="Evening").set(capacity=3)
# Days of the week.with model.rule(dynamic=True): for name in ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]: Day.add(name=name)
# Employee-day availability.with model.rule(dynamic=True): # Alice works weekdays only for name in ["Mon", "Tue", "Wed", "Thu", "Fri"]: Available.add(employee=Employee(name="Alice"), day=Day(name=name)) # Bob works all days Available.add(employee=Employee(name="Bob"), day=Day()) # Carol works weekends only for name in ["Sat", "Sun"]: Available.add(employee=Employee(name="Carol"), day=Day(name=name)) # Dave works Mon/Wed/Fri for name in ["Mon", "Wed", "Fri"]: Available.add(employee=Employee(name="Dave"), day=Day(name=name)) # Eve works Tue/Thu/Sat for name in ["Tue", "Thu", "Sat"]: Available.add(employee=Employee(name="Eve"), day=Day(name=name))
# All possible employee–shift–day combinationswith model.rule(): Scenario.add(employee=Employee(), shift=Shift(), day=Day())
# Create a SolverModel instance from the model.solver_model = solvers.SolverModel(model)
# Define solver variables.
# Scenario assignment: binary variable for each employee–shift–day combination.with model.rule(): scenario = Scenario() solver_model.variable( scenario, type="zero_one", name_args=["assigned", scenario.employee.name, scenario.day.name, scenario.shift.name], )
# Define Solver Constraints
# Unavailable employee–day pairs cannot be assigned.with model.rule(): scenario = Scenario() with model.not_found(): Available(employee=scenario.employee, day=scenario.day) solver_model.constraint( solvers.not_(scenario), name_args=["unavailable", scenario.employee.name, scenario.day.name] )
# If an employee works the morning shift, they can't work the evening shift.with model.rule(): works_morning = Scenario(shift=Shift(name="Morning")) works_evening = Scenario( shift=Shift(name="Evening"), employee=works_morning.employee, day=works_morning.day ) solver_model.constraint( solvers.implies(works_morning, solvers.not_(works_evening)) )
# At least one of Alice or Bob works Morning on Monday.with model.rule(): morning, monday = Shift(name="Morning"), Day(name="Mon") alice_works = Scenario(employee=Employee(name="Alice"), shift=morning, day=monday) bob_works = Scenario(employee=Employee(name="Bob"), shift=morning, day=monday) solver_model.constraint(solvers.or_(alice_works, bob_works))
# If Alice works in the morning, she doesn't work in the evening. Otherwise, she does work in the evening.with model.rule(): alice = Employee(name="Alice") day = Available(employee=alice).day works_morning = Scenario(employee=alice, shift=Shift(name="Morning"), day=day) works_evening = Scenario(employee=alice, shift=Shift(name="Evening"), day=day) solver_model.constraint( solvers.if_then_else(works_morning, solvers.not_(works_evening), works_evening), name_args=["alice_shift", day.name] )
# Limit the number of employees assigned to each shift to its capacity.with solvers.operators(): scenario = Scenario() shift, day = scenario.shift, scenario.day num_shifts_per_day = solvers.count(scenario, per=[shift, day]) solver_model.constraint( num_shifts_per_day <= shift.capacity, name_args=["shift_cap", shift.name, day.name] )
# Define solver objective.
# Maximize total assignments across the week.with model.rule(): scenario = Scenario() solver_model.max_objective(solvers.sum(scenario))
# Solve the solver model.
# Create a Solver instance.solver = solvers.Solver("minizinc")
# Solve the model.solver_model.solve(solver)
# View the objective value.with model.query() as select: response = select(solver_model.objective_value)print(f"Total assignments: {response.results.iloc[0, 0]}")# Output:# Total assignments: 20
# View the solution values for each scenario variable.with model.query() as select: scenario = Scenario() response = select( scenario.employee.name, scenario.shift.name, scenario.day.name, solver_model.value(scenario) )print("Solution:")print(response.results)# Output:# Solution:# name name2 name3 value# 0 Alice Evening Fri 1# 1 Alice Evening Mon 0# 2 Alice Evening Sat 0# 3 Alice Evening Sun 0# 4 Alice Evening Thu 1# .. ... ... ... ...# 65 Eve Morning Sat 1# 66 Eve Morning Sun 0# 67 Eve Morning Thu 0# 68 Eve Morning Tue 0# 69 Eve Morning Wed 0
# [70 rows x 4 columns]
# Use the solution to define Assignments.
# Define Assignments from Scenarios whose solution value is 1.with model.rule(): scenario = Scenario() solver_model.value(scenario) == 1 scenario.set(Assignment)
# View Dave's assignments.with model.query() as select: assignment = Assignment(employee=Employee(name="Dave")) response = select(assignment.day.name, assignment.shift.name)print("Dave's assignments:")print(response.results)# Output:# Dave's assignments:# name name2# 0 Fri Morning# 1 Mon Morning# 2 Wed Evening
# View solution metadata.
with model.query() as select: response = select(solver_model.termination_status)print(f"Solver termination status: {response.results.iloc[0, 0]}")
Solver termination status: OPTIMAL
.solve_time_sec
Section titled “.solve_time_sec”SolverModel.solve_time_sec: dsl.Expression
A dsl.Expression
that produces the time taken by the solver to solve the model, in seconds.
Requires the model to have been solved with SolverModel.solve()
with print_only=False
.
For example:
import relationalai as raifrom relationalai.experimental import solvers
215 collapsed lines
# Create a RAI model.model = rai.Model("WeeklyShiftAssignment")
# Declare entity types and properties.
# Entity types.Employee = model.Type("Employee")Shift = model.Type("Shift")Day = model.Type("Day")Available = model.Type("Available")Scenario = model.Type("Scenario") # All possible employee–shift–day combinations.Assignment = model.Type("Assignment") # A Scenario that is assigned.
# Properties.Employee.name.declare()Shift.name.declare()Shift.capacity.declare()Available.employee.declare()Available.day.declare()Scenario.employee.declare()Scenario.shift.declare()Scenario.day.declare()
# Define sample data.
# Employees.with model.rule(dynamic=True): for name in ["Alice", "Bob", "Carol", "Dave", "Eve"]: Employee.add(name=name)
# Shifts.with model.rule(): Shift.add(name="Morning").set(capacity=2) Shift.add(name="Evening").set(capacity=3)
# Days of the week.with model.rule(dynamic=True): for name in ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]: Day.add(name=name)
# Employee-day availability.with model.rule(dynamic=True): # Alice works weekdays only for name in ["Mon", "Tue", "Wed", "Thu", "Fri"]: Available.add(employee=Employee(name="Alice"), day=Day(name=name)) # Bob works all days Available.add(employee=Employee(name="Bob"), day=Day()) # Carol works weekends only for name in ["Sat", "Sun"]: Available.add(employee=Employee(name="Carol"), day=Day(name=name)) # Dave works Mon/Wed/Fri for name in ["Mon", "Wed", "Fri"]: Available.add(employee=Employee(name="Dave"), day=Day(name=name)) # Eve works Tue/Thu/Sat for name in ["Tue", "Thu", "Sat"]: Available.add(employee=Employee(name="Eve"), day=Day(name=name))
# All possible employee–shift–day combinationswith model.rule(): Scenario.add(employee=Employee(), shift=Shift(), day=Day())
# Create a SolverModel instance from the model.solver_model = solvers.SolverModel(model)
# Define solver variables.
# Scenario assignment: binary variable for each employee–shift–day combination.with model.rule(): scenario = Scenario() solver_model.variable( scenario, type="zero_one", name_args=["assigned", scenario.employee.name, scenario.day.name, scenario.shift.name], )
# Define Solver Constraints
# Unavailable employee–day pairs cannot be assigned.with model.rule(): scenario = Scenario() with model.not_found(): Available(employee=scenario.employee, day=scenario.day) solver_model.constraint( solvers.not_(scenario), name_args=["unavailable", scenario.employee.name, scenario.day.name] )
# If an employee works the morning shift, they can't work the evening shift.with model.rule(): works_morning = Scenario(shift=Shift(name="Morning")) works_evening = Scenario( shift=Shift(name="Evening"), employee=works_morning.employee, day=works_morning.day ) solver_model.constraint( solvers.implies(works_morning, solvers.not_(works_evening)) )
# At least one of Alice or Bob works Morning on Monday.with model.rule(): morning, monday = Shift(name="Morning"), Day(name="Mon") alice_works = Scenario(employee=Employee(name="Alice"), shift=morning, day=monday) bob_works = Scenario(employee=Employee(name="Bob"), shift=morning, day=monday) solver_model.constraint(solvers.or_(alice_works, bob_works))
# If Alice works in the morning, she doesn't work in the evening. Otherwise, she does work in the evening.with model.rule(): alice = Employee(name="Alice") day = Available(employee=alice).day works_morning = Scenario(employee=alice, shift=Shift(name="Morning"), day=day) works_evening = Scenario(employee=alice, shift=Shift(name="Evening"), day=day) solver_model.constraint( solvers.if_then_else(works_morning, solvers.not_(works_evening), works_evening), name_args=["alice_shift", day.name] )
# Limit the number of employees assigned to each shift to its capacity.with solvers.operators(): scenario = Scenario() shift, day = scenario.shift, scenario.day num_shifts_per_day = solvers.count(scenario, per=[shift, day]) solver_model.constraint( num_shifts_per_day <= shift.capacity, name_args=["shift_cap", shift.name, day.name] )
# Define solver objective.
# Maximize total assignments across the week.with model.rule(): scenario = Scenario() solver_model.max_objective(solvers.sum(scenario))
# Solve the solver model.
# Create a Solver instance.solver = solvers.Solver("minizinc")
# Solve the model.solver_model.solve(solver)
# View the objective value.with model.query() as select: response = select(solver_model.objective_value)print(f"Total assignments: {response.results.iloc[0, 0]}")# Output:# Total assignments: 20
# View the solution values for each scenario variable.with model.query() as select: scenario = Scenario() response = select( scenario.employee.name, scenario.shift.name, scenario.day.name, solver_model.value(scenario) )print("Solution:")print(response.results)# Output:# Solution:# name name2 name3 value# 0 Alice Evening Fri 1# 1 Alice Evening Mon 0# 2 Alice Evening Sat 0# 3 Alice Evening Sun 0# 4 Alice Evening Thu 1# .. ... ... ... ...# 65 Eve Morning Sat 1# 66 Eve Morning Sun 0# 67 Eve Morning Thu 0# 68 Eve Morning Tue 0# 69 Eve Morning Wed 0
# [70 rows x 4 columns]
# Use the solution to define Assignments.
# Define Assignments from Scenarios whose solution value is 1.with model.rule(): scenario = Scenario() solver_model.value(scenario) == 1 scenario.set(Assignment)
# View Dave's assignments.with model.query() as select: assignment = Assignment(employee=Employee(name="Dave")) response = select(assignment.day.name, assignment.shift.name)print("Dave's assignments:")print(response.results)# Output:# Dave's assignments:# name name2# 0 Fri Morning# 1 Mon Morning# 2 Wed Evening
# View solution metadata.
# Termination status.with model.query() as select: response = select(solver_model.termination_status)print(f"Solver termination status: {response.results.iloc[0, 0]}")# Output:# Solver termination status: OPTIMAL
with model.query() as select: response = select(solver_model.solve_time_sec)print(f"Solver time: {response.results.iloc[0, 0]} seconds")
Solver time: 1.5687661170959473 seconds
.solver_version
Section titled “.solver_version”SolverModel.solver_version: dsl.Expression
A dsl.Expression
that produces the version of the solver backend used to solve the model.
Requires the model to have been solved with SolverModel.solve()
.
For example:
import relationalai as raifrom relationalai.experimental import solvers
222 collapsed lines
# Create a RAI model.model = rai.Model("WeeklyShiftAssignment")
# Declare entity types and properties.
# Entity types.Employee = model.Type("Employee")Shift = model.Type("Shift")Day = model.Type("Day")Available = model.Type("Available")Scenario = model.Type("Scenario") # All possible employee–shift–day combinations.Assignment = model.Type("Assignment") # A Scenario that is assigned.
# Properties.Employee.name.declare()Shift.name.declare()Shift.capacity.declare()Available.employee.declare()Available.day.declare()Scenario.employee.declare()Scenario.shift.declare()Scenario.day.declare()
# Define sample data.
# Employees.with model.rule(dynamic=True): for name in ["Alice", "Bob", "Carol", "Dave", "Eve"]: Employee.add(name=name)
# Shifts.with model.rule(): Shift.add(name="Morning").set(capacity=2) Shift.add(name="Evening").set(capacity=3)
# Days of the week.with model.rule(dynamic=True): for name in ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]: Day.add(name=name)
# Employee-day availability.with model.rule(dynamic=True): # Alice works weekdays only for name in ["Mon", "Tue", "Wed", "Thu", "Fri"]: Available.add(employee=Employee(name="Alice"), day=Day(name=name)) # Bob works all days Available.add(employee=Employee(name="Bob"), day=Day()) # Carol works weekends only for name in ["Sat", "Sun"]: Available.add(employee=Employee(name="Carol"), day=Day(name=name)) # Dave works Mon/Wed/Fri for name in ["Mon", "Wed", "Fri"]: Available.add(employee=Employee(name="Dave"), day=Day(name=name)) # Eve works Tue/Thu/Sat for name in ["Tue", "Thu", "Sat"]: Available.add(employee=Employee(name="Eve"), day=Day(name=name))
# All possible employee–shift–day combinationswith model.rule(): Scenario.add(employee=Employee(), shift=Shift(), day=Day())
# Create a SolverModel instance from the model.solver_model = solvers.SolverModel(model)
# Define solver variables.
# Scenario assignment: binary variable for each employee–shift–day combination.with model.rule(): scenario = Scenario() solver_model.variable( scenario, type="zero_one", name_args=["assigned", scenario.employee.name, scenario.day.name, scenario.shift.name], )
# Define Solver Constraints
# Unavailable employee–day pairs cannot be assigned.with model.rule(): scenario = Scenario() with model.not_found(): Available(employee=scenario.employee, day=scenario.day) solver_model.constraint( solvers.not_(scenario), name_args=["unavailable", scenario.employee.name, scenario.day.name] )
# If an employee works the morning shift, they can't work the evening shift.with model.rule(): works_morning = Scenario(shift=Shift(name="Morning")) works_evening = Scenario( shift=Shift(name="Evening"), employee=works_morning.employee, day=works_morning.day ) solver_model.constraint( solvers.implies(works_morning, solvers.not_(works_evening)) )
# At least one of Alice or Bob works Morning on Monday.with model.rule(): morning, monday = Shift(name="Morning"), Day(name="Mon") alice_works = Scenario(employee=Employee(name="Alice"), shift=morning, day=monday) bob_works = Scenario(employee=Employee(name="Bob"), shift=morning, day=monday) solver_model.constraint(solvers.or_(alice_works, bob_works))
# If Alice works in the morning, she doesn't work in the evening. Otherwise, she does work in the evening.with model.rule(): alice = Employee(name="Alice") day = Available(employee=alice).day works_morning = Scenario(employee=alice, shift=Shift(name="Morning"), day=day) works_evening = Scenario(employee=alice, shift=Shift(name="Evening"), day=day) solver_model.constraint( solvers.if_then_else(works_morning, solvers.not_(works_evening), works_evening), name_args=["alice_shift", day.name] )
# Limit the number of employees assigned to each shift to its capacity.with solvers.operators(): scenario = Scenario() shift, day = scenario.shift, scenario.day num_shifts_per_day = solvers.count(scenario, per=[shift, day]) solver_model.constraint( num_shifts_per_day <= shift.capacity, name_args=["shift_cap", shift.name, day.name] )
# Define solver objective.
# Maximize total assignments across the week.with model.rule(): scenario = Scenario() solver_model.max_objective(solvers.sum(scenario))
# Solve the solver model.
# Create a Solver instance.solver = solvers.Solver("minizinc")
# Solve the model.solver_model.solve(solver)
# View the objective value.with model.query() as select: response = select(solver_model.objective_value)print(f"Total assignments: {response.results.iloc[0, 0]}")# Output:# Total assignments: 20
# View the solution values for each scenario variable.with model.query() as select: scenario = Scenario() response = select( scenario.employee.name, scenario.shift.name, scenario.day.name, solver_model.value(scenario) )print("Solution:")print(response.results)# Output:# Solution:# name name2 name3 value# 0 Alice Evening Fri 1# 1 Alice Evening Mon 0# 2 Alice Evening Sat 0# 3 Alice Evening Sun 0# 4 Alice Evening Thu 1# .. ... ... ... ...# 65 Eve Morning Sat 1# 66 Eve Morning Sun 0# 67 Eve Morning Thu 0# 68 Eve Morning Tue 0# 69 Eve Morning Wed 0
# [70 rows x 4 columns]
# Use the solution to define Assignments.
# Define Assignments from Scenarios whose solution value is 1.with model.rule(): scenario = Scenario() solver_model.value(scenario) == 1 scenario.set(Assignment)
# View Dave's assignments.with model.query() as select: assignment = Assignment(employee=Employee(name="Dave")) response = select(assignment.day.name, assignment.shift.name)print("Dave's assignments:")print(response.results)# Output:# Dave's assignments:# name name2# 0 Fri Morning# 1 Mon Morning# 2 Wed Evening
# View solution metadata.
# Termination status.with model.query() as select: response = select(solver_model.termination_status)print(f"Solver termination status: {response.results.iloc[0, 0]}")# Output:# Solver termination status: OPTIMAL
# Solver time.with model.query() as select: response = select(solver_model.solve_time_sec)print(f"Solver time: {response.results.iloc[0, 0]} seconds")# Output:# Solver time: 1.5687661170959473 seconds
with model.query() as select: response = select(solver_model.solver_version)print(f"Solver version: {response.results.iloc[0, 0]}")
Solver version: MiniZinc_nothing
.error
Section titled “.error”SolverModel.error: dsl.Expression
A dsl.Expression
that produces an error message if the solver encountered an issue during execution.
Requires the model to have been solved with SolverModel.solve()
with print_only=False
.
For example:
import relationalai as raifrom relationalai.experimental import solvers
229 collapsed lines
# Create a RAI model.model = rai.Model("WeeklyShiftAssignment")
# Declare entity types and properties.
# Entity types.Employee = model.Type("Employee")Shift = model.Type("Shift")Day = model.Type("Day")Available = model.Type("Available")Scenario = model.Type("Scenario") # All possible employee–shift–day combinations.Assignment = model.Type("Assignment") # A Scenario that is assigned.
# Properties.Employee.name.declare()Shift.name.declare()Shift.capacity.declare()Available.employee.declare()Available.day.declare()Scenario.employee.declare()Scenario.shift.declare()Scenario.day.declare()
# Define sample data.
# Employees.with model.rule(dynamic=True): for name in ["Alice", "Bob", "Carol", "Dave", "Eve"]: Employee.add(name=name)
# Shifts.with model.rule(): Shift.add(name="Morning").set(capacity=2) Shift.add(name="Evening").set(capacity=3)
# Days of the week.with model.rule(dynamic=True): for name in ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"]: Day.add(name=name)
# Employee-day availability.with model.rule(dynamic=True): # Alice works weekdays only for name in ["Mon", "Tue", "Wed", "Thu", "Fri"]: Available.add(employee=Employee(name="Alice"), day=Day(name=name)) # Bob works all days Available.add(employee=Employee(name="Bob"), day=Day()) # Carol works weekends only for name in ["Sat", "Sun"]: Available.add(employee=Employee(name="Carol"), day=Day(name=name)) # Dave works Mon/Wed/Fri for name in ["Mon", "Wed", "Fri"]: Available.add(employee=Employee(name="Dave"), day=Day(name=name)) # Eve works Tue/Thu/Sat for name in ["Tue", "Thu", "Sat"]: Available.add(employee=Employee(name="Eve"), day=Day(name=name))
# All possible employee–shift–day combinationswith model.rule(): Scenario.add(employee=Employee(), shift=Shift(), day=Day())
# Create a SolverModel instance from the model.solver_model = solvers.SolverModel(model)
# Define solver variables.
# Scenario assignment: binary variable for each employee–shift–day combination.with model.rule(): scenario = Scenario() solver_model.variable( scenario, type="zero_one", name_args=["assigned", scenario.employee.name, scenario.day.name, scenario.shift.name], )
# Define Solver Constraints
# Unavailable employee–day pairs cannot be assigned.with model.rule(): scenario = Scenario() with model.not_found(): Available(employee=scenario.employee, day=scenario.day) solver_model.constraint( solvers.not_(scenario), name_args=["unavailable", scenario.employee.name, scenario.day.name] )
# If an employee works the morning shift, they can't work the evening shift.with model.rule(): works_morning = Scenario(shift=Shift(name="Morning")) works_evening = Scenario( shift=Shift(name="Evening"), employee=works_morning.employee, day=works_morning.day ) solver_model.constraint( solvers.implies(works_morning, solvers.not_(works_evening)) )
# At least one of Alice or Bob works Morning on Monday.with model.rule(): morning, monday = Shift(name="Morning"), Day(name="Mon") alice_works = Scenario(employee=Employee(name="Alice"), shift=morning, day=monday) bob_works = Scenario(employee=Employee(name="Bob"), shift=morning, day=monday) solver_model.constraint(solvers.or_(alice_works, bob_works))
# If Alice works in the morning, she doesn't work in the evening. Otherwise, she does work in the evening.with model.rule(): alice = Employee(name="Alice") day = Available(employee=alice).day works_morning = Scenario(employee=alice, shift=Shift(name="Morning"), day=day) works_evening = Scenario(employee=alice, shift=Shift(name="Evening"), day=day) solver_model.constraint( solvers.if_then_else(works_morning, solvers.not_(works_evening), works_evening), name_args=["alice_shift", day.name] )
# Limit the number of employees assigned to each shift to its capacity.with solvers.operators(): scenario = Scenario() shift, day = scenario.shift, scenario.day num_shifts_per_day = solvers.count(scenario, per=[shift, day]) solver_model.constraint( num_shifts_per_day <= shift.capacity, name_args=["shift_cap", shift.name, day.name] )
# Define solver objective.
# Maximize total assignments across the week.with model.rule(): scenario = Scenario() solver_model.max_objective(solvers.sum(scenario))
# Solve the solver model.
# Create a Solver instance.solver = solvers.Solver("minizinc")
# Solve the model.solver_model.solve(solver)
# View the objective value.with model.query() as select: response = select(solver_model.objective_value)print(f"Total assignments: {response.results.iloc[0, 0]}")# Output:# Total assignments: 20
# View the solution values for each scenario variable.with model.query() as select: scenario = Scenario() response = select( scenario.employee.name, scenario.shift.name, scenario.day.name, solver_model.value(scenario) )print("Solution:")print(response.results)# Output:# Solution:# name name2 name3 value# 0 Alice Evening Fri 1# 1 Alice Evening Mon 0# 2 Alice Evening Sat 0# 3 Alice Evening Sun 0# 4 Alice Evening Thu 1# .. ... ... ... ...# 65 Eve Morning Sat 1# 66 Eve Morning Sun 0# 67 Eve Morning Thu 0# 68 Eve Morning Tue 0# 69 Eve Morning Wed 0
# [70 rows x 4 columns]
# Use the solution to define Assignments.
# Define Assignments from Scenarios whose solution value is 1.with model.rule(): scenario = Scenario() solver_model.value(scenario) == 1 scenario.set(Assignment)
# View Dave's assignments.with model.query() as select: assignment = Assignment(employee=Employee(name="Dave")) response = select(assignment.day.name, assignment.shift.name)print("Dave's assignments:")print(response.results)# Output:# Dave's assignments:# name name2# 0 Fri Morning# 1 Mon Morning# 2 Wed Evening
# View solution metadata.
# Termination status.with model.query() as select: response = select(solver_model.termination_status)print(f"Solver termination status: {response.results.iloc[0, 0]}")# Output:# Solver termination status: OPTIMAL
# Solver time.with model.query() as select: response = select(solver_model.solve_time_sec)print(f"Solver time: {response.results.iloc[0, 0]} seconds")# Output:# Solver time: 1.5687661170959473 seconds
# Solver version.with model.query() as select: response = select(solver_model.solver_version)print(f"Solver version: {response.results.iloc[0, 0]}")# Output:# Solver version: MiniZinc_nothing
with model.query() as select: response = select(solver_model.error)print(f"Solver errors:\n{response.results}")