Skip to content
Snippets Groups Projects

Feature/python

Merged Sebastian Hahta requested to merge feature/python into master
1 file
+ 70
43
Compare changes
  • Side-by-side
  • Inline
+ 129
43
@@ -20,14 +20,28 @@ except ImportError:
# order: 0 nn, 1 bilinear, 3 bicubic
return (resize_skimage(img, size, order=3, mode="constant", cval=0) * 255).astype(np.uint8)
from warnings import warn
import ctypes
from enum import IntEnum
import numpy as np
import os
'''
# default number of worker threads for decoder: half of os.cpu_count()
_threads = os.cpu_count() // 2
if _threads is None:
_threads = 1
'''
_threads = 1
# error codes copied from header (de265.h)
class libde265error(IntEnum):
class _libde265error(IntEnum):
DE265_OK = 0
DE265_ERROR_NO_SUCH_FILE=1
DE265_ERROR_COEFFICIENT_OUT_OF_IMAGE_BOUNDS=4
@@ -74,10 +88,20 @@ class libde265error(IntEnum):
DE265_WARNING_SPS_MISSING_CANNOT_DECODE_SEI=1025
DE265_WARNING_COLLOCATED_MOTION_VECTOR_OUTSIDE_IMAGE_AREA=1026
class de265_chroma(IntEnum):
de265_chroma_mono = 0
de265_chroma_420 = 1
de265_chroma_422 = 2
de265_chroma_444 = 3
libde265 = ctypes.cdll.LoadLibrary("libde265.so.0")
libde265.de265_get_error_text.argtypes = [ctypes.c_void_p]
libde265.de265_get_error_text.restype = ctypes.c_char_p
libde265.de265_get_warning.argtypes = [ctypes.c_void_p]
libde265.de265_get_warning.restype = ctypes.c_int
libde265.de265_get_version_number_major.restype = ctypes.c_uint32
libde265.de265_get_version_number_minor.restype = ctypes.c_uint32
@@ -96,6 +120,7 @@ libde265.de265_push_NAL.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_i
libde265.de265_push_data.restype = ctypes.c_int
libde265.de265_push_end_of_frame.argtypes = [ctypes.c_void_p]
libde265.de265_push_end_of_frame.restype = None
libde265.de265_flush_data.argtypes = [ctypes.c_void_p]
libde265.de265_flush_data.restype = ctypes.c_int
@@ -106,6 +131,15 @@ libde265.de265_decode.restype = ctypes.c_int
libde265.de265_get_next_picture.argtypes = [ctypes.c_void_p]
libde265.de265_get_next_picture.restype = ctypes.c_void_p
libde265.de265_peek_next_picture.argtypes = [ctypes.c_void_p]
libde265.de265_peek_next_picture.restype = ctypes.c_void_p
libde265.de265_release_next_picture.argtypes = [ctypes.c_void_p]
libde265.de265_release_next_picture.restype = None
libde265.de265_get_chroma_format.argtypes = [ctypes.c_void_p]
libde265.de265_get_chroma_format.restype = ctypes.c_int
libde265.de265_get_image_width.argtypes = [ctypes.c_void_p, ctypes.c_int]
libde265.de265_get_image_width.restype = ctypes.c_int
@@ -118,21 +152,85 @@ libde265.de265_get_bits_per_pixel.restype = ctypes.c_int
libde265.de265_get_image_plane.argtypes = [ctypes.c_void_p, ctypes.c_int, ctypes.POINTER(ctypes.c_int)]
libde265.de265_get_image_plane.restype = ctypes.POINTER(ctypes.c_char)
libde265.de265_get_number_of_input_bytes_pending.argtypes = [ctypes.c_void_p]
libde265.de265_get_number_of_input_bytes_pending.restype = ctypes.c_int
class libde265Error(Exception):
def __init__(self, code):
super(libde265Error, self).__init__(
libde265.de265_get_error_text(code).decode("ascii"))
class WaitingForInput(libde265Error):
pass
class Decoder:
def __init__(self, size, threads=1):
def __init__(self, size, threads=_threads):
self._size = size
self._more = ctypes.c_int()
self._out_stride = ctypes.c_int()
self._ctx = libde265.de265_new_decoder()
self._supress_warnings = False
err = libde265.de265_start_worker_threads(self._ctx, threads)
if err:
raise Exception(self.get_error_str(err))
raise libde265Error(err)
def __del__(self):
libde265.de265_free_decoder(self._ctx)
def _copy_image(self, de265_image):
res = np.zeros((self._size[0], self._size[1], 3), dtype=np.uint8)
# libde265: always 420 (???)
# chroma_format = libde265.de265_get_chroma_format(de265_image)
for c in range(0, 3):
size = (libde265.de265_get_image_height(de265_image, c),
libde265.de265_get_image_width(de265_image, c))
bpp = libde265.de265_get_bits_per_pixel(de265_image, c)
if bpp != 8:
raise NotImplementedError("unsupported bits per pixel %i" % bpp)
img_ptr = libde265.de265_get_image_plane(de265_image, c, self._out_stride)
# for frombuffer() no copy assumed
ch = np.frombuffer(img_ptr[:size[0] * size[1]], dtype=np.uint8)
ch.shape = size
res[:,:,c] = _resize(ch, self._size)
return res
def _warning(self):
if self._supress_warnings:
return
code = libde265.de265_get_warning(self._ctx)
if code != _libde265error.DE265_OK:
msg = libde265.de265_get_error_text(code).decode("ascii")
warn(msg)
def decode(self):
err = libde265.de265_decode(self._ctx, self._more)
if err:
if err == _libde265error.DE265_ERROR_WAITING_FOR_INPUT_DATA:
raise WaitingForInput(err)
raise libde265Error(err)
self._warning()
return self._more.value != 0
def get_error_str(self, code):
return libde265.de265_get_error_text(code).decode("ascii")
def flush_data(self):
err = libde265.de265_flush_data(self._ctx)
if err:
raise libde265Error(err)
def push_data(self, data):
if not isinstance(data, bytes):
@@ -141,13 +239,13 @@ class Decoder:
err = libde265.de265_push_data(self._ctx, data, len(data), None, None)
if err:
raise Exception(self.get_error_str(err))
raise libde265Error(err)
def push_end_of_frame(self):
err = libde265.de265_push_end_of_frame(self._ctx)
if err:
raise Exception(self.get_error_str(err))
raise libde265Error(err)
def push_NAL(self, data):
if not isinstance(data, bytes):
@@ -156,48 +254,36 @@ class Decoder:
err = libde265.de265_push_NAL(self._ctx, data, len(data), None, None)
if err:
raise Exception(self.get_error_str(err))
def decode(self):
err = libde265.de265_decode(self._ctx, self._more)
if err and err != libde265error.DE265_ERROR_WAITING_FOR_INPUT_DATA:
raise Exception(self.get_error_str(err))
return self._more.value != 0
def flush_data(self):
err = libde265.de265_flush_data(self._ctx)
if err:
raise Exception(self.get_error_str(err))
raise libde265Error(err)
def get_next_picture(self):
'''
Returns next decoded frame. Image in YCbCr format. If no frame available
returns None.
'''
img = libde265.de265_get_next_picture(self._ctx)
if not img:
de265_image = libde265.de265_get_next_picture(self._ctx)
if not de265_image:
return None
res = np.zeros((self._size[0], self._size[1], 3), dtype=np.uint8)
res = self._copy_image(de265_image)
libde265.de265_release_next_picture(self._ctx)
return res
def get_number_of_input_bytes_pending(self):
return libde265.de265_get_number_of_input_bytes_pending(self._ctx)
def peek_next_picture(self):
de265_image = libde265.de265_peek_next_picture(self._ctx)
if not de265_image:
return None
for c in range(0, 3):
size = (libde265.de265_get_image_height(img, c),
libde265.de265_get_image_width(img, c))
bpp = libde265.de265_get_bits_per_pixel(img, c)
res = self._copy_image(de265_image)
if bpp != 8:
raise NotImplementedError("unsupported bits per pixel %i" % bpp)
img_ptr = libde265.de265_get_image_plane(img, c, self._out_stride)
ch = np.frombuffer(img_ptr[:size[0] * size[1]], dtype=np.uint8)
ch.shape = size
res[:,:,c] = _resize(ch, self._size)
libde265.de265_release_next_picture(self._ctx)
return res
\ No newline at end of file
return res
Loading