# -*- coding: utf-8 -*-
#
# Project name: MXCuBE
# https://github.com/mxcube
#
# This file is part of MXCuBE software.
#
# MXCuBE is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MXCuBE is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU General Lesser Public License
# along with MXCuBE. If not, see <http://www.gnu.org/licenses/>.
"""ProtocolError and StandardClient implementation"""
import logging
import socket
import sys
import gevent
import gevent.lock
__copyright__ = """ Copyright © 2019 by the MXCuBE collaboration """
__license__ = "LGPLv3+"
[docs]class TimeoutError(Exception):
"""Protype"""
[docs]class ProtocolError(Exception):
"""Protype"""
[docs]class SocketError(Exception):
"""Protype"""
if sys.version_info > (3, 0):
STX = 2 # b'\x02'
ETX = 3 # b'\x03'
def empty_buffer():
"""Empty buffer"""
return b""
_bytes = bytes
encode = str.encode
else:
STX = chr(2)
ETX = chr(3)
[docs] def empty_buffer():
"""Empty buffer"""
return ""
_bytes = str
encode = str
MAX_SIZE_STREAM_MSG = 500000
[docs]class PROTOCOL:
"""Protocol"""
DATAGRAM = 1
STREAM = 2
[docs]class StandardClient:
"""Standard JLib client"""
def __init__(self, server_ip, server_port, protocol, timeout, retries):
self.server_ip = server_ip
self.server_port = server_port
self.timeout = timeout
self.default_timeout = timeout
self.retries = retries
self.protocol = protocol
self.error = None
self.received_msg = None
self.receiving_greenlet = None
self.msg_received_event = gevent.event.Event()
self._lock = gevent.lock.Semaphore()
self.__msg_index__ = -1
self.__sock = None
self.__constant_local_port = True
self._is_connected = False
def __create_socket(self):
"""Create socket"""
if self.protocol == PROTOCOL.DATAGRAM:
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.__sock.settimeout(self.timeout)
else:
self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def __close_socket(self):
"""Close socket"""
try:
self.__sock.close()
except Exception:
logging.getLogger("HWR").exception("")
self._is_connected = False
self.__sock = None
self.received_msg = None
[docs] def connect(self):
"""Socket connect"""
if self.protocol == PROTOCOL.DATAGRAM:
return
if self.__sock is None:
self.__create_socket()
self.__sock.connect((self.server_ip, self.server_port))
self._is_connected = True
self.error = None
self.received_msg = None
self.receiving_greenlet = gevent.spawn(self.recv_thread)
[docs] def is_connected(self):
"""Check if connected
Returns:
(bool): True if connected
"""
if self.protocol == PROTOCOL.DATAGRAM:
return False
if self.__sock is None:
return False
return self._is_connected
[docs] def disconnect(self):
"""Disconnect"""
if self.is_connected():
self.receiving_greenlet.kill()
self.__close_socket()
def __send_receive_datagram_single(self, cmd):
"""Send and receive single datagram.
Args:
cmd(str): Command
Returns:
(str): return message
Raises:
SocketError, TimeoutError
"""
try:
if self.__constant_local_port is False or self.__sock is None:
self.__create_socket()
msg_number = "%04d " % self.__msg_index__
msg = msg_number + cmd
try:
self.__sock.sendto(encode(msg), (self.server_ip, self.server_port))
except Exception:
raise SocketError("Socket error:" + str(sys.exc_info()[1]))
received = False
while received is False:
try:
ret = self.__sock.recv(4096).decode()
except socket.timeout:
raise TimeoutError("Timeout error:" + str(sys.exc_info()[1]))
except Exception:
raise SocketError("Socket error:" + str(sys.exc_info()[1]))
if ret[0:5] == msg_number:
received = True
ret = ret[5:]
except SocketError:
self.__close_socket()
raise
except Exception:
if self.__constant_local_port is False:
self.__close_socket()
raise
if self.__constant_local_port is False:
self.__close_socket()
return ret
def __send_receive_datagram(self, cmd):
"""Send/receive datagram.
Args:
(str): command
Returns:
(str): datagram
Raises:
TimeoutError, ProtocolError
"""
self.__msg_index__ = self.__msg_index__ + 1
if self.__msg_index__ >= 10000:
self.__msg_index__ = 1
for i in range(0, self.retries):
try:
ret = self.__send_receive_datagram_single(encode(cmd))
return ret
except TimeoutError:
if i >= self.retries - 1:
raise
except ProtocolError:
if i >= self.retries - 1:
raise
except SocketError:
if i >= self.retries - 1:
raise
except Exception:
raise
[docs] def set_timeout(self, timeout):
"""Set the socket timeout.
Args:
timeout(float): Timeout value
"""
self.timeout = timeout
if self.protocol == PROTOCOL.DATAGRAM:
if self.__sock is not None:
self.__sock.settimeout(self.timeout)
[docs] def restore_timeout(self):
"""Restore the default timeout"""
self.set_timeout(self.default_timeout)
[docs] def dispose(self):
"""Disconnect or close socket"""
if self.protocol == PROTOCOL.DATAGRAM:
if self.__constant_local_port:
self.__close_socket()
else:
pass
else:
self.disconnect()
[docs] def on_message_received(self, msg):
"""Actions
Args:
msg(str): Message
"""
self.received_msg = msg
self.msg_received_event.set()
[docs] def recv_thread(self):
"""Receive thread"""
try:
self.on_connected()
except Exception:
logging.getLogger("HWR").exception("")
buffer = empty_buffer()
mReceivedSTX = False
while True:
ret = self.__sock.recv(4096)
if not ret:
# connection reset by peer
self.error = "Disconnected"
self.__close_socket()
break
for b in ret:
if b == STX:
buffer = empty_buffer()
mReceivedSTX = True
elif b == ETX:
if mReceivedSTX:
try:
# Unicode decoding exception catching,
# consider errors='ignore'
buffer_utf8 = buffer.decode()
except UnicodeDecodeError as e:
# Syntax not allowed in Python 2
# raise ProtocolError from e
raise ProtocolError(
"UnicodeDecodeError: %s" % (sys.exc_info(),)
)
self.on_message_received(buffer_utf8)
mReceivedSTX = False
buffer = empty_buffer()
else:
if mReceivedSTX:
buffer += _bytes([b])
if len(buffer) > MAX_SIZE_STREAM_MSG:
mReceivedSTX = False
buffer = empty_buffer()
try:
self.on_disconnected()
except Exception:
logging.getLogger("HWR").exception("")
def __send_stream(self, cmd):
"""Send a command.
Args:
cmd(str): command
"""
if not self.is_connected():
self.connect()
try:
pack = _bytes([STX]) + encode(cmd) + _bytes([ETX])
self.__sock.send(pack)
except SocketError:
self.disconnect()
def __send_receive_stream(self, cmd):
"""Send/receive event.
Args:
cmd(str): command
Returns:
(str): reply form the socket
"""
self.error = None
self.received_msg = None
self.msg_received_event.clear() # = gevent.event.Event()
if not self.is_connected():
self.connect()
self.__send_stream(cmd)
with gevent.Timeout(self.timeout, TimeoutError):
while self.received_msg is None:
if self.error is not None:
raise SocketError("Socket error:" + str(self.error))
self.msg_received_event.wait()
return self.received_msg
[docs] def send_receive(self, cmd, timeout=-1):
"""Send/receive command, locking the socket.
Args:
cmd(str): command
Returns:
(str): reply form the socket
"""
self._lock.acquire()
try:
if (timeout is None) or (timeout >= 0):
self.set_timeout(timeout)
if self.protocol == PROTOCOL.DATAGRAM:
return self.__send_receive_datagram(cmd)
return self.__send_receive_stream(cmd)
finally:
try:
if (timeout is None) or (timeout >= 0):
self.restore_timeout()
finally:
self._lock.release()
[docs] def send(self, cmd):
"""Send command.
Args:
cmd(str): command
Returns:
(str): reply form the socket
Raises:
ProtocolError
"""
if self.protocol == PROTOCOL.DATAGRAM:
raise ProtocolError(
"Protocol error: send command not support in datagram clients"
)
return self.__send_stream(cmd)
[docs] def on_connected(self):
"""On connect"""
[docs] def on_disconnected(self):
"""On disconnect"""