A Coding Guide to Build a Production-Grade Background Task Processing System Using Huey with SQLite, Scheduling, Retries, Pipelines, and Concurrency Control
consumer = huey.create_consumer(
workers=4,
worker_type=WORKER_THREAD,
periodic=True,
initial_delay=0.1,
backoff=1.15,
max_delay=2.0,
scheduler_interval=1,
check_worker_health=True,
health_check_interval=10,
flush_locks=False,
)
consumer_thread = threading.Thread(target=consumer.run, daemon=True)
consumer_thread.start()
print("Consumer started (threaded).")
print("\nEnqueue basics...")
r1 = quick_add(10, 32)
r2 = slow_io(0.75)
print("quick_add result:", r1(blocking=True, timeout=5))
print("slow_io result:", r2(blocking=True, timeout=5))
print("\nRetries + priority demo (flaky task)...")
rf = flaky_network_call(p_fail=0.7)
try:
print("flaky_network_call result:", rf(blocking=True, timeout=10))
except Exception as e:
print("flaky_network_call failed even after retries:", repr(e))
print("\nContext task (task id inside payload)...")
rp = cpu_pi_estimate(samples=150_000)
print("pi payload:", rp(blocking=True, timeout=20))
print("\nLocks demo: enqueue multiple locked jobs quickly (should serialize)...")
locked_results = [locked_sync_job(tag=f"run{i}") for i in range(3)]
print([res(blocking=True, timeout=10) for res in locked_results])
print("\nScheduling demo: run slow_io in ~3 seconds...")
rs = slow_io.schedule(args=(0.25,), delay=3)
print("scheduled handle:", rs)
print("scheduled slow_io result:", rs(blocking=True, timeout=10))
print("\nRevoke demo: schedule a task in 5s then revoke before it runs...")
rv = slow_io.schedule(args=(0.1,), delay=5)
rv.revoke()
time.sleep(6)
try:
out = rv(blocking=False)
print("revoked task output:", out)
except Exception as e:
print("revoked task did not produce result (expected):", type(e).__name__, str(e)[:120])
print("\nPipeline demo...")
pipeline = (
fetch_number.s(123)
.then(transform_number, 5)
.then(store_result)
)
pipe_res = huey.enqueue(pipeline)
print("pipeline final result:", pipe_res(blocking=True, timeout=10))
print("\nStarting 15-second heartbeat demo for ~40 seconds...")
start_seconds_heartbeat(interval_sec=15)
time.sleep(40)
stop_seconds_heartbeat()
print("Stopped 15-second heartbeat demo.")
print_latest_events(12)
print("\nStopping consumer gracefully...")
consumer.stop(graceful=True)
consumer_thread.join(timeout=5)
print("Consumer stopped.")


