Skip to content
Snippets Groups Projects
Commit 9fa4ce41 authored by Nicolas Pope's avatar Nicolas Pope
Browse files

Merge branch 'master' of gitlab.utu.fi:nicolas.pope/ftl

parents 5c894d17 4d5a9e7a
No related branches found
No related tags found
No related merge requests found
Pipeline #23160 passed
......@@ -3,7 +3,7 @@
**/include/ftl/config.h
**/src/config.cpp
**/*.blend1
SDK/Python/ftl/__pycache__
__pycache__
fabric/
fabric-js/
build/
......
......@@ -18,6 +18,8 @@ linux:
stage: all
tags:
- linux
variables:
FTL_LIB: ../../build/SDK/C/libftl-dev.so
# before_script:
# - export DEBIAN_FRONTEND=noninteractive
# - apt-get update -qq && apt-get install -y -qq g++ cmake git
......@@ -28,6 +30,8 @@ linux:
- cmake .. -DWITH_OPTFLOW=TRUE -DUSE_CPPCHECK=FALSE -DBUILD_CALIBRATION=TRUE -DWITH_CERES=TRUE -DCMAKE_BUILD_TYPE=Release
- make
- ctest --output-on-failure
- cd ../SDK/Python
- python3 -m unittest discover test
webserver-deploy:
only:
......
../../LICENSE
\ No newline at end of file
from . types import Channel, is_float_channel, Camera, Pipeline
from . types import FTLException, Channel, is_float_channel, Camera, Pipeline
import numpy as np
from enum import IntEnum
import ctypes
import sys
import os.path
################################################################################
# Try to locate shared library
_paths = [
".",
"/usr/lib",
"/usr/local/lib",
]
_libpath = None
if "FTL_LIB" in os.environ and os.path.exists(os.environ["FTL_LIB"]):
_libpath = os.environ["FTL_LIB"]
else:
for p in _paths:
p = os.path.join(p, "libftl-dev.so")
if os.path.exists(p):
_libpath = p
break
if _libpath is None:
raise FileNotFoundError("libftl-dev.so not found")
################################################################################
class _imageformat_t(IntEnum):
FLOAT = 0
BGRA = 1
......@@ -20,16 +39,6 @@ class _imageformat_t(IntEnum):
BGR = 4
RGB_FLOAT = 5
_libpath = None
for p in _paths:
p = os.path.join(p, "libftl-dev.so")
if os.path.exists(p):
_libpath = p
break
if _libpath is None:
raise FileNotFoundError("libftl-dev.so not found")
_c_api = ctypes.CDLL(_libpath)
_c_api.ftlCreateWriteStream.restype = ctypes.c_void_p
......@@ -70,12 +79,22 @@ _c_api.ftlDestroyStream.argtypes = [ctypes.c_void_p]
def _ftl_check(retval):
if retval != 0:
raise Exception("FTL api returned %i" % retval)
raise FTLException(retval ,"FTL api returned %i" % retval)
class FTLStreamWriter:
def __init__(self, fname):
self._sources = {}
self._instance = _c_api.ftlCreateWriteStream(bytes(fname, "utf8"))
if isinstance(fname, str):
fname_ = bytes(fname, sys.getdefaultencoding())
elif isinstance(fname, bytes):
fname_ = fname
else:
raise TypeError()
self._instance = _c_api.ftlCreateWriteStream(fname_)
if self._instance is None:
raise Exception("Error: ftlCreateWriteStream")
......@@ -148,9 +167,11 @@ class FTLStreamWriter:
func = _c_api.ftlIntrinsicsWriteLeft if channel == Channel.Calibration else _c_api.ftlIntrinsicsWriteRight
_ftl_check(func(self._instance, source, camera.width, camera.height,
camera.fx, camera.cx, camera.cy, camera.baseline,
camera.min_depth, camera.max_depth))
_ftl_check(func(self._instance, source,
int(camera.width), int(camera.height),
float(camera.fx), float(camera.cx),
float(camera.cy), float(camera.baseline),
float(camera.min_depth), float(camera.max_depth)))
self._sources[source] = camera
......@@ -168,6 +189,8 @@ class FTLStreamWriter:
def write(self, source, channel, data):
""" Write data to stream """
source = int(source)
channel = Channel(channel)
if channel in [Channel.Calibration, Channel.Calibration2]:
self._write_calibration(source, channel, data)
......
from typing import NamedTuple
from enum import IntEnum
class FTLException(Exception):
def __init__(self, code, message):
self.code = code
self.message = message
class Pipeline(IntEnum):
DEPTH = 0
RECONSTRUCT = 1
......
import setuptools
with open("README.md"), "r") as fh:
long_description = fh.read()
setuptools.setup(
name="ftl-python",
version="0.0.1", # TODO: CMAKE
long_description=long_description,
long_description_content_type="text/markdown",
packages=setuptools.find_packages(),
classifiers=[
"Programming Language :: Python :: 3"
],
python_requires='>=3.6',
)
import unittest
import tempfile
import os
from ftl import FTLStreamWriter
from ftl.types import Channel, Camera, FTLException
import numpy as np
class TestStreamWriter(unittest.TestCase):
def test_create_and_delete_file(self):
""" Test constructor and destructor (empty file) """
with tempfile.NamedTemporaryFile(suffix=".ftl") as f:
stream = FTLStreamWriter(f.name)
stream.__del__()
def test_write_frames_float32_1080p(self):
""" Write random image, correct types and values """
with tempfile.NamedTemporaryFile(suffix=".ftl") as f:
stream = FTLStreamWriter(f.name)
calib = Camera(700.0, 700.0, 960.0, 540.0, 1920, 1080, 0.0, 10.0, 0.20, 0.0)
im = np.random.rand(1080, 1920, 3).astype(np.float32)
stream.write(0, Channel.Calibration, calib)
stream.write(0, Channel.Colour, im)
def test_write_calib_wrong_compatible_type(self):
""" Write calibration with incorrect but compatible types (float/int) """
with tempfile.NamedTemporaryFile(suffix=".ftl") as f:
stream = FTLStreamWriter(f.name)
calib = Camera(700, 700.0, 960, 540.0, 1920.0, 1080, 0, 10.0, 0.2, 0)
stream.write(0, Channel.Calibration, calib)
def test_write_calib_wrong_incompatible_type(self):
""" Write calibration with incorrect and incompatible types """
with tempfile.NamedTemporaryFile(suffix=".ftl") as f:
stream = FTLStreamWriter(f.name)
calib = Camera("foo", "bar", 960, 540.0, 1920.0, 1080, 0, 10.0, 0.2, 0)
with self.assertRaises(ValueError):
stream.write(0, Channel.Calibration, calib)
def test_empty_nextframe(self):
""" Call nextframe() on empty stream """
with tempfile.NamedTemporaryFile(suffix=".ftl") as f:
stream = FTLStreamWriter(f.name)
with self.assertRaises(FTLException):
stream.next_frame()
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment