Source code for neurokernel.tools.zmq

#!/usr/bin/env python

"""
ZeroMQ utilities.
"""

import twiggy
import zmq

[docs]def is_poll_in(sock, poller, timeout=100): """ Check for incoming data on a socket using a poller. """ socks = dict(poller.poll(timeout)) if sock in socks and socks[sock] == zmq.POLLIN: return True else: return False
[docs]def get_random_port(min_port=49152, max_port=65536, max_tries=100): """ Return available random ZeroMQ port. """ ctx = zmq.Context() sock = ctx.socket(zmq.REQ) try: port = sock.bind_to_random_port('tcp://*', min_port, max_port, max_tries) except: raise zmq.ZMQError(msg='Could not find free port') finally: sock.close() return port
class ZMQOutput(twiggy.outputs.Output): """ Output messages to a ZeroMQ PUB socket. """ def __init__(self, addr, mode, format=None, close_atexit=True): self.addr = addr self.mode = mode super(ZMQOutput, self).__init__(format, close_atexit) def _open(self): self.ctx = zmq.Context() self.sock = self.ctx.socket(zmq.PUB) if self.mode == 'bind': self.sock.bind(addr) elif self.mode == 'connect': self.sock.connect(addr) else: raise ValueError('invalid connection mode') def _close(self): self.sock.close() def _write(self, x): self.sock.send(x)