feat: Seeking support

This commit is contained in:
2024-06-21 12:17:32 +07:00
parent a8062b5fe2
commit 5e5a1091a5
4 changed files with 231 additions and 20 deletions

View File

@@ -16,7 +16,7 @@ export class Player {
this.providers = providers;
}
public async play(url: string) {
public async play(url: string, seekTime: number = 0) {
if (!this.currentProvider) {
const providers = this.providers.filter((provider) =>
provider.canPlay(url),
@@ -29,15 +29,25 @@ export class Player {
this.currentProvider = providers[0];
}
// TODO: Handle if already playing
const information = await this.currentProvider.getInformation(url);
//console.log(information);
if (information.livestream)
// TODO: Implement livestreams
throw new Error("Livestreams are not supported yet");
else this._stream = new SeekableStream(information, url);
// If already playing, destroy the current stream
if (this._stream) {
this._stream.destroy();
}
this._stream = new SeekableStream(information, url, seekTime);
}
public async seek(time: number) {
if (!this._stream) throw new Error("No stream to seek");
await this.play(this._stream.referenceUrl, time);
}
public getCurrentSampleRate() {

View File

@@ -1,7 +1,7 @@
import { Readable } from "stream";
import { AudioInformation } from "../providers/base";
import { Timer } from "./Timer";
import { WebmSeeker } from "./WebmSeeker";
import { WebmSeeker, WebmSeekerState } from "./WebmSeeker";
import { getStream } from "./Request";
const DEBUG_SIMULATE_FAILURE = false;
@@ -9,25 +9,30 @@ const DEBUG_SIMULATE_FAILURE = false;
export class SeekableStream {
private id: string;
public information: AudioInformation;
private referenceUrl: string;
public readonly referenceUrl: string;
public stream: WebmSeeker;
private timer: Timer;
private ticking: boolean = false;
private locked: boolean = false;
private firstTick: boolean = true;
private destroyed: boolean = false;
private bytesReceived: number = 0;
private bytesRead: number = 0;
private bytesPerRequestLimit = 1 * 1024 * 1024; // 1 MB per request
constructor(information: AudioInformation, referenceUrl: string) {
constructor(
information: AudioInformation,
referenceUrl: string,
seekTime: number = 0,
) {
this.id = Math.random().toString(36).substring(8);
this.information = information;
this.referenceUrl = referenceUrl;
this.stream = new WebmSeeker(0, {
this.stream = new WebmSeeker(seekTime, {
highWaterMark: 5 * 1024 * 1024,
});
@@ -45,11 +50,10 @@ export class SeekableStream {
this.timer.start();
this.tick();
if (seekTime !== 0) this.seek();
}
private async tick() {
console.log(`[${this.id}] > Ticking...`);
if (this.destroyed) {
console.debug(
`[${this.id}] > Stream already destroyed, not ticking`,
@@ -60,6 +64,81 @@ export class SeekableStream {
this.debugLog();
if (this.firstTick) {
this.firstTick = false;
this.locked = true;
// Get header
const rangeHeader = `bytes=${this.bytesReceived}-${this.information.indexRange.end}`;
const request = await getStream(this.information.url, {
headers: {
range: rangeHeader,
},
}).catch((err: Error) => err);
console.debug(
`[${this.id}] > Request first tick header range | ${rangeHeader}`,
);
if (request instanceof Error) {
console.debug(
`[${this.id}] > Request first tick error: ${request.message}`,
);
await this.refreshInformation();
this.timer.reset();
this.tick();
this.locked = false;
return;
}
// Simulate failed request 25% of the time
if (DEBUG_SIMULATE_FAILURE && Math.random() < 0.25) {
console.debug(`[${this.id}] > Simulating request failure`);
request.status = 416;
}
if (request.status >= 400) {
console.debug(
`[${this.id}] > Request first tick failed with status ${request.status}`,
);
await this.refreshInformation();
this.timer.reset();
this.tick();
this.locked = false;
return;
}
if (!request.data) {
this.timer.reset();
this.tick();
this.locked = false;
return;
}
console.debug(`[${this.id}] > Request first tick successful`);
const incomingStream = request.data;
incomingStream.on("data", (chunk: any) => {
this.stream.push(chunk);
this.bytesReceived += chunk.length;
});
incomingStream.on("end", async () => {
console.debug(`[${this.id}] > Header received, unlocking`);
this.locked = false;
incomingStream.destroy();
this.debugLog();
this.locked = false;
});
return;
}
const isBufferSufficient =
this.stream.readableLength >= this.bytesPerRequestLimit;
@@ -147,6 +226,7 @@ export class SeekableStream {
}
if (
!this.locked &&
this.bytesReceived >= this.information.fileSize &&
this.stream.readableLength === 0
) {
@@ -154,8 +234,78 @@ export class SeekableStream {
this.destroy();
return;
}
}
console.debug(`[${this.id}] > Tick completed`);
public async seek(): Promise<boolean> {
const parse = await new Promise(async (resolve, reject) => {
if (!this.stream.headerparsed) {
console.debug(`[${this.id}] > Parsing header...`);
console.debug(
`[${this.id}] > Requesting range | 0-${this.information.indexRange.end}`,
);
let req = await getStream(this.information.url, {
headers: {
range: `bytes=0-${this.information.indexRange.end}`,
},
}).catch((err: Error) => err);
if (req instanceof Error || req.status >= 400) {
reject(false);
return;
}
const incomingStream = req.data;
incomingStream.pipe(this.stream, { end: false });
this.stream.once("headComplete", () => {
console.debug(`[${this.id}] > Header parsed, unpiping...`);
incomingStream.unpipe(this.stream);
incomingStream.destroy();
this.stream.state = WebmSeekerState.READING_DATA;
resolve(true);
});
}
resolve(true);
}).catch((err) => err);
if (parse instanceof Error || parse === false) {
await this.refreshInformation();
this.timer.reset();
return this.seek();
}
// Wait for lock to be released
while (this.locked) {
await new Promise((resolve) => setTimeout(resolve, 100));
}
this.locked = true;
const bytes = this.stream.seek(this.information.fileSize);
if (bytes instanceof Error) {
// TODO: Handle seek error
this.destroy();
return false;
}
console.debug(
`[${this.id}] > Seeking... Byte located at ${bytes} / ${this.information.fileSize}`,
);
// Offset the counter
this.bytesReceived = bytes;
this.bytesRead = bytes;
this.stream.seekfound = false;
this.locked = false;
// Tick to start fetching data
this.timer.reset();
this.tick();
return true;
}
private getCurrentTimestamp() {
@@ -177,10 +327,14 @@ export class SeekableStream {
this.stream.on(event, listener);
}
private destroy() {
public destroy() {
console.debug(`[${this.id}] > Stream destroyed`);
if (!this.timer.isDestroyed()) this.timer.destroy();
if (this.stream) this.stream.destroy();
if (this.stream) {
this.stream.end();
this.stream.destroy();
}
this.destroyed = true;
}

View File

@@ -766,6 +766,7 @@ export class WebmSeeker extends Duplex {
position = 0;
let time_left = (this.sec - this.time) * 1000 || 0;
time_left = Math.round(time_left / 20) * 20;
if (!this.header.segment.cues) return new Error("Failed to Parse Cues");
for (let i = 0; i < this.header.segment.cues.length; i++) {
@@ -781,6 +782,14 @@ export class WebmSeeker extends Duplex {
} else continue;
}
if (clusterlength === 0) return position;
console.debug(
`[WebmSeeker] > Seeking to ${this.sec} seconds, ${
this.offset +
Math.round(position + (time_left / 20) * (clusterlength / 500))
} bytes`,
);
return (
this.offset +
Math.round(position + (time_left / 20) * (clusterlength / 500))
@@ -810,6 +819,8 @@ export class WebmSeeker extends Duplex {
private readHead(): Error | undefined {
if (!this.chunk) return new Error("Chunk is missing");
console.debug("[WebmSeeker] > Reading Head");
while (this.chunk.length > this.cursor) {
const oldCursor = this.cursor;
const id = this.vint_length;
@@ -870,6 +881,9 @@ export class WebmSeeker extends Duplex {
}
this.remaining = this.chunk.slice(this.cursor);
this.cursor = 0;
console.debug("[WebmSeeker] > Finished Reading Head");
this.emit("headComplete");
}
private readTag(): Error | undefined {