diff --git a/src/player/index.ts b/src/player/index.ts index f44b052..c97020c 100644 --- a/src/player/index.ts +++ b/src/player/index.ts @@ -1,22 +1,65 @@ -import { Readable, Stream } from "stream"; -import { Provider } from "../providers/base"; +import { EventEmitter, Readable, Stream } from "stream"; +import { AudioInformation, Provider } from "../providers/base"; import { SeekableStream } from "../utils/SeekableStream"; export class Player { private providers: Provider[]; private currentProvider: Provider | null = null; + private queue: AudioInformation[] = []; + private playerEvent: EventEmitter = new EventEmitter(); + private paused: boolean = false; + private currentAudioInformation: AudioInformation | null = null; public _stream: SeekableStream | null = null; - get stream() { - return this._stream?.stream; - } - constructor(providers: Provider[]) { this.providers = providers; } - public async play(url: string, seekTime: number = 0) { + public get stream() { + return this._stream?.stream; + } + + private _createStream( + information: AudioInformation, + url: string, + seekTime: number, + ) { + // If already playing, destroy the current stream + if (this._stream) { + this._stream.destroy(); + } + + this._stream = new SeekableStream(information, url, seekTime); + this.currentAudioInformation = information; + this._stream.on("destroy", () => { + console.log("Stream destroyed, total song", this.queue.length); + if (this.queue.length > 0) { + const next = this.queue.shift(); + console.log("Playing next in queue"); + if (next) { + this._createStream(next, next.url, 0); + } + } else { + this._stream = null; + this.currentAudioInformation = null; + } + }); + + this.playerEvent.emit("play", information); + } + + public endCurrentStream() { + if (this._stream) { + this._stream.destroy(); + } + } + + public on(event: string, listener: (...args: any[]) => void) { + this.playerEvent.on(event, listener); + } + + public async getInformation(url: string) { if (!this.currentProvider) { const providers = this.providers.filter((provider) => provider.canPlay(url), @@ -29,19 +72,42 @@ export class Player { this.currentProvider = providers[0]; } - const information = await this.currentProvider.getInformation(url); + return await this.currentProvider.getInformation(url); + } + + public async play(url: string, seekTime: number = 0) { + const information = await this.getInformation(url); //console.log(information); if (information.livestream) // TODO: Implement livestreams throw new Error("Livestreams are not supported yet"); - // If already playing, destroy the current stream - if (this._stream) { - this._stream.destroy(); + this._createStream(information, url, seekTime); + } + + public async enqueue(url: string, seekTime: number = 0) { + const information = await this.getInformation(url); + + if (information.livestream) + // TODO: Implement livestreams + throw new Error("Livestreams are not supported yet"); + + this.playerEvent.emit("enqueue", information); + + // If queue is empty, no stream is playing and not paused, play the current URL + if ( + this.queue.length === 0 && + !this.currentAudioInformation && + !this._stream && + !this.paused + ) { + this._createStream(information, url, seekTime); + } else { + this.queue.push(information); } - this._stream = new SeekableStream(information, url, seekTime); + console.log("Enqueued", url); } public async seek(time: number) { diff --git a/src/utils/SeekableStream.ts b/src/utils/SeekableStream.ts index 4038eba..b7b1980 100644 --- a/src/utils/SeekableStream.ts +++ b/src/utils/SeekableStream.ts @@ -1,4 +1,4 @@ -import { Readable } from "stream"; +import { EventEmitter, Readable } from "stream"; import { AudioInformation } from "../providers/base"; import { Timer } from "./Timer"; import { WebmSeeker, WebmSeekerState } from "./WebmSeeker"; @@ -18,6 +18,7 @@ export class SeekableStream { private locked: boolean = false; private firstTick: boolean = true; private destroyed: boolean = false; + private event: EventEmitter = new EventEmitter(); private bytesReceived: number = 0; private bytesRead: number = 0; @@ -240,12 +241,28 @@ export class SeekableStream { if ( !this.locked && this.bytesReceived >= this.information.fileSize && - this.stream.readableLength === 0 + this.stream.readableLength === 0 && + this.bytesRead >= this.information.fileSize && + this.stream.state === WebmSeekerState.READING_DATA && + !this.stream.readableEnded ) { console.debug(`[${this.id}] > Stream completed`); - this.destroy(); - return; + this.stream.push(null); } + + // if ( + // !this.locked && + // this.bytesReceived >= this.information.fileSize && + // this.stream.readableLength === 0 && + // this.bytesRead >= this.information.fileSize && + // this.stream.state === WebmSeekerState.READING_DATA && + // !this.stream.readableFlowing && + // this.stream.readableEnded && + // ) { + // console.debug(`[${this.id}] > Stream ended`); + // //this.destroy(); + // return; + // } } public async seek(): Promise { @@ -338,18 +355,22 @@ export class SeekableStream { } public on(event: string, listener: (...args: any[]) => void) { - this.stream.on(event, listener); + this.event.on(event, listener); } public destroy() { + if (this.destroyed) return; + console.debug(`[${this.id}] > Stream destroyed`); if (!this.timer.isDestroyed()) this.timer.destroy(); if (this.stream) { + //this.stream.push(null); this.stream.end(); this.stream.destroy(); } this.destroyed = true; + this.event.emit("destroy"); } private debugLog() {