Source code for ironman.communicator

"""
    This file implements all of the various communications one might need to do.

    Jarvis provides a callback structure that looks up (in its registry) for an appropriate communication protocol.
"""

from zope.interface import implementer
from .interfaces import ICommunicationSlave, ICommunicationDriver, IHardwareNode
from .constructs.ipbus import IPBusWords, IPBusWords_long


[docs]@implementer(ICommunicationSlave) class Jarvis: """This is the general communication slave. Jarvis is what lets us pass around communications to various routes/protocols while keeping the details separated from us. Here's an example of how one might use it: >>> from ironman.communicator import Jarvis, SimpleIO >>> # create a Jarvis instance to manage what we want to register >>> j = Jarvis() >>> # tell Jarvis to register this class for the given route >>> @j.register('fpgaOne') ... class FileOne(SimpleIO): ... __f__ = '/path/to/fileOne' ... >>> # tell Jarvis to register this class for the given route >>> @j.register('fpgaTwo') ... class FileTwo(SimpleIO): ... __f__ = '/path/to/fileTwo' ... >>> # print the available registered classes >>> import pprint >>> pprint.pprint(j.registry) {'fpgaOne': <class 'ironman.communicator.FileOne'>, 'fpgaTwo': <class 'ironman.communicator.FileTwo'>} Jarvis does the wrapping for :func:`Jarvis.register` so that a class defined at run-time is automatically inserted. """ def __init__(self): self.registry = {}
[docs] def set_hardware_manager(self, hwmanager): self.hwmanager = hwmanager
[docs] def parse_address(self, address): return self.hwmanager.get_route(address)
def __call__(self, packet): """ Handle CONTROL packets """ if packet.request.header.type_id == 'CONTROL': for transaction, response in zip( packet.request.transactions, packet.response.transactions ): response.data = self.__transaction__(transaction) return packet def __transaction__(self, transaction): protocol = self.registry.get(self.parse_address(transaction.address), None) if protocol is None: raise KeyError(transaction.address) protocol = protocol() if transaction.header.type_id == 'READ': return IPBusWords.parse( protocol.read(transaction.address, transaction.header.words) ) elif transaction.header.type_id == 'RMWBITS': # ECC - new case to handle RMWBITS protocol.rmwbits(transaction.address, IPBusWords_long.build(transaction.data)) return elif transaction.header.type_id == 'WRITE': protocol.write(transaction.address, IPBusWords.build(transaction.data)) return
[docs] def register(self, route): if route is None: raise ValueError('Must specify a route') if route in self.registry: raise KeyError( "{:s} is an existing route to {:s}".format( route, self.registry[route].__name__ ) ) def register_wrapper(cls): """Duck-typing checks""" getattr(cls, 'read') getattr(cls, 'write') self.registry[route] = cls return register_wrapper
[docs] def unregister(self, route): del self.registry[route]
[docs]@implementer(IHardwareNode) class SimpleIO: __f__ = None
[docs] def read(self, offset, size): with open(self.__f__, 'rb') as f: f.seek(offset) return f.read(4 * size)
[docs] def write(self, offset, data): with open(self.__f__, 'r+b') as f: f.seek(offset) return f.write(data)
[docs]@implementer(ICommunicationDriver) class ComplexIO: __f__ = {}
[docs] def read(self, offset, size): with open(self.__f__.get(offset), 'rb') as f: return f.read(4 * size)
[docs] def write(self, offset, data): with open(self.__f__.get(offset), 'r+b') as f: return f.write(data)