step.resource

Hardware resource tracking and conflict detection for parallel step execution.

Every step declares which hardware resources it requires exclusive access to via required_resources(). Composite steps (Parallel, DoWhileActive) validate at construction time that no two concurrent branches claim the same resource. A lightweight runtime ResourceManager acts as a safety net for dynamically constructed steps (e.g. Defer) that cannot be checked statically.

Resource identifiers are plain strings with the format "<type>" or "<type>:<qualifier>":

“drive” — chassis drive system “motor:0” — motor on port 0 “servo:3” — servo on port 3 “servo:*” — all servo ports (used by FullyDisableServos)

Exceptions

ResourceConflictError

Raised when two concurrent steps require the same hardware resource.

Classes

ResourceManager

Non-blocking, fail-fast resource lock attached to the robot instance.

Functions

validate_no_overlap(→ None)

Check pairwise that no two branches claim the same resource.

get_resource_manager(→ ResourceManager)

Get or create the ResourceManager attached to robot.

Module Contents

exception step.resource.ResourceConflictError(resource: str, holder: str, requester: str)

Bases: Exception

Raised when two concurrent steps require the same hardware resource.

resource
holder
requester
step.resource.validate_no_overlap(branches: Sequence[_HasResources], context: str = 'Parallel') None

Check pairwise that no two branches claim the same resource.

Parameters:
  • branches – Sequence of objects with a required_resources() method.

  • context – Name used in the error message (e.g. "Parallel" or "DoWhileActive").

Raises:

ResourceConflictError – If any two branches share a resource.

class step.resource.ResourceManager

Non-blocking, fail-fast resource lock attached to the robot instance.

Since all steps run in a single asyncio event loop (cooperative multitasking), no real lock is needed — a simple dict suffices.

acquire(resources: frozenset[str], holder: str) None

Claim resources for holder. Raises on conflict.

release(resources: frozenset[str]) None

Release previously acquired resources.

step.resource.get_resource_manager(robot: object) ResourceManager

Get or create the ResourceManager attached to robot.