Source code for pystreaming.stream.interface

import zmq
import numpy as np


[docs]def send(*, socket, fno, ftime, meta, arr=None, buf=None, flags=0): """Internal video data send command. Args: socket (zmq.Context.socket): Socket through which to send data. fno (int): Frame number. ftime (float): Frame timestamp. meta (pyobj): Any reasonably small picklable object. arr ([type], optional): Numpy array to send. Defaults to None. buf (bytes, optional): Byte buffer to send. Defaults to None. flags (int, optional): Zmq flags to execute with (zmq.NOBLOCK or zmq.SNDMORE). Defaults to 0. """ if arr is not None: md = {"dtype": str(arr.dtype), "shape": arr.shape} socket.send_json(md, flags=zmq.SNDMORE | flags) socket.send(arr, copy=False, flags=zmq.SNDMORE | flags) if buf is not None: socket.send(buf, copy=False, flags=zmq.SNDMORE | flags) socket.send_pyobj(meta, flags=zmq.SNDMORE | flags) socket.send_pyobj(ftime, flags=zmq.SNDMORE | flags) socket.send_pyobj(fno, flags=flags)
[docs]def recv(*, socket, arr=False, buf=False, flags=0): """Internal video data receive command. Args: socket (zmq.Context.socket): Socket through which to receive data. arr (bool, optional): Change to True if you expect an arr. Defaults to False. buf (bool, optional): Change to True if you expect a byte buffer. Defaults to False. flags (int, optional): Zmq flags to execute with (zmq.NOBLOCK). Defaults to 0. Returns: dict: Expected items, with possible keys: {arr, buf, meta, ftime, fno}. """ out = {} if arr: md = socket.recv_json(flags=flags) msg = socket.recv(copy=False, flags=flags) arrbuf = memoryview(msg) out["arr"] = np.frombuffer(arrbuf, dtype=md["dtype"]).reshape(md["shape"]) if buf: out["buf"] = socket.recv(copy=False, flags=flags) out["meta"] = socket.recv_pyobj(flags=flags) out["ftime"] = socket.recv_pyobj(flags=flags) out["fno"] = socket.recv_pyobj(flags=flags) return out