diff --git a/packages/client/src/views/Virtuals/Virtuals.tsx b/packages/client/src/views/Virtuals/Virtuals.tsx new file mode 100644 index 0000000000000000000000000000000000000000..780da1b5aa867395579bd9c48f6c649855d76a34 --- /dev/null +++ b/packages/client/src/views/Virtuals/Virtuals.tsx @@ -0,0 +1,6 @@ +import React from 'react'; +import {Table} from '../../components/Table'; + +export function Virtuals() { + return +} \ No newline at end of file diff --git a/packages/client/src/views/Virtuals/index.ts b/packages/client/src/views/Virtuals/index.ts new file mode 100644 index 0000000000000000000000000000000000000000..2e0f0b047688d66e70c2392f2e3b7f0e49ed10e4 --- /dev/null +++ b/packages/client/src/views/Virtuals/index.ts @@ -0,0 +1 @@ +export * from './Virtuals'; diff --git a/packages/playback-service/.eslintrc.json b/packages/playback-service/.eslintrc.json new file mode 100644 index 0000000000000000000000000000000000000000..a9e9269d625c45f8b692cb7db5ee9bec41c10199 --- /dev/null +++ b/packages/playback-service/.eslintrc.json @@ -0,0 +1,28 @@ +{ + "env": { + "browser": true, + "es2021": true + }, + "extends": [ + "plugin:react/recommended", + "airbnb" + ], + "parser": "@typescript-eslint/parser", + "parserOptions": { + "ecmaFeatures": { + "jsx": true + }, + "ecmaVersion": 12, + "sourceType": "module" + }, + "plugins": [ + "react", + "@typescript-eslint" + ], + "rules": { + "semi": 0, + "import/extensions": ["error", "never"], + "import/no-unresolved": 0, + "class-methods-use-this": 0 + } +} diff --git a/packages/playback-service/README.md b/packages/playback-service/README.md new file mode 100644 index 0000000000000000000000000000000000000000..3a48826fd16e6afdedc21989cc5207cf926ce788 --- /dev/null +++ b/packages/playback-service/README.md @@ -0,0 +1,3 @@ +# Playback Service + +Create streams from files to playback previous recordings. diff --git a/packages/playback-service/_package.json b/packages/playback-service/_package.json new file mode 100644 index 0000000000000000000000000000000000000000..8688eaae142c405c88ac1ce67d996645c8a8fe31 --- /dev/null +++ b/packages/playback-service/_package.json @@ -0,0 +1,52 @@ +{ + "name": "@ftl/playback-service", + "version": "1.0.0", + "main": "./dist/index.js", + "scripts": { + "clean": "rm -rf ./dist && rm -rf ./node_modules", + "build": "tsc", + "start": "node ./dist/index.js", + "start:dev": "ts-node-dev --respawn --transpile-only --watch src --ignore-watch node_modules src/index.ts" + }, + "devDependencies": { + "@ftl/types": "1.0.0", + "@types/compression": "^1.7.2", + "@types/express": "^4.17.13", + "@types/jest": "^26.0.24", + "@types/multer": "^1.4.7", + "@types/node": "^16.4.9", + "@typescript-eslint/eslint-plugin": "^4.31.1", + "@typescript-eslint/parser": "^4.31.1", + "eslint": "^7.32.0", + "eslint-config-airbnb": "^18.2.1", + "eslint-plugin-import": "^2.24.2", + "eslint-plugin-jsx-a11y": "^6.4.1", + "eslint-plugin-react": "^7.25.2", + "eslint-plugin-react-hooks": "^4.2.0", + "ts-node-dev": "^1.1.8", + "typescript": "^4.3.5" + }, + "dependencies": { + "@ftl/api": "1.0.0", + "@ftl/common": "1.0.0", + "@ftl/types": "1.0.0", + "@tsed/ajv": "^6.62.0", + "@tsed/common": "^6.62.0", + "@tsed/core": "^6.62.0", + "@tsed/di": "^6.62.0", + "@tsed/exceptions": "^6.62.0", + "@tsed/json-mapper": "^6.62.0", + "@tsed/logger": "^5.17.0", + "@tsed/mongoose": "^6.62.0", + "@tsed/platform-express": "^6.62.0", + "@tsed/schema": "^6.62.0", + "ajv": "^8.6.3", + "body-parser": "^1.19.0", + "compression": "^1.7.4", + "cookie-parser": "^1.4.5", + "express": "4", + "express-session": "^1.17.2", + "msgpack5": "^5.3.2", + "uuid": "^8.3.2" + } +} diff --git a/packages/playback-service/src/controllers/playback.ts b/packages/playback-service/src/controllers/playback.ts new file mode 100644 index 0000000000000000000000000000000000000000..6252d55dd9039cbf5be102850515417a29459e0e --- /dev/null +++ b/packages/playback-service/src/controllers/playback.ts @@ -0,0 +1,38 @@ +/* eslint-disable class-methods-use-this */ +import { + BodyParams, Get, Inject, Post, PathParams, Put, +} from '@tsed/common'; +import { Description, Groups } from '@tsed/schema'; +import { AccessToken } from '@ftl/types'; +import { Controller, UseToken } from '@ftl/common'; +import PlaybackService from '../services/playback'; +import Playback from '../models/playback'; + +@Controller('/playback') +export default class PlaybackController { + @Inject() + playbackService: PlaybackService; + + @Get('/') + @Description('Get all active playbacks for user') + async find( + @UseToken() token: AccessToken, + ): Promise<Playback[]> { + return this.playbackService.find(token.user?.id); + } + + @Post('/') + async create(@BodyParams() @Groups('creation') request: Playback, @UseToken() token: AccessToken): Promise<Playback> { + return this.playbackService.create(request, token.user?.id); + } + + @Get('/:id') + async get(@PathParams('id') id: string, @UseToken() token: AccessToken): Promise<Playback> { + return this.playbackService.get(id, token.user?.id); + } + + @Put('/:id') + async update(@PathParams('id') id: string, @BodyParams() @Groups('update') request: Playback, @UseToken() token: AccessToken): Promise<Playback> { + return this.playbackService.update(id, token.user?.id, request); + } +} diff --git a/packages/playback-service/src/index.ts b/packages/playback-service/src/index.ts new file mode 100644 index 0000000000000000000000000000000000000000..9a3772c7dbd43d809cff0a707ccb0521ef8a6de9 --- /dev/null +++ b/packages/playback-service/src/index.ts @@ -0,0 +1,19 @@ +import { $log } from '@tsed/common'; +import { PlatformExpress } from '@tsed/platform-express'; +import Server from './server'; + +async function bootstrap() { + try { + $log.debug('Start server...'); + const platform = await PlatformExpress.bootstrap(Server, { + // extra settings + }); + + await platform.listen(); + $log.debug('Server initialized'); + } catch (er) { + $log.error(er); + } +} + +bootstrap(); diff --git a/packages/playback-service/src/models/archive.ts b/packages/playback-service/src/models/archive.ts new file mode 100644 index 0000000000000000000000000000000000000000..f94fe15144732fc4750b87a642f5d84a672b6c0b --- /dev/null +++ b/packages/playback-service/src/models/archive.ts @@ -0,0 +1,43 @@ +import { + Required, Groups, DateTime, Default, CollectionOf, RequiredGroups, +} from '@tsed/schema'; + +export default class Archive { + @Required() + @Groups('!update') + id: string; + + @Required() + @Groups('!update') + @DateTime() + @Default(new Date()) + created: Date; + + @Required() + @Groups('!update') + owner: string; + + @Required() + @Groups('!update') + filename: string; + + @Required() + @Groups('!update') + size: number; + + @Required() + @Groups('!update') + duration: number; + + @Required() + @RequiredGroups('!update') + @CollectionOf(String) + tags: string[]; + + @Required() + @RequiredGroups('!update') + name: string; + + // Thumbnail + // Description +} diff --git a/packages/playback-service/src/models/playback.ts b/packages/playback-service/src/models/playback.ts new file mode 100644 index 0000000000000000000000000000000000000000..04f7f6864c18f1baf553d27d1294b448a4e5aead --- /dev/null +++ b/packages/playback-service/src/models/playback.ts @@ -0,0 +1,39 @@ +import { + Property, Required, Groups, DateTime, Default, +} from '@tsed/schema'; + +export default class Playback { + @Required() + @Groups('!creation', '!update') + id: string; + + @Required() + archiveID: string; + + @Property() + @Groups('!creation', '!update') + streams: string; + + @Required() + @DateTime() + @Default(new Date()) + startTime: Date; + + @Required() + @Groups('!creation', '!update') + owner: string; + + filename?: string; + + @Property() + @Groups('!creation', '!update') + size?: number; + + @Property() + @Groups('!creation') + status: 'playing' | 'paused' | 'stopped' | 'finished'; + + @Property() + @Groups('!creation', '!update') + duration: number; +} diff --git a/packages/playback-service/src/server.ts b/packages/playback-service/src/server.ts new file mode 100644 index 0000000000000000000000000000000000000000..e96046c97b9dc33251ef33dbf9645633ad283914 --- /dev/null +++ b/packages/playback-service/src/server.ts @@ -0,0 +1,44 @@ +import { Configuration, Inject, PlatformApplication } from '@tsed/common'; +import express from 'express'; +import compress from 'compression'; +import cookieParser from 'cookie-parser'; +import { redisStreamListen, redisSetGroup } from '@ftl/common'; + +const rootDir = __dirname; + +@Configuration({ + rootDir, + acceptMimes: ['application/json'], + port: 8080, + debug: false, + mount: { + '/v1': `${rootDir}/controllers/**/*.ts`, + }, +}) +export default class Server { + @Inject() + app: PlatformApplication; + + @Configuration() + settings: Configuration; + + public $beforeInit() { + redisSetGroup('playback-service'); + } + + public $afterInit() { + redisStreamListen('consumer1'); + } + + /** + * This method let you configure the express middleware required by your application to works. + * @returns {Server} + */ + public $beforeRoutesInit(): void | Promise<any> { + this.app + .use(compress({})) + .use(cookieParser()) + .use(express.urlencoded()) + .use(express.json()); + } +} diff --git a/packages/playback-service/src/services/playback.ts b/packages/playback-service/src/services/playback.ts new file mode 100644 index 0000000000000000000000000000000000000000..d6800f77099639ccd71224e4ba6ed02b2aa1a386 --- /dev/null +++ b/packages/playback-service/src/services/playback.ts @@ -0,0 +1,232 @@ +import { Service } from '@tsed/common'; +import { $log } from '@tsed/logger'; +import { v4 as uuidv4 } from 'uuid'; +import { + redisAddItem, + redisTopItems, + redisSet, + redisMGet, + redisSubscribe, + redisPublish, + redisUnsubscribe, + redisRemoveItem, + redisGet, +} from '@ftl/common'; +import { sendRecordingEvent } from '@ftl/api'; +import fs from 'fs/promises'; +import { BadRequest, NotFound } from '@tsed/exceptions'; +import Playback from '../models/playback'; + +const { encode, decode } = require('msgpack5')(); + +const HOUR = 60 * 60 * 1000; +const DAY = 24 * HOUR; +const GIGABYTE = 1024 * 1024 * 1024; +const BASEPATH = '/data/ftl'; + +interface ActiveRecording { + id: string; + owner: string; + streams: string[]; + streamCallbacks: Function[]; + channels: Set<number>; + fd: fs.FileHandle; + size: number; + timestamp: number; + queue: Map<number, Uint8Array[]>; +} + +const activeRecordings = new Map<string, ActiveRecording>(); + +function createHeader(version: number) { + const header = Buffer.alloc(69); + header.writeUInt8(70, 0); + header.writeUInt8(84, 1); + header.writeUInt8(76, 2); + header.writeUInt8(70, 3); + header.writeUInt8(version, 4); + header.writeBigUInt64BE(BigInt(0), 5 + 0 * 8); + header.writeBigUInt64BE(BigInt(0), 5 + 1 * 8); + header.writeBigUInt64BE(BigInt(0), 5 + 2 * 8); + header.writeBigUInt64BE(BigInt(0), 5 + 3 * 8); + header.writeBigUInt64BE(BigInt(0), 5 + 4 * 8); + header.writeBigUInt64BE(BigInt(0), 5 + 5 * 8); + header.writeBigUInt64BE(BigInt(0), 5 + 6 * 8); + header.writeBigUInt64BE(BigInt(0), 5 + 7 * 8); + return header; +} + +async function createFTLFile(filename: string) { + const header = createHeader(5); + const fd = await fs.open(filename, 'w'); + await fs.appendFile(fd, header); + return fd; +} + +function sendRequest(uri: string, fsid: number, channel: number) { + const latency = 0; + const spkt = [1, fsid, 255, channel, 1]; + const pkt = [255, 7, 35, 255, 0, Buffer.alloc(0)]; + redisPublish(`stream-out:${uri}`, encode([latency, spkt, pkt])); +} + +async function processPackets(rec: ActiveRecording, entry: Recording) { + // Write data in correct timestamp order + const timestamps = Array.from(rec.queue.keys()).sort(); + // Most recent timestamp may be incomplete so skip that one. + if (timestamps.length > 1) { + const q = rec.queue.get(timestamps[0]); + rec.queue.delete(timestamps[0]); + const proms = q.map((buf) => rec.fd.appendFile(buf)); + Promise.all(proms).then(() => processPackets(rec, entry)); + } else if (entry.status === 'stopped') { + // Remove the recording. + $log.info('Remove recording entry...', rec); + rec.streamCallbacks.forEach((cb, ix) => redisUnsubscribe(`stream-in:${rec.streams[ix]}`, cb)); + activeRecordings.delete(rec.id); + await redisRemoveItem(`recordings:list:${rec.owner}`, rec.id); + await rec.fd.close(); + sendRecordingEvent({ + id: rec.id, + owner: rec.owner, + filename: entry.filename, + event: 'complete', + size: rec.size, + duration: entry.duration, + date: entry.startTime, + }); + } + + // Send a request to every stream for each recorded channel to ensure data is sent. + // eslint-disable-next-line no-restricted-syntax + for (const s of rec.streams) { + // eslint-disable-next-line no-restricted-syntax + for (const c of rec.channels) { + sendRequest(s, 0, c); + } + } +} + +async function processRecording(rec: ActiveRecording) { + const key = `recording:${rec.owner}:${rec.id}`; + const entry = await redisGet<Playback>(key); + + entry.size = rec.size; + entry.duration = Date.now() - new Date(entry.startTime).getTime(); + + if (entry.size > 3 * GIGABYTE || entry.duration > 2 * HOUR) { + entry.status = 'stopped'; + } + + processPackets(rec, entry); + await redisSet<Playback>(key, entry); +} + +setInterval(() => { + // Write stuff to files + activeRecordings.forEach(processRecording); +}, 1000); + +@Service() +export default class PlaybackService { + async find(owner: string): Promise<Playback[]> { + if (!owner) { + throw new BadRequest('bad_owner'); + } + const ids = await redisTopItems(`recordings:list:${owner}`); + const keys = ids.map((id) => `recording:${owner}:${id}`); + const results = await redisMGet<Playback>(keys); + return results; + } + + async update(id: string, owner: string, recording: Playback): Promise<Playback> { + const key = `recording:${owner}:${id}`; + const rec = await redisGet<Playback>(key); + if (!rec) { + throw new NotFound('recording_not_found'); + } + if (recording.status) rec.status = recording.status; + await redisSet<Playback>(key, rec); + return rec; + } + + async get(id: string, owner: string): Promise<Playback> { + const key = `recording:${owner}:${id}`; + const rec = redisGet<Playback>(key); + if (!rec) { + throw new NotFound('recording_not_found'); + } + return rec; + } + + async create(recording: Playback, owner: string): Promise<Playback | null> { + const filename = `${BASEPATH}/record-${owner}-${new Date().toISOString()}.ftl`; + const newEntry: Playback = { + ...recording, + id: uuidv4(), + owner, + status: 'playing', + filename, + startTime: new Date(), + } + redisSet<Playback>(`recording:${owner}:${newEntry.id}`, newEntry, 1 * DAY); + redisAddItem(`recordings:list:${owner}`, newEntry.id, Date.now()); + + const fd = await createFTLFile(filename); + + const r: ActiveRecording = { + id: newEntry.id, + owner, + streamCallbacks: [], + channels: new Set<number>(recording.channels), + fd, + size: 0, + timestamp: 0, + queue: new Map<number, Uint8Array[]>(), + streams: recording.streams, + }; + + $log.info('Starting recording: ', filename); + + // eslint-disable-next-line no-restricted-syntax + for (const uri of recording.streams) { + const f = async (data: Buffer) => { + const [, spkt, pkt] = decode(data); + const [timestamp,,, channel] = spkt; + // TODO: Filter by FS + // Skip unwanted channels and empty data. + if ((!r.channels.has(channel) && channel < 64) || pkt[5].byteLength === 0) { + return; + } + + const reencoded = encode([spkt, pkt]); + r.timestamp = timestamp; + r.size += reencoded.byteLength; + + if (!r.queue.has(timestamp)) { + r.queue.set(timestamp, []); + } + const q = r.queue.get(timestamp); + q.push(reencoded); + }; + r.streamCallbacks.push(f); + redisSubscribe(`stream-in:${uri}`, f); + // TODO: Set a recording data channel + sendReset(uri, 255, 0); + $log.info(` -- ${uri}`); + } + activeRecordings.set(newEntry.id, r); + + sendRecordingEvent({ + id: r.id, + owner: r.owner, + filename, + event: 'start', + size: 0, + duration: 0, + date: newEntry.startTime, + }); + + return { ...newEntry, filename: undefined }; + } +} diff --git a/packages/playback-service/tsconfig.json b/packages/playback-service/tsconfig.json new file mode 100644 index 0000000000000000000000000000000000000000..e32f95bf327a9acdda1327382d4b638e57ab8576 --- /dev/null +++ b/packages/playback-service/tsconfig.json @@ -0,0 +1,22 @@ +{ + "compilerOptions": { + "module": "CommonJS", + "resolveJsonModule": true, + "esModuleInterop": true, + "target": "ES2019", + "moduleResolution": "node", + "sourceMap": true, + "outDir": "dist", + "lib": ["es2018", "dom", "esnext.asynciterable"], + "allowSyntheticDefaultImports": true, + "experimentalDecorators": true, + "emitDecoratorMetadata": true, + "declaration": true, + "types": ["jest", "node"], + "baseUrl": ".", + "jsx": "react", + "allowJs": true + }, + "include": ["./src"], + "exclude": ["**/node_modules", "**/jest.config.js", "**/*.test.ts"] +} \ No newline at end of file