marimo-learn / queueing /10_priority_starvation.py
Greg Wilson
feat: hiding a few cells
8eccedb
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "altair",
# "asimpy",
# "marimo",
# "polars==1.24.0",
# ]
# ///
import marimo
__generated_with = "0.20.4"
app = marimo.App(width="medium")
@app.cell(hide_code=True)
def _():
import marimo as mo
import random
import statistics
import altair as alt
import polars as pl
from asimpy import Environment, Process, Queue
return Environment, Process, Queue, alt, mo, pl, random, statistics
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
# Priority Starvation
## *When High-Priority Traffic Crowds Out Low-Priority Jobs*
A single server processes two job classes:
- High-priority jobs (class H) arrive frequently and are served quickly.
- Low-priority jobs (class L) arrive rarely and take longer to serve.
The server always picks the highest-priority job available. Total server utilization $\rho = \rho_H + \rho_L < 1$, so the server has spare capacity on average. Yet low-priority jobs can wait far longer than the utilization level suggests they should.
### Static Priority: Starvation at Moderate Load
With a static priority queue, high-priority jobs *never* yield to low-priority ones. Even when $\rho_H < 1$, high-priority bursts can lock out low-priority jobs for extended periods. The mean wait for low-priority jobs under a static non-preemptive priority queue is:
$$W_L = \frac{\overline{s}_L}{1-\rho_H} \cdot \frac{1}{1-\rho_H-\rho_L}$$
This diverges as $\rho_H \to 1$ independently of $\rho_L$. As $\rho_H$ approaches 100%, low-priority jobs wait arbitrarily long, even if only a few low-priority jobs ever arrive.
### Aging: Solving Starvation Creates Oscillation
The standard remedy for starvation is *priority aging*: a waiting job's priority improves over time until it eventually beats even high-priority arrivals. This guarantees finite wait for all jobs.
However, aging introduces a new pathology. When aged low-priority jobs finally burst through, they occupy the server and leave a backlog of high-priority jobs waiting. The high-priority queue then drains, and the cycle repeats — producing oscillating bursts rather than smooth, uniform service.
### Intuition
Suppose H jobs arrive in random bursts. During a burst, the server never pauses for L jobs. An L job unlucky enough to arrive at the start of a long burst must wait for every H job in that burst to be served before getting its turn. As bursts grow more frequent (larger $\rho_H$), the expected burst length grows, and with it the expected wait for that unlucky L job. The math confirms: starvation is a real risk at moderate $\rho_H$, not just at extreme loads.
### What aging does
Aging assigns each waiting L job a maximum patience time $T_{\max}$. After waiting $T_{\max}$, the job is promoted to high priority. This caps the worst-case wait: no L job can wait longer than $T_{\max}$ plus one service time. Mathematically, the effective $W_L$ is bounded by $T_{\max} + 1/\mu_L$.
## Practical Implications
Priority queues appear throughout computing:
- OS scheduling: interactive processes (high priority) vs. batch jobs (low priority). Linux uses dynamic priority aging (nice values + sleep bonuses) to avoid starvation.
- Network QoS: real-time traffic (VoIP, video) vs. bulk data. Traffic shaping with Deficit Round Robin (DRR) or Weighted Fair Queuing (WFQ) guarantees bandwidth shares without starvation.
- Database query planning: short OLTP queries vs. long OLAP queries. Resource groups and query timeouts implement a form of aging.
""")
return
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
## Implementation
Two runs are compared:
1. Static priority: H jobs are inserted as `(0, ...)` and L jobs as `(1, ...)` into a `Queue(priority=True)`. The server always picks the smallest key first, so H jobs are always served before L jobs.
2. Aging: an `Ager` process wakes up every `AGING_INTERVAL` time units, inspects waiting L jobs, and promotes sufficiently old ones by reducing their priority key until it falls below the H threshold and they move into the server's feed queue.
""")
return
@app.cell(hide_code=True)
def _(mo):
sim_time_slider = mo.ui.slider(
start=0,
stop=100_000,
step=1_000,
value=20_000,
label="Simulation time",
)
aging_threshold_slider = mo.ui.slider(
start=1.0,
stop=60.0,
step=1.0,
value=15.0,
label="Aging threshold",
)
seed_input = mo.ui.number(
value=192,
step=1,
label="Random seed",
)
run_button = mo.ui.run_button(label="Run simulation")
mo.vstack([
sim_time_slider,
aging_threshold_slider,
seed_input,
run_button,
])
return aging_threshold_slider, seed_input, sim_time_slider
@app.cell
def _(aging_threshold_slider, seed_input, sim_time_slider):
SIM_TIME = int(sim_time_slider.value)
AGING_THRESHOLD = float(aging_threshold_slider.value)
SEED = int(seed_input.value)
SERVICE_RATE_HI = 2.0
SERVICE_RATE_LO = 1.0
ARRIVAL_RATE_LO = 0.2
return (
AGING_THRESHOLD,
ARRIVAL_RATE_LO,
SEED,
SERVICE_RATE_HI,
SERVICE_RATE_LO,
SIM_TIME,
)
@app.cell
def _(Process):
class StaticPriorityServer(Process):
def init(self, hi_q, lo_q, sojourn_hi, sojourn_lo):
self.hi_q = hi_q
self.lo_q = lo_q
self.sojourn_hi = sojourn_hi
self.sojourn_lo = sojourn_lo
async def _serve(self, arrival, svc, record):
await self.timeout(svc)
record.append(self.now - arrival)
async def run(self):
while True:
if not self.hi_q.is_empty():
arrival, svc = await self.hi_q.get()
await self._serve(arrival, svc, self.sojourn_hi)
elif not self.lo_q.is_empty():
arrival, svc = await self.lo_q.get()
await self._serve(arrival, svc, self.sojourn_lo)
else:
await self.timeout(0.01)
return (StaticPriorityServer,)
@app.cell
def _(AGING_THRESHOLD, Process):
class AgingServer(Process):
def init(self, hi_q, lo_q, sojourn_hi, sojourn_lo):
self.hi_q = hi_q
self.lo_q = lo_q
self.sojourn_hi = sojourn_hi
self.sojourn_lo = sojourn_lo
async def run(self):
while True:
lo_aged = (
not self.lo_q.is_empty()
and self.now - self.lo_q._items[0][0] >= AGING_THRESHOLD
)
if lo_aged:
arrival, svc = await self.lo_q.get()
await self.timeout(svc)
self.sojourn_lo.append(self.now - arrival)
elif not self.hi_q.is_empty():
arrival, svc = await self.hi_q.get()
await self.timeout(svc)
self.sojourn_hi.append(self.now - arrival)
elif not self.lo_q.is_empty():
arrival, svc = await self.lo_q.get()
await self.timeout(svc)
self.sojourn_lo.append(self.now - arrival)
else:
await self.timeout(0.01)
return (AgingServer,)
@app.cell
def _(Process, SERVICE_RATE_HI, random):
class HiSource(Process):
def init(self, rate, q):
self.rate = rate
self.q = q
async def run(self):
while True:
await self.timeout(random.expovariate(self.rate))
svc = random.expovariate(SERVICE_RATE_HI)
await self.q.put((self.now, svc))
return (HiSource,)
@app.cell
def _(ARRIVAL_RATE_LO, Process, SERVICE_RATE_LO, random):
class LoSource(Process):
def init(self, q):
self.q = q
async def run(self):
while True:
await self.timeout(random.expovariate(ARRIVAL_RATE_LO))
svc = random.expovariate(SERVICE_RATE_LO)
await self.q.put((self.now, svc))
return (LoSource,)
@app.cell
def _(
AgingServer,
Environment,
HiSource,
LoSource,
Queue,
SIM_TIME,
StaticPriorityServer,
statistics,
):
def simulate(arrival_rate_hi, use_aging):
env = Environment()
hi_q = Queue(env)
lo_q = Queue(env)
sojourn_hi = []
sojourn_lo = []
HiSource(env, arrival_rate_hi, hi_q)
LoSource(env, lo_q)
if use_aging:
AgingServer(env, hi_q, lo_q, sojourn_hi, sojourn_lo)
else:
StaticPriorityServer(env, hi_q, lo_q, sojourn_hi, sojourn_lo)
env.run(until=SIM_TIME)
return sojourn_hi, sojourn_lo
def mean_or_none(lst):
return statistics.mean(lst) if lst else None
def pct_or_none(lst, p):
if not lst:
return None
return sorted(lst)[int(p * len(lst))]
return mean_or_none, pct_or_none, simulate
@app.cell
def _(
ARRIVAL_RATE_LO,
SEED,
SERVICE_RATE_HI,
SERVICE_RATE_LO,
mean_or_none,
pl,
random,
simulate,
):
def sweep():
sweep_rows = []
for rho_hi in [0.10, 0.20, 0.40, 0.60, 0.70, 0.80]:
rate_hi = rho_hi * SERVICE_RATE_HI
hi, lo = simulate(rate_hi, use_aging=False)
rho_total = rho_hi + ARRIVAL_RATE_LO / SERVICE_RATE_LO
sweep_rows.append({
"rho_hi": rho_hi,
"rho_total": rho_total,
"mean_W_hi": mean_or_none(hi),
"mean_W_lo": mean_or_none(lo),
})
return pl.DataFrame(sweep_rows)
random.seed(SEED)
df_sweep = sweep()
return (df_sweep,)
@app.cell
def _(
ARRIVAL_RATE_LO,
SERVICE_RATE_HI,
SERVICE_RATE_LO,
mean_or_none,
pct_or_none,
pl,
simulate,
):
FIXED_RHO_HI = 0.70
rho_total = FIXED_RHO_HI + ARRIVAL_RATE_LO / SERVICE_RATE_LO
def compare():
rate_hi = FIXED_RHO_HI * SERVICE_RATE_HI
hi_static, lo_static = simulate(rate_hi, use_aging=False)
hi_aging, lo_aging = simulate(rate_hi, use_aging=True)
compare_rows = [
{
"policy": "static", "class": "hi", "n": len(hi_static),
"mean_W": mean_or_none(hi_static),
"p95": pct_or_none(hi_static, 0.95),
"p99": pct_or_none(hi_static, 0.99),
},
{
"policy": "static", "class": "lo", "n": len(lo_static),
"mean_W": mean_or_none(lo_static),
"p95": pct_or_none(lo_static, 0.95),
"p99": pct_or_none(lo_static, 0.99),
},
{
"policy": "aging", "class": "hi", "n": len(hi_aging),
"mean_W": mean_or_none(hi_aging),
"p95": pct_or_none(hi_aging, 0.95),
"p99": pct_or_none(hi_aging, 0.99),
},
{
"policy": "aging", "class": "lo", "n": len(lo_aging),
"mean_W": mean_or_none(lo_aging),
"p95": pct_or_none(lo_aging, 0.95),
"p99": pct_or_none(lo_aging, 0.99),
},
]
return pl.DataFrame(compare_rows)
df_compare = compare()
return (df_compare, FIXED_RHO_HI, rho_total,)
@app.cell(hide_code=True)
def _(AGING_THRESHOLD, ARRIVAL_RATE_LO, SERVICE_RATE_LO, mo):
mo.md(f"""
## Part 1 — Static Priority: Effect of Hi-Priority Load on Lo-Priority Wait
Lo-priority: arrival rate {ARRIVAL_RATE_LO}, mean service {1 / SERVICE_RATE_LO:.1f},
ρ_lo = {ARRIVAL_RATE_LO / SERVICE_RATE_LO:.2f}
Aging threshold: {AGING_THRESHOLD} time units
""")
return
@app.cell
def _(df_sweep):
df_sweep
return
@app.cell(hide_code=True)
def _(FIXED_RHO_HI, mo, rho_total):
mo.md(f"""
## Part 2 — Static vs. Aging at ρ_hi = {FIXED_RHO_HI:.2f}, ρ_total = {rho_total:.2f}
""")
return
@app.cell
def _(df_compare):
df_compare
return
@app.cell
def _(alt, df_compare, df_sweep):
df_plot = df_sweep.unpivot(
on=["mean_W_hi", "mean_W_lo"],
index=["rho_hi", "rho_total"],
variable_name="job_class",
value_name="mean_W",
)
sweep_chart = (
alt.Chart(df_plot)
.mark_line(point=True)
.encode(
x=alt.X("rho_hi:Q", title="Hi-priority utilization (ρ_hi)"),
y=alt.Y("mean_W:Q", title="Mean sojourn time (W)"),
color=alt.Color("job_class:N", title="Job class"),
tooltip=["rho_hi:Q", "job_class:N", "mean_W:Q"],
)
.properties(title="Priority Starvation: Effect of Hi-Priority Load")
)
compare_chart = (
alt.Chart(df_compare)
.mark_bar()
.encode(
x=alt.X("class:N", title="Job class"),
y=alt.Y("mean_W:Q", title="Mean sojourn time (W)"),
color=alt.Color("policy:N", title="Policy"),
xOffset="policy:N",
tooltip=["policy:N", "class:N", "mean_W:Q", "p99:Q"],
)
.properties(title="Static Priority vs. Aging")
)
(sweep_chart | compare_chart)
return
@app.cell(hide_code=True)
def _(mo):
mo.md(r"""
## Understanding the Math
### Mean wait for two-priority queues
Let $\lambda_i$, $\mu_i$, and $\rho_i = \lambda_i / \mu_i$ be the arrival rate, service rate, and utilization of class $i \in \{H, L\}$. For a non-preemptive priority queue:
$$W_H = \frac{R_0}{1 - \rho_H}$$
$$W_L = \frac{R_0}{(1 - \rho_H)(1 - \rho_H - \rho_L)}$$
where $R_0 = \tfrac{1}{2}(\lambda_H \overline{s_H^2} + \lambda_L \overline{s_L^2})$ is the mean residual work seen by an arriving customer. The ratio $W_L / W_H = 1/(1 - \rho_H)$ grows without bound as $\rho_H \to 1$.
### Utilization of each class
Let $\lambda_H$ be the arrival rate of high-priority jobs (H) and $\mu_H$ be their service rate. The utilization contributed by H jobs alone is $\rho_H = \lambda_H / \mu_H$ — the fraction of server time that H jobs would consume if they were the only class. Similarly, $\rho_L = \lambda_L / \mu_L$ for low-priority jobs. The total utilization is $\rho = \rho_H + \rho_L$. Requiring $\rho < 1$ means the server has enough capacity for both classes on average.
### Why "on average" is not enough
Even when $\rho < 1$, randomness creates bursts of H arrivals. During a burst, the server is continuously occupied by H jobs, and L jobs must wait in the background. The mean wait for low-priority jobs in a non-preemptive priority queue is:
$$W_L = \frac{R_0}{(1 - \rho_H)(1 - \rho_H - \rho_L)}$$
where $R_0$ is the mean residual work in the system when a job arrives. The critical observation is the factor $(1 - \rho_H)$ in the denominator. As $\rho_H \to 1$, this factor approaches zero and $W_L \to \infty$ — even if $\rho_L$ stays small and the total load $\rho$ is comfortably below 1.
### The trade-off
Without aging, $W_L$ can be infinite when $\rho_H$ is large. With aging, $W_L \leq T_{\max} + 1/\mu_L$, but during promotion events the effective $\rho_H$ spikes temporarily, increasing $W_H$. Choosing $T_{\max}$ is a design decision: a small $T_{\max}$ protects L jobs but forces more promotions and penalizes H jobs more often; a large $T_{\max}$ is kinder to H jobs but allows L jobs to wait longer. There is no setting that simultaneously minimizes both — the trade-off is fundamental.
""")
return
if __name__ == "__main__":
app.run()