Source code for pystreaming.stream.handlers

import time
from collections import OrderedDict, deque


[docs]def buffer(bufferlen, handlers): """Buffer and reorder incoming packets of data. Args: bufferlen ([type]): Length of buffer, in seconds. handlers (dict): Dictionary of str: generator. The key is a stream name, the value is an unprimed generator. Yields: tuple(str, data): str is stream name, data is data packet from corresponding handler. """ assert isinstance(handlers, dict) handlers = {k: handlers[k]() for k in handlers} buffers = {k: OrderedDict() for k in handlers} tdelta = None while True: for k, handler in handlers.items(): data = next(handler) buffer = buffers[k] if data is None: # handler miss time.sleep(0.001) else: # handler hit ftime = data["ftime"] if tdelta is None: tdelta = ftime - time.time() # skip if packet is stale if ftime < time.time() + tdelta - bufferlen: continue # packet is fresh. write data to buffer buffer[ftime] = data if buffer.keys(): # if buffer is not empty mint = min(buffer.keys()) # yield data if time is right if mint < time.time() + tdelta - bufferlen: yield (k, buffer.pop(mint))
[docs]def display(frame, BGR=True): """Display a frame using OpenCV. Press [ESC] to stop. Args: frame (np.ndarray): Frame must have dtype uint8. BGR (bool, optional): Is frame in BGR format (This is OpenCV format). Defaults to True. Raises: KeyboardInterrupt: Raised when [ESC] is pressed. Catch this exception to gracefully exit the stream. """ import cv2 if not BGR: frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) cv2.imshow("pystreaming-display", frame) if cv2.waitKey(1) & 0xFF == 27: # esc pressed raise KeyboardInterrupt("StopStream")
[docs]def dispfps(handler, n=100, write=False): """Average iterations per second over last n iterations. Args: handler (generator): Generator that yields data. n (int, optional): Number of iterations to average over. Defaults to 100. write (bool, optional): Set to true to write FPS to data['arr'] instead of console, assuming it is a video frame. Defaults to False. Yields: pyobj: Forwards data from handler """ if write: import cv2 times = deque() for data in handler: end = time.time() times.append(end) if len(times) > n: diff = end - times.popleft() if write: data["arr"] = cv2.putText( data["arr"], f"FPS: {(n / diff):.3f}", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_4, ) else: print(f"\rFPS: {(n / diff):.3f}", end="") yield data