#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
#
# Please user PEP 0008 -- "Style Guide for Python Code" to format code
# https://www.python.org/dev/peps/pep-0008/
"""
Handles interaction with the data model(s). Adding, removing and
retrieving nodes are all done via this object. It is possible to
handle several models by using register_model and select_model.
"""
import json
import logging
import jsonpickle
from mxcubecore import HardwareRepository as HWR
from mxcubecore import queue_entry
from mxcubecore.BaseHardwareObjects import HardwareObject
from mxcubecore.model import queue_model_objects
class Serializer(object):
@staticmethod
def serialize(object):
return json.dumps(object, default=lambda o: o.__dict__.values()[0])
[docs]class QueueModel(HardwareObject):
def __init__(self, name):
HardwareObject.__init__(self, name)
self._ispyb_model = queue_model_objects.RootNode()
self._ispyb_model._node_id = 0
self._free_pin_model = queue_model_objects.RootNode()
self._free_pin_model._node_id = 0
self._plate_model = queue_model_objects.RootNode()
self._plate_model._node_id = 0
self._models = {
"ispyb": self._ispyb_model,
"free-pin": self._free_pin_model,
"plate": self._plate_model,
}
self._selected_model = self._ispyb_model
def __getstate__(self):
d = dict(self.__dict__)
return d
def __setstate__(self, d):
self.__dict__.update(d)
# Framework-2 method, inherited from HardwareObject and called
# by the framework after the object has been initialized.
[docs] def init(self):
"""
Framework-2 method, inherited from HardwareObject and called
by the framework after the object has been initialized.
You should normally not need to call this method.
"""
pass
[docs] def select_model(self, name):
"""
Selects the model with the name <name>
:param name: The name of the model to select.
:type name: str
:returns: None
:rtype: NoneType
"""
self._selected_model = self._models[name]
HWR.beamline.queue_manager.clear()
self._re_emit(self._selected_model)
[docs] def get_model_root(self):
"""
:returns: The selected model root.
:rtype: TaskNode
"""
return self._selected_model
[docs] def clear_model(self, name=None):
"""
Clears the model with name <name>, clears all if name is None
:param name: The name of the model to clear.
:type name: str
:returns: None
:rtype: NoneType
"""
self._models[name] = queue_model_objects.RootNode()
if not name:
for name in self._models.keys():
self._models[name] = queue_model_objects.RootNode()
HWR.beamline.queue_manager.clear()
[docs] def register_model(self, name, root_node):
"""
Register a new model with name <name> and root node <root_node>.
:param name: The name of the 'new' model.
:type name: str
:param root_node: The root of the model.
:type root_node: RootNode
:returns: None
:rtype: NoneType
"""
if name in self._models:
raise KeyError("The key %s is already registered" % name)
else:
self._models[name]
def _re_emit(self, parent_node):
"""
Re-emits the 'child_added' for all the nodes in the model.
"""
for child_node in parent_node.get_children():
self.emit("child_added", (parent_node, child_node))
self._re_emit(child_node)
[docs] def add_child(self, parent, child):
"""
Adds the child node <child>. Raises the exception TypeError
if child is not of type TaskNode.
Moves the child (re-parents it) if it already has a parent.
:param child: TaskNode to add
:type child: TaskNode
:returns: None
:rtype: None
"""
if True:
# if isinstance(child, queue_model_objects.TaskNode):
self._selected_model._total_node_count += 1
child._parent = parent
child._node_id = self._selected_model._total_node_count
parent._children.append(child)
child._set_name(child._name)
self.emit("child_added", (parent, child))
else:
raise TypeError("Expected type TaskNode, got %s " % str(type(child)))
[docs] def add_child_at_id(self, _id, child):
"""
Adds a child <child> at the node with the node id <_id>
:param _id: The id of the parent node.
:type _id: int
:param child: The child node to add.
:type child: TaskNode
:returns: The id of the child.
:rtype: int
"""
parent = self.get_node(_id)
self.add_child(parent, child)
return child._node_id
[docs] def get_node(self, _id, parent=None):
"""
Retrieves the node with the node id <_id>
:param _id: The id of the node to retrieve.
:type _id: int
:param parent: parent node to search in.
:type parent: TaskNode
:returns: The node with the id <_id>
:rtype: TaskNode
"""
if parent is None:
parent = self._selected_model
for node in parent._children:
if node._node_id == _id:
return node
else:
result = self.get_node(_id, node)
if result:
return result
[docs] def del_child(self, parent, child):
"""
Removes <child>
:param child: Child to remove.
:type child: TaskNode
:returns: None
:rtype: None
"""
if child in parent._children:
parent._children.remove(child)
self.emit("child_removed", (parent, child))
def _detach_child(self, parent, child):
"""
Detaches the child <child>
:param child: Child to detach.
:type child: TaskNode
:returns: None
:rtype: None
"""
child = parent._children.pop(child)
return child
[docs] def set_parent(self, parent, child):
"""
Sets the parent of the child <child> to <parent>
:param parent: The parent.
:type parent: TaskNode Object
:param child: The child
:type child: TaskNode Object
"""
if child._parent:
self._detach_child(parent, child)
child.set_parent(parent)
else:
child._parent = parent
[docs] def view_created(self, view_item, task_model):
"""
Method that should be called by the routine that adds
the view <view_item> for the model <task_model>
:param view_item: The view item that was added.
:type view_item: ViewItem
:param task_model: The associated task model.
:type task_model: TaskModel
:returns: None
:rtype: None
"""
view_item._data_model = task_model
cls = queue_entry.MODEL_QUEUE_ENTRY_MAPPINGS[task_model.__class__]
qe = cls(view_item, task_model)
# view_item.setText(0, task_model.get_name())
# if isinstance(task_model, queue_model_objects.Sample) or \
# isinstance(task_model, queue_model_objects.TaskGroup):
# view_item.setText(0, task_model.get_name())
# else:
view_item.setText(0, task_model.get_display_name())
view_item.setOn(task_model.is_enabled())
if isinstance(task_model, queue_model_objects.Sample):
HWR.beamline.queue_manager.enqueue(qe)
elif not isinstance(task_model, queue_model_objects.Basket):
# else:
view_item.parent().get_queue_entry().enqueue(qe)
view_item.update_tool_tip()
[docs] def get_next_run_number(self, new_path_template, exclude_current=True):
"""
Iterates through all the path templates of the tasks
in the model and returns the next available run number
for the path template <new_path_template>.
:param new_path_template: PathTempalte to match with.
:type new_path_template: PathTemplate
:param exclude_current: Skips it self when iterating through
the model, default Tree.
:type exlcude_current: bool
:returns: The next available run number for the given path_template.
:rtype: int
"""
all_path_templates = self.get_path_templates()
conflicting_path_templates = [0]
for pt in all_path_templates:
if exclude_current:
if pt[1] is not new_path_template:
if pt[1] == new_path_template:
conflicting_path_templates.append(pt[1].run_number)
else:
if pt[1] == new_path_template:
conflicting_path_templates.append(pt[1].run_number)
return max(conflicting_path_templates) + 1
[docs] def get_path_templates(self):
"""
Retrieves a list of all the path templates in the model.
"""
return self._get_path_templates_rec(self.get_model_root())
def _get_path_templates_rec(self, parent_node):
"""
Recursive part of get_path_templates.
"""
path_template_list = []
for child_node in parent_node.get_children():
path_template = child_node.get_path_template()
if path_template:
path_template_list.append((child_node, path_template))
child_path_template_list = self._get_path_templates_rec(child_node)
if child_path_template_list:
path_template_list.extend(child_path_template_list)
return path_template_list
[docs] def check_for_path_collisions(self, new_path_template):
"""
Returns True if there is a path template (task) in the model,
that produces the same files as this one.
:returns: True if there is a potential path collision.
"""
result = False
path_template_list = self.get_path_templates()
for pt in path_template_list:
if pt[1] is not new_path_template:
if new_path_template.intersection(pt[1]):
result = True
return result
[docs] def copy_node(self, node):
"""
Copies the node <node> and returns it.
:param node: The node to copy.
:type node: TaskModel
:returns: A copy of the node.
:rtype: TaskModel
"""
new_node = node.copy()
if new_node.get_path_template():
pt = new_node.get_path_template()
new_run_number = self.get_next_run_number(pt)
pt.run_number = new_run_number
new_node.set_number(new_run_number)
# We do not copy grid object, but keep a link to the original grid
if hasattr(new_node, "grid"):
new_node.grid = node.grid
new_node.set_executed(False)
return new_node
def get_nodes(self):
node_list = []
def get_nodes_list(entry):
for child in entry._children:
node_list.append(child)
get_nodes_list(child)
for qe in self._selected_model._children:
get_nodes_list(qe)
return node_list
def get_all_queue_entries(self):
node_list = []
def get_nodes_list(entry):
for child in entry._queue_entry_list:
node_list.append(child)
get_nodes_list(child)
for qe in HWR.beamline.queue_manager._queue_entry_list:
get_nodes_list(qe)
return node_list
def get_all_dc_queue_entries(self):
result = []
for item in self.get_all_queue_entries():
if isinstance(item, queue_entry.DataCollectionQueueEntry):
result.append(item)
return result
[docs] def save_queue(self, filename=None):
"""Saves queue in the file. Current selected model is saved as a list
of dictionaries. Information about samples and baskets is not saved
"""
if not filename:
# filename = os.path.join(self.user_file_directory, "queue_active.dat")
filename = "queue_active.dat"
items_to_save = []
selected_model = ""
for key in self._models:
if self._selected_model == self._models[key]:
selected_model = key
queue_entry_list = HWR.beamline.queue_manager.get_queue_entry_list()
for item in queue_entry_list:
# On the top level is Sample or Basket
if isinstance(item, queue_entry.SampleQueueEntry):
for task_item in item.get_queue_entry_list():
task_item_dict = {
"sample_location": item.get_data_model().location,
"task_group_entry": jsonpickle.encode(
task_item.get_data_model()
),
}
items_to_save.append(task_item_dict)
save_file = None
try:
save_file = open(filename, "w")
save_file.write(repr((selected_model, items_to_save)))
except Exception:
logging.getLogger().exception(
"Unable to save queue " + "in file %s", filename
)
if save_file:
save_file.close()
def get_queue_as_json_list(self):
items_to_save = []
selected_model = ""
for key in self._models:
if self._selected_model == self._models[key]:
selected_model = key
queue_entry_list = HWR.beamline.queue_manager.get_queue_entry_list()
for item in queue_entry_list:
# On the top level is Sample or Basket
if isinstance(item, queue_entry.SampleQueueEntry):
for task_item in item.get_queue_entry_list():
task_item_dict = {
"sample_location": item.get_data_model().location,
# "task_group_entry": Serializer.serialize(task_item.get_data_model())}
# "task_group_entry" : jsonpickle.encode(task_item.get_data_model())}
"task_group_entry": json.dumps(task_item.get_data_model()),
}
items_to_save.append(task_item_dict)
return selected_model, items_to_save
def load_queue_from_json_list(self, queue_list, snapshot):
# Prepare list of samplesL
sample_dict = {}
for item in HWR.beamline.queue_manager.get_queue_entry_list():
if isinstance(item, queue_entry.SampleQueueEntry):
sample_data_model = item.get_data_model()
sample_dict[sample_data_model.location] = sample_data_model
elif isinstance(item, queue_entry.BasketQueueEntry):
for sample_item in item.get_queue_entry_list():
sample_data_model = sample_item.get_data_model()
sample_dict[sample_data_model.location] = sample_data_model
if len(queue_list) > 0:
try:
for task_group_item in queue_list:
task_group_entry = json.load(task_group_item["task_group_entry"])
self.add_child(
sample_dict[task_group_item["sample_location"]],
task_group_entry,
)
for child in task_group_entry.get_children():
child.set_snapshot(snapshot)
self.log.info("Queue loading done")
except Exception:
self.log.exception("Unable to load queue")
[docs] def load_queue_from_file(self, filename, snapshot=None):
"""Loads queue from file. The problem is snapshots that are
not stored in the file, so we have to add new ones in
the loading process
:returns: model name 'free-pin', 'ispyb' or 'plate'
"""
self.log.info("Loading queue from file %s" % filename)
load_file = None
try:
# Read file and clear the model
load_file = open(filename, "r")
decoded_file = eval(load_file.read())
self.select_model(decoded_file[0])
# Prepare list of samples
sample_dict = {}
for item in HWR.beamline.queue_manager.get_queue_entry_list():
if isinstance(item, queue_entry.SampleQueueEntry):
sample_data_model = item.get_data_model()
sample_dict[sample_data_model.location] = sample_data_model
elif isinstance(item, queue_entry.BasketQueueEntry):
for sample_item in item.get_queue_entry_list():
sample_data_model = sample_item.get_data_model()
sample_dict[sample_data_model.location] = sample_data_model
if len(decoded_file[1]) > 0:
for task_group_item in decoded_file[1]:
task_group_entry = jsonpickle.decode(
task_group_item["task_group_entry"]
)
self.add_child(
sample_dict[task_group_item["sample_location"]],
task_group_entry,
)
for child in task_group_entry.get_children():
child.set_snapshot(snapshot)
self.log.info("Queue loading done")
else:
self.log.info("No queue content available in file")
return decoded_file[0]
except Exception:
self.log.exception("Unable to load queue " + "from file %s", filename)
if load_file:
load_file.close()