diff --git a/example/test.ts b/example/test.ts index 38903ff..7e1f333 100644 --- a/example/test.ts +++ b/example/test.ts @@ -7,9 +7,11 @@ const main = async () => { // Create the Speaker instance const speaker = new Speaker(); + const videoId = "9PuudPiyma4"; + // Get the stream from the URL const stream = await NekoMelody.stream( - "https://www.youtube.com/watch?v=9PuudPiyma4", + `https://www.youtube.com/watch?v=${videoId}`, ); // PCM data from stdin gets piped into the speaker @@ -22,10 +24,7 @@ const main = async () => { .on("error", (err) => { console.error("An error occurred:", err.message); }) - .pipe(speaker, { end: true }) - .on("end", () => { - console.log("Audio playback finished."); - }); + .pipe(speaker, { end: true }); }; main(); diff --git a/package.json b/package.json index 3c9af5d..db8b2ac 100644 --- a/package.json +++ b/package.json @@ -40,8 +40,10 @@ "module": "./dist/index.mjs", "types": "./dist/index.d.ts", "dependencies": { + "axios": "^1.7.2", "fluent-ffmpeg": "^2.1.3", "play-dl": "github:YuzuZensai/play-dl-test#test", - "speaker": "^0.5.5" + "speaker": "^0.5.5", + "yt-dlp-wrap": "^2.3.12" } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 1512efa..b30209a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -8,6 +8,9 @@ importers: .: dependencies: + axios: + specifier: ^1.7.2 + version: 1.7.2 fluent-ffmpeg: specifier: ^2.1.3 version: 2.1.3 @@ -17,6 +20,9 @@ importers: speaker: specifier: ^0.5.5 version: 0.5.5 + yt-dlp-wrap: + specifier: ^2.3.12 + version: 2.3.12 devDependencies: '@swc/core': specifier: ^1.6.1 @@ -539,6 +545,12 @@ packages: async@0.2.10: resolution: {integrity: sha512-eAkdoKxU6/LkKDBzLpT+t6Ff5EtfSF4wx1WfJiPEEV7WNLnDaRXk0oVysiEPm262roaachGexwUv94WhSgN5TQ==} + asynckit@0.4.0: + resolution: {integrity: sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==} + + axios@1.7.2: + resolution: {integrity: sha512-2A8QhOMrbomlDuiLeK9XibIBzuHeRcqqNOHp0Cyp5EoJ1IFDh+XZH3A6BkXtv0K4gFGCI0Y4BM7B1wOEi0Rmgw==} + balanced-match@1.0.2: resolution: {integrity: sha512-3oSeUO0TMV67hN1AmbXsK4yaqU7tjiHlbxRDZOpH0KW9+CeX4bRAaX0Anxt0tx2MrpRpWwQaPwIlISEJhYU5Pw==} @@ -597,6 +609,10 @@ packages: color-name@1.1.4: resolution: {integrity: sha512-dOy+3AuW3a2wNbZHIuMZpTcgjGuLU/uBL/ubcZF9OXbDo8ff4O8yVp5Bf0efS8uEoYo5q4Fx7dY9OgQGXgAsQA==} + combined-stream@1.0.8: + resolution: {integrity: sha512-FQN4MRfuJeHf7cBbBMJFXhKSDq+2kAArBlmRBvcvFE5BB1HZKXtSFASDhdlz9zOYwxh8lDdnvmMOe/+5cdoEdg==} + engines: {node: '>= 0.8'} + commander@4.1.1: resolution: {integrity: sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA==} engines: {node: '>= 6'} @@ -620,6 +636,10 @@ packages: deep-is@0.1.4: resolution: {integrity: sha512-oIPzksmTg4/MriiaYGO+okXDT7ztn/w3Eptv/+gSIdMdKsJo0u4CfYNFJPy+4SKMuCqGw2wxnA+URMg3t8a/bQ==} + delayed-stream@1.0.0: + resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} + engines: {node: '>=0.4.0'} + dir-glob@3.0.1: resolution: {integrity: sha512-WkrWp9GR4KXfKGYzOLmTuGVi1UWFfws377n9cc55/tb6DuqyF6pcQ5AbiHEshaDpY9v6oaSr2XCDidGmMwdzIA==} engines: {node: '>=8'} @@ -748,10 +768,23 @@ packages: resolution: {integrity: sha512-Be3narBNt2s6bsaqP6Jzq91heDgOEaDCJAXcE3qcma/EJBSy5FB4cvO31XBInuAuKBx8Kptf8dkhjK0IOru39Q==} engines: {node: '>=18'} + follow-redirects@1.15.6: + resolution: {integrity: sha512-wWN62YITEaOpSK584EZXJafH1AGpO8RVgElfkuXbTOrPX4fIfOyEpW/CsiNd8JdYrAoOvafRTOEnvsO++qCqFA==} + engines: {node: '>=4.0'} + peerDependencies: + debug: '*' + peerDependenciesMeta: + debug: + optional: true + foreground-child@3.2.1: resolution: {integrity: sha512-PXUUyLqrR2XCWICfv6ukppP96sdFwWbNEnfEMt7jNsISjMsvaLNinAHNDYyvkyU+SZG2BTSbT5NjG+vZslfGTA==} engines: {node: '>=14'} + form-data@4.0.0: + resolution: {integrity: sha512-ETEklSGi5t0QMZuiXoA/Q6vcnxcLQP5vdugSpuAyi6SVGi2clPPp+xgEhuMaHC+zGgn31Kd235W35f7Hykkaww==} + engines: {node: '>= 6'} + fsevents@2.3.3: resolution: {integrity: sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==} engines: {node: ^8.16.0 || ^10.6.0 || >=11.0.0} @@ -903,6 +936,14 @@ packages: resolution: {integrity: sha512-LPP/3KorzCwBxfeUuZmaR6bG2kdeHSbe0P2tY3FLRU4vYrjYz5hI4QZwV0njUx3jeuKe67YukQ1LSPZBKDqO/Q==} engines: {node: '>=8.6'} + mime-db@1.52.0: + resolution: {integrity: sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==} + engines: {node: '>= 0.6'} + + mime-types@2.1.35: + resolution: {integrity: sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==} + engines: {node: '>= 0.6'} + mimic-fn@2.1.0: resolution: {integrity: sha512-OqbOk5oEQeAZ8WXWydlu9HJjz9WVdEIvamMCcXmuqUYjTknH/sqsWvhQ3vgwKFRR1HpjvNBKQ37nbJgYzGqGcg==} engines: {node: '>=6'} @@ -1019,6 +1060,9 @@ packages: engines: {node: '>=14'} hasBin: true + proxy-from-env@1.1.0: + resolution: {integrity: sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==} + punycode@2.3.1: resolution: {integrity: sha512-vYt7UD1U9Wg6138shLtLOvdAu+8DsC/ilFtEVHcH+wydcSpNE20AfSOduf6MkRFahL5FY7X1oU7nKVZFtfq8Fg==} engines: {node: '>=6'} @@ -1230,6 +1274,9 @@ packages: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} + yt-dlp-wrap@2.3.12: + resolution: {integrity: sha512-P8fJ+6M1YjukyJENCTviNLiZ8mokxprR54ho3DsSKPWDcac489OjRiStGEARJr6un6ETS6goTn4CWl/b/rM3aA==} + snapshots: '@esbuild/aix-ppc64@0.21.5': @@ -1608,6 +1655,16 @@ snapshots: async@0.2.10: {} + asynckit@0.4.0: {} + + axios@1.7.2: + dependencies: + follow-redirects: 1.15.6 + form-data: 4.0.0 + proxy-from-env: 1.1.0 + transitivePeerDependencies: + - debug + balanced-match@1.0.2: {} binary-extensions@2.3.0: {} @@ -1670,6 +1727,10 @@ snapshots: color-name@1.1.4: {} + combined-stream@1.0.8: + dependencies: + delayed-stream: 1.0.0 + commander@4.1.1: {} concat-map@0.0.1: {} @@ -1686,6 +1747,8 @@ snapshots: deep-is@0.1.4: {} + delayed-stream@1.0.0: {} + dir-glob@3.0.1: dependencies: path-type: 4.0.0 @@ -1862,11 +1925,19 @@ snapshots: async: 0.2.10 which: 1.3.1 + follow-redirects@1.15.6: {} + foreground-child@3.2.1: dependencies: cross-spawn: 7.0.3 signal-exit: 4.1.0 + form-data@4.0.0: + dependencies: + asynckit: 0.4.0 + combined-stream: 1.0.8 + mime-types: 2.1.35 + fsevents@2.3.3: optional: true @@ -1991,6 +2062,12 @@ snapshots: braces: 3.0.3 picomatch: 2.3.1 + mime-db@1.52.0: {} + + mime-types@2.1.35: + dependencies: + mime-db: 1.52.0 + mimic-fn@2.1.0: {} minimatch@3.1.2: @@ -2082,6 +2159,8 @@ snapshots: prettier@3.3.2: {} + proxy-from-env@1.1.0: {} + punycode@2.3.1: {} queue-microtask@1.2.3: {} @@ -2296,3 +2375,5 @@ snapshots: yaml@2.4.5: {} yocto-queue@0.1.0: {} + + yt-dlp-wrap@2.3.12: {} diff --git a/src/stream.ts b/src/stream.ts index e616599..dba0845 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,7 +1,58 @@ import { Readable } from "stream"; import playdl from "play-dl/play-dl"; +import YTDlpWrap from "yt-dlp-wrap"; +import { Stream } from "./utils/stream"; +const ytDlpWrap = new YTDlpWrap(); export const stream = async (url: string): Promise => { - let playdlStream = await playdl.stream(url); - return playdlStream.stream; + const playdlData = await playdl.stream(url); + const playdlStream = playdlData.stream; + + const getInfo = async () => { + return JSON.parse( + await ytDlpWrap.execPromise([ + url, + "-f", + "140", + "--extractor-args", + "youtube:player_client=ios", + "--dump-json", + ]), + ); + }; + + const ytDlpWrapInfo = await getInfo(); + const refreshStreamUrlFunction = async () => { + const info = await getInfo(); + return info.url; + }; + + const ytDlpWrapStream = new Stream( + ytDlpWrapInfo.url, + url, + //playdlData.type, + ytDlpWrapInfo.filesize, + ytDlpWrapInfo.duration, + refreshStreamUrlFunction, + ); + + const stream = ytDlpWrapStream.stream; + + // stream.on("error", (err) => { + // console.error("An error occurred:", err.message); + // }); + + // stream.on("end", () => { + // console.log("Stream ended."); + // }); + + // stream.on("close", () => { + // console.log("Stream closed."); + // }); + + stream.on("finished", () => { + console.log("Stream finished."); + }); + + return stream; }; diff --git a/src/utils/stream.ts b/src/utils/stream.ts new file mode 100644 index 0000000..093efe4 --- /dev/null +++ b/src/utils/stream.ts @@ -0,0 +1,283 @@ +import { Readable } from "stream"; +import axios, { AxiosRequestConfig, AxiosResponse } from "axios"; +import { Timer } from "./timer"; + +const DEBUG_SIMULATE_FAILURE = false; + +async function makeStreamRequest( + url: string, + options: AxiosRequestConfig = {}, + body?: any, +): Promise { + const { headers = {}, method = "GET" } = options; + + let config: AxiosRequestConfig = { + url, + method, + headers, + data: body, + responseType: "stream", + }; + + // Override / Add config + config = Object.assign(config, options); + + try { + const response = await axios(config); + return response; + } catch (err) { + throw err; + } +} + +export async function fetchStream( + url: string, + options: AxiosRequestConfig = { method: "GET" }, +): Promise> { + try { + let response = await makeStreamRequest(url, options); + const visitedUrls = new Set(); + + // Handle redirection and detect redirection loop + while ( + response.status >= 300 && + response.status < 400 && + response.headers.location + ) { + const redirectUrl = response.headers.location; + if (visitedUrls.has(redirectUrl)) { + throw new Error("Redirection loop detected"); + } + visitedUrls.add(redirectUrl); + response = await makeStreamRequest(redirectUrl, options); + } + + return response; + } catch (error) { + throw error; + } +} + +export class Stream { + private id: string; + private url: string; + private referenceUrl: string; + private duration: number; + + private timer: Timer; + private locked: boolean = false; + private destroyed: boolean = false; + private fetchCompleted: boolean = false; + + public stream: Readable; + private bytesReceived: number = 0; + + private contentLength: number; + + private inputReadable: Readable | null = null; + + private bytesPerRequestLimit = 1 * 1024 * 1024; // 1 MB per request + + private refreshStreamUrlFunction: () => Promise; + + constructor( + streamUrl: string, + referenceUrl: string, + contentLength: number, + duration: number, + refreshStreamUrlFunction: () => Promise, + ) { + this.id = Math.random().toString(36).substring(7); + this.url = streamUrl; + this.referenceUrl = referenceUrl; + this.duration = duration; + this.stream = new Readable({ + highWaterMark: 5 * 1024 * 1024, + read() {}, + }); + this.contentLength = contentLength; + this.refreshStreamUrlFunction = refreshStreamUrlFunction; + + this.timer = new Timer(() => { + this.timer.reset(); + this.tick(); + }, 2000); + + this.stream.on("close", () => { + console.debug( + `[${this.id}] > Destination stream closed, destroying...`, + ); + this.timer.destroy(); + this.destroy(); + }); + + this.timer.start(); + this.tick(); + } + + private debugLog() { + const isBufferSufficient = + this.stream.readableLength >= this.bytesPerRequestLimit; + + console.debug( + `[${this.id}] > ` + + `Data Received: ${(this.bytesReceived / (1024 * 1024)).toFixed(3)} MB / ${(this.contentLength / (1024 * 1024)).toFixed(3)} MB | ` + + `Buffer Remaining: ${(this.stream.readableLength / (1024 * 1024)).toFixed(3)} MB | ` + + `${!this.fetchCompleted ? `Buffer Sufficient: ${isBufferSufficient} | ` : ``}` + + `Locked: ${this.locked} | ` + + `Fetch Completed: ${this.fetchCompleted} | `, + ); + } + + private async tick() { + if (this.destroyed) { + console.debug(`[${this.id}] > Stream destroyed, not ticking`); + this.timer.destroy(); + return; + } + + if (this.stream.destroyed) { + console.debug( + `[${this.id}] > Destination stream destroyed, destroying...`, + ); + this.timer.destroy(); + this.destroy(); + return; + } + + const remainingBufferBytes = this.stream.readableLength; + const isBufferSufficient = + remainingBufferBytes >= this.bytesPerRequestLimit; + + this.debugLog(); + + if (!this.locked && !this.fetchCompleted) { + // Check if the remaining buffer size is less than a threshold before fetching the next chunk + if (!isBufferSufficient) { + this.locked = true; + const end = Math.min( + this.bytesReceived + this.bytesPerRequestLimit, + this.contentLength, + ); + + const rangeHeader = `bytes=${this.bytesReceived}-${end}`; + + const request = await fetchStream(this.url, { + headers: { + range: rangeHeader, + }, + }).catch((err: Error) => err); + + console.log(`[${this.id}] > Requesting range | ${rangeHeader}`); + + if (request instanceof Error) { + console.debug( + `[${this.id}] > Request error: ${request.message}`, + ); + + await this.refreshStreamUrl(); + this.locked = false; + this.timer.reset(); + this.tick(); + return; + } + + // Simulate failed request 25% of the time + if (DEBUG_SIMULATE_FAILURE && Math.random() < 0.25) { + console.debug(`[${this.id}] > Simulating request failure`); + request.status = 416; + } + + if (request.status >= 400) { + await this.refreshStreamUrl(); + this.locked = false; + this.timer.reset(); + this.tick(); + return; + } + + this.inputReadable = request.data; + + if (this.inputReadable === null) { + this.locked = false; + return; + } + + console.debug(`[${this.id}] > Request successful`); + + request.data.on("data", (data: any) => { + this.stream.push(data); + this.bytesReceived += data.length; + }); + + request.data.on("end", (data: any) => { + const check = () => { + // If still locked, delay the check + if (this.locked) { + console.debug( + `[${this.id}] > Still locked, delaying end check...`, + ); + setTimeout(check, 1000); + return; + } + + if (end >= this.contentLength - 1) { + console.debug( + `[${this.id}] > Fetching completed, permanently locking...`, + ); + this.locked = true; + + //this.timer.destroy(); + this.stream.push(null); + this.debugLog(); + this.fetchCompleted = true; + //this.destroy(); + } + }; + check(); + }); + + request.data.once("error", async () => { + this.destroy(); + await this.refreshStreamUrl(); + this.timer.reset(); + this.tick(); + }); + + this.locked = false; + } + } + + // If data fetch is completed, check if the buffer is empty, if so, destroy the stream + if (this.fetchCompleted && remainingBufferBytes === 0) { + console.debug(`[${this.id}] > Buffer empty, destroying...`); + this.stream.emit("finished"); + this.destroy(); + return; + } + + return; + } + + pause() { + this.timer.pause(); + } + + resume() { + this.timer.resume(); + } + + private async refreshStreamUrl() { + console.debug(`[${this.id}] > Refreshing stream URL...`); + let url = await this.refreshStreamUrlFunction(); + this.url = url; + console.debug(`[${this.id}] > Stream URL refreshed | ${url}`); + } + + private destroy() { + this.debugLog(); + console.debug(`[${this.id}] > Stream destroyed`); + if (this.inputReadable) this.inputReadable.destroy(); + this.destroyed = true; + } +} diff --git a/src/utils/timer.ts b/src/utils/timer.ts new file mode 100644 index 0000000..dccff28 --- /dev/null +++ b/src/utils/timer.ts @@ -0,0 +1,81 @@ +type TimerState = "idle" | "running" | "paused" | "finished"; + +export class Timer { + private callback: () => void | Promise; + private time: number; + private intervalId: NodeJS.Timeout | null; + private startTime: number; + private remainingTime: number; + private state: TimerState; + + constructor(callback: () => void | Promise, time: number) { + this.callback = callback; + this.time = time; + this.intervalId = null; + this.startTime = 0; + this.remainingTime = time; + this.state = "idle"; + } + + private clearExistingInterval() { + if (this.intervalId !== null) { + clearInterval(this.intervalId); + this.intervalId = null; + } + } + + private async runCallback() { + await this.callback(); + if (this.state === "running") { + this.start(); + } + } + + start() { + this.clearExistingInterval(); + this.state = "running"; + this.startTime = Date.now(); + this.intervalId = setTimeout(async () => { + await this.runCallback(); + }, this.remainingTime); + } + + pause() { + if (this.state === "running") { + this.clearExistingInterval(); + this.remainingTime -= Date.now() - this.startTime; + this.state = "paused"; + } + } + + resume() { + if (this.state === "paused") { + this.state = "running"; + this.startTime = Date.now(); + this.intervalId = setTimeout(async () => { + await this.runCallback(); + }, this.remainingTime); + } + } + + reset(startImmediately: boolean = true, newTime?: number) { + this.clearExistingInterval(); + this.time = newTime !== undefined ? newTime : this.time; + this.remainingTime = this.time; + + this.state = "idle"; + if (startImmediately) { + this.start(); + } + } + + destroy() { + this.clearExistingInterval(); + this.callback = () => {}; + this.state = "finished"; + } + + getState(): TimerState { + return this.state; + } +}