# encoding: utf-8
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
import copy
import logging
import time
try:
import epics
except ImportError:
logging.getLogger("HWR").warning("EPICS support not available.")
from mxcubecore import Poller
from mxcubecore.CommandContainer import (
ChannelObject,
CommandObject,
)
from mxcubecore.dispatcher import saferef
__copyright__ = """ Copyright © 2010 - 2020 by MXCuBE Collaboration """
__license__ = "LGPLv3+"
[docs]class EpicsCommand(CommandObject):
"""Epics Command"""
def __init__(self, name, pv_name, username=None, args=None, **kwargs):
CommandObject.__init__(self, name, username, **kwargs)
self.pv_name = pv_name
self.read_as_str = kwargs.get("read_as_str", False)
self.auto_monitor = kwargs.get("auto_monitor", True)
self.pollers = {}
self.__value_changed_callback_ref = None
self.__timeout_callback_ref = None
if args is None:
self.arg_list = ()
else:
# not very nice...
args = str(args)
if not args.endswith(","):
args += ","
self.arg_list = eval("(" + args + ")")
if len(self.arg_list) > 1:
logging.getLogger("HWR").error(
"EpicsCommand: ftm only scalar arguments are supported."
)
return
logging.getLogger("HWR").debug(
"EpicsCommand: creating pv %s: read_as_str = %s",
self.pv_name,
self.read_as_str,
)
self.pv = epics.PV(pv_name, auto_monitor=self.auto_monitor)
time.sleep(0.1)
self.pv_connected = self.pv.connect(timeout=0.1)
if not self.pv_connected:
logging.getLogger("HWR").error(
"EpicsCommand: Error connecting to pv %s.", self.pv_name
)
def __call__(self, *args, **kwargs):
self.emit("commandBeginWaitReply", (str(self.name()),))
if len(args) > 0 and len(self.arg_list) > 0:
# arguments given both given in command call _AND_ in the xml file
logging.getLogger("HWR").error(
"%s: cannot execute command with arguments when 'args' is defined from XML",
str(self.name()),
)
self.emit("commandFailed", (-1, str(self.name())))
return
elif len(args) == 0 and len(self.arg_list) > 0:
# no argument given in the command call but inside the xml file -> use the
# default argument from the xml file
args = self.arg_list
if self.pv is not None:
if len(args) == 0:
# no arguments available -> get the pv's current value
try:
ret = self.pv.get(as_string=self.read_as_str, timeout=0.2)
if ret is None:
ret = self.reconnect()
except TypeError:
# When a cached info is lost internally Epics return a TypeError
ret = self.reconnect()
if ret is not None:
self.emit("commandReplyArrived", (ret, str(self.name())))
return ret
except Exception as e:
logging.getLogger("HWR").error(
"%s: an error occured when getting value with Epics command %s",
str(self.name()),
self.pv_name,
)
else:
self.emit("commandReplyArrived", (ret, str(self.name())))
return ret
else:
# use the given argument to change the pv's value
try:
value = args[0]
wait = kwargs.get("wait", False)
self.pv.put(value, wait=wait)
except:
logging.getLogger("HWR").error(
"%s: an error occured when putting a value with Epics command %s",
str(self.name()),
self.pv_name,
)
else:
self.emit("commandReplyArrived", (0, str(self.name())))
return 0
self.emit("commandFailed", (-1, str(self.name())))
def value_changed(self, value):
try:
callback = self.__value_changed_callback_ref()
except Exception:
logging.getLogger("HWR").exception("")
else:
if callback is not None:
callback(value)
def on_polling_error(self, exception, poller_id):
# try to reconnect the pv
self.pv.connect()
poller = Poller.get_poller(poller_id)
if poller is not None:
try:
poller.restart(1000)
except Exception:
logging.getLogger("HWR").exception("")
[docs] def get_value(self):
"""wrapper function to pv.get() in order to supply additional named parameter"""
return self.pv.get(as_string=self.read_as_str)
def poll(
self,
polling_time=500,
arguments_list=(),
value_changed_callback=None,
timeout_callback=None,
direct=True,
compare=True,
):
self.__value_changed_callback_ref = saferef.safe_ref(value_changed_callback)
# store the call to get as a function object
poll_cmd = self.get_value
Poller.poll(
poll_cmd,
copy.deepcopy(arguments_list),
polling_time,
self.value_changed,
self.on_polling_error,
compare,
)
def stop_polling(self):
pass
def abort(self):
pass
[docs] def is_connected(self):
return self.pv_connected
def reconnect(self):
epics.ca._cache.clear()
# Reconnect PV
self.pv = epics.PV(self.pv_name, auto_monitor=self.auto_monitor)
self.pv_connected = self.pv.connect(timeout=0.2)
# Return the result of get()
ret = self.pv.get(as_string=self.read_as_str, timeout=0.2)
return ret
[docs]class EpicsChannel(ChannelObject):
"""Emulates an *Epics channel* with an EpicsCommand +polling"""
def __init__(self, name, command, username=None, polling=None, args=None, **kwargs):
ChannelObject.__init__(self, name, username, **kwargs)
self.command = EpicsCommand(
name + "_internalCmd", command, username, args, **kwargs
)
try:
self.polling = int(polling)
except Exception:
self.polling = None
else:
self.command.poll(self.polling, self.command.arg_list, self.value_changed)
def value_changed(self, value):
self.emit("update", value)
[docs] def get_value(self):
return self.command()
def set_value(self, value):
self.command(value)
[docs] def is_connected(self):
return self.command.is_connected()