Skip to content

Commit

Permalink
Implement (kind of) queue
Browse files Browse the repository at this point in the history
  • Loading branch information
noituri committed Sep 19, 2024
1 parent db2c5d7 commit 8c93f92
Show file tree
Hide file tree
Showing 14 changed files with 176 additions and 82 deletions.
5 changes: 0 additions & 5 deletions ts/@live-compositor/browser-render/src/renderer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ export type RendererOptions = {
streamFallbackTimeoutMs: number;
};

export type Framerate = {
num: number;
den: number;
};

export type FrameSet = {
ptsMs: number;
frames: { [id: string]: Frame };
Expand Down
3 changes: 2 additions & 1 deletion ts/@live-compositor/core/src/api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Api } from 'live-compositor';
import { CompositorManager } from './compositorManager';
import { RegisterOutputRequest } from './api/output';
import { RegisterInputRequest } from './api/input';

export { Api };

Expand Down Expand Up @@ -41,7 +42,7 @@ export class ApiClient {
});
}

public async registerInput(inputId: string, request: Api.RegisterInput): Promise<object> {
public async registerInput(inputId: string, request: RegisterInputRequest): Promise<object> {
return this.serverManager.sendRequest({
method: 'POST',
route: `/api/input/${encodeURIComponent(inputId)}/register`,
Expand Down
35 changes: 26 additions & 9 deletions ts/@live-compositor/core/src/api/input.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
import { Api } from '../api';
import { RegisterInput, Inputs } from 'live-compositor';

export function intoRegisterInput(input: RegisterInput): Api.RegisterInput {
if (input.type === 'mp4') {
return intoMp4RegisterInput(input);
} else if (input.type === 'rtp_stream') {
return intoRtpRegisterInput(input);
} else {
throw new Error(`Unknown input type ${(input as any).type}`);
export type RegisterInputRequest = Api.RegisterInput | RegisterBytesInput;

export type RegisterBytesInput = {
type: 'bytes';
video: Inputs.InputBytesVideoOptions;
};

export function intoRegisterInput(input: RegisterInput): RegisterInputRequest {
switch (input.type) {
case 'mp4':
return intoMp4RegisterInput(input);
case 'rtp_stream':
return intoRtpRegisterInput(input);
case 'bytes':
return intoBytesRegisterInput(input);
default:
throw new Error(`Unknown input type ${(input as any).type}`);
}
}

function intoMp4RegisterInput(input: Inputs.RegisterMp4Input): Api.RegisterInput {
function intoMp4RegisterInput(input: Inputs.RegisterMp4Input): RegisterInputRequest {
return {
type: 'mp4',
url: input.url,
Expand All @@ -21,7 +31,7 @@ function intoMp4RegisterInput(input: Inputs.RegisterMp4Input): Api.RegisterInput
};
}

function intoRtpRegisterInput(input: Inputs.RegisterRtpInput): Api.RegisterInput {
function intoRtpRegisterInput(input: Inputs.RegisterRtpInput): RegisterInputRequest {
return {
type: 'rtp_stream',
port: input.port,
Expand All @@ -33,6 +43,13 @@ function intoRtpRegisterInput(input: Inputs.RegisterRtpInput): Api.RegisterInput
};
}

function intoBytesRegisterInput(input: Inputs.RegisterBytesInput): RegisterInputRequest {
return {
type: 'bytes',
video: input.video,
};
}

function intoInputAudio(audio: Inputs.InputRtpAudioOptions): Api.InputRtpAudioOptions {
if (audio.decoder === 'opus') {
return {
Expand Down
12 changes: 7 additions & 5 deletions ts/@live-compositor/core/src/api/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ export type RegisterOutputRequest = Api.RegisterOutput | RegisterBytesOutput;

export type RegisterBytesOutput = {
type: 'bytes';
video?: {
format: OutputBytesFrameFormat;
resolution: Api.Resolution;
initial: Api.Video;
};
video?: OutputBytesVideoOptions;
};

export type OutputBytesVideoOptions = {
format: OutputBytesFrameFormat;
resolution: Api.Resolution;
initial: Api.Video;
};

export function intoRegisterOutput(
Expand Down
1 change: 1 addition & 0 deletions ts/@live-compositor/core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export { ApiClient, ApiRequest } from './api';
export { LiveCompositor, createLiveCompositor } from './compositor';
export { CompositorManager } from './compositorManager';
export { RegisterInputRequest, RegisterBytesInput } from './api/input';
export { RegisterOutputRequest, RegisterBytesOutput } from './api/output';
57 changes: 57 additions & 0 deletions ts/@live-compositor/web/src/compositor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { ImageSpec, InputId, OutputId, Renderer } from '@live-compositor/browser-render';
import { LiveCompositor as CoreLiveCompositor, createLiveCompositor } from '@live-compositor/core';
import WasmInstance from './manager/wasmInstance';
import { RegisterInput, RegisterOutput } from 'live-compositor';
import { Queue } from './queue';
import Input from './input';
import Output from './output';

export type LiveCompositorOptions = {
framerate: Framerate;
streamFallbackTimeoutMs: number;
};

export type Framerate = {
num: number;
den: number;
};

export default class LiveCompositor {
private compositor: CoreLiveCompositor;
private renderer: Renderer;
private framerate: Framerate;
private inputs: Map<InputId, Input>;
private outputs: Map<OutputId, Output>;

private constructor(compositor: CoreLiveCompositor, renderer: Renderer, framerate: Framerate) {
this.compositor = compositor;
this.renderer = renderer;
this.framerate = framerate;
this.inputs = new Map();
this.outputs = new Map();
}

public static async create(options: LiveCompositorOptions): Promise<LiveCompositor> {
const renderer = await Renderer.create({
streamFallbackTimeoutMs: options.streamFallbackTimeoutMs,
});
const compositor = await createLiveCompositor(new WasmInstance(renderer));
return new LiveCompositor(compositor, renderer, options.framerate);
}

public async registerOutput(outputId: string, request: RegisterOutput): Promise<object> {
return this.compositor.registerOutput(outputId, request);
}

public async registerInput(inputId: string, request: RegisterInput): Promise<object> {
return this.compositor.registerInput(inputId, request);
}

public async registerImage(imageId: string, request: ImageSpec): Promise<object> {
return this.compositor.registerImage(imageId, request);
}

public async start(): Promise<void> {
Queue.start(this.framerate, this.inputs, this.outputs, this.renderer);
}
}
63 changes: 2 additions & 61 deletions ts/@live-compositor/web/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,63 +1,4 @@
import { LiveCompositor as CoreLiveCompositor, CompositorManager } from '@live-compositor/core';
import WasmInstance from './manager/wasmInstance';
import { Api, RegisterInput, RegisterOutput } from 'live-compositor';
import LiveCompositor from './compositor';

export { WasmInstance };

export default class LiveCompositor extends CoreLiveCompositor {
private constructor(manager: CompositorManager) {
super(manager);
}

public static async create(manager: CompositorManager): Promise<LiveCompositor> {
const compositor = new LiveCompositor(manager);
await compositor['setupInstance']();
return compositor;
}

public override async registerOutput(outputId: string, request: RegisterOutput): Promise<object> {
if (request.type !== 'bytes') {
throw `Output type "${request.type}" is unsupported on web`;
}

return super.registerOutput(outputId, request);
}

public override async registerInput(inputId: string, request: RegisterInput): Promise<object> {
if (request.type !== 'bytes') {
throw `Input type "${request.type}" is unsupported on web`;
}

return super.registerInput(inputId, request);
}

public override async registerImage(imageId: string, request: Api.ImageSpec): Promise<object> {
if (request.path) {
throw "Image's `path` field is not supported on web";
}

return super.registerImage(imageId, request);
}

public override async registerShader(
_shaderId: string,
_request: Api.ShaderSpec
): Promise<object> {
throw 'Shaders are unsupported';
}

public override async unregisterShader(_shaderId: string): Promise<object> {
throw 'Shaders are unsupported';
}

public override async registerWebRenderer(
_instanceId: string,
_request: Api.WebRendererSpec
): Promise<object> {
throw 'Web renderers are unsupported';
}

public override async unregisterWebRenderer(_instanceId: string): Promise<object> {
throw 'Web renderers are unsupported';
}
}
export { WasmInstance, LiveCompositor };
5 changes: 5 additions & 0 deletions ts/@live-compositor/web/src/input.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import MP4Input from './mp4/input';

type Input = { type: 'mp4' } & MP4Input;

export default Input;
4 changes: 3 additions & 1 deletion ts/@live-compositor/web/src/manager/wasmInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class WasmInstance implements CompositorManager {
private renderer: Renderer;
private outputs: Map<string, Output>;

// TODO(noituri): I'm not sure if renderer should be passed here
public constructor(renderer: Renderer) {
this.renderer = renderer;
this.outputs = new Map();
Expand Down Expand Up @@ -50,9 +51,10 @@ class WasmInstance implements CompositorManager {

private handleInputRequest(inputId: string, operation: string): void {
switch (operation) {
case 'register':
case 'register': {
this.renderer.registerInput(inputId);
break;
}
case 'unregister':
this.renderer.unregisterInput(inputId);
break;
Expand Down
1 change: 1 addition & 0 deletions ts/@live-compositor/web/src/mp4/decoder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class MP4Decoder {}
1 change: 1 addition & 0 deletions ts/@live-compositor/web/src/mp4/demuxer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class MP4Demuxer {}
9 changes: 9 additions & 0 deletions ts/@live-compositor/web/src/mp4/input.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Frame } from '@live-compositor/browser-render';

export default class MP4Input {
// TODO(noituri): demuxer & decoder fields

public nextFrame(): Frame | undefined {
return undefined;
}
}
9 changes: 9 additions & 0 deletions ts/@live-compositor/web/src/output.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Frame } from '@live-compositor/browser-render';

// TODO(noituri): Implement output
// - Canvas output
type Output = object;

export async function sendOutput(_output: Output, _frame: Frame): Promise<void> {}

export default Output;
53 changes: 53 additions & 0 deletions ts/@live-compositor/web/src/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { InputId, OutputId, Renderer } from '@live-compositor/browser-render';
import Input from './input';
import { Framerate } from './compositor';
import Output, { sendOutput } from './output';

export class Queue {
private inputs: Map<InputId, Input>;
private outputs: Map<OutputId, Output>;
private renderer: Renderer;
private currentPts: number;

public constructor(
inputs: Map<InputId, Input>,
outputs: Map<OutputId, Output>,
renderer: Renderer
) {
this.inputs = inputs;
this.outputs = outputs;
this.renderer = renderer;
this.currentPts = 0;
}

public static start(
framerate: Framerate,
inputs: Map<InputId, Input>,
outputs: Map<OutputId, Output>,
renderer: Renderer
) {
const queue = new Queue(inputs, outputs, renderer);
const tickDuration = framerate.den / framerate.num;
setInterval(() => {
queue.onTick();
queue.currentPts += tickDuration;
}, tickDuration);
}

private onTick() {
const frames = {};
for (const [_inputId, _input] of this.inputs) {
// TODO(noituri): Populate input frames
}

const outputs = this.renderer.render({
ptsMs: this.currentPts,
frames: frames,
});

for (const [outputId, frame] of Object.entries(outputs.frames)) {
const output = this.outputs.get(outputId);
void sendOutput(output!, frame);
}
}
}

0 comments on commit 8c93f92

Please sign in to comment.