From 5e5a1091a5af936be5ddb8af4927be7ed22c49d2 Mon Sep 17 00:00:00 2001 From: Yuzu Date: Fri, 21 Jun 2024 12:17:32 +0700 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat:=20Seeking=20support?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- example/test.ts | 47 ++++++++-- src/player/index.ts | 18 +++- src/utils/SeekableStream.ts | 172 ++++++++++++++++++++++++++++++++++-- src/utils/WebmSeeker.ts | 14 +++ 4 files changed, 231 insertions(+), 20 deletions(-) diff --git a/example/test.ts b/example/test.ts index 0b0b3aa..2a83891 100644 --- a/example/test.ts +++ b/example/test.ts @@ -1,26 +1,55 @@ -import NekoMelody from "../src"; +import NekoMelody, { Player } from "../src"; import Speaker from "speaker"; import ffmpeg from "fluent-ffmpeg"; import { YtDlpProvider } from "../src/providers"; const main = async () => { - // Create the Speaker instance - const speaker = new Speaker(); - - const videoId = "9PuudPiyma4"; + const videoId = "2gigEGxnsmo"; // Providers const providers = [new YtDlpProvider()]; const player = NekoMelody.createPlayer(providers); await player.play(`https://www.youtube.com/watch?v=${videoId}`); + playSpeaker(player); + // setTimeout(async () => { + // await player.seek(100); + // playSpeaker(player); + // }, 5000); +}; + +// TODO: player end event to automate changing the stream +let lastFFmpeg: ffmpeg.FfmpegCommand | null = null; +let lastSpeaker: Speaker | null = null; +const playSpeaker = async (player: Player) => { if (!player.stream) { console.error("No input stream"); return; } + // A function that resolves when the speaker is closed and the ffmpeg process is killed + const closeSpeaker = () => { + return new Promise((resolve) => { + if (lastSpeaker) { + lastSpeaker.on("close", () => { + resolve(); + }); + if (lastFFmpeg) lastFFmpeg.kill("SIGKILL"); + lastSpeaker.close(true); + } else { + resolve(); + } + }); + }; + + await closeSpeaker(); + + // Create the Speaker instance + const speaker = new Speaker(); + lastSpeaker = speaker; + // PCM data from stdin gets piped into the speaker const ffmpegProcess = ffmpeg() .input(player.stream) @@ -29,8 +58,12 @@ const main = async () => { .audioFrequency(44100) .on("error", (err) => { console.error("An error occurred:", err.message); - }) - .pipe(speaker, { end: true }); + }); + + // Pipe the ffmpeg output to the speaker + ffmpegProcess.pipe(speaker, { end: true }); + + lastFFmpeg = ffmpegProcess; }; main(); diff --git a/src/player/index.ts b/src/player/index.ts index faef57b..f44b052 100644 --- a/src/player/index.ts +++ b/src/player/index.ts @@ -16,7 +16,7 @@ export class Player { this.providers = providers; } - public async play(url: string) { + public async play(url: string, seekTime: number = 0) { if (!this.currentProvider) { const providers = this.providers.filter((provider) => provider.canPlay(url), @@ -29,15 +29,25 @@ export class Player { this.currentProvider = providers[0]; } - // TODO: Handle if already playing - const information = await this.currentProvider.getInformation(url); //console.log(information); if (information.livestream) // TODO: Implement livestreams throw new Error("Livestreams are not supported yet"); - else this._stream = new SeekableStream(information, url); + + // If already playing, destroy the current stream + if (this._stream) { + this._stream.destroy(); + } + + this._stream = new SeekableStream(information, url, seekTime); + } + + public async seek(time: number) { + if (!this._stream) throw new Error("No stream to seek"); + + await this.play(this._stream.referenceUrl, time); } public getCurrentSampleRate() { diff --git a/src/utils/SeekableStream.ts b/src/utils/SeekableStream.ts index 8815c31..1915b4b 100644 --- a/src/utils/SeekableStream.ts +++ b/src/utils/SeekableStream.ts @@ -1,7 +1,7 @@ import { Readable } from "stream"; import { AudioInformation } from "../providers/base"; import { Timer } from "./Timer"; -import { WebmSeeker } from "./WebmSeeker"; +import { WebmSeeker, WebmSeekerState } from "./WebmSeeker"; import { getStream } from "./Request"; const DEBUG_SIMULATE_FAILURE = false; @@ -9,25 +9,30 @@ const DEBUG_SIMULATE_FAILURE = false; export class SeekableStream { private id: string; public information: AudioInformation; - private referenceUrl: string; + public readonly referenceUrl: string; public stream: WebmSeeker; private timer: Timer; private ticking: boolean = false; private locked: boolean = false; + private firstTick: boolean = true; private destroyed: boolean = false; private bytesReceived: number = 0; private bytesRead: number = 0; private bytesPerRequestLimit = 1 * 1024 * 1024; // 1 MB per request - constructor(information: AudioInformation, referenceUrl: string) { + constructor( + information: AudioInformation, + referenceUrl: string, + seekTime: number = 0, + ) { this.id = Math.random().toString(36).substring(8); this.information = information; this.referenceUrl = referenceUrl; - this.stream = new WebmSeeker(0, { + this.stream = new WebmSeeker(seekTime, { highWaterMark: 5 * 1024 * 1024, }); @@ -45,11 +50,10 @@ export class SeekableStream { this.timer.start(); this.tick(); + if (seekTime !== 0) this.seek(); } private async tick() { - console.log(`[${this.id}] > Ticking...`); - if (this.destroyed) { console.debug( `[${this.id}] > Stream already destroyed, not ticking`, @@ -60,6 +64,81 @@ export class SeekableStream { this.debugLog(); + if (this.firstTick) { + this.firstTick = false; + this.locked = true; + + // Get header + const rangeHeader = `bytes=${this.bytesReceived}-${this.information.indexRange.end}`; + const request = await getStream(this.information.url, { + headers: { + range: rangeHeader, + }, + }).catch((err: Error) => err); + + console.debug( + `[${this.id}] > Request first tick header range | ${rangeHeader}`, + ); + + if (request instanceof Error) { + console.debug( + `[${this.id}] > Request first tick error: ${request.message}`, + ); + await this.refreshInformation(); + this.timer.reset(); + this.tick(); + + this.locked = false; + return; + } + + // Simulate failed request 25% of the time + if (DEBUG_SIMULATE_FAILURE && Math.random() < 0.25) { + console.debug(`[${this.id}] > Simulating request failure`); + request.status = 416; + } + + if (request.status >= 400) { + console.debug( + `[${this.id}] > Request first tick failed with status ${request.status}`, + ); + await this.refreshInformation(); + this.timer.reset(); + this.tick(); + + this.locked = false; + return; + } + + if (!request.data) { + this.timer.reset(); + this.tick(); + + this.locked = false; + return; + } + + console.debug(`[${this.id}] > Request first tick successful`); + + const incomingStream = request.data; + + incomingStream.on("data", (chunk: any) => { + this.stream.push(chunk); + this.bytesReceived += chunk.length; + }); + + incomingStream.on("end", async () => { + console.debug(`[${this.id}] > Header received, unlocking`); + this.locked = false; + incomingStream.destroy(); + this.debugLog(); + + this.locked = false; + }); + + return; + } + const isBufferSufficient = this.stream.readableLength >= this.bytesPerRequestLimit; @@ -147,6 +226,7 @@ export class SeekableStream { } if ( + !this.locked && this.bytesReceived >= this.information.fileSize && this.stream.readableLength === 0 ) { @@ -154,8 +234,78 @@ export class SeekableStream { this.destroy(); return; } + } - console.debug(`[${this.id}] > Tick completed`); + public async seek(): Promise { + const parse = await new Promise(async (resolve, reject) => { + if (!this.stream.headerparsed) { + console.debug(`[${this.id}] > Parsing header...`); + console.debug( + `[${this.id}] > Requesting range | 0-${this.information.indexRange.end}`, + ); + + let req = await getStream(this.information.url, { + headers: { + range: `bytes=0-${this.information.indexRange.end}`, + }, + }).catch((err: Error) => err); + + if (req instanceof Error || req.status >= 400) { + reject(false); + return; + } + + const incomingStream = req.data; + incomingStream.pipe(this.stream, { end: false }); + + this.stream.once("headComplete", () => { + console.debug(`[${this.id}] > Header parsed, unpiping...`); + incomingStream.unpipe(this.stream); + incomingStream.destroy(); + this.stream.state = WebmSeekerState.READING_DATA; + resolve(true); + }); + } + + resolve(true); + }).catch((err) => err); + + if (parse instanceof Error || parse === false) { + await this.refreshInformation(); + this.timer.reset(); + return this.seek(); + } + + // Wait for lock to be released + while (this.locked) { + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + this.locked = true; + + const bytes = this.stream.seek(this.information.fileSize); + if (bytes instanceof Error) { + // TODO: Handle seek error + this.destroy(); + return false; + } + + console.debug( + `[${this.id}] > Seeking... Byte located at ${bytes} / ${this.information.fileSize}`, + ); + + // Offset the counter + this.bytesReceived = bytes; + this.bytesRead = bytes; + + this.stream.seekfound = false; + this.locked = false; + + // Tick to start fetching data + this.timer.reset(); + this.tick(); + + return true; } private getCurrentTimestamp() { @@ -177,10 +327,14 @@ export class SeekableStream { this.stream.on(event, listener); } - private destroy() { + public destroy() { console.debug(`[${this.id}] > Stream destroyed`); if (!this.timer.isDestroyed()) this.timer.destroy(); - if (this.stream) this.stream.destroy(); + if (this.stream) { + this.stream.end(); + this.stream.destroy(); + } + this.destroyed = true; } diff --git a/src/utils/WebmSeeker.ts b/src/utils/WebmSeeker.ts index 0b9799b..c19ab10 100644 --- a/src/utils/WebmSeeker.ts +++ b/src/utils/WebmSeeker.ts @@ -766,6 +766,7 @@ export class WebmSeeker extends Duplex { position = 0; let time_left = (this.sec - this.time) * 1000 || 0; time_left = Math.round(time_left / 20) * 20; + if (!this.header.segment.cues) return new Error("Failed to Parse Cues"); for (let i = 0; i < this.header.segment.cues.length; i++) { @@ -781,6 +782,14 @@ export class WebmSeeker extends Duplex { } else continue; } if (clusterlength === 0) return position; + + console.debug( + `[WebmSeeker] > Seeking to ${this.sec} seconds, ${ + this.offset + + Math.round(position + (time_left / 20) * (clusterlength / 500)) + } bytes`, + ); + return ( this.offset + Math.round(position + (time_left / 20) * (clusterlength / 500)) @@ -810,6 +819,8 @@ export class WebmSeeker extends Duplex { private readHead(): Error | undefined { if (!this.chunk) return new Error("Chunk is missing"); + console.debug("[WebmSeeker] > Reading Head"); + while (this.chunk.length > this.cursor) { const oldCursor = this.cursor; const id = this.vint_length; @@ -870,6 +881,9 @@ export class WebmSeeker extends Duplex { } this.remaining = this.chunk.slice(this.cursor); this.cursor = 0; + + console.debug("[WebmSeeker] > Finished Reading Head"); + this.emit("headComplete"); } private readTag(): Error | undefined {