libstp.step.timeout¶
Classes¶
A step that executes another step with a timeout limit. |
Functions¶
|
Wrap a step with a time limit, cancelling it if it runs too long. |
Module Contents¶
- class libstp.step.timeout.Timeout(step: libstp.step.Step, timeout_seconds: float | int)¶
Bases:
libstp.step.StepA step that executes another step with a timeout limit. Provides handlers for different outcomes (completion, timeout, error).
- step¶
- timeout_seconds¶
- result = None¶
- libstp.step.timeout.timeout(step: libstp.step.StepProtocol, seconds: float) Timeout¶
Wrap a step with a time limit, cancelling it if it runs too long.
Executes the given step normally but enforces a maximum wall-clock duration. If the wrapped step completes within the budget, the timeout step finishes successfully. If the step exceeds the time limit, it is cancelled via
asyncio.wait_forand an error is logged. Any exception raised by the wrapped step propagates normally.This is especially useful around blocking steps like
motor_move_toorwait_for_buttonthat could stall indefinitely if the hardware misbehaves.- Parameters:
step – The step to execute under a time constraint. Must be a valid
Step(orStepProtocol) instance.seconds – Maximum allowed execution time in seconds. Must be positive.
- Returns:
A
Timeoutstep that wraps the original step with the given time limit.
Example:
from libstp.step import timeout from libstp.step.motor import motor_move_to # Give the arm 5 seconds to reach position 300; cancel if stuck timeout( motor_move_to(robot.motor(2), position=300, velocity=800), seconds=5.0, ) # Ensure operator presses button within 30 seconds timeout(wait_for_button(), seconds=30.0)