Source code for pystreaming.audio.patterns

import zmq
import time
from ..stream import interface as intf
from . import STR_HWM, RCV_HWM


[docs]class AudioStreamer: def __init__(self, endpoint): """Audio streamer. Binds to a zmq PUB socket. Args: endpoint (str): Descriptor of stream publishing endpoint. """ self.socket = zmq.Context.instance().socket(zmq.PUB) self.socket.bind(endpoint) self.socket.setsockopt(zmq.SNDHWM, STR_HWM) self.endpoint = endpoint self.fno = 0
[docs] def send(self, arr): """Send a buffer of audio. Args: arr (np.ndarray): A segment of audio as a numpy array. """ try: intf.send( socket=self.socket, fno=self.fno, ftime=time.time(), meta=None, arr=arr, flags=zmq.NOBLOCK, ) except zmq.error.Again: pass self.fno += 1
def __repr__(self): rpr = "-----AudioStreamer-----\n" rpr += f"{'OUT': <8}{self.endpoint}\n" rpr += f"{'HWM': <8}({STR_HWM} >" return rpr
[docs]class AudioReceiver: def __init__(self, endpoint): """Audio receiver. Connects using a zmq SUB socket. Args: endpoint (str): Descriptor of stream publishing endpoint. """ self.socket = zmq.Context.instance().socket(zmq.SUB) self.socket.setsockopt(zmq.RCVHWM, RCV_HWM) self.socket.connect(endpoint) self.socket.subscribe("") self.endpoint = endpoint
[docs] def recv(self, timeout): """Receive a package of data from the audio channel. Args: timeout (int): Timeout period in milliseconds. Raises: TimeoutError: Raised when no messages are received in the timeout period. """ if self.socket.poll(timeout): return intf.recv( socket=self.socket, arr=True, flags=zmq.NOBLOCK, ) else: raise TimeoutError( f"No messages were received within the timeout period {timeout}ms" )
[docs] def handler(self, timeout=0): """Yield a package of data from audio channel. Args: timeout (int, optional): Timeout period in milliseconds. Defaults to 0. Yields: dict: Expected items, with keys: {arr, meta, ftime, fno}. """ while True: try: yield self.recv(timeout=timeout) except TimeoutError: yield None
def __repr__(self): rpr = "-----AudioReceiver-----\n" rpr += f"{'IN': <8}{self.endpoint}\n" rpr += f"{'HWM': <8}> {RCV_HWM})" return rpr