Source code for openlcb.communication
import logging
import socket
logger = logging.getLogger(__name__)
[docs]class CommunicationException(Exception):
'''Raised if an error occurs while communicating with a node'''
[docs]class EthernetConnection(object):
'''Class for communicating with nodes via Eth2CAN
:param str hostname: Host name or IP address of the Eth2CAN device
:param int port: TCP port of the Eth2CAN device
'''
#: Amount of time to wait for a response (in seconds)
SOCKET_TIMEOUT = 1.0
#: Maximum amount of data to read from a response message (in bytes)
BUFFER_SIZE = 4096
def __init__(self, hostname, port):
self.hostname = hostname
self.port = port
self._socket = None
def __enter__(self):
self.connect()
return self
def __exit__(self, exc_type, exc_value, trace):
self.close()
if exc_type is not None:
logger.exception('Error communicating with node')
[docs] def connect(self):
'''Connect to the Eth2CAN device over TCP/IP'''
logger.debug('Connecting to Eth2CAN at {hostname}:{port}'.format(
hostname=self.hostname,
port=self.port))
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.connect((self.hostname, self.port))
logger.debug('Socket timeout set to {0}s'.format(self.SOCKET_TIMEOUT))
self._socket.settimeout(self.SOCKET_TIMEOUT)
[docs] def send(self, message):
'''Send a CAN message
:param CANMessage message: An instance of a
:py:class:`~olcbtests.messages.can.CANMessage` containing
the message to send
'''
logger.debug('Sent message: {0}'.format(message))
self._socket.send(str(message).encode() + b'\n')
[docs] def receive(self):
'''Retreive a response from the node
:returns str: A string containing the CAN message, suitable for
creating a new instance of a
:py:class:`~olcbtests.messages.can.CANMessage` subclass
.. deprecated:: 0.1
Use :py:meth:`receive_one` instead
'''
return self.receive_one()
[docs] def receive_one(self):
'''Retreive a single response message from the node
:returns str: A string containing the first message received,
suitable for creating a new instance of a
:py:class:`~olcbtests.messages.can.CANMessage` subclass
'''
try:
return self.receive_multi()[0]
except IndexError:
return None
[docs] def receive_multi(self):
'''Retreive multiple responses from the node
:returns list: A list of strings containing CAN messages,
suitable for creating new instances of a
:py:class:`~olcbtests.messages.can.CANMessage` subclass
'''
response = b''
try:
while True:
response += self._socket.recv(self.BUFFER_SIZE)
except socket.timeout:
pass
except socket.error as e:
raise CommunicationException(e)
response = response.decode()
if response:
logger.debug('Received response: {0}'.format(
response.strip()))
else:
logger.debug('Did not receive a response within {0} s'.format(
self.SOCKET_TIMEOUT))
return response.splitlines()
[docs] def close(self):
'''Close the TCP/IP communication socket'''
logger.debug('Closing connection to {hostname}:{port}'.format(
hostname=self.hostname,
port=self.port
))
self._socket.close()