from scripts.sardine.acquisition.waits import Waits
from scripts.sardine.acquisition.action import Shoot, Recover, Deploy, AdditionalDelay
[docs]
class AcquisitionSequence:
def __init__(self):
self._sequence = []
self._waiting_point_reached = False
self._waits = Waits()
[docs]
def waiting_point_reached(self):
'''
Returns the waiting point reached
'''
return self._waiting_point_reached
[docs]
def add(self, action, dependencies=()):
'''
Appends the given action to the sequence
Adds the given dependencies and action to the waits
'''
self._sequence.append(action)
self._waits.add_dependencies(action, *dependencies)
[docs]
def mark_dependencies_met(self, *dependencies):
'''
Removes the waits with instanced dependencies using 'mark_dependencies_met' from waits.py
'''
self._waits.mark_dependencies_met(*dependencies)
[docs]
def first_action(self):
'''
Returns the first element of the sequence
Or an error if there is none
'''
try:
return self._sequence[0]
except IndexError:
raise EmptyAcquisitionSequence
[docs]
def read_next_actions_ignoring_dependencies_and_extra_wait(self, count):
'''
This function's purpose is to peek ahead at the next count non-delay actions in the sequence (self._sequence), without removing them or considering dependency constraints.
If there is no sequence, it returns None
Creates an empty 'actions'
If there is a sequence, it loops through it:
- If the action is a delay: continues (skips it)
- If there is an action (and it's a delay): checks if the action differs to the previous one; if it does, it returns 'actions' (so stops)
- Appends the action to the list of actions
- When the length f the actions equals the given count, it stops (break)
Crops the sequence to the length of 'actions'
Returns 'actions'
'''
if not self._sequence:
return None
actions = []
i = 0
while True:
try:
action = self._sequence[i]
is_delay = (isinstance(action, AdditionalDelay))
if is_delay:
i += 1
continue
if actions:
try:
previous = next(a for a in reversed(actions))
previous_action_differs = (type(action) != type(previous))
if previous_action_differs:
return actions
except StopIteration:
pass
actions.append(action)
if len(actions) == count:
break
except IndexError:
break
i += 1
return actions
[docs]
def pop_next_actions(self, count):
'''
This function's purpose is to pop the next sequence of actions (up to the given count), excluding delays (AdditionalDelay),
and stopping early if a waiting condition is encountered or a different action type appears. Let's see the steps:
If there is no sequence, it returns None
Creates an empty 'actions'
If there is a sequence, it loops through it:
- If the action is not a delay: stops (break) if we have collected 'count' non-delay actions
- If there is an action but it's not a delay: checks if the action differs to the previous one, and breaks if it does
- If a wait exists, it sets the waiting_point_reached to True
- Appends the action to the list of actions
Crops the sequence to the length of 'actions'
Returns 'actions'
'''
self._waiting_point_reached = False
if not self._sequence:
return None
actions = []
i = 0
while True:
try:
action = self._sequence[i]
is_delay = (isinstance(action, AdditionalDelay))
if not is_delay:
if len([a for a in actions if not isinstance(a, AdditionalDelay)]) == count:
break # enough activities collected, excluding any delays
if actions and not is_delay:
try:
previous_non_wait = next(a for a in reversed(actions) if not isinstance(a, AdditionalDelay))
previous_action_differs = (type(action) != type(previous_non_wait))
if previous_action_differs:
break
except StopIteration:
pass
if self._waits.exists(action):
self._waiting_point_reached = True
break
actions.append(action)
except IndexError:
break
i += 1
crop_index = len(actions)
self._sequence = self._sequence[crop_index:]
return actions
def __bool__(self):
'''
Returns True if there is a sequence, False if there is not
'''
if self._sequence:
return True
return False
def __iter__(self):
'''
Defines sequence as the element to iterate over the instance of this class
'''
return self._sequence.__iter__()
[docs]
class EmptyAcquisitionSequence(Exception):
'''
Depends on the class Exception
passes
'''
pass