Solve a decision problem
Solve a decision problem by calling Problem.solve() with a solver backend.
This guide covers solving for feasibility (no objective) and solving optimally (with an objective).
What happens when you solve a problem
Section titled “What happens when you solve a problem”When you call Problem.solve(), PyRel runs a solve job in the RelationalAI Native App in Snowflake.
Even though you call solve() from Python, the backend solve happens in Snowflake.
At a high level, a solve looks like this:
- PyRel translates your decision variables, constraints, and objective into a solve job.
- PyRel submits the solve job to the RelationalAI (RAI) Native App for Snowflake.
- The Native App runs the prescriptive reasoner service, which calls the backend solver.
- PyRel imports the results and exposes them as attributes on the
Problem.
The following diagram combines the reasoner architecture with the job workflow:
Validate before you solve
Section titled “Validate before you solve”Most beginner issues are formulation issues. Use these checks below as a lightweight pre-solve checklist:
-
Check you actually created variables
Problem.num_variablesshould be greater than 0:from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as aggm = Model("ShiftAssignment")# Declare the model's schemaWorker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")42 collapsed lines# Define base facts.workers = m.data([{"id": 1, "name": "Alice"},{"id": 2, "name": "Bob"},{"id": 3, "name": "Chen"},])shifts = m.data([{"id": 10, "name": "Morning"},{"id": 20, "name": "Evening"},])availability = m.data([{"worker_id": 1, "shift_id": 10},{"worker_id": 2, "shift_id": 10},{"worker_id": 2, "shift_id": 20},{"worker_id": 3, "shift_id": 20},])costs = m.data([{"worker_id": 1, "shift_id": 10, "cost": 9.0},{"worker_id": 2, "shift_id": 10, "cost": 10.0},{"worker_id": 2, "shift_id": 20, "cost": 8.0},{"worker_id": 3, "shift_id": 20, "cost": 11.0},])required = m.data([{"shift_id": 10, "required_workers": 1},{"shift_id": 20, "required_workers": 1},])m.define(Worker.new(workers.to_schema()),Shift.new(shifts.to_schema()),)worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))p = Problem(m, Float)25 collapsed lines# Declare a decision-variable relationship.Worker.x_assign = m.Relationship(f"{Worker} is assigned to {Shift} if {Float:assigned}")# Define `Worker.x_assign` as a decision variable to be solved for,# scoped to available worker-shift pairs, and with binary values of 0 or 1.x = Float.ref("x")p.solve_for(Worker.x_assign(Shift, x),populate=True,name=["assign", Worker.id, Shift.id],where=[Worker.available_shifts(Shift)],type="bin",lower=0,upper=1,)# Each shift must be covered by the required number of workers.assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))# Worker capacity: each worker is assigned to at most one shift.assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)p.satisfy(m.require(assigned_per_worker <= 1))print(p.num_variables) -
Check that constraints have been added
If your problem has constraints, then
Problem.num_constraintsshould be greater than 0:print(p.num_constraints) -
Spot check the entire problem formulation
Use
Problem.display()to get a complete description of the decision problem:p.display()
Solve for feasibility
Section titled “Solve for feasibility”If you don’t add an objective to Problem, the solver treats the problem as a feasibility problem.
It finds any solution that satisfies all solution constraints without optimizing for anything in particular.
Pass a backend name to Problem.solve() to solve the problem with a specific solver:
from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
# Declare the model's schemaWorker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.61 collapsed lines
workers = m.data([ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Chen"},])shifts = m.data([ {"id": 10, "name": "Morning"}, {"id": 20, "name": "Evening"},])availability = m.data([ {"worker_id": 1, "shift_id": 10}, {"worker_id": 2, "shift_id": 10}, {"worker_id": 2, "shift_id": 20}, {"worker_id": 3, "shift_id": 20},])costs = m.data([ {"worker_id": 1, "shift_id": 10, "cost": 9.0}, {"worker_id": 2, "shift_id": 10, "cost": 10.0}, {"worker_id": 2, "shift_id": 20, "cost": 8.0}, {"worker_id": 3, "shift_id": 20, "cost": 11.0},])required = m.data([ {"shift_id": 10, "required_workers": 1}, {"shift_id": 20, "required_workers": 1},])
m.define( Worker.new(workers.to_schema()), Shift.new(shifts.to_schema()),)
worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
# Declare a decision-variable relationship.Worker.x_assign = m.Relationship( f"{Worker} is assigned to {Shift} if {Float:assigned}")
# Define `Worker.x_assign` as a decision variable to be solved for,# scoped to available worker-shift pairs, and with binary values of 0 or 1.x = Float.ref("x")p.solve_for( Worker.x_assign(Shift, x), populate=True, name=["assign", Worker.id, Shift.id], where=[Worker.available_shifts(Shift)], type="bin", lower=0, upper=1,)
# Each shift must be covered by the required number of workers.assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
# Worker capacity: each worker is assigned to at most one shift.assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)p.satisfy(m.require(assigned_per_worker <= 1))
p.solve("highs")
print(p.termination_status)- No objective is added to the
Problem, so this is a feasibility solve. p.solve("highs")calls the HiGHS solver to solve the problem.p.termination_statusreports the solver’s termination status, which indicates whether a feasible solution was found or not.
- Calling
solve()before declaring any variables is an error. - Use
"minizinc"when you have a pure discrete feasibility problem and you want a constraint-programming solver. In that case, create the problem withProblem(m, Integer).
Solve optimally
Section titled “Solve optimally”Solve an optimization problem by adding an objective and then calling Problem.solve():
-
Define an objective
Add an objective expression involving at least one decision variable and use
Problem.minimize()orProblem.maximize()to set the optimization direction:from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as aggm = Model("ShiftAssignment")# Declare the model's schemaWorker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")42 collapsed lines# Define base facts.workers = m.data([{"id": 1, "name": "Alice"},{"id": 2, "name": "Bob"},{"id": 3, "name": "Chen"},])shifts = m.data([{"id": 10, "name": "Morning"},{"id": 20, "name": "Evening"},])availability = m.data([{"worker_id": 1, "shift_id": 10},{"worker_id": 2, "shift_id": 10},{"worker_id": 2, "shift_id": 20},{"worker_id": 3, "shift_id": 20},])costs = m.data([{"worker_id": 1, "shift_id": 10, "cost": 9.0},{"worker_id": 2, "shift_id": 10, "cost": 10.0},{"worker_id": 2, "shift_id": 20, "cost": 8.0},{"worker_id": 3, "shift_id": 20, "cost": 11.0},])required = m.data([{"shift_id": 10, "required_workers": 1},{"shift_id": 20, "required_workers": 1},])m.define(Worker.new(workers.to_schema()),Shift.new(shifts.to_schema()),)worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))# Create a decision problem.p = Problem(m, Float)25 collapsed lines# Declare a decision-variable relationship.Worker.x_assign = m.Relationship(f"{Worker} is assigned to {Shift} if {Float:assigned}")# Define `Worker.x_assign` as a decision variable to be solved for,# scoped to available worker-shift pairs, and with binary values of 0 or 1.x = Float.ref("x")p.solve_for(Worker.x_assign(Shift, x),populate=True,name=["assign", Worker.id, Shift.id],where=[Worker.available_shifts(Shift)],type="bin",lower=0,upper=1,)# Each shift must be covered by the required number of workers.assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))# Worker capacity: each worker is assigned to at most one shift.assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)p.satisfy(m.require(assigned_per_worker <= 1))# Minimize the total cost of assigned shifts.cost = Float.ref("cost")p.minimize(agg.sum(cost * x).where(Worker.x_assign(Shift, x),Worker.cost_for_shift(Shift, cost),)) -
Solve and check the result
Call
Problem.solve()and inspect the termination status before you treat the solution as optimal:p.solve("highs")print(p.termination_status)print(p.objective_value)
p.minimize(...)adds a minimization objective.agg.sum(cost * x).where(...)defines the expression to minimize.p.solve("highs")calls the HiGHS solver to solve the problem.p.termination_statustells you whether the solver proved optimality (or stopped for another reason).p.objective_valuereports the objective value for the returned solution.
- Objective expressions must reference at least one decision variable. If the objective is constant, it is rejected.
- You can also add a maximization objective with
p.maximize(...). - If you set a time limit, a feasible (but not proven optimal) solution can still be useful. Check
p.termination_statusbefore treating the result as optimal.
Inspect solve metadata
Section titled “Inspect solve metadata”After a solve, you can inspect additional solver metadata on the Problem.
This is useful when you need to report solve time, confirm which solver version ran, or quickly summarize the result.
Use p.display_solve_info() to print a short, human-readable summary:
from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
# Declare the model's schemaWorker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.70 collapsed lines
workers = m.data([ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Chen"},])shifts = m.data([ {"id": 10, "name": "Morning"}, {"id": 20, "name": "Evening"},])availability = m.data([ {"worker_id": 1, "shift_id": 10}, {"worker_id": 2, "shift_id": 10}, {"worker_id": 2, "shift_id": 20}, {"worker_id": 3, "shift_id": 20},])costs = m.data([ {"worker_id": 1, "shift_id": 10, "cost": 9.0}, {"worker_id": 2, "shift_id": 10, "cost": 10.0}, {"worker_id": 2, "shift_id": 20, "cost": 8.0}, {"worker_id": 3, "shift_id": 20, "cost": 11.0},])required = m.data([ {"shift_id": 10, "required_workers": 1}, {"shift_id": 20, "required_workers": 1},])
m.define( Worker.new(workers.to_schema()), Shift.new(shifts.to_schema()),)
worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))
# Create a decision problem.p = Problem(m, Float)
# Declare a decision-variable relationship.Worker.x_assign = m.Relationship( f"{Worker} is assigned to {Shift} if {Float:assigned}")
# Define `Worker.x_assign` as a decision variable to be solved for,# scoped to available worker-shift pairs, and with binary values of 0 or 1.x = Float.ref("x")p.solve_for( Worker.x_assign(Shift, x), populate=True, name=["assign", Worker.id, Shift.id], where=[Worker.available_shifts(Shift)], type="bin", lower=0, upper=1,)
# Each shift must be covered by the required number of workers.assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
# Worker capacity: each worker is assigned to at most one shift.assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)p.satisfy(m.require(assigned_per_worker <= 1))
# Minimize the total cost of assigned shifts.cost = Float.ref("cost")p.minimize( agg.sum(cost * x).where( Worker.x_assign(Shift, x), Worker.cost_for_shift(Shift, cost), ))
p.solve("highs")
# Display a summary of the solve resultsp.display_solve_info()
# Or inspect specific metadata attributesprint("status:", p.termination_status)print("objective:", p.objective_value) # None for feasibility solvesprint("solve time (sec):", p.solve_time_sec)print("solver version:", p.solver_version)Set a time limit
Section titled “Set a time limit”To limit how long the solver runs, pass time_limit_sec with a value in seconds to Problem.solve():
from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schemaWorker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.workers = m.data([ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Chen"},])shifts = m.data([ {"id": 10, "name": "Morning"}, {"id": 20, "name": "Evening"},])availability = m.data([ {"worker_id": 1, "shift_id": 10}, {"worker_id": 2, "shift_id": 10}, {"worker_id": 2, "shift_id": 20}, {"worker_id": 3, "shift_id": 20},])costs = m.data([ {"worker_id": 1, "shift_id": 10, "cost": 9.0}, {"worker_id": 2, "shift_id": 10, "cost": 10.0}, {"worker_id": 2, "shift_id": 20, "cost": 8.0}, {"worker_id": 3, "shift_id": 20, "cost": 11.0},])required = m.data([ {"shift_id": 10, "required_workers": 1}, {"shift_id": 20, "required_workers": 1},])
m.define( Worker.new(workers.to_schema()), Shift.new(shifts.to_schema()),)
worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship( f"{Worker} is assigned to {Shift} if {Float:assigned}")
x = Float.ref("x")p.solve_for( Worker.x_assign(Shift, x), populate=True, name=["assign", Worker.id, Shift.id], where=[Worker.available_shifts(Shift)], type="bin", lower=0, upper=1,)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")p.minimize( agg.sum(cost * x).where( Worker.x_assign(Shift, x), Worker.cost_for_shift(Shift, cost), ))
p.solve( "highs", time_limit_sec=60,)
print("status:", p.termination_status)print("objective:", p.objective_value)time_limit_sec=60limits the solver to run for at most 60 seconds.p.termination_statusindicates whether the solve completed, hit the time limit, or failed.- When a time limit is reached, the returned solution can be feasible but not proven optimal.
time_limit_secis a solver-independent option that works with any backend.
Accept a near-optimal solution
Section titled “Accept a near-optimal solution”You can set optimality gap tolerances to accept a near-optimal solution and stop the solver earlier. Use:
relative_gap_toleranceto stop when the relative optimality gap is below a threshold.absolute_gap_toleranceto stop when the absolute optimality gap is below a threshold.
Pass these as arguments to Problem.solve():
from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schemaWorker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.workers = m.data([ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Chen"},])shifts = m.data([ {"id": 10, "name": "Morning"}, {"id": 20, "name": "Evening"},])availability = m.data([ {"worker_id": 1, "shift_id": 10}, {"worker_id": 2, "shift_id": 10}, {"worker_id": 2, "shift_id": 20}, {"worker_id": 3, "shift_id": 20},])costs = m.data([ {"worker_id": 1, "shift_id": 10, "cost": 9.0}, {"worker_id": 2, "shift_id": 10, "cost": 10.0}, {"worker_id": 2, "shift_id": 20, "cost": 8.0}, {"worker_id": 3, "shift_id": 20, "cost": 11.0},])required = m.data([ {"shift_id": 10, "required_workers": 1}, {"shift_id": 20, "required_workers": 1},])
m.define( Worker.new(workers.to_schema()), Shift.new(shifts.to_schema()),)
worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship( f"{Worker} is assigned to {Shift} if {Float:assigned}")
x = Float.ref("x")p.solve_for( Worker.x_assign(Shift, x), populate=True, name=["assign", Worker.id, Shift.id], where=[Worker.available_shifts(Shift)], type="bin", lower=0, upper=1,)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")p.minimize( agg.sum(cost * x).where( Worker.x_assign(Shift, x), Worker.cost_for_shift(Shift, cost), ))
p.solve( "highs", relative_gap_tolerance=0.01,)
print("status:", p.termination_status)print("objective:", p.objective_value)relative_gap_tolerance=0.01asks the solver to stop once the relative optimality gap is below 1%.absolute_gap_toleranceworks similarly, but with an absolute gap threshold.p.termination_statusis printed to check the status before accepting the solution.
- Optimality gap tolerances are supported by some solvers, but not all. If a solver does not support the specified tolerance, it is ignored.
Tune solver backend behavior
Section titled “Tune solver backend behavior”Solver backends have many options that you can tune to improve solve time or influence the returned solution.
You can pass backend-specific options as keyword arguments to Problem.solve() and they will be passed through to the solver.
For example, you can pass Gurobi parameters as keyword arguments when you solve with "gurobi":
from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schemaWorker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.workers = m.data([ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Chen"},])shifts = m.data([ {"id": 10, "name": "Morning"}, {"id": 20, "name": "Evening"},])availability = m.data([ {"worker_id": 1, "shift_id": 10}, {"worker_id": 2, "shift_id": 10}, {"worker_id": 2, "shift_id": 20}, {"worker_id": 3, "shift_id": 20},])costs = m.data([ {"worker_id": 1, "shift_id": 10, "cost": 9.0}, {"worker_id": 2, "shift_id": 10, "cost": 10.0}, {"worker_id": 2, "shift_id": 20, "cost": 8.0}, {"worker_id": 3, "shift_id": 20, "cost": 11.0},])required = m.data([ {"shift_id": 10, "required_workers": 1}, {"shift_id": 20, "required_workers": 1},])
m.define( Worker.new(workers.to_schema()), Shift.new(shifts.to_schema()),)
worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship( f"{Worker} is assigned to {Shift} if {Float:assigned}")
x = Float.ref("x")p.solve_for( Worker.x_assign(Shift, x), populate=True, name=["assign", Worker.id, Shift.id], where=[Worker.available_shifts(Shift)], type="bin", lower=0, upper=1,)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")p.minimize( agg.sum(cost * x).where( Worker.x_assign(Shift, x), Worker.cost_for_shift(Shift, cost), ))
p.solve( "gurobi", MIPFocus=1, Presolve=2, Threads=0,)- Raw solver parameters must be
int,float,str, orboolvalues. This means they must be plain scalar configuration values, not PyRel objects like concept types or expression values. It also means you cannot pass structured values like lists or dictionaries. - PyRel emits a warning when you pass any backend-specific parameters. It does this because raw solver parameters may not be portable across solvers and can change meaning across backends.
Print the translated solver model
Section titled “Print the translated solver model”When you need to debug a formulation or share a solver-native model with someone, request a text representation during solve.
To request a text representation, pass print_format to Problem.solve() and read the result from p.printed_model:
from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schemaWorker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.workers = m.data([ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Chen"},])shifts = m.data([ {"id": 10, "name": "Morning"}, {"id": 20, "name": "Evening"},])availability = m.data([ {"worker_id": 1, "shift_id": 10}, {"worker_id": 2, "shift_id": 10}, {"worker_id": 2, "shift_id": 20}, {"worker_id": 3, "shift_id": 20},])costs = m.data([ {"worker_id": 1, "shift_id": 10, "cost": 9.0}, {"worker_id": 2, "shift_id": 10, "cost": 10.0}, {"worker_id": 2, "shift_id": 20, "cost": 8.0}, {"worker_id": 3, "shift_id": 20, "cost": 11.0},])required = m.data([ {"shift_id": 10, "required_workers": 1}, {"shift_id": 20, "required_workers": 1},])
m.define( Worker.new(workers.to_schema()), Shift.new(shifts.to_schema()),)
worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship( f"{Worker} is assigned to {Shift} if {Float:assigned}")
x = Float.ref("x")p.solve_for( Worker.x_assign(Shift, x), populate=True, name=["assign", Worker.id, Shift.id], where=[Worker.available_shifts(Shift)], type="bin", lower=0, upper=1,)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")p.minimize( agg.sum(cost * x).where( Worker.x_assign(Shift, x), Worker.cost_for_shift(Shift, cost), ))
p.solve( "highs", print_format="lp", # Request a printed model representation in LP format)
print(p.printed_model)You can also set print_only=True to print the model without solving:
from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schemaWorker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.workers = m.data([ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}, {"id": 3, "name": "Chen"},])shifts = m.data([ {"id": 10, "name": "Morning"}, {"id": 20, "name": "Evening"},])availability = m.data([ {"worker_id": 1, "shift_id": 10}, {"worker_id": 2, "shift_id": 10}, {"worker_id": 2, "shift_id": 20}, {"worker_id": 3, "shift_id": 20},])costs = m.data([ {"worker_id": 1, "shift_id": 10, "cost": 9.0}, {"worker_id": 2, "shift_id": 10, "cost": 10.0}, {"worker_id": 2, "shift_id": 20, "cost": 8.0}, {"worker_id": 3, "shift_id": 20, "cost": 11.0},])required = m.data([ {"shift_id": 10, "required_workers": 1}, {"shift_id": 20, "required_workers": 1},])
m.define( Worker.new(workers.to_schema()), Shift.new(shifts.to_schema()),)
worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship( f"{Worker} is assigned to {Shift} if {Float:assigned}")
x = Float.ref("x")p.solve_for( Worker.x_assign(Shift, x), populate=True, name=["assign", Worker.id, Shift.id], where=[Worker.available_shifts(Shift)], type="bin", lower=0, upper=1,)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")p.minimize( agg.sum(cost * x).where( Worker.x_assign(Shift, x), Worker.cost_for_shift(Shift, cost), ))
p.solve( "highs", print_only=True, # Do not run the solver, just print the model print_format="lp",)
print(p.printed_model)Supported print_format values are:
"moi""latex""mof""lp""mps""nl"
Check error details when termination status is not OPTIMAL
Section titled “Check error details when termination status is not OPTIMAL”If the solver does not prove optimality, p.termination_status tells you what happened.
When results are imported, p.error can include backend-provided diagnostic text that helps you decide what to try next.
Use the following pattern after p.solve(...) to check the status and print p.error when it is available:
from relationalai.semantics import Float, Integer, Modelfrom relationalai.semantics.reasoners.prescriptive import Problemfrom relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
69 collapsed lines
Worker = m.Concept("Worker", identify_by={"id": Integer})Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
workers = m.data([ {"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"},])shifts = m.data([ {"id": 10, "name": "Morning"},])availability = m.data([ {"worker_id": 1, "shift_id": 10}, {"worker_id": 2, "shift_id": 10},])costs = m.data([ {"worker_id": 1, "shift_id": 10, "cost": 9.0}, {"worker_id": 2, "shift_id": 10, "cost": 10.0},])required = m.data([ {"shift_id": 10, "required_workers": 1},])
m.define( Worker.new(workers.to_schema()), Shift.new(shifts.to_schema()),)
worker = Worker.filter_by(id=availability.worker_id)shift = Shift.filter_by(id=availability.shift_id)m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)shift = Shift.filter_by(id=costs.shift_id)m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship( f"{Worker} is assigned to {Shift} if {Float:assigned}")
x = Float.ref("x")p.solve_for( Worker.x_assign(Shift, x), populate=True, name=["assign", Worker.id, Shift.id], where=[Worker.available_shifts(Shift)], type="bin", lower=0, upper=1,)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
cost = Float.ref("cost")p.minimize( agg.sum(cost * x).where( Worker.x_assign(Shift, x), Worker.cost_for_shift(Shift, cost), ))
p.solve("highs")
print("termination status:", p.termination_status)
if p.termination_status != "OPTIMAL": print("error:", p.error)- Only treat the solution as optimal when
p.termination_status == "OPTIMAL". - Use
p.erroras best-effort diagnostic text when the solve returns a non-OPTIMALstatus.
p.erroris only available after a solve where results were successfully imported into theProblem. Accessing it before that can raiseAttributeError.p.errorcan beNoneeven after a solve returns.- If
Problem.solve()raises before results import completes,p.erroris typically not available. Use the raised exception details and retry withlog_to_console=Trueto capture solver logs.
Handle solve failures
Section titled “Handle solve failures”When Problem.solve() raises an exception, the most useful thing you can do first is capture logs.
To stream solver logs to your console, pass log_to_console=True.
In particular, Problem.solve() can raise:
ValueErrorif you callsolve()before declaring any decision variables.RuntimeErrorif the solve job fails, or if theProblemis in a degraded state.TimeoutErrorif the solve job doesn’t complete within the timeout.
Here’s an example of how to capture logs and retry on failure:
from relationalai.semantics import Float, Modelfrom relationalai.semantics.reasoners.prescriptive import Problem
def build_problem(model: Model) -> Problem: p = Problem(model, Float)
# Declare decision variables, add requirements, # and optionally add an objective.
return p
m = Model("MyDecisionProblem")p = build_problem(m)
try: p.solve("highs", log_to_console=True)except (RuntimeError, TimeoutError): # Rebuild and retry so you don't reuse a partially-failed Problem instance. p = build_problem(m) p.solve("highs", log_to_console=True)- If you only want logs on failure, solve once without logs and then retry with
log_to_console=True. - If a solve completes but you suspect it terminated early, inspect
p.termination_statusand use the metadata fields in the previous section.
Avoid common pitfalls
Section titled “Avoid common pitfalls”Use this table to troubleshoot common issues when solving a decision problem:
| Symptom | Likely cause | What to try |
|---|---|---|
| Solve fails because no decision variables exist | You didn’t call solve_for() or your where=[...] scope matches nothing. | Declare at least one variable and verify p.num_variables > 0. |
Solution is OPTIMAL but everything is 0 | You added an objective function but need a forcing constraint. | Add a coverage or demand constraint like sum(x) == required or sum(x) >= demand. |
| The solve is slow or very large | Variable scope is too broad or missing a join. | Tighten where=[...] and avoid Cartesian products. |
| The solve is infeasible | Requirements conflict, bounds are too tight, or joins don’t match real data. | Inspect constraints with p.display() and verify your input facts match the intended scope. |