Skip to content

This feature is currently in Preview.

Solve a decision problem

Solve a decision problem by calling Problem.solve() with a solver backend. This guide covers solving for feasibility (no objective) and solving optimally (with an objective).

When you call Problem.solve(), PyRel runs a solve job in the RelationalAI Native App in Snowflake. Even though you call solve() from Python, the backend solve happens in Snowflake.

At a high level, a solve looks like this:

  1. PyRel translates your decision variables, constraints, and objective into a solve job.
  2. PyRel submits the solve job to the RelationalAI (RAI) Native App for Snowflake.
  3. The Native App runs the prescriptive reasoner service, which calls the backend solver.
  4. PyRel imports the results and exposes them as attributes on the Problem.

The following diagram combines the reasoner architecture with the job workflow:

Local Python processRAI Native App in SnowflakeModel objectProblem objectSolve jobPrescriptive reasoner serviceBackend solver(HiGHS / MiniZinc / ...) submit solve jobrun job(provisions a reasonerif needed)translate + solvesolution + logsupdate job statusimport resultsand metadata

Most beginner issues are formulation issues. Use these checks below as a lightweight pre-solve checklist:

  1. Check you actually created variables

    Problem.num_variables should be greater than 0:

    from relationalai.semantics import Float, Integer, Model
    from relationalai.semantics.reasoners.prescriptive import Problem
    from relationalai.semantics.std import aggregates as agg
    m = Model("ShiftAssignment")
    # Declare the model's schema
    Worker = m.Concept("Worker", identify_by={"id": Integer})
    Shift = m.Concept("Shift", identify_by={"id": Integer})
    Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
    Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
    Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
    42 collapsed lines
    # Define base facts.
    workers = m.data([
    {"id": 1, "name": "Alice"},
    {"id": 2, "name": "Bob"},
    {"id": 3, "name": "Chen"},
    ])
    shifts = m.data([
    {"id": 10, "name": "Morning"},
    {"id": 20, "name": "Evening"},
    ])
    availability = m.data([
    {"worker_id": 1, "shift_id": 10},
    {"worker_id": 2, "shift_id": 10},
    {"worker_id": 2, "shift_id": 20},
    {"worker_id": 3, "shift_id": 20},
    ])
    costs = m.data([
    {"worker_id": 1, "shift_id": 10, "cost": 9.0},
    {"worker_id": 2, "shift_id": 10, "cost": 10.0},
    {"worker_id": 2, "shift_id": 20, "cost": 8.0},
    {"worker_id": 3, "shift_id": 20, "cost": 11.0},
    ])
    required = m.data([
    {"shift_id": 10, "required_workers": 1},
    {"shift_id": 20, "required_workers": 1},
    ])
    m.define(
    Worker.new(workers.to_schema()),
    Shift.new(shifts.to_schema()),
    )
    worker = Worker.filter_by(id=availability.worker_id)
    shift = Shift.filter_by(id=availability.shift_id)
    m.define(worker.available_shifts(shift))
    worker = Worker.filter_by(id=costs.worker_id)
    shift = Shift.filter_by(id=costs.shift_id)
    m.define(worker.cost_for_shift(shift, costs.cost))
    shift = Shift.filter_by(id=required.shift_id)
    m.define(shift.required_workers(required.required_workers))
    p = Problem(m, Float)
    25 collapsed lines
    # Declare a decision-variable relationship.
    Worker.x_assign = m.Relationship(
    f"{Worker} is assigned to {Shift} if {Float:assigned}"
    )
    # Define `Worker.x_assign` as a decision variable to be solved for,
    # scoped to available worker-shift pairs, and with binary values of 0 or 1.
    x = Float.ref("x")
    p.solve_for(
    Worker.x_assign(Shift, x),
    populate=True,
    name=["assign", Worker.id, Shift.id],
    where=[Worker.available_shifts(Shift)],
    type="bin",
    lower=0,
    upper=1,
    )
    # Each shift must be covered by the required number of workers.
    assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
    p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
    # Worker capacity: each worker is assigned to at most one shift.
    assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)
    p.satisfy(m.require(assigned_per_worker <= 1))
    print(p.num_variables)
  2. Check that constraints have been added

    If your problem has constraints, then Problem.num_constraints should be greater than 0:

    print(p.num_constraints)
  3. Spot check the entire problem formulation

    Use Problem.display() to get a complete description of the decision problem:

    p.display()

If you don’t add an objective to Problem, the solver treats the problem as a feasibility problem. It finds any solution that satisfies all solution constraints without optimizing for anything in particular.

Pass a backend name to Problem.solve() to solve the problem with a specific solver:

from relationalai.semantics import Float, Integer, Model
from relationalai.semantics.reasoners.prescriptive import Problem
from relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
# Declare the model's schema
Worker = m.Concept("Worker", identify_by={"id": Integer})
Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.
61 collapsed lines
workers = m.data([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Chen"},
])
shifts = m.data([
{"id": 10, "name": "Morning"},
{"id": 20, "name": "Evening"},
])
availability = m.data([
{"worker_id": 1, "shift_id": 10},
{"worker_id": 2, "shift_id": 10},
{"worker_id": 2, "shift_id": 20},
{"worker_id": 3, "shift_id": 20},
])
costs = m.data([
{"worker_id": 1, "shift_id": 10, "cost": 9.0},
{"worker_id": 2, "shift_id": 10, "cost": 10.0},
{"worker_id": 2, "shift_id": 20, "cost": 8.0},
{"worker_id": 3, "shift_id": 20, "cost": 11.0},
])
required = m.data([
{"shift_id": 10, "required_workers": 1},
{"shift_id": 20, "required_workers": 1},
])
m.define(
Worker.new(workers.to_schema()),
Shift.new(shifts.to_schema()),
)
worker = Worker.filter_by(id=availability.worker_id)
shift = Shift.filter_by(id=availability.shift_id)
m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)
shift = Shift.filter_by(id=costs.shift_id)
m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)
m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
# Declare a decision-variable relationship.
Worker.x_assign = m.Relationship(
f"{Worker} is assigned to {Shift} if {Float:assigned}"
)
# Define `Worker.x_assign` as a decision variable to be solved for,
# scoped to available worker-shift pairs, and with binary values of 0 or 1.
x = Float.ref("x")
p.solve_for(
Worker.x_assign(Shift, x),
populate=True,
name=["assign", Worker.id, Shift.id],
where=[Worker.available_shifts(Shift)],
type="bin",
lower=0,
upper=1,
)
# Each shift must be covered by the required number of workers.
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
# Worker capacity: each worker is assigned to at most one shift.
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)
p.satisfy(m.require(assigned_per_worker <= 1))
p.solve("highs")
print(p.termination_status)
  • No objective is added to the Problem, so this is a feasibility solve.
  • p.solve("highs") calls the HiGHS solver to solve the problem.
  • p.termination_status reports the solver’s termination status, which indicates whether a feasible solution was found or not.
  • Calling solve() before declaring any variables is an error.
  • Use "minizinc" when you have a pure discrete feasibility problem and you want a constraint-programming solver. In that case, create the problem with Problem(m, Integer).

Solve an optimization problem by adding an objective and then calling Problem.solve():

  1. Define an objective

    Add an objective expression involving at least one decision variable and use Problem.minimize() or Problem.maximize() to set the optimization direction:

    from relationalai.semantics import Float, Integer, Model
    from relationalai.semantics.reasoners.prescriptive import Problem
    from relationalai.semantics.std import aggregates as agg
    m = Model("ShiftAssignment")
    # Declare the model's schema
    Worker = m.Concept("Worker", identify_by={"id": Integer})
    Shift = m.Concept("Shift", identify_by={"id": Integer})
    Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
    Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
    Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
    42 collapsed lines
    # Define base facts.
    workers = m.data([
    {"id": 1, "name": "Alice"},
    {"id": 2, "name": "Bob"},
    {"id": 3, "name": "Chen"},
    ])
    shifts = m.data([
    {"id": 10, "name": "Morning"},
    {"id": 20, "name": "Evening"},
    ])
    availability = m.data([
    {"worker_id": 1, "shift_id": 10},
    {"worker_id": 2, "shift_id": 10},
    {"worker_id": 2, "shift_id": 20},
    {"worker_id": 3, "shift_id": 20},
    ])
    costs = m.data([
    {"worker_id": 1, "shift_id": 10, "cost": 9.0},
    {"worker_id": 2, "shift_id": 10, "cost": 10.0},
    {"worker_id": 2, "shift_id": 20, "cost": 8.0},
    {"worker_id": 3, "shift_id": 20, "cost": 11.0},
    ])
    required = m.data([
    {"shift_id": 10, "required_workers": 1},
    {"shift_id": 20, "required_workers": 1},
    ])
    m.define(
    Worker.new(workers.to_schema()),
    Shift.new(shifts.to_schema()),
    )
    worker = Worker.filter_by(id=availability.worker_id)
    shift = Shift.filter_by(id=availability.shift_id)
    m.define(worker.available_shifts(shift))
    worker = Worker.filter_by(id=costs.worker_id)
    shift = Shift.filter_by(id=costs.shift_id)
    m.define(worker.cost_for_shift(shift, costs.cost))
    shift = Shift.filter_by(id=required.shift_id)
    m.define(shift.required_workers(required.required_workers))
    # Create a decision problem.
    p = Problem(m, Float)
    25 collapsed lines
    # Declare a decision-variable relationship.
    Worker.x_assign = m.Relationship(
    f"{Worker} is assigned to {Shift} if {Float:assigned}"
    )
    # Define `Worker.x_assign` as a decision variable to be solved for,
    # scoped to available worker-shift pairs, and with binary values of 0 or 1.
    x = Float.ref("x")
    p.solve_for(
    Worker.x_assign(Shift, x),
    populate=True,
    name=["assign", Worker.id, Shift.id],
    where=[Worker.available_shifts(Shift)],
    type="bin",
    lower=0,
    upper=1,
    )
    # Each shift must be covered by the required number of workers.
    assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
    p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
    # Worker capacity: each worker is assigned to at most one shift.
    assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)
    p.satisfy(m.require(assigned_per_worker <= 1))
    # Minimize the total cost of assigned shifts.
    cost = Float.ref("cost")
    p.minimize(
    agg.sum(cost * x).where(
    Worker.x_assign(Shift, x),
    Worker.cost_for_shift(Shift, cost),
    )
    )
  2. Solve and check the result

    Call Problem.solve() and inspect the termination status before you treat the solution as optimal:

    p.solve("highs")
    print(p.termination_status)
    print(p.objective_value)
  • p.minimize(...) adds a minimization objective.
  • agg.sum(cost * x).where(...) defines the expression to minimize.
  • p.solve("highs") calls the HiGHS solver to solve the problem.
  • p.termination_status tells you whether the solver proved optimality (or stopped for another reason).
  • p.objective_value reports the objective value for the returned solution.
  • Objective expressions must reference at least one decision variable. If the objective is constant, it is rejected.
  • You can also add a maximization objective with p.maximize(...).
  • If you set a time limit, a feasible (but not proven optimal) solution can still be useful. Check p.termination_status before treating the result as optimal.

After a solve, you can inspect additional solver metadata on the Problem. This is useful when you need to report solve time, confirm which solver version ran, or quickly summarize the result.

Use p.display_solve_info() to print a short, human-readable summary:

from relationalai.semantics import Float, Integer, Model
from relationalai.semantics.reasoners.prescriptive import Problem
from relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
# Declare the model's schema
Worker = m.Concept("Worker", identify_by={"id": Integer})
Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.
70 collapsed lines
workers = m.data([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Chen"},
])
shifts = m.data([
{"id": 10, "name": "Morning"},
{"id": 20, "name": "Evening"},
])
availability = m.data([
{"worker_id": 1, "shift_id": 10},
{"worker_id": 2, "shift_id": 10},
{"worker_id": 2, "shift_id": 20},
{"worker_id": 3, "shift_id": 20},
])
costs = m.data([
{"worker_id": 1, "shift_id": 10, "cost": 9.0},
{"worker_id": 2, "shift_id": 10, "cost": 10.0},
{"worker_id": 2, "shift_id": 20, "cost": 8.0},
{"worker_id": 3, "shift_id": 20, "cost": 11.0},
])
required = m.data([
{"shift_id": 10, "required_workers": 1},
{"shift_id": 20, "required_workers": 1},
])
m.define(
Worker.new(workers.to_schema()),
Shift.new(shifts.to_schema()),
)
worker = Worker.filter_by(id=availability.worker_id)
shift = Shift.filter_by(id=availability.shift_id)
m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)
shift = Shift.filter_by(id=costs.shift_id)
m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)
m.define(shift.required_workers(required.required_workers))
# Create a decision problem.
p = Problem(m, Float)
# Declare a decision-variable relationship.
Worker.x_assign = m.Relationship(
f"{Worker} is assigned to {Shift} if {Float:assigned}"
)
# Define `Worker.x_assign` as a decision variable to be solved for,
# scoped to available worker-shift pairs, and with binary values of 0 or 1.
x = Float.ref("x")
p.solve_for(
Worker.x_assign(Shift, x),
populate=True,
name=["assign", Worker.id, Shift.id],
where=[Worker.available_shifts(Shift)],
type="bin",
lower=0,
upper=1,
)
# Each shift must be covered by the required number of workers.
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
# Worker capacity: each worker is assigned to at most one shift.
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)
p.satisfy(m.require(assigned_per_worker <= 1))
# Minimize the total cost of assigned shifts.
cost = Float.ref("cost")
p.minimize(
agg.sum(cost * x).where(
Worker.x_assign(Shift, x),
Worker.cost_for_shift(Shift, cost),
)
)
p.solve("highs")
# Display a summary of the solve results
p.display_solve_info()
# Or inspect specific metadata attributes
print("status:", p.termination_status)
print("objective:", p.objective_value) # None for feasibility solves
print("solve time (sec):", p.solve_time_sec)
print("solver version:", p.solver_version)

To limit how long the solver runs, pass time_limit_sec with a value in seconds to Problem.solve():

from relationalai.semantics import Float, Integer, Model
from relationalai.semantics.reasoners.prescriptive import Problem
from relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schema
Worker = m.Concept("Worker", identify_by={"id": Integer})
Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.
workers = m.data([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Chen"},
])
shifts = m.data([
{"id": 10, "name": "Morning"},
{"id": 20, "name": "Evening"},
])
availability = m.data([
{"worker_id": 1, "shift_id": 10},
{"worker_id": 2, "shift_id": 10},
{"worker_id": 2, "shift_id": 20},
{"worker_id": 3, "shift_id": 20},
])
costs = m.data([
{"worker_id": 1, "shift_id": 10, "cost": 9.0},
{"worker_id": 2, "shift_id": 10, "cost": 10.0},
{"worker_id": 2, "shift_id": 20, "cost": 8.0},
{"worker_id": 3, "shift_id": 20, "cost": 11.0},
])
required = m.data([
{"shift_id": 10, "required_workers": 1},
{"shift_id": 20, "required_workers": 1},
])
m.define(
Worker.new(workers.to_schema()),
Shift.new(shifts.to_schema()),
)
worker = Worker.filter_by(id=availability.worker_id)
shift = Shift.filter_by(id=availability.shift_id)
m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)
shift = Shift.filter_by(id=costs.shift_id)
m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)
m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship(
f"{Worker} is assigned to {Shift} if {Float:assigned}"
)
x = Float.ref("x")
p.solve_for(
Worker.x_assign(Shift, x),
populate=True,
name=["assign", Worker.id, Shift.id],
where=[Worker.available_shifts(Shift)],
type="bin",
lower=0,
upper=1,
)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)
p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")
p.minimize(
agg.sum(cost * x).where(
Worker.x_assign(Shift, x),
Worker.cost_for_shift(Shift, cost),
)
)
p.solve(
"highs",
time_limit_sec=60,
)
print("status:", p.termination_status)
print("objective:", p.objective_value)
  • time_limit_sec=60 limits the solver to run for at most 60 seconds.
  • p.termination_status indicates whether the solve completed, hit the time limit, or failed.
  • When a time limit is reached, the returned solution can be feasible but not proven optimal.
  • time_limit_sec is a solver-independent option that works with any backend.

You can set optimality gap tolerances to accept a near-optimal solution and stop the solver earlier. Use:

  • relative_gap_tolerance to stop when the relative optimality gap is below a threshold.
  • absolute_gap_tolerance to stop when the absolute optimality gap is below a threshold.

Pass these as arguments to Problem.solve():

from relationalai.semantics import Float, Integer, Model
from relationalai.semantics.reasoners.prescriptive import Problem
from relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schema
Worker = m.Concept("Worker", identify_by={"id": Integer})
Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.
workers = m.data([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Chen"},
])
shifts = m.data([
{"id": 10, "name": "Morning"},
{"id": 20, "name": "Evening"},
])
availability = m.data([
{"worker_id": 1, "shift_id": 10},
{"worker_id": 2, "shift_id": 10},
{"worker_id": 2, "shift_id": 20},
{"worker_id": 3, "shift_id": 20},
])
costs = m.data([
{"worker_id": 1, "shift_id": 10, "cost": 9.0},
{"worker_id": 2, "shift_id": 10, "cost": 10.0},
{"worker_id": 2, "shift_id": 20, "cost": 8.0},
{"worker_id": 3, "shift_id": 20, "cost": 11.0},
])
required = m.data([
{"shift_id": 10, "required_workers": 1},
{"shift_id": 20, "required_workers": 1},
])
m.define(
Worker.new(workers.to_schema()),
Shift.new(shifts.to_schema()),
)
worker = Worker.filter_by(id=availability.worker_id)
shift = Shift.filter_by(id=availability.shift_id)
m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)
shift = Shift.filter_by(id=costs.shift_id)
m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)
m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship(
f"{Worker} is assigned to {Shift} if {Float:assigned}"
)
x = Float.ref("x")
p.solve_for(
Worker.x_assign(Shift, x),
populate=True,
name=["assign", Worker.id, Shift.id],
where=[Worker.available_shifts(Shift)],
type="bin",
lower=0,
upper=1,
)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)
p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")
p.minimize(
agg.sum(cost * x).where(
Worker.x_assign(Shift, x),
Worker.cost_for_shift(Shift, cost),
)
)
p.solve(
"highs",
relative_gap_tolerance=0.01,
)
print("status:", p.termination_status)
print("objective:", p.objective_value)
  • relative_gap_tolerance=0.01 asks the solver to stop once the relative optimality gap is below 1%. absolute_gap_tolerance works similarly, but with an absolute gap threshold.
  • p.termination_status is printed to check the status before accepting the solution.
  • Optimality gap tolerances are supported by some solvers, but not all. If a solver does not support the specified tolerance, it is ignored.

Solver backends have many options that you can tune to improve solve time or influence the returned solution. You can pass backend-specific options as keyword arguments to Problem.solve() and they will be passed through to the solver.

For example, you can pass Gurobi parameters as keyword arguments when you solve with "gurobi":

from relationalai.semantics import Float, Integer, Model
from relationalai.semantics.reasoners.prescriptive import Problem
from relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schema
Worker = m.Concept("Worker", identify_by={"id": Integer})
Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.
workers = m.data([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Chen"},
])
shifts = m.data([
{"id": 10, "name": "Morning"},
{"id": 20, "name": "Evening"},
])
availability = m.data([
{"worker_id": 1, "shift_id": 10},
{"worker_id": 2, "shift_id": 10},
{"worker_id": 2, "shift_id": 20},
{"worker_id": 3, "shift_id": 20},
])
costs = m.data([
{"worker_id": 1, "shift_id": 10, "cost": 9.0},
{"worker_id": 2, "shift_id": 10, "cost": 10.0},
{"worker_id": 2, "shift_id": 20, "cost": 8.0},
{"worker_id": 3, "shift_id": 20, "cost": 11.0},
])
required = m.data([
{"shift_id": 10, "required_workers": 1},
{"shift_id": 20, "required_workers": 1},
])
m.define(
Worker.new(workers.to_schema()),
Shift.new(shifts.to_schema()),
)
worker = Worker.filter_by(id=availability.worker_id)
shift = Shift.filter_by(id=availability.shift_id)
m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)
shift = Shift.filter_by(id=costs.shift_id)
m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)
m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship(
f"{Worker} is assigned to {Shift} if {Float:assigned}"
)
x = Float.ref("x")
p.solve_for(
Worker.x_assign(Shift, x),
populate=True,
name=["assign", Worker.id, Shift.id],
where=[Worker.available_shifts(Shift)],
type="bin",
lower=0,
upper=1,
)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)
p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")
p.minimize(
agg.sum(cost * x).where(
Worker.x_assign(Shift, x),
Worker.cost_for_shift(Shift, cost),
)
)
p.solve(
"gurobi",
MIPFocus=1,
Presolve=2,
Threads=0,
)
  • Raw solver parameters must be int, float, str, or bool values. This means they must be plain scalar configuration values, not PyRel objects like concept types or expression values. It also means you cannot pass structured values like lists or dictionaries.
  • PyRel emits a warning when you pass any backend-specific parameters. It does this because raw solver parameters may not be portable across solvers and can change meaning across backends.

When you need to debug a formulation or share a solver-native model with someone, request a text representation during solve. To request a text representation, pass print_format to Problem.solve() and read the result from p.printed_model:

from relationalai.semantics import Float, Integer, Model
from relationalai.semantics.reasoners.prescriptive import Problem
from relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schema
Worker = m.Concept("Worker", identify_by={"id": Integer})
Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.
workers = m.data([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Chen"},
])
shifts = m.data([
{"id": 10, "name": "Morning"},
{"id": 20, "name": "Evening"},
])
availability = m.data([
{"worker_id": 1, "shift_id": 10},
{"worker_id": 2, "shift_id": 10},
{"worker_id": 2, "shift_id": 20},
{"worker_id": 3, "shift_id": 20},
])
costs = m.data([
{"worker_id": 1, "shift_id": 10, "cost": 9.0},
{"worker_id": 2, "shift_id": 10, "cost": 10.0},
{"worker_id": 2, "shift_id": 20, "cost": 8.0},
{"worker_id": 3, "shift_id": 20, "cost": 11.0},
])
required = m.data([
{"shift_id": 10, "required_workers": 1},
{"shift_id": 20, "required_workers": 1},
])
m.define(
Worker.new(workers.to_schema()),
Shift.new(shifts.to_schema()),
)
worker = Worker.filter_by(id=availability.worker_id)
shift = Shift.filter_by(id=availability.shift_id)
m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)
shift = Shift.filter_by(id=costs.shift_id)
m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)
m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship(
f"{Worker} is assigned to {Shift} if {Float:assigned}"
)
x = Float.ref("x")
p.solve_for(
Worker.x_assign(Shift, x),
populate=True,
name=["assign", Worker.id, Shift.id],
where=[Worker.available_shifts(Shift)],
type="bin",
lower=0,
upper=1,
)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)
p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")
p.minimize(
agg.sum(cost * x).where(
Worker.x_assign(Shift, x),
Worker.cost_for_shift(Shift, cost),
)
)
p.solve(
"highs",
print_format="lp", # Request a printed model representation in LP format
)
print(p.printed_model)

You can also set print_only=True to print the model without solving:

from relationalai.semantics import Float, Integer, Model
from relationalai.semantics.reasoners.prescriptive import Problem
from relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
81 collapsed lines
# Declare the model's schema
Worker = m.Concept("Worker", identify_by={"id": Integer})
Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
# Define base facts.
workers = m.data([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Chen"},
])
shifts = m.data([
{"id": 10, "name": "Morning"},
{"id": 20, "name": "Evening"},
])
availability = m.data([
{"worker_id": 1, "shift_id": 10},
{"worker_id": 2, "shift_id": 10},
{"worker_id": 2, "shift_id": 20},
{"worker_id": 3, "shift_id": 20},
])
costs = m.data([
{"worker_id": 1, "shift_id": 10, "cost": 9.0},
{"worker_id": 2, "shift_id": 10, "cost": 10.0},
{"worker_id": 2, "shift_id": 20, "cost": 8.0},
{"worker_id": 3, "shift_id": 20, "cost": 11.0},
])
required = m.data([
{"shift_id": 10, "required_workers": 1},
{"shift_id": 20, "required_workers": 1},
])
m.define(
Worker.new(workers.to_schema()),
Shift.new(shifts.to_schema()),
)
worker = Worker.filter_by(id=availability.worker_id)
shift = Shift.filter_by(id=availability.shift_id)
m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)
shift = Shift.filter_by(id=costs.shift_id)
m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)
m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship(
f"{Worker} is assigned to {Shift} if {Float:assigned}"
)
x = Float.ref("x")
p.solve_for(
Worker.x_assign(Shift, x),
populate=True,
name=["assign", Worker.id, Shift.id],
where=[Worker.available_shifts(Shift)],
type="bin",
lower=0,
upper=1,
)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
assigned_per_worker = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Worker)
p.satisfy(m.require(assigned_per_worker <= 1))
cost = Float.ref("cost")
p.minimize(
agg.sum(cost * x).where(
Worker.x_assign(Shift, x),
Worker.cost_for_shift(Shift, cost),
)
)
p.solve(
"highs",
print_only=True, # Do not run the solver, just print the model
print_format="lp",
)
print(p.printed_model)

Supported print_format values are:

  • "moi"
  • "latex"
  • "mof"
  • "lp"
  • "mps"
  • "nl"

Check error details when termination status is not OPTIMAL

Section titled “Check error details when termination status is not OPTIMAL”

If the solver does not prove optimality, p.termination_status tells you what happened. When results are imported, p.error can include backend-provided diagnostic text that helps you decide what to try next.

Use the following pattern after p.solve(...) to check the status and print p.error when it is available:

from relationalai.semantics import Float, Integer, Model
from relationalai.semantics.reasoners.prescriptive import Problem
from relationalai.semantics.std import aggregates as agg
m = Model("ShiftAssignment")
69 collapsed lines
Worker = m.Concept("Worker", identify_by={"id": Integer})
Shift = m.Concept("Shift", identify_by={"id": Integer})
Worker.available_shifts = m.Relationship(f"{Worker} is available for {Shift}")
Worker.cost_for_shift = m.Relationship(f"{Worker} working {Shift} has cost {Float:cost}")
Shift.required_workers = m.Property(f"{Shift} requires {Integer:n} workers")
workers = m.data([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
])
shifts = m.data([
{"id": 10, "name": "Morning"},
])
availability = m.data([
{"worker_id": 1, "shift_id": 10},
{"worker_id": 2, "shift_id": 10},
])
costs = m.data([
{"worker_id": 1, "shift_id": 10, "cost": 9.0},
{"worker_id": 2, "shift_id": 10, "cost": 10.0},
])
required = m.data([
{"shift_id": 10, "required_workers": 1},
])
m.define(
Worker.new(workers.to_schema()),
Shift.new(shifts.to_schema()),
)
worker = Worker.filter_by(id=availability.worker_id)
shift = Shift.filter_by(id=availability.shift_id)
m.define(worker.available_shifts(shift))
worker = Worker.filter_by(id=costs.worker_id)
shift = Shift.filter_by(id=costs.shift_id)
m.define(worker.cost_for_shift(shift, costs.cost))
shift = Shift.filter_by(id=required.shift_id)
m.define(shift.required_workers(required.required_workers))
p = Problem(m, Float)
Worker.x_assign = m.Relationship(
f"{Worker} is assigned to {Shift} if {Float:assigned}"
)
x = Float.ref("x")
p.solve_for(
Worker.x_assign(Shift, x),
populate=True,
name=["assign", Worker.id, Shift.id],
where=[Worker.available_shifts(Shift)],
type="bin",
lower=0,
upper=1,
)
assigned_per_shift = agg.sum(x).where(Worker.x_assign(Shift, x)).per(Shift)
p.satisfy(m.require(assigned_per_shift == Shift.required_workers))
cost = Float.ref("cost")
p.minimize(
agg.sum(cost * x).where(
Worker.x_assign(Shift, x),
Worker.cost_for_shift(Shift, cost),
)
)
p.solve("highs")
print("termination status:", p.termination_status)
if p.termination_status != "OPTIMAL":
print("error:", p.error)
  • Only treat the solution as optimal when p.termination_status == "OPTIMAL".
  • Use p.error as best-effort diagnostic text when the solve returns a non-OPTIMAL status.
  • p.error is only available after a solve where results were successfully imported into the Problem. Accessing it before that can raise AttributeError.
  • p.error can be None even after a solve returns.
  • If Problem.solve() raises before results import completes, p.error is typically not available. Use the raised exception details and retry with log_to_console=True to capture solver logs.

When Problem.solve() raises an exception, the most useful thing you can do first is capture logs. To stream solver logs to your console, pass log_to_console=True.

In particular, Problem.solve() can raise:

  • ValueError if you call solve() before declaring any decision variables.
  • RuntimeError if the solve job fails, or if the Problem is in a degraded state.
  • TimeoutError if the solve job doesn’t complete within the timeout.

Here’s an example of how to capture logs and retry on failure:

from relationalai.semantics import Float, Model
from relationalai.semantics.reasoners.prescriptive import Problem
def build_problem(model: Model) -> Problem:
p = Problem(model, Float)
# Declare decision variables, add requirements,
# and optionally add an objective.
return p
m = Model("MyDecisionProblem")
p = build_problem(m)
try:
p.solve("highs", log_to_console=True)
except (RuntimeError, TimeoutError):
# Rebuild and retry so you don't reuse a partially-failed Problem instance.
p = build_problem(m)
p.solve("highs", log_to_console=True)
  • If you only want logs on failure, solve once without logs and then retry with log_to_console=True.
  • If a solve completes but you suspect it terminated early, inspect p.termination_status and use the metadata fields in the previous section.

Use this table to troubleshoot common issues when solving a decision problem:

SymptomLikely causeWhat to try
Solve fails because no decision variables existYou didn’t call solve_for() or your where=[...] scope matches nothing.Declare at least one variable and verify p.num_variables > 0.
Solution is OPTIMAL but everything is 0You added an objective function but need a forcing constraint.Add a coverage or demand constraint like sum(x) == required or sum(x) >= demand.
The solve is slow or very largeVariable scope is too broad or missing a join.Tighten where=[...] and avoid Cartesian products.
The solve is infeasibleRequirements conflict, bounds are too tight, or joins don’t match real data.Inspect constraints with p.display() and verify your input facts match the intended scope.