feat: Queue system

This commit is contained in:
2024-07-04 20:59:00 +07:00
parent b338319f80
commit edc1a207be
2 changed files with 104 additions and 17 deletions

View File

@@ -1,22 +1,65 @@
import { Readable, Stream } from "stream"; import { EventEmitter, Readable, Stream } from "stream";
import { Provider } from "../providers/base"; import { AudioInformation, Provider } from "../providers/base";
import { SeekableStream } from "../utils/SeekableStream"; import { SeekableStream } from "../utils/SeekableStream";
export class Player { export class Player {
private providers: Provider[]; private providers: Provider[];
private currentProvider: Provider | null = null; private currentProvider: Provider | null = null;
private queue: AudioInformation[] = [];
private playerEvent: EventEmitter = new EventEmitter();
private paused: boolean = false;
private currentAudioInformation: AudioInformation | null = null;
public _stream: SeekableStream | null = null; public _stream: SeekableStream | null = null;
get stream() {
return this._stream?.stream;
}
constructor(providers: Provider[]) { constructor(providers: Provider[]) {
this.providers = providers; this.providers = providers;
} }
public async play(url: string, seekTime: number = 0) { public get stream() {
return this._stream?.stream;
}
private _createStream(
information: AudioInformation,
url: string,
seekTime: number,
) {
// If already playing, destroy the current stream
if (this._stream) {
this._stream.destroy();
}
this._stream = new SeekableStream(information, url, seekTime);
this.currentAudioInformation = information;
this._stream.on("destroy", () => {
console.log("Stream destroyed, total song", this.queue.length);
if (this.queue.length > 0) {
const next = this.queue.shift();
console.log("Playing next in queue");
if (next) {
this._createStream(next, next.url, 0);
}
} else {
this._stream = null;
this.currentAudioInformation = null;
}
});
this.playerEvent.emit("play", information);
}
public endCurrentStream() {
if (this._stream) {
this._stream.destroy();
}
}
public on(event: string, listener: (...args: any[]) => void) {
this.playerEvent.on(event, listener);
}
public async getInformation(url: string) {
if (!this.currentProvider) { if (!this.currentProvider) {
const providers = this.providers.filter((provider) => const providers = this.providers.filter((provider) =>
provider.canPlay(url), provider.canPlay(url),
@@ -29,19 +72,42 @@ export class Player {
this.currentProvider = providers[0]; this.currentProvider = providers[0];
} }
const information = await this.currentProvider.getInformation(url); return await this.currentProvider.getInformation(url);
}
public async play(url: string, seekTime: number = 0) {
const information = await this.getInformation(url);
//console.log(information); //console.log(information);
if (information.livestream) if (information.livestream)
// TODO: Implement livestreams // TODO: Implement livestreams
throw new Error("Livestreams are not supported yet"); throw new Error("Livestreams are not supported yet");
// If already playing, destroy the current stream this._createStream(information, url, seekTime);
if (this._stream) { }
this._stream.destroy();
public async enqueue(url: string, seekTime: number = 0) {
const information = await this.getInformation(url);
if (information.livestream)
// TODO: Implement livestreams
throw new Error("Livestreams are not supported yet");
this.playerEvent.emit("enqueue", information);
// If queue is empty, no stream is playing and not paused, play the current URL
if (
this.queue.length === 0 &&
!this.currentAudioInformation &&
!this._stream &&
!this.paused
) {
this._createStream(information, url, seekTime);
} else {
this.queue.push(information);
} }
this._stream = new SeekableStream(information, url, seekTime); console.log("Enqueued", url);
} }
public async seek(time: number) { public async seek(time: number) {

View File

@@ -1,4 +1,4 @@
import { Readable } from "stream"; import { EventEmitter, Readable } from "stream";
import { AudioInformation } from "../providers/base"; import { AudioInformation } from "../providers/base";
import { Timer } from "./Timer"; import { Timer } from "./Timer";
import { WebmSeeker, WebmSeekerState } from "./WebmSeeker"; import { WebmSeeker, WebmSeekerState } from "./WebmSeeker";
@@ -18,6 +18,7 @@ export class SeekableStream {
private locked: boolean = false; private locked: boolean = false;
private firstTick: boolean = true; private firstTick: boolean = true;
private destroyed: boolean = false; private destroyed: boolean = false;
private event: EventEmitter = new EventEmitter();
private bytesReceived: number = 0; private bytesReceived: number = 0;
private bytesRead: number = 0; private bytesRead: number = 0;
@@ -240,12 +241,28 @@ export class SeekableStream {
if ( if (
!this.locked && !this.locked &&
this.bytesReceived >= this.information.fileSize && this.bytesReceived >= this.information.fileSize &&
this.stream.readableLength === 0 this.stream.readableLength === 0 &&
this.bytesRead >= this.information.fileSize &&
this.stream.state === WebmSeekerState.READING_DATA &&
!this.stream.readableEnded
) { ) {
console.debug(`[${this.id}] > Stream completed`); console.debug(`[${this.id}] > Stream completed`);
this.destroy(); this.stream.push(null);
return;
} }
// if (
// !this.locked &&
// this.bytesReceived >= this.information.fileSize &&
// this.stream.readableLength === 0 &&
// this.bytesRead >= this.information.fileSize &&
// this.stream.state === WebmSeekerState.READING_DATA &&
// !this.stream.readableFlowing &&
// this.stream.readableEnded &&
// ) {
// console.debug(`[${this.id}] > Stream ended`);
// //this.destroy();
// return;
// }
} }
public async seek(): Promise<boolean> { public async seek(): Promise<boolean> {
@@ -338,18 +355,22 @@ export class SeekableStream {
} }
public on(event: string, listener: (...args: any[]) => void) { public on(event: string, listener: (...args: any[]) => void) {
this.stream.on(event, listener); this.event.on(event, listener);
} }
public destroy() { public destroy() {
if (this.destroyed) return;
console.debug(`[${this.id}] > Stream destroyed`); console.debug(`[${this.id}] > Stream destroyed`);
if (!this.timer.isDestroyed()) this.timer.destroy(); if (!this.timer.isDestroyed()) this.timer.destroy();
if (this.stream) { if (this.stream) {
//this.stream.push(null);
this.stream.end(); this.stream.end();
this.stream.destroy(); this.stream.destroy();
} }
this.destroyed = true; this.destroyed = true;
this.event.emit("destroy");
} }
private debugLog() { private debugLog() {