Source code for pystreaming.stream.patterns

from pystreaming import (
    EncoderDevice,
    DistributorDevice,
    DecoderDevice,
    RequesterDevice,
    PublisherDevice,
    SubscriberDevice,
    CollectDevice,
)
import zmq
import uuid
from ..stream import interface as intf
from . import WORKER_HWM


[docs]class Streamer: def __init__(self, endpoint, tracks=None, nproc=2, mapreduce=False): """Video streamer with P2P and Map-Reduce functionality. Args: endpoint (str): Descriptor of video stream endpoint. tracks (list[str], optional): Video stream tracks. Defaults to None which is ["none"]. nproc (int, optional): Number of processes devoted to encoding. Defaults to 2. mapreduce (bool, optional): Enable Map-Reduce streaming pattern. Defaults to False. """ seed = uuid.uuid1().hex if tracks is None: tracks = ["none"] self.encoder = EncoderDevice(nproc, seed) if mapreduce: self.distributor = DistributorDevice(tracks, endpoint, seed) else: self.distributor = PublisherDevice(endpoint, seed) self.started = False
[docs] def start(self): """Start internal pystreaming devices.""" self.encoder.start() self.distributor.start() self.started = True
[docs] def stop(self): """Cleanup and stop interal pystreaming objects.""" self.encoder.stop() self.distributor.stop() self.started = False
[docs] def send(self, frame): """Send a video frame into the stream. Args: frame (np.ndarray): Video frame """ if not self.started: raise RuntimeError("Start the Streamer before sending frames") self.encoder.send(frame)
def __enter__(self): self.start() return self def __exit__(self, exception_type, exception_value, traceback): self.stop()
[docs] def testcard(self, card, animated=False): """Display a testcard or a test image. Automatically starts the device if not already started. This method is blocking. Args: card (int): One of pystreaming.TEST_S pystreaming.TEST_M pystreaming.TEST_L pystreaming.IMAG_S pystreaming.IMAG_M pystreaming.IMAG_L animated (bool, optional): Set True to make image rotate. Defaults to False. """ import cv2 import numpy as np from itertools import count import time from pystreaming import loadimage self.start() testimage = loadimage(card) if animated: storage = [] for ang in range(0, 360, 3): storage.append( cv2.cvtColor(np.asarray(testimage.rotate(ang)), cv2.COLOR_RGB2BGR) ) else: storage = np.asarray(testimage) for i in count(): print(i) self.send(storage[i % 360] if animated else storage) time.sleep(1 / 30)
[docs]class Worker: def __init__(self, source, drain, track="none", reqthread=3, decproc=2): """Worker in the Map-Reduce streaming pattern. Args: source (str): Descriptor of Streamer. drain (str): Descriptor of Receiver. track (str, optional): Video stream track name. Defaults to "none". reqthread (int, optional): Number of request threads. Defaults to 3. decproc (int, optional): Number of decoder processes. Defaults to 2. """ seed = uuid.uuid1().hex self.requester = RequesterDevice(source, track, reqthread, seed) self.decoder = DecoderDevice(decproc, seed, fwdbuf=True) self.drain = zmq.Context.instance().socket(zmq.PUSH) self.drain.setsockopt(zmq.SNDHWM, WORKER_HWM) self.drain.connect(drain)
[docs] def start(self): """Start internal pystreaming devices.""" self.requester.start() self.decoder.start() self.started = True
[docs] def stop(self): """Cleanup and stop interal pystreaming objects.""" self.requester.stop() self.decoder.stop() self.started = False
def __enter__(self): self.start() return self def __exit__(self, exception_type, exception_value, traceback): self.stop() @property def handler(self): """Returns a handler that is used for worker data handling. Returns: generator: Generator that yields dicts with keys {arr, buf, meta, ftime, fno}. """ return self.decoder.handler
[docs] def send(self, data): del data["arr"] try: intf.send( socket=self.drain, flags=zmq.NOBLOCK, **data, ) except zmq.error.Again: pass # ignore send misses
[docs]class Receiver: def __init__(self, endpoint, nproc=2, mapreduce=False): """Receiver frames from a video stream. Args: endpoint (str): Descriptor of collection endpoint. nproc (int, optional): Number of processes devoted to decoding. Defaults to 2. mapreduce (bool, optional): Enable Map-Reduce streaming pattern. Defaults to False. """ seed = uuid.uuid1().hex self.mapreduce = mapreduce self.startedonce = False if mapreduce: self.receive = CollectDevice(endpoint, seed) else: self.receive = SubscriberDevice(endpoint, seed) self.decoder = DecoderDevice(nproc, seed) self.started = False
[docs] def start(self): """Start internal pystreaming objects.""" self.decoder.start() self.receive.start() self.started = True
[docs] def stop(self): """Cleanup and stop interal pystreaming objects.""" self.receive.stop() self.decoder.stop() self.started = False
def __enter__(self): self.start() return self def __exit__(self, exception_type, exception_value, traceback): self.stop() @property def handler(self): """Returns a handler that is used for future data handling. Returns: generator: Generator that yields dicts with keys {arr, meta, ftime, fno}. """ return self.decoder.handler