Source code for mxcubecore.HardwareObjects.DESY.P11DoorInterlock
# encoding: utf-8
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
__copyright__ = """Copyright The MXCuBE Collaboration"""
__license__ = "LGPLv3+"
import logging
import sys
import urllib.request
from mxcubecore.BaseHardwareObjects import HardwareObject
__credits__ = ["MXCuBE collaboration"]
[docs]class P11DoorInterlock(HardwareObject):
def __init__(self, name):
HardwareObject.__init__(self, name)
self.door_interlock_state = None
self.simulationMode = False
def connected(self):
self.set_is_ready(True)
def disconnected(self):
self.set_is_ready(False)
def door_is_interlocked(self):
return self.door_interlock_state in [self.STATES.READY]
def unlock(self):
self.breakInterlockEH()
def unlock_door_interlock(self):
if self.door_interlock_state == "locked_active":
self.door_interlock_state = "unlocked"
self.emit("doorInterlockStateChanged", self.door_interlock_state, "")
[docs] def breakInterlockEH(self):
"""Command to break the interlock by sending a request to a URL."""
if not self.simulationMode:
logging.info("Attempting to break interlock by opening EH")
url = self.get_property("unlockEH_url")
success = self.fetch_url(url)
if success:
logging.info("EH opened successfully")
# Update the state to reflect that the interlock was broken
self.emit("doorInterlockStateChanged", self.door_interlock_state, "")
else:
logging.error("Failed to open EH")
[docs] def fetch_url(self, url, timeout=3, retries=10):
"""Fetch URL with retry mechanism."""
for attempt in range(retries):
try:
result = urllib.request.urlopen(url, None, timeout).readlines()
logging.info(f"Successfully fetched URL: {url}")
return True # Success
except Exception as e:
logging.error(f"Error fetching URL: {url} on attempt {attempt + 1}")
logging.error(f"Error details: {sys.exc_info()}")
if attempt + 1 == retries:
logging.error(f"All {retries} attempts failed.")
return False # Failed after retries