libstp.step.timeout

Classes

Timeout

A step that executes another step with a timeout limit.

Functions

timeout(→ Timeout)

Wrap a step with a time limit, cancelling it if it runs too long.

Module Contents

class libstp.step.timeout.Timeout(step: libstp.step.Step, timeout_seconds: float | int)

Bases: libstp.step.Step

A step that executes another step with a timeout limit. Provides handlers for different outcomes (completion, timeout, error).

step
timeout_seconds
result = None
libstp.step.timeout.timeout(step: libstp.step.StepProtocol, seconds: float) Timeout

Wrap a step with a time limit, cancelling it if it runs too long.

Executes the given step normally but enforces a maximum wall-clock duration. If the wrapped step completes within the budget, the timeout step finishes successfully. If the step exceeds the time limit, it is cancelled via asyncio.wait_for and an error is logged. Any exception raised by the wrapped step propagates normally.

This is especially useful around blocking steps like motor_move_to or wait_for_button that could stall indefinitely if the hardware misbehaves.

Parameters:
  • step – The step to execute under a time constraint. Must be a valid Step (or StepProtocol) instance.

  • seconds – Maximum allowed execution time in seconds. Must be positive.

Returns:

A Timeout step that wraps the original step with the given time limit.

Example:

from libstp.step import timeout
from libstp.step.motor import motor_move_to

# Give the arm 5 seconds to reach position 300; cancel if stuck
timeout(
    motor_move_to(robot.motor(2), position=300, velocity=800),
    seconds=5.0,
)

# Ensure operator presses button within 30 seconds
timeout(wait_for_button(), seconds=30.0)