From 5ce9ff89e91f2357efe9ab94587e627185f97c4e Mon Sep 17 00:00:00 2001 From: stratuma Date: Fri, 24 May 2024 03:11:34 +0200 Subject: [PATCH] changed audio parts fetch --- src/api/routes/service/service.service.ts | 31 ++++- src/api/services/audio.ts | 142 +++++++--------------- 2 files changed, 69 insertions(+), 104 deletions(-) diff --git a/src/api/routes/service/service.service.ts b/src/api/routes/service/service.service.ts index 5fbfc12..5cf7a92 100644 --- a/src/api/routes/service/service.service.ts +++ b/src/api/routes/service/service.service.ts @@ -679,7 +679,10 @@ export async function downloadCrunchyrollPlaylist( const audioDownload = async () => { const audios: Array = [] - for (const v of dubDownloadList) { + const concurrentDownloads = 2 + const tasks: Array> = [] + + const downloadTask = async (v: any) => { const list = await crunchyGetPlaylist(v.guid, v.geo) if (!list) return @@ -707,10 +710,10 @@ export async function downloadCrunchyrollPlaylist( return } - var pssh - var keys: { kid: string; key: string }[] | undefined + let pssh + let keys: { kid: string; key: string }[] | undefined - var p: { filename: string; url: string }[] = [] + let p: { filename: string; url: string }[] = [] if (playlist.mediaGroups.AUDIO.audio.main.playlists[0].contentProtection) { if (!playlist.mediaGroups.AUDIO.audio.main.playlists[0].contentProtection['com.widevine.alpha'].pssh) { @@ -745,6 +748,7 @@ export async function downloadCrunchyrollPlaylist( }) return } + p.push({ filename: (playlist.mediaGroups.AUDIO.audio.main.playlists[0].segments[0].map.uri.match(/([^\/]+)\?/) as RegExpMatchArray)[1], url: playlist.mediaGroups.AUDIO.audio.main.playlists[0].segments[0].map.resolvedUri @@ -759,8 +763,25 @@ export async function downloadCrunchyrollPlaylist( const path = await downloadMPDAudio(p, audioFolder, list.data.audioLocale, downloadID, keys ? keys : undefined) - audios.push(path as string) + if (path) { + audios.push(path as string) + } } + + for (const v of dubDownloadList) { + const task = downloadTask(v).finally(() => { + tasks.splice(tasks.indexOf(task), 1) + }) + + tasks.push(task) + + if (tasks.length >= concurrentDownloads) { + await Promise.race(tasks) + } + } + + await Promise.all(tasks) + return audios } diff --git a/src/api/services/audio.ts b/src/api/services/audio.ts index de9c4a1..7eb0e24 100644 --- a/src/api/services/audio.ts +++ b/src/api/services/audio.ts @@ -10,6 +10,7 @@ const mp4e = getMP4DecryptPath() import util from 'util' import { server } from '../api' const exec = util.promisify(require('child_process').exec) +import { finished } from 'stream/promises' // Define Downloading Array var downloading: Array<{ @@ -41,114 +42,47 @@ export async function downloadMPDAudio( audio: name }) - const maxParallelDownloads = 5 - const downloadPromises = [] - for (const [index, part] of parts.entries()) { - let retries = 0 + let success = false + while (!success) { + try { + var stream - const downloadPromise = async () => { - let downloadSuccess = false - while (!downloadSuccess) { - try { - const stream = fs.createWriteStream(`${path}/${part.filename}`) - await fetchAndPipe(part.url, stream, index + 1, downloadID, name) - downloadSuccess = true - } catch (error) { - retries++ - console.error(`Failed to download part ${part.filename}, retrying (${retries})...`) - await new Promise((resolve) => setTimeout(resolve, 1000)) - } + stream = fs.createWriteStream(`${path}/${part.filename}`) + + const { body } = await fetch(part.url) + + const readableStream = Readable.from(body as any) + + await finished(readableStream.pipe(stream)) + + console.log(`Fragment ${index + 1} downloaded`) + + success = true + } catch (error) { + console.error(`Error occurred during download of fragment ${index + 1}:`, error) + server.logger.log({ + level: 'error', + message: `Error occurred during download of fragment ${index + 1}`, + error: error, + timestamp: new Date().toISOString(), + section: 'crunchyrollDownloadProcessAudioDownload' + }) + console.log(`Retrying download of fragment ${index + 1}...`) + server.logger.log({ + level: 'warn', + message: `Retrying download of fragment ${index + 1} because failed`, + timestamp: new Date().toISOString(), + section: 'crunchyrollDownloadProcessAudioDownload' + }) + await new Promise((resolve) => setTimeout(resolve, 5000)) } } - - downloadPromises.push(downloadPromise()) - - if (downloadPromises.length === maxParallelDownloads || index === parts.length - 1) { - await Promise.all(downloadPromises) - downloadPromises.length = 0 - } } return await mergePartsAudio(parts, path, dir, name, downloadID, drmkeys) } -async function fetchAndPipe(url: string, stream: fs.WriteStream, index: number, downloadID: number, name: string) { - try { - const dn = downloading.find((i) => i.id === downloadID && i.audio === name) - - const response = await fetch(url) - - if (!response.ok) { - if (dn) { - dn.status = 'failed' - } - server.logger.log({ - level: 'error', - message: 'Error while downloading an Audio Fragment', - fragment: index, - error: await response.text(), - timestamp: new Date().toISOString(), - section: 'audiofragmentCrunchyrollFetch' - }) - throw new Error(`Failed to fetch URL: ${response.statusText}`) - } - - const body = response.body - - if (!body) { - if (dn) { - dn.status = 'failed' - } - server.logger.log({ - level: 'error', - message: 'Error while downloading an Audio Fragment', - fragment: index, - error: 'Response body is not a readable stream', - timestamp: new Date().toISOString(), - section: 'audiofragmentCrunchyrollFetch' - }) - throw new Error('Response body is not a readable stream') - } - - const readableStream = Readable.from(body as any) - - return new Promise((resolve, reject) => { - readableStream - .pipe(stream) - .on('finish', () => { - console.log(`Fragment ${index} downloaded`) - resolve() - }) - .on('error', (error) => { - if (dn) { - dn.status = 'failed' - } - server.logger.log({ - level: 'error', - message: 'Error while downloading an Audio Fragment', - fragment: index, - error: error, - timestamp: new Date().toISOString(), - section: 'audiofragmentCrunchyrollFetch' - }) - reject(error) - }) - }) - } catch (error) { - server.logger.log({ - level: 'error', - message: 'Error while downloading an Audio Fragment, retrying', - fragment: index, - error: error, - timestamp: new Date().toISOString(), - section: 'audiofragmentCrunchyrollFetch' - }) - console.error(`Retrying fragment ${index} due to error:`, error) - await new Promise((resolve) => setTimeout(resolve, 1000)) - } -} - async function mergePartsAudio( parts: { filename: string; url: string }[], tmp: string, @@ -217,5 +151,15 @@ async function mergePartsAudio( }) } catch (error) { console.error('Error merging parts:', error) + if (dn) { + dn.status = 'failed' + } + server.logger.log({ + level: 'error', + message: 'Error while merging Audio', + error: error, + timestamp: new Date().toISOString(), + section: 'audioCrunchyrollMerging' + }) } }