#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
import time
from datetime import datetime
import yaml
from mxcubecore.BaseHardwareObjects import HardwareObject
__author__ = "EMBL Hamburg"
__credits__ = ["MXCuBE collaboration"]
__version__ = "2.3."
[docs]class StateMachine(HardwareObject):
"""Finite State Machine (FSM) is a mathematical model of a closed or
opened loop discrete-event systems with well defined state.
It is wildly used to define functioning system and control their
execution. In the case of MX beamlines and MXCuBE FSM represents
different state where certain action from a user is requested.
It is possible to describe a sequence of user actions as a discrete
state that are logically connected.
For example, each MX experiment requires a crystal to be mounted on a
goniostat. If not crystal is mounted then it makes no sense to
continue an experiment.
The actual transition logic is implemented in the update_fsm_state()
"""
def __init__(self, name):
HardwareObject.__init__(self, name)
self.state_list = []
self.condition_list = None
self.transition_list = None
self.current_state = None
self.previous_state = ""
self.history_state_list = []
[docs] def init(self):
with open(self.get_property("structure_file"), "r") as stream:
data_loaded = yaml.load(stream)
self.state_list = data_loaded["states"]
self.condition_list = data_loaded["conditions"]
self.transition_list = data_loaded["transitions"]
self.current_state = data_loaded["initial_state"]
self.previous_state = data_loaded["initial_state"]
for condition in self.condition_list:
condition["value"] = False
if "desc" not in condition:
condition["desc"] = condition["name"].title().replace("_", " ")
for transition in self.transition_list:
if not self.get_state_by_name(transition["source"]):
self.log.error(
"Transition %s " % str(transition)
+ "has a none existing source state: %s" % transition["source"]
)
if not self.get_state_by_name(transition["dest"]):
self.log.error(
"Transition %s " % str(transition)
+ "has a none existing destination state: %s" % transition["dest"]
)
if "conditions_true" not in transition:
transition["conditions_true"] = []
if "conditions_false" not in transition:
transition["conditions_false"] = []
if "conditions_false_or" not in transition:
transition["conditions_false_or"] = []
for condition_name in transition["conditions_true"]:
if not self.get_condition_by_name(condition_name):
self.log.error(
"Transition %s " % str(transition)
+ "has a none existing condition: %s" % condition_name
)
for condition_name in transition["conditions_false"]:
if not self.get_condition_by_name(condition_name):
self.log.error(
"Transition %s " % str(transition)
+ "has a none existing condition: %s" % condition_name
)
for condition_name in transition["conditions_false_or"]:
if not self.get_condition_by_name(condition_name):
self.log.error(
"Transition %s " % str(transition)
+ "has a none existing condition: %s" % condition_name
)
self.update_fsm_state()
self.bl_setup_hwobj = self.get_object_by_role("beamline_setup")
for hwobj_name in dir(self.bl_setup_hwobj):
if hwobj_name.endswith("hwobj"):
# self.log.debug(\
# "StateMachine: Attaching hwobj: %s " % hwobj_name)
self.connect(
getattr(self.bl_setup_hwobj, hwobj_name),
"fsmConditionChanged",
self.condition_changed,
)
getattr(self.bl_setup_hwobj, hwobj_name).re_emit_values()
def get_state_by_name(self, state_name):
for state in self.state_list:
if state["name"] == state_name:
return state
def get_condition_by_name(self, condition_name):
for condition in self.condition_list:
if condition["name"] == condition_name:
return condition
[docs] def condition_changed(self, condition_name, value):
"""Event when condition of a hardware object has been changed"""
condition = self.get_condition_by_name(condition_name)
if condition:
# self.log.debug(\
# "StateMachine: condition '%s' changed to '%s'" \
# % (condition_name, value))
if condition["value"] != value:
condition["value"] = value
self.emit("conditionChanged", self.condition_list)
self.update_fsm_state()
else:
self.log.debug(
"StateMachine: condition '%s' not in the condition list"
% condition_name
)
[docs] def update_fsm_state(self):
"""Updates state machine
We look at the current state and available transitions from it
If all conditions of a transition is met then the transition is
executed and signal is emitted.
"""
for transition in self.transition_list:
if transition["source"] == self.current_state:
allow_transition = True
for cond_name in transition["conditions_true"]:
cond = self.get_condition_by_name(cond_name)
if cond["value"] is False:
allow_transition = False
for cond_name in transition["conditions_false"]:
cond = self.get_condition_by_name(cond_name)
if cond["value"] is True:
allow_transition = False
for cond_name in transition["conditions_false_or"]:
cond = self.get_condition_by_name(cond_name)
if cond["value"] is False:
allow_transition = True
if allow_transition:
self.current_state = transition["dest"]
break
if self.previous_state != self.current_state:
if self.history_state_list:
self.history_state_list[-1]["end_time"] = time.strftime(
"%Y-%m-%d %H:%M:%S"
)
self.history_state_list[-1]["total_time"] = str(
datetime.strptime(
self.history_state_list[-1]["end_time"], "%Y-%m-%d %H:%M:%S"
)
- datetime.strptime(
self.history_state_list[-1]["start_time"], "%Y-%m-%d %H:%M:%S"
)
)
history_state_item = {
"current_state": self.current_state,
"previous_state": self.previous_state,
"start_time": time.strftime("%Y-%m-%d %H:%M:%S"),
"end_time": "... ",
"total_time": "",
}
self.history_state_list.append(history_state_item)
self.previous_state = self.current_state
self.log.debug(
"StateMachine: current state " + "changed to : %s" % self.current_state
)
self.emit("stateChanged", self.history_state_list)
self.update_fsm_state()
[docs] def get_condition_list(self):
"""Returns list of conditions"""
return self.condition_list
[docs] def get_state_list(self):
"""Returns list of available state"""
return self.state_list
[docs] def get_transition_list(self):
"""Returns list of available transitions"""
return self.transition_list
[docs] def re_emit_values(self):
"""Reemits signals"""
if len(self.history_state_list):
self.emit("stateChanged", (self.history_state_list))