import gi
import sys
import numpy as np
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
class AsyncVideoSlicer:
"""
An asynchronous class to handle video slicing from S3 using GStreamer.
It yields raw video frames as NumPy arrays from a specified time slice.
This version is designed to work with asyncio.
"""
def __init__(self, executor: ThreadPoolExecutor):
Gst.init(None)
self.executor = executor
# These will be set for each call
self.pipeline = None
self.main_loop = GLib.MainLoop()
self.future_result = None
self.event_loop = None
def _on_new_sample(self, sink) -> Gst.FlowReturn:
# This callback runs in the GLib thread
sample = sink.pull_sample()
if not sample:
return Gst.FlowReturn.ERROR
buf = sample.get_buffer()
caps = sample.get_caps()
height = caps.get_structure(0).get_value("height")
width = caps.get_structure(0).get_value("width")
frame_data = buf.extract_dup(0, buf.get_size())
frame = np.ndarray((height, width, 3), buffer=frame_data, dtype=np.uint8)
# We need a way to pass this frame back to the main thread's list
if self.future_result and not self.future_result.done():
# Get the current list, append, and update
current_frames = self.future_result.get_extra_info("frames")
current_frames.append(frame)
return Gst.FlowReturn.OK
def _set_future_result(self, result):
"""Thread-safe way to set the result on the asyncio future."""
if self.future_result and not self.future_result.done():
self.event_loop.call_soon_threadsafe(self.future_result.set_result, result)
def _on_bus_message(self, bus, message):
"""Bus message handler, runs in the GLib thread."""
msg_type = message.type
if msg_type == Gst.MessageType.EOS or msg_type == Gst.MessageType.ERROR:
if msg_type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error in GStreamer: {err.message} ({debug})")
# Retrieve the collected frames
frames = self.future_result.get_extra_info("frames")
self._set_future_result(frames)
self.main_loop.quit()
def _run_gstreamer_loop(self):
"""This is the blocking function that will run in a separate thread."""
try:
self.main_loop.run()
except Exception as e:
print(f"Exception in GStreamer thread: {e}")
self._set_future_result([]) # Return empty list on error
finally:
self.pipeline.set_state(Gst.State.NULL)
async def get_slice(self, bucket_name: str, key: str, start_seconds: int, duration_seconds: int) -> List[np.ndarray]:
"""
Asynchronously builds and runs the pipeline to retrieve a slice of video frames.
"""
self.event_loop = asyncio.get_running_loop()
self.future_result = self.event_loop.create_future()
self.future_result.set_extra_info("frames", []) # Attach a list to the future
pipeline_str = (
f"s3src bucket={bucket_name} key={key} ! decodebin name=dec "
f"dec. ! queue ! videoconvert ! video/x-raw,format=RGB ! appsink name=videosink emit-signals=true sync=false "
f"dec. ! queue ! fakesink"
)
self.pipeline = Gst.parse_launch(pipeline_str)
appsink = self.pipeline.get_by_name("videosink")
appsink.connect("new-sample", self._on_new_sample)
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self._on_bus_message)
# Start the pipeline and seek
self.pipeline.set_state(Gst.State.PLAYING)
self.pipeline.seek_simple(
Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE,
start_seconds * Gst.SECOND
)
print(f"Slicing {key} from {start_seconds}s for {duration_seconds}s...")
# Schedule a timeout to kill the pipeline if it gets stuck
loop_killer_task = self.event_loop.create_task(
self._timeout_killer(duration_seconds + 5) # 5s buffer
)
# Run the blocking GLib main loop in the thread pool
# and wait for it to complete.
await self.event_loop.run_in_executor(
self.executor, self._run_gstreamer_loop
)
# Clean up the timeout task
loop_killer_task.cancel()
# The future is set by the bus message handler when EOS/Error occurs.
return await self.future_result
async def _timeout_killer(self, timeout_seconds: int):
"""Async task to forcefully stop the GLib loop if it times out."""
await asyncio.sleep(timeout_seconds)
if self.main_loop.is_running():
print("Timeout reached, forcefully stopping GStreamer loop.")
# Retrieve whatever frames we got
frames = self.future_result.get_extra_info("frames")
self._set_future_result(frames)
self.main_loop.quit()
# --- HOW TO RUN ---
async def main():
S3_BUCKET = "your-s3-bucket-name" # <--- CHANGE THIS
VIDEO_KEYS = [
"source-videos/my-awesome-video.mp4",
"source-videos/another-video.mp4",
]
# Use a ThreadPoolExecutor to run the blocking GLib loops
with ThreadPoolExecutor() as executor:
slicer = AsyncVideoSlicer(executor)
# Create concurrent tasks to slice both videos at the same time
tasks = [
slicer.get_slice(S3_BUCKET, key, start_seconds=30, duration_seconds=2)
for key in VIDEO_KEYS
]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
for key, frames in zip(VIDEO_KES, results):
if frames:
print(f"Result for {key}: Got {len(frames)} frames. Shape of first frame: {frames[0].shape}")
else:
print(f"Result for {key}: No frames returned.")
if __name__ == '__main__':
# This runs the main async function
asyncio.run(main())
🚀 Feature
Sample code
Bookmarking for videos streaming:
Motivation
Pitch
Alternatives
Additional context