From 29371b21926b5252ef7a333f85835a08194527d3 Mon Sep 17 00:00:00 2001 From: iPromKnight Date: Mon, 5 Feb 2024 12:37:20 +0000 Subject: [PATCH] further lunchtime cleanup Three files left: torrentEntries, torrentFiles torrentSubtitles --- src/node/consumer/package-lock.json | 20 ++ src/node/consumer/package.json | 2 + src/node/consumer/src/index.js | 4 +- src/node/consumer/src/jobs/processTorrents.ts | 6 +- src/node/consumer/src/lib/cache.ts | 89 ------- src/node/consumer/src/lib/extension.ts | 62 ----- .../src/lib/helpers/promises_helpers.ts | 51 ++++ .../lib/interfaces/downloaded_torrent_file.ts | 3 + src/node/consumer/src/lib/logger.ts | 5 - src/node/consumer/src/lib/metadata.ts | 216 ----------------- src/node/consumer/src/lib/parseHelper.ts | 101 -------- src/node/consumer/src/lib/promises.ts | 52 ----- .../src/lib/services/cache_service.ts | 104 +++++++++ .../src/lib/services/extension_service.ts | 69 ++++++ .../src/lib/services/logging_service.ts | 26 +++ .../src/lib/services/metadata_service.ts | 217 ++++++++++++++++++ .../src/lib/services/parsing_service.ts | 106 +++++++++ .../lib/services/torrent_download_service.ts | 82 +++++++ .../services/torrent_processing_service.ts | 54 +++++ .../src/lib/services/tracker_service.ts | 32 +++ src/node/consumer/src/lib/torrent.js | 82 ------- src/node/consumer/src/lib/torrentEntries.js | 22 +- src/node/consumer/src/lib/torrentFiles.js | 32 +-- .../consumer/src/lib/torrent_processor.ts | 49 ---- src/node/consumer/src/lib/trackerService.ts | 27 --- .../src/repository/database_repository.ts | 8 +- 26 files changed, 802 insertions(+), 719 deletions(-) delete mode 100644 src/node/consumer/src/lib/cache.ts delete mode 100644 src/node/consumer/src/lib/extension.ts create mode 100644 src/node/consumer/src/lib/helpers/promises_helpers.ts create mode 100644 src/node/consumer/src/lib/interfaces/downloaded_torrent_file.ts delete mode 100644 src/node/consumer/src/lib/logger.ts delete mode 100644 src/node/consumer/src/lib/metadata.ts delete mode 100644 src/node/consumer/src/lib/parseHelper.ts delete mode 100644 src/node/consumer/src/lib/promises.ts create mode 100644 src/node/consumer/src/lib/services/cache_service.ts create mode 100644 src/node/consumer/src/lib/services/extension_service.ts create mode 100644 src/node/consumer/src/lib/services/logging_service.ts create mode 100644 src/node/consumer/src/lib/services/metadata_service.ts create mode 100644 src/node/consumer/src/lib/services/parsing_service.ts create mode 100644 src/node/consumer/src/lib/services/torrent_download_service.ts create mode 100644 src/node/consumer/src/lib/services/torrent_processing_service.ts create mode 100644 src/node/consumer/src/lib/services/tracker_service.ts delete mode 100644 src/node/consumer/src/lib/torrent.js delete mode 100644 src/node/consumer/src/lib/torrent_processor.ts delete mode 100644 src/node/consumer/src/lib/trackerService.ts diff --git a/src/node/consumer/package-lock.json b/src/node/consumer/package-lock.json index 3ad7ee7..2898608 100644 --- a/src/node/consumer/package-lock.json +++ b/src/node/consumer/package-lock.json @@ -31,8 +31,10 @@ }, "devDependencies": { "@types/amqplib": "^0.10.4", + "@types/magnet-uri": "^5.1.5", "@types/node": "^20.11.16", "@types/stremio-addon-sdk": "^1.6.10", + "@types/torrent-stream": "^0.0.9", "@types/validator": "^13.11.8", "esbuild": "^0.20.0", "eslint": "^8.56.0", @@ -586,6 +588,15 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/magnet-uri": { + "version": "5.1.5", + "resolved": "https://registry.npmjs.org/@types/magnet-uri/-/magnet-uri-5.1.5.tgz", + "integrity": "sha512-SbBjlb1KGe38VfjRR+mwqztJd/4skhdKkRbIzPDhTy7IAeEAPZWIVSEkZw00Qr4ZZOGR3/ATJ20WWPBfrKHGdA==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/ms": { "version": "0.7.34", "license": "MIT" @@ -607,6 +618,15 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/torrent-stream": { + "version": "0.0.9", + "resolved": "https://registry.npmjs.org/@types/torrent-stream/-/torrent-stream-0.0.9.tgz", + "integrity": "sha512-SY0K6HNlDdnU7yk4TWpLjlv65/liZnxmftMuOdjRriC2IGExqnAYfl8dprjU1j1KQMPVM/X174cusUPNPloghQ==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/validator": { "version": "13.11.8", "resolved": "https://registry.npmjs.org/@types/validator/-/validator-13.11.8.tgz", diff --git a/src/node/consumer/package.json b/src/node/consumer/package.json index 0c86906..f306029 100644 --- a/src/node/consumer/package.json +++ b/src/node/consumer/package.json @@ -32,8 +32,10 @@ }, "devDependencies": { "@types/amqplib": "^0.10.4", + "@types/magnet-uri": "^5.1.5", "@types/node": "^20.11.16", "@types/stremio-addon-sdk": "^1.6.10", + "@types/torrent-stream": "^0.0.9", "@types/validator": "^13.11.8", "esbuild": "^0.20.0", "eslint": "^8.56.0", diff --git a/src/node/consumer/src/index.js b/src/node/consumer/src/index.js index 3f7ef3a..7d3e675 100644 --- a/src/node/consumer/src/index.js +++ b/src/node/consumer/src/index.js @@ -1,9 +1,9 @@ import { listenToQueue } from './jobs/processTorrents'; import { repository } from "./repository/database_repository"; -import { getTrackers } from "./lib/trackerService"; +import { trackerService } from "./lib/services/tracker_service.js"; (async () => { - await getTrackers(); + await trackerService.getTrackers(); await repository.connect(); await listenToQueue(); })(); \ No newline at end of file diff --git a/src/node/consumer/src/jobs/processTorrents.ts b/src/node/consumer/src/jobs/processTorrents.ts index 6f43426..54a3a6b 100644 --- a/src/node/consumer/src/jobs/processTorrents.ts +++ b/src/node/consumer/src/jobs/processTorrents.ts @@ -1,7 +1,7 @@ import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib' import {jobConfig, rabbitConfig} from '../lib/config'; -import {processTorrentRecord} from '../lib/torrent_processor'; -import {logger} from '../lib/logger'; +import {torrentProcessingService} from '../lib/services/torrent_processing_service'; +import {logger} from '../lib/services/logging_service'; import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message"; import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes"; @@ -10,7 +10,7 @@ const consumeQueueOptions: Options.Consume = { noAck: false }; const processMessage = (msg: ConsumeMessage | null): Promise => { const ingestedTorrent: IngestedTorrentAttributes = getMessageAsJson(msg); - return processTorrentRecord(ingestedTorrent); + return torrentProcessingService.processTorrentRecord(ingestedTorrent); }; const getMessageAsJson = (msg: ConsumeMessage | null): IngestedTorrentAttributes => { diff --git a/src/node/consumer/src/lib/cache.ts b/src/node/consumer/src/lib/cache.ts deleted file mode 100644 index d389257..0000000 --- a/src/node/consumer/src/lib/cache.ts +++ /dev/null @@ -1,89 +0,0 @@ -import {Cache, createCache, memoryStore} from 'cache-manager'; -import { mongoDbStore } from '@tirke/node-cache-manager-mongodb' -import { cacheConfig } from './config'; -import { logger } from './logger'; -import { CacheType } from "./enums/cache_types"; -import {CacheOptions} from "./interfaces/cache_options"; - -const GLOBAL_KEY_PREFIX = 'knightcrawler-consumer'; -const IMDB_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|imdb_id`; -const KITSU_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|kitsu_id`; -const METADATA_PREFIX = `${GLOBAL_KEY_PREFIX}|metadata`; -const TRACKERS_KEY_PREFIX = `${GLOBAL_KEY_PREFIX}|trackers`; - -const GLOBAL_TTL: number = Number(process.env.METADATA_TTL) || 7 * 24 * 60 * 60; // 7 days -const MEMORY_TTL: number = Number(process.env.METADATA_TTL) || 2 * 60 * 60; // 2 hours -const TRACKERS_TTL: number = 2 * 24 * 60 * 60; // 2 days - -type CacheMethod = () => any; - -const initiateMemoryCache = () => - createCache(memoryStore(), { - ttl: MEMORY_TTL - }) as Cache; - -const initiateMongoCache = () => { - const store = mongoDbStore({ - collectionName: cacheConfig.COLLECTION_NAME, - ttl: GLOBAL_TTL, - url: cacheConfig.MONGO_URI, - mongoConfig:{ - socketTimeoutMS: 120000, - appName: 'knightcrawler-consumer', - } - }); - - return createCache(store, { - ttl: GLOBAL_TTL, - }); -} - -const initiateRemoteCache = (): Cache => { - if (cacheConfig.NO_CACHE) { - logger.debug('Cache is disabled'); - return null; - } - - return cacheConfig.MONGO_URI ? initiateMongoCache() : initiateMemoryCache(); -} - -const getCacheType = (cacheType: CacheType): typeof memoryCache | null => { - switch (cacheType) { - case CacheType.MEMORY: - return memoryCache; - case CacheType.MONGODB: - return remoteCache; - default: - return null; - } -} - -const memoryCache = initiateMemoryCache(); -const remoteCache = initiateRemoteCache(); - -const cacheWrap = async ( - cacheType: CacheType, key: string, method: CacheMethod, options: CacheOptions): Promise => { - const cache = getCacheType(cacheType); - - if (cacheConfig.NO_CACHE || !cache) { - return method(); - } - - logger.debug(`Cache type: ${cacheType}`); - logger.debug(`Cache key: ${key}`); - logger.debug(`Cache options: ${JSON.stringify(options)}`); - - return cache.wrap(key, method, options.ttl); -} - -export const cacheWrapImdbId = (key: string, method: CacheMethod): Promise => - cacheWrap(CacheType.MONGODB, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); - -export const cacheWrapKitsuId = (key: string, method: CacheMethod): Promise => - cacheWrap(CacheType.MONGODB, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); - -export const cacheWrapMetadata = (id: string, method: CacheMethod): Promise => - cacheWrap(CacheType.MEMORY, `${METADATA_PREFIX}:${id}`, method, { ttl: MEMORY_TTL }); - -export const cacheTrackers = (method: CacheMethod): Promise => - cacheWrap(CacheType.MEMORY, `${TRACKERS_KEY_PREFIX}`, method, { ttl: TRACKERS_TTL }); \ No newline at end of file diff --git a/src/node/consumer/src/lib/extension.ts b/src/node/consumer/src/lib/extension.ts deleted file mode 100644 index 937dc35..0000000 --- a/src/node/consumer/src/lib/extension.ts +++ /dev/null @@ -1,62 +0,0 @@ -const VIDEO_EXTENSIONS: string[] = [ - "3g2", - "3gp", - "avi", - "flv", - "mkv", - "mk3d", - "mov", - "mp2", - "mp4", - "m4v", - "mpe", - "mpeg", - "mpg", - "mpv", - "webm", - "wmv", - "ogm", - "divx" -]; -const SUBTITLE_EXTENSIONS: string[] = [ - "aqt", - "gsub", - "jss", - "sub", - "ttxt", - "pjs", - "psb", - "rt", - "smi", - "slt", - "ssf", - "srt", - "ssa", - "ass", - "usf", - "idx", - "vtt" -]; -const DISK_EXTENSIONS: string[] = [ - "iso", - "m2ts", - "ts", - "vob" -] - -export function isVideo(filename: string): boolean { - return isExtension(filename, VIDEO_EXTENSIONS); -} - -export function isSubtitle(filename: string): boolean { - return isExtension(filename, SUBTITLE_EXTENSIONS); -} - -export function isDisk(filename: string): boolean { - return isExtension(filename, DISK_EXTENSIONS); -} - -export function isExtension(filename: string, extensions: string[]): boolean { - const extensionMatch = filename.match(/\.(\w{2,4})$/); - return extensionMatch !== null && extensions.includes(extensionMatch[1].toLowerCase()); -} \ No newline at end of file diff --git a/src/node/consumer/src/lib/helpers/promises_helpers.ts b/src/node/consumer/src/lib/helpers/promises_helpers.ts new file mode 100644 index 0000000..3837d19 --- /dev/null +++ b/src/node/consumer/src/lib/helpers/promises_helpers.ts @@ -0,0 +1,51 @@ +export class PromiseHelpers { + public static async sequence(promises: Array<() => Promise>): Promise { + return promises.reduce((promise: Promise, func: () => Promise) => + promise.then(result => func().then(res => result.concat(res))), Promise.resolve([])); + } + + /** + * Return first resolved promise as the result. + */ + public static async first(promises: Array>): Promise { + return Promise.all(promises.map(p => { + // If a request fails, count that as a resolution so it will keep + // waiting for other possible successes. If a request succeeds, + // treat it as a rejection so Promise.all immediately bails out. + return p.then((val) => Promise.reject(val), (err) => Promise.resolve(err)); + })).then( + // If '.all' resolved, we've just got an array of errors. + (errors) => Promise.reject(errors), + // If '.all' rejected, we've got the result we wanted. + (val) => Promise.resolve(val) + ); + } + + /** + * Delay promise + */ + public static async delay(duration: number): Promise { + return new Promise((resolve) => setTimeout(() => resolve(), duration)); + } + + /** + * Timeout promise after a set time in ms + */ + public static async timeout(timeoutMs: number, promise: Promise, message = 'Timed out'): Promise { + return Promise.race([ + promise, + new Promise(function (resolve, reject) { + setTimeout(function () { + reject(message); + }, timeoutMs); + }) + ]); + } + + /** + * Return most common value from given array. + */ + public static mostCommonValue(array: any[]): any { + return array.sort((a, b) => array.filter(v => v === a).length - array.filter(v => v === b).length).pop(); + } +} diff --git a/src/node/consumer/src/lib/interfaces/downloaded_torrent_file.ts b/src/node/consumer/src/lib/interfaces/downloaded_torrent_file.ts new file mode 100644 index 0000000..4a0a5a5 --- /dev/null +++ b/src/node/consumer/src/lib/interfaces/downloaded_torrent_file.ts @@ -0,0 +1,3 @@ +export interface DownloadedTorrentFile extends TorrentStream.TorrentFile { + fileIndex: number; +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/logger.ts b/src/node/consumer/src/lib/logger.ts deleted file mode 100644 index 2ea0d7a..0000000 --- a/src/node/consumer/src/lib/logger.ts +++ /dev/null @@ -1,5 +0,0 @@ -import pino from 'pino'; - -export const logger = pino({ - level: process.env.LOG_LEVEL || 'info' -}); \ No newline at end of file diff --git a/src/node/consumer/src/lib/metadata.ts b/src/node/consumer/src/lib/metadata.ts deleted file mode 100644 index 99ebc8d..0000000 --- a/src/node/consumer/src/lib/metadata.ts +++ /dev/null @@ -1,216 +0,0 @@ -import axios, {AxiosResponse} from 'axios'; -import {search, ResultTypes} from 'google-sr'; -import nameToImdb from 'name-to-imdb'; -import { cacheWrapImdbId, cacheWrapKitsuId, cacheWrapMetadata } from './cache'; -import { TorrentType } from './enums/torrent_types'; -import {MetadataResponse} from "./interfaces/metadata_response"; -import {CinemetaJsonResponse} from "./interfaces/cinemeta_metadata"; -import {CommonVideoMetadata} from "./interfaces/common_video_metadata"; -import {KitsuJsonResponse} from "./interfaces/kitsu_metadata"; -import {MetaDataQuery} from "./interfaces/metadata_query"; -import {KitsuCatalogJsonResponse} from "./interfaces/kitsu_catalog_metadata"; - -const CINEMETA_URL = 'https://v3-cinemeta.strem.io'; -const KITSU_URL = 'https://anime-kitsu.strem.fun'; -const TIMEOUT = 20000; - -async function _requestMetadata(url: string): Promise { - let response: AxiosResponse = await axios.get(url, {timeout: TIMEOUT}); - let result : MetadataResponse; - const body = response.data; - if ('kitsu_id' in body.meta) { - result = handleKitsuResponse(body as KitsuJsonResponse); - } - else if ('imdb_id' in body.meta) { - result = handleCinemetaResponse(body as CinemetaJsonResponse); - } - else { - throw new Error('No valid metadata'); - } - - return result; -} - -function handleCinemetaResponse(body: CinemetaJsonResponse) : MetadataResponse { - return { - imdbId: parseInt(body.meta.imdb_id), - type: body.meta.type, - title: body.meta.name, - year: parseInt(body.meta.year), - country: body.meta.country, - genres: body.meta.genres, - status: body.meta.status, - videos: body.meta.videos - ? body.meta.videos.map(video => ({ - name: video.name, - season: video.season, - episode: video.episode, - imdbSeason: video.season, - imdbEpisode: video.episode, - })) - : [], - episodeCount: body.meta.videos - ? getEpisodeCount(body.meta.videos) - : [], - totalCount: body.meta.videos - ? body.meta.videos.filter( - entry => entry.season !== 0 && entry.episode !== 0 - ).length - : 0, - }; -} - -function handleKitsuResponse(body: KitsuJsonResponse) : MetadataResponse { - return { - kitsuId: parseInt(body.meta.kitsu_id), - type: body.meta.type, - title: body.meta.name, - year: parseInt(body.meta.year), - country: body.meta.country, - genres: body.meta.genres, - status: body.meta.status, - videos: body.meta.videos - ? body.meta.videos.map(video => ({ - name: video.title, - season: video.season, - episode: video.episode, - kitsuId: video.id, - kitsuEpisode: video.episode, - released: video.released, - })) - : [], - episodeCount: body.meta.videos - ? getEpisodeCount(body.meta.videos) - : [], - totalCount: body.meta.videos - ? body.meta.videos.filter( - entry => entry.season !== 0 && entry.episode !== 0 - ).length - : 0, - }; -} - -function getEpisodeCount(videos: CommonVideoMetadata[]) { - return Object.values( - videos - .filter(entry => entry.season !== 0 && entry.episode !== 0) - .sort((a, b) => a.season - b.season) - .reduce((map, next) => { - map[next.season] = map[next.season] + 1 || 1; - return map; - }, {}) - ); -} - - -export function escapeTitle(title: string): string { - return title.toLowerCase() - .normalize('NFKD') // normalize non-ASCII characters - .replace(/[\u0300-\u036F]/g, '') - .replace(/&/g, 'and') - .replace(/[;, ~./]+/g, ' ') // replace dots, commas or underscores with spaces - .replace(/[^\w \-()×+#@!'\u0400-\u04ff]+/g, '') // remove all non-alphanumeric chars - .replace(/^\d{1,2}[.#\s]+(?=(?:\d+[.\s]*)?[\u0400-\u04ff])/i, '') // remove russian movie numbering - .replace(/\s{2,}/, ' ') // replace multiple spaces - .trim(); -} - -function getIMDbIdFromNameToImdb(name: string, info: MetaDataQuery) : Promise { - const year = info.year; - const type = info.type; - return new Promise((resolve, reject) => { - nameToImdb({ name, year, type }, function(err: Error, res: string) { - if (res) { - resolve(res); - } else { - reject(err || new Error('Failed IMDbId search')); - } - }); - }); -} - -async function getIMDbIdFromGoogle(query: string): Promise{ - try { - const searchResults = await search({ query: query }); - for(const result of searchResults) { - if(result.type === ResultTypes.SearchResult) { - if(result.link.includes('imdb.com/title/')){ - const match = result.link.match(/imdb\.com\/title\/(tt\d+)/); - if(match){ - return match[1]; - } - } - } - } - return undefined; - } - catch (error) { - throw new Error('Failed to find IMDb ID from Google search'); - } -} - -export async function getKitsuId(info: MetaDataQuery): Promise { - const title = escapeTitle(info.title.replace(/\s\|\s.*/, '')); - const year = info.year ? ` ${info.year}` : ''; - const season = info.season > 1 ? ` S${info.season}` : ''; - const key = `${title}${year}${season}`; - const query = encodeURIComponent(key); - - return cacheWrapKitsuId(key, - () => axios.get(`${KITSU_URL}/catalog/series/kitsu-anime-list/search=${query}.json`, { timeout: 60000 }) - .then((response) => { - const body = response.data as KitsuCatalogJsonResponse; - if (body && body.metas && body.metas.length) { - return body.metas[0].id.replace('kitsu:', ''); - } else { - throw new Error('No search results'); - } - })); -} - -export async function getImdbId(info: MetaDataQuery): Promise { - const name = escapeTitle(info.title); - const year = info.year || (info.date && info.date.slice(0, 4)); - const key = `${name}_${year || 'NA'}_${info.type}`; - const query = `${name} ${year || ''} ${info.type} imdb`; - const fallbackQuery = `${name} ${info.type} imdb`; - const googleQuery = year ? query : fallbackQuery; - - try { - const imdbId = await cacheWrapImdbId(key, - () => getIMDbIdFromNameToImdb(name, info) - ); - return imdbId && 'tt' + imdbId.replace(/tt0*([1-9][0-9]*)$/, '$1').padStart(7, '0'); - } catch (error) { - const imdbIdFallback = await getIMDbIdFromGoogle(googleQuery); - return imdbIdFallback && 'tt' + imdbIdFallback.toString().replace(/tt0*([1-9][0-9]*)$/, '$1').padStart(7, '0'); - } -} - -export function getMetadata(id: string | number, type: TorrentType = TorrentType.SERIES): Promise { - if (!id) { - return Promise.reject("no valid id provided"); - } - - const key = Number.isInteger(id) || id.toString().match(/^\d+$/) ? `kitsu:${id}` : id; - const metaType = type === TorrentType.MOVIE ? TorrentType.MOVIE : TorrentType.SERIES; - return cacheWrapMetadata(key, () => _requestMetadata(`${KITSU_URL}/meta/${metaType}/${key}.json`) - .catch(() => _requestMetadata(`${CINEMETA_URL}/meta/${metaType}/${key}.json`)) - .catch(() => { - // try different type in case there was a mismatch - const otherType = metaType === TorrentType.MOVIE ? TorrentType.SERIES : TorrentType.MOVIE; - return _requestMetadata(`${CINEMETA_URL}/meta/${otherType}/${key}.json`) - }) - .catch((error) => { - throw new Error(`failed metadata query ${key} due: ${error.message}`); - })); -} - -export async function isEpisodeImdbId(imdbId: string | undefined): Promise { - if (!imdbId) { - return false; - } - return axios.get(`https://www.imdb.com/title/${imdbId}/`, { timeout: 10000 }) - .then(response => !!(response.data && response.data.includes('video.episode'))) - .catch(() => false); -} \ No newline at end of file diff --git a/src/node/consumer/src/lib/parseHelper.ts b/src/node/consumer/src/lib/parseHelper.ts deleted file mode 100644 index d393186..0000000 --- a/src/node/consumer/src/lib/parseHelper.ts +++ /dev/null @@ -1,101 +0,0 @@ -import { parse } from 'parse-torrent-title'; -import { TorrentType } from './enums/torrent_types'; -import {ParseTorrentTitleResult} from "./interfaces/parse_torrent_title_result"; -import {ParsableTorrentVideo} from "./interfaces/parsable_torrent_video"; -import {ParsableTorrent} from "./interfaces/parsable_torrent"; - -const MULTIPLE_FILES_SIZE = 4 * 1024 * 1024 * 1024; // 4 GB - -export function parseSeriesVideos(torrent: ParsableTorrent, videos: ParsableTorrentVideo[]): ParseTorrentTitleResult[] { - const parsedTorrentName = parse(torrent.title); - const hasMovies = parsedTorrentName.complete || !!torrent.title.match(/movies?(?:\W|$)/i); - const parsedVideos = videos.map(video => parseSeriesVideo(video, parsedTorrentName)); - return parsedVideos.map(video => ({ ...video, isMovie: isMovieVideo(video, parsedVideos, torrent.type, hasMovies) })); -} - -function parseSeriesVideo(video: ParsableTorrentVideo, parsedTorrentName: ParseTorrentTitleResult): ParseTorrentTitleResult { - const videoInfo = parse(video.name); - // the episode may be in a folder containing season number - if (!Number.isInteger(videoInfo.season) && video.path.includes('/')) { - const folders = video.path.split('/'); - const pathInfo = parse(folders[folders.length - 2]); - videoInfo.season = pathInfo.season; - } - if (!Number.isInteger(videoInfo.season) && parsedTorrentName.season) { - videoInfo.season = parsedTorrentName.season; - } - if (!Number.isInteger(videoInfo.season) && videoInfo.seasons && videoInfo.seasons.length > 1) { - // in case single file was interpreted as having multiple seasons - videoInfo.season = videoInfo.seasons[0]; - } - if (!Number.isInteger(videoInfo.season) && video.path.includes('/') && parsedTorrentName.seasons - && parsedTorrentName.seasons.length > 1) { - // russian season are usually named with 'series name-2` i.e. Улицы разбитых фонарей-6/22. Одиночный выстрел.mkv - const folderPathSeasonMatch = video.path.match(/[\u0400-\u04ff]-(\d{1,2})(?=.*\/)/); - videoInfo.season = folderPathSeasonMatch && parseInt(folderPathSeasonMatch[1], 10) || undefined; - } - // sometimes video file does not have correct date format as in torrent title - if (!videoInfo.episodes && !videoInfo.date && parsedTorrentName.date) { - videoInfo.date = parsedTorrentName.date; - } - // limit number of episodes in case of incorrect parsing - if (videoInfo.episodes && videoInfo.episodes.length > 20) { - videoInfo.episodes = [videoInfo.episodes[0]]; - videoInfo.episode = videoInfo.episodes[0]; - } - // force episode to any found number if it was not parsed - if (!videoInfo.episodes && !videoInfo.date) { - const epMatcher = videoInfo.title.match( - /(? 3 - && otherVideos.filter(other => other.title === video.title && other.year === video.year).length < 3; -} - -export function isPackTorrent(torrent: ParsableTorrent): boolean { - if (torrent.pack) { - return true; - } - const parsedInfo = parse(torrent.title); - if (torrent.type === TorrentType.MOVIE) { - return parsedInfo.complete || typeof parsedInfo.year === 'string' || /movies/i.test(torrent.title); - } - const hasMultipleEpisodes = parsedInfo.complete || - torrent.size > MULTIPLE_FILES_SIZE || - (parsedInfo.seasons && parsedInfo.seasons.length > 1) || - (parsedInfo.episodes && parsedInfo.episodes.length > 1) || - (parsedInfo.seasons && !parsedInfo.episodes); - const hasSingleEpisode = Number.isInteger(parsedInfo.episode) || (!parsedInfo.episodes && parsedInfo.date); - return hasMultipleEpisodes && !hasSingleEpisode; -} \ No newline at end of file diff --git a/src/node/consumer/src/lib/promises.ts b/src/node/consumer/src/lib/promises.ts deleted file mode 100644 index f7a8dd3..0000000 --- a/src/node/consumer/src/lib/promises.ts +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Execute promises in sequence one after another. - */ -export async function sequence(promises: Array<() => Promise>): Promise { - return promises.reduce((promise: Promise, func: () => Promise) => - promise.then(result => func().then(res => result.concat(res))), Promise.resolve([])); -} - -/** - * Return first resolved promise as the result. - */ -export async function first(promises: Array>): Promise { - return Promise.all(promises.map(p => { - // If a request fails, count that as a resolution so it will keep - // waiting for other possible successes. If a request succeeds, - // treat it as a rejection so Promise.all immediately bails out. - return p.then((val) => Promise.reject(val), (err) => Promise.resolve(err)); - })).then( - // If '.all' resolved, we've just got an array of errors. - (errors) => Promise.reject(errors), - // If '.all' rejected, we've got the result we wanted. - (val) => Promise.resolve(val) - ); -} - -/** - * Delay promise - */ -export async function delay(duration: number): Promise { - return new Promise((resolve) => setTimeout(() => resolve(), duration)); -} - -/** - * Timeout promise after a set time in ms - */ -export async function timeout(timeoutMs: number, promise: Promise, message = 'Timed out'): Promise { - return Promise.race([ - promise, - new Promise(function (resolve, reject) { - setTimeout(function () { - reject(message); - }, timeoutMs); - }) - ]); -} - -/** - * Return most common value from given array. - */ -export function mostCommonValue(array: any[]): any { - return array.sort((a, b) => array.filter(v => v === a).length - array.filter(v => v === b).length).pop(); -} \ No newline at end of file diff --git a/src/node/consumer/src/lib/services/cache_service.ts b/src/node/consumer/src/lib/services/cache_service.ts new file mode 100644 index 0000000..5743d3c --- /dev/null +++ b/src/node/consumer/src/lib/services/cache_service.ts @@ -0,0 +1,104 @@ +import {Cache, createCache, memoryStore} from 'cache-manager'; +import { mongoDbStore } from '@tirke/node-cache-manager-mongodb' +import { cacheConfig } from '../config'; +import { logger } from './logging_service'; +import { CacheType } from "../enums/cache_types"; +import {CacheOptions} from "../interfaces/cache_options"; + +const GLOBAL_KEY_PREFIX = 'knightcrawler-consumer'; +const IMDB_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|imdb_id`; +const KITSU_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|kitsu_id`; +const METADATA_PREFIX = `${GLOBAL_KEY_PREFIX}|metadata`; +const TRACKERS_KEY_PREFIX = `${GLOBAL_KEY_PREFIX}|trackers`; + +const GLOBAL_TTL: number = Number(process.env.METADATA_TTL) || 7 * 24 * 60 * 60; // 7 days +const MEMORY_TTL: number = Number(process.env.METADATA_TTL) || 2 * 60 * 60; // 2 hours +const TRACKERS_TTL: number = 2 * 24 * 60 * 60; // 2 days + +type CacheMethod = () => any; + +class CacheService { + constructor() { + if (!cacheConfig.NO_CACHE) { + logger.info('Cache is disabled'); + return; + } + + this.memoryCache = this.initiateMemoryCache(); + this.remoteCache = this.initiateRemoteCache(); + } + + public cacheWrapImdbId = (key: string, method: CacheMethod): Promise => + this.cacheWrap(CacheType.MONGODB, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); + + public cacheWrapKitsuId = (key: string, method: CacheMethod): Promise => + this.cacheWrap(CacheType.MONGODB, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); + + public cacheWrapMetadata = (id: string, method: CacheMethod): Promise => + this.cacheWrap(CacheType.MEMORY, `${METADATA_PREFIX}:${id}`, method, { ttl: MEMORY_TTL }); + + public cacheTrackers = (method: CacheMethod): Promise => + this.cacheWrap(CacheType.MEMORY, `${TRACKERS_KEY_PREFIX}`, method, { ttl: TRACKERS_TTL }); + + private initiateMemoryCache = () => + createCache(memoryStore(), { + ttl: MEMORY_TTL + }) as Cache; + + private initiateMongoCache = () => { + const store = mongoDbStore({ + collectionName: cacheConfig.COLLECTION_NAME, + ttl: GLOBAL_TTL, + url: cacheConfig.MONGO_URI, + mongoConfig:{ + socketTimeoutMS: 120000, + appName: 'knightcrawler-consumer', + } + }); + + return createCache(store, { + ttl: GLOBAL_TTL, + }); + } + + private initiateRemoteCache = (): Cache => { + if (cacheConfig.NO_CACHE) { + logger.debug('Cache is disabled'); + return null; + } + + return cacheConfig.MONGO_URI ? this.initiateMongoCache() : this.initiateMemoryCache(); + } + + private getCacheType = (cacheType: CacheType): typeof this.memoryCache | null => { + switch (cacheType) { + case CacheType.MEMORY: + return this.memoryCache; + case CacheType.MONGODB: + return this.remoteCache; + default: + return null; + } + } + + private readonly memoryCache: Cache; + private readonly remoteCache: Cache; + + private cacheWrap = async ( + cacheType: CacheType, key: string, method: CacheMethod, options: CacheOptions): Promise => { + const cache = this.getCacheType(cacheType); + + if (cacheConfig.NO_CACHE || !cache) { + return method(); + } + + logger.debug(`Cache type: ${cacheType}`); + logger.debug(`Cache key: ${key}`); + logger.debug(`Cache options: ${JSON.stringify(options)}`); + + return cache.wrap(key, method, options.ttl); + } +} + +export const cacheService: CacheService = new CacheService(); + diff --git a/src/node/consumer/src/lib/services/extension_service.ts b/src/node/consumer/src/lib/services/extension_service.ts new file mode 100644 index 0000000..4e270ad --- /dev/null +++ b/src/node/consumer/src/lib/services/extension_service.ts @@ -0,0 +1,69 @@ +class ExtensionService { + private readonly VIDEO_EXTENSIONS: string[] = [ + "3g2", + "3gp", + "avi", + "flv", + "mkv", + "mk3d", + "mov", + "mp2", + "mp4", + "m4v", + "mpe", + "mpeg", + "mpg", + "mpv", + "webm", + "wmv", + "ogm", + "divx" + ]; + private readonly SUBTITLE_EXTENSIONS: string[] = [ + "aqt", + "gsub", + "jss", + "sub", + "ttxt", + "pjs", + "psb", + "rt", + "smi", + "slt", + "ssf", + "srt", + "ssa", + "ass", + "usf", + "idx", + "vtt" + ]; + private readonly DISK_EXTENSIONS: string[] = [ + "iso", + "m2ts", + "ts", + "vob" + ] + + public isVideo(filename: string): boolean { + return this.isExtension(filename, this.VIDEO_EXTENSIONS); + } + + public isSubtitle(filename: string): boolean { + return this.isExtension(filename, this.SUBTITLE_EXTENSIONS); + } + + public isDisk(filename: string): boolean { + return this.isExtension(filename, this.DISK_EXTENSIONS); + } + + public isExtension(filename: string, extensions: string[]): boolean { + const extensionMatch = filename.match(/\.(\w{2,4})$/); + return extensionMatch !== null && extensions.includes(extensionMatch[1].toLowerCase()); + } +} + +export const extensionService = new ExtensionService(); + + + diff --git a/src/node/consumer/src/lib/services/logging_service.ts b/src/node/consumer/src/lib/services/logging_service.ts new file mode 100644 index 0000000..ac23ed8 --- /dev/null +++ b/src/node/consumer/src/lib/services/logging_service.ts @@ -0,0 +1,26 @@ +import {Logger, pino} from "pino"; + +class LoggingService { + public readonly logger: Logger = pino({ + level: process.env.LOG_LEVEL || 'info' + }); + + public info(message: string, ...args: any[]): void { + this.logger.info(message); + } + + public error(message: string, ...args: any[]): void { + this.logger.error(message); + } + + public debug(message: string, ...args: any[]): void { + this.logger.debug(message); + } + + public warn(message: string, ...args: any[]): void { + this.logger.warn(message); + } +} + +export const logger = new LoggingService(); + diff --git a/src/node/consumer/src/lib/services/metadata_service.ts b/src/node/consumer/src/lib/services/metadata_service.ts new file mode 100644 index 0000000..5667bb8 --- /dev/null +++ b/src/node/consumer/src/lib/services/metadata_service.ts @@ -0,0 +1,217 @@ +import axios, {AxiosResponse} from 'axios'; +import {search, ResultTypes} from 'google-sr'; +import nameToImdb from 'name-to-imdb'; +import { cacheService } from './cache_service'; +import { TorrentType } from '../enums/torrent_types'; +import {MetadataResponse} from "../interfaces/metadata_response"; +import {CinemetaJsonResponse} from "../interfaces/cinemeta_metadata"; +import {CommonVideoMetadata} from "../interfaces/common_video_metadata"; +import {KitsuJsonResponse} from "../interfaces/kitsu_metadata"; +import {MetaDataQuery} from "../interfaces/metadata_query"; +import {KitsuCatalogJsonResponse} from "../interfaces/kitsu_catalog_metadata"; + +const CINEMETA_URL = 'https://v3-cinemeta.strem.io'; +const KITSU_URL = 'https://anime-kitsu.strem.fun'; +const TIMEOUT = 20000; + +class MetadataService { + public async getKitsuId(info: MetaDataQuery): Promise { + const title = this.escapeTitle(info.title.replace(/\s\|\s.*/, '')); + const year = info.year ? ` ${info.year}` : ''; + const season = info.season > 1 ? ` S${info.season}` : ''; + const key = `${title}${year}${season}`; + const query = encodeURIComponent(key); + + return cacheService.cacheWrapKitsuId(key, + () => axios.get(`${KITSU_URL}/catalog/series/kitsu-anime-list/search=${query}.json`, { timeout: 60000 }) + .then((response) => { + const body = response.data as KitsuCatalogJsonResponse; + if (body && body.metas && body.metas.length) { + return body.metas[0].id.replace('kitsu:', ''); + } else { + throw new Error('No search results'); + } + })); + } + + public async getImdbId(info: MetaDataQuery): Promise { + const name = this.escapeTitle(info.title); + const year = info.year || (info.date && info.date.slice(0, 4)); + const key = `${name}_${year || 'NA'}_${info.type}`; + const query = `${name} ${year || ''} ${info.type} imdb`; + const fallbackQuery = `${name} ${info.type} imdb`; + const googleQuery = year ? query : fallbackQuery; + + try { + const imdbId = await cacheService.cacheWrapImdbId(key, + () => this.getIMDbIdFromNameToImdb(name, info) + ); + return imdbId && 'tt' + imdbId.replace(/tt0*([1-9][0-9]*)$/, '$1').padStart(7, '0'); + } catch (error) { + const imdbIdFallback = await this.getIMDbIdFromGoogle(googleQuery); + return imdbIdFallback && 'tt' + imdbIdFallback.toString().replace(/tt0*([1-9][0-9]*)$/, '$1').padStart(7, '0'); + } + } + + public getMetadata(id: string | number, type: TorrentType = TorrentType.SERIES): Promise { + if (!id) { + return Promise.reject("no valid id provided"); + } + + const key = Number.isInteger(id) || id.toString().match(/^\d+$/) ? `kitsu:${id}` : id; + const metaType = type === TorrentType.MOVIE ? TorrentType.MOVIE : TorrentType.SERIES; + return cacheService.cacheWrapMetadata(key.toString(), () => this.requestMetadata(`${KITSU_URL}/meta/${metaType}/${key}.json`) + .catch(() => this.requestMetadata(`${CINEMETA_URL}/meta/${metaType}/${key}.json`)) + .catch(() => { + // try different type in case there was a mismatch + const otherType = metaType === TorrentType.MOVIE ? TorrentType.SERIES : TorrentType.MOVIE; + return this.requestMetadata(`${CINEMETA_URL}/meta/${otherType}/${key}.json`) + }) + .catch((error) => { + throw new Error(`failed metadata query ${key} due: ${error.message}`); + })); + } + + public async isEpisodeImdbId(imdbId: string | undefined): Promise { + if (!imdbId) { + return false; + } + return axios.get(`https://www.imdb.com/title/${imdbId}/`, {timeout: 10000}) + .then(response => !!(response.data && response.data.includes('video.episode'))) + .catch(() => false); + } + + public escapeTitle(title: string): string { + return title.toLowerCase() + .normalize('NFKD') // normalize non-ASCII characters + .replace(/[\u0300-\u036F]/g, '') + .replace(/&/g, 'and') + .replace(/[;, ~./]+/g, ' ') // replace dots, commas or underscores with spaces + .replace(/[^\w \-()×+#@!'\u0400-\u04ff]+/g, '') // remove all non-alphanumeric chars + .replace(/^\d{1,2}[.#\s]+(?=(?:\d+[.\s]*)?[\u0400-\u04ff])/i, '') // remove russian movie numbering + .replace(/\s{2,}/, ' ') // replace multiple spaces + .trim(); + } + + private async requestMetadata(url: string): Promise { + let response: AxiosResponse = await axios.get(url, {timeout: TIMEOUT}); + let result: MetadataResponse; + const body = response.data; + if ('kitsu_id' in body.meta) { + result = this.handleKitsuResponse(body as KitsuJsonResponse); + } else if ('imdb_id' in body.meta) { + result = this.handleCinemetaResponse(body as CinemetaJsonResponse); + } else { + throw new Error('No valid metadata'); + } + + return result; + } + + private handleCinemetaResponse(body: CinemetaJsonResponse): MetadataResponse { + return { + imdbId: parseInt(body.meta.imdb_id), + type: body.meta.type, + title: body.meta.name, + year: parseInt(body.meta.year), + country: body.meta.country, + genres: body.meta.genres, + status: body.meta.status, + videos: body.meta.videos + ? body.meta.videos.map(video => ({ + name: video.name, + season: video.season, + episode: video.episode, + imdbSeason: video.season, + imdbEpisode: video.episode, + })) + : [], + episodeCount: body.meta.videos + ? this.getEpisodeCount(body.meta.videos) + : [], + totalCount: body.meta.videos + ? body.meta.videos.filter( + entry => entry.season !== 0 && entry.episode !== 0 + ).length + : 0, + }; + } + + private handleKitsuResponse(body: KitsuJsonResponse): MetadataResponse { + return { + kitsuId: parseInt(body.meta.kitsu_id), + type: body.meta.type, + title: body.meta.name, + year: parseInt(body.meta.year), + country: body.meta.country, + genres: body.meta.genres, + status: body.meta.status, + videos: body.meta.videos + ? body.meta.videos.map(video => ({ + name: video.title, + season: video.season, + episode: video.episode, + kitsuId: video.id, + kitsuEpisode: video.episode, + released: video.released, + })) + : [], + episodeCount: body.meta.videos + ? this.getEpisodeCount(body.meta.videos) + : [], + totalCount: body.meta.videos + ? body.meta.videos.filter( + entry => entry.season !== 0 && entry.episode !== 0 + ).length + : 0, + }; + } + + private getEpisodeCount(videos: CommonVideoMetadata[]) { + return Object.values( + videos + .filter(entry => entry.season !== 0 && entry.episode !== 0) + .sort((a, b) => a.season - b.season) + .reduce((map, next) => { + map[next.season] = map[next.season] + 1 || 1; + return map; + }, {}) + ); + } + + private getIMDbIdFromNameToImdb(name: string, info: MetaDataQuery): Promise { + const year = info.year; + const type = info.type; + return new Promise((resolve, reject) => { + nameToImdb({name, year, type}, function (err: Error, res: string) { + if (res) { + resolve(res); + } else { + reject(err || new Error('Failed IMDbId search')); + } + }); + }); + } + + private async getIMDbIdFromGoogle(query: string): Promise { + try { + const searchResults = await search({query: query}); + for (const result of searchResults) { + if (result.type === ResultTypes.SearchResult) { + if (result.link.includes('imdb.com/title/')) { + const match = result.link.match(/imdb\.com\/title\/(tt\d+)/); + if (match) { + return match[1]; + } + } + } + } + return undefined; + } catch (error) { + throw new Error('Failed to find IMDb ID from Google search'); + } + } +} + +export const metadataService: MetadataService = new MetadataService(); + diff --git a/src/node/consumer/src/lib/services/parsing_service.ts b/src/node/consumer/src/lib/services/parsing_service.ts new file mode 100644 index 0000000..28ea866 --- /dev/null +++ b/src/node/consumer/src/lib/services/parsing_service.ts @@ -0,0 +1,106 @@ +import { parse } from 'parse-torrent-title'; +import { TorrentType } from '../enums/torrent_types'; +import {ParseTorrentTitleResult} from "../interfaces/parse_torrent_title_result"; +import {ParsableTorrentVideo} from "../interfaces/parsable_torrent_video"; +import {ParsableTorrent} from "../interfaces/parsable_torrent"; + +class ParsingService { + private readonly MULTIPLE_FILES_SIZE = 4 * 1024 * 1024 * 1024; // 4 GB + + public parseSeriesVideos(torrent: ParsableTorrent, videos: ParsableTorrentVideo[]): ParseTorrentTitleResult[] { + const parsedTorrentName = parse(torrent.title); + const hasMovies = parsedTorrentName.complete || !!torrent.title.match(/movies?(?:\W|$)/i); + const parsedVideos = videos.map(video => this.parseSeriesVideo(video, parsedTorrentName)); + return parsedVideos.map(video => ({ ...video, isMovie: this.isMovieVideo(video, parsedVideos, torrent.type, hasMovies) })); + } + + public isPackTorrent(torrent: ParsableTorrent): boolean { + if (torrent.pack) { + return true; + } + const parsedInfo = parse(torrent.title); + if (torrent.type === TorrentType.MOVIE) { + return parsedInfo.complete || typeof parsedInfo.year === 'string' || /movies/i.test(torrent.title); + } + const hasMultipleEpisodes = parsedInfo.complete || + torrent.size > this.MULTIPLE_FILES_SIZE || + (parsedInfo.seasons && parsedInfo.seasons.length > 1) || + (parsedInfo.episodes && parsedInfo.episodes.length > 1) || + (parsedInfo.seasons && !parsedInfo.episodes); + const hasSingleEpisode = Number.isInteger(parsedInfo.episode) || (!parsedInfo.episodes && parsedInfo.date); + return hasMultipleEpisodes && !hasSingleEpisode; + } + + private parseSeriesVideo(video: ParsableTorrentVideo, parsedTorrentName: ParseTorrentTitleResult): ParseTorrentTitleResult { + const videoInfo = parse(video.name); + // the episode may be in a folder containing season number + if (!Number.isInteger(videoInfo.season) && video.path.includes('/')) { + const folders = video.path.split('/'); + const pathInfo = parse(folders[folders.length - 2]); + videoInfo.season = pathInfo.season; + } + if (!Number.isInteger(videoInfo.season) && parsedTorrentName.season) { + videoInfo.season = parsedTorrentName.season; + } + if (!Number.isInteger(videoInfo.season) && videoInfo.seasons && videoInfo.seasons.length > 1) { + // in case single file was interpreted as having multiple seasons + videoInfo.season = videoInfo.seasons[0]; + } + if (!Number.isInteger(videoInfo.season) && video.path.includes('/') && parsedTorrentName.seasons + && parsedTorrentName.seasons.length > 1) { + // russian season are usually named with 'series name-2` i.e. Улицы разбитых фонарей-6/22. Одиночный выстрел.mkv + const folderPathSeasonMatch = video.path.match(/[\u0400-\u04ff]-(\d{1,2})(?=.*\/)/); + videoInfo.season = folderPathSeasonMatch && parseInt(folderPathSeasonMatch[1], 10) || undefined; + } + // sometimes video file does not have correct date format as in torrent title + if (!videoInfo.episodes && !videoInfo.date && parsedTorrentName.date) { + videoInfo.date = parsedTorrentName.date; + } + // limit number of episodes in case of incorrect parsing + if (videoInfo.episodes && videoInfo.episodes.length > 20) { + videoInfo.episodes = [videoInfo.episodes[0]]; + videoInfo.episode = videoInfo.episodes[0]; + } + // force episode to any found number if it was not parsed + if (!videoInfo.episodes && !videoInfo.date) { + const epMatcher = videoInfo.title.match( + /(? 3 + && otherVideos.filter(other => other.title === video.title && other.year === video.year).length < 3; + } +} + +export const parsingService = new ParsingService(); + diff --git a/src/node/consumer/src/lib/services/torrent_download_service.ts b/src/node/consumer/src/lib/services/torrent_download_service.ts new file mode 100644 index 0000000..2cffccd --- /dev/null +++ b/src/node/consumer/src/lib/services/torrent_download_service.ts @@ -0,0 +1,82 @@ +import { encode } from 'magnet-uri'; +import torrentStream from 'torrent-stream'; +import { torrentConfig } from '../config'; +import {extensionService} from './extension_service'; +import {TorrentInfo} from "../interfaces/torrent_info"; +import {DownloadedTorrentFile} from "../interfaces/downloaded_torrent_file"; + +class TorrentDownloadService { + private engineOptions: TorrentStream.TorrentEngineOptions = { + connections: torrentConfig.MAX_CONNECTIONS_PER_TORRENT, + uploads: 0, + verify: false, + dht: false, + tracker: true, + }; + + public async getTorrentFiles(torrent: TorrentInfo, timeout: number = 30000): Promise<{ contents: Array; videos: Array; subtitles: Array }> { + return this.filesFromTorrentStream(torrent, timeout) + .then((files: Array) => ({ + contents: files, + videos: this.filterVideos(files), + subtitles: this.filterSubtitles(files) + })); + } + + private async filesFromTorrentStream(torrent: TorrentInfo, timeout: number): Promise> { + if (!torrent.infoHash) { + return Promise.reject(new Error("No infoHash...")); + } + const magnet = encode({ infoHash: torrent.infoHash, announce: torrent.trackers.split(',') }); + + return new Promise((resolve, reject) => { + let engine: TorrentStream.TorrentEngine; + + const timeoutId = setTimeout(() => { + engine.destroy(() => {}); + reject(new Error('No available connections for torrent!')); + }, timeout); + + engine = torrentStream(magnet, this.engineOptions); + + engine.on("ready", () => { + const files: DownloadedTorrentFile[] = engine.files.map((file, fileId) => ({ ...file, fileIndex: fileId })); + resolve(files); + + engine.destroy(() => {}); + clearTimeout(timeoutId); + }); + }); + } + + private filterVideos(files: Array): Array { + if (files.length === 1 && !Number.isInteger(files[0].fileIndex)) { + return files; + } + const videos = files.filter(file => extensionService.isVideo(file.path || '')); + const maxSize = Math.max(...videos.map((video: DownloadedTorrentFile) => video.length)); + const minSampleRatio = videos.length <= 3 ? 3 : 10; + const minAnimeExtraRatio = 5; + const minRedundantRatio = videos.length <= 3 ? 30 : Number.MAX_VALUE; + const isSample = (video: DownloadedTorrentFile) => video.path?.match(/sample|bonus|promo/i) && maxSize / parseInt(video.length.toString()) > minSampleRatio; + const isRedundant = (video: DownloadedTorrentFile) => maxSize / parseInt(video.length.toString()) > minRedundantRatio; + const isExtra = (video: DownloadedTorrentFile) => video.path?.match(/extras?\//i); + const isAnimeExtra = (video: DownloadedTorrentFile) => video.path?.match(/(?:\b|_)(?:NC)?(?:ED|OP|PV)(?:v?\d\d?)?(?:\b|_)/i) + && maxSize / parseInt(video.length.toString()) > minAnimeExtraRatio; + const isWatermark = (video: DownloadedTorrentFile) => video.path?.match(/^[A-Z-]+(?:\.[A-Z]+)?\.\w{3,4}$/) + && maxSize / parseInt(video.length.toString()) > minAnimeExtraRatio + return videos + .filter(video => !isSample(video)) + .filter(video => !isExtra(video)) + .filter(video => !isAnimeExtra(video)) + .filter(video => !isRedundant(video)) + .filter(video => !isWatermark(video)); + } + + private filterSubtitles(files: Array): Array { + return files.filter(file => extensionService.isSubtitle(file.path || '')); + } +} + +export const torrentDownloadService = new TorrentDownloadService(); + diff --git a/src/node/consumer/src/lib/services/torrent_processing_service.ts b/src/node/consumer/src/lib/services/torrent_processing_service.ts new file mode 100644 index 0000000..3374881 --- /dev/null +++ b/src/node/consumer/src/lib/services/torrent_processing_service.ts @@ -0,0 +1,54 @@ +import {TorrentInfo} from "../interfaces/torrent_info"; +import {TorrentType} from "../enums/torrent_types"; +import {logger} from "./logging_service"; +import {checkAndUpdateTorrent, createTorrentEntry} from "../torrentEntries.js"; +import {trackerService} from "./tracker_service"; +import {IngestedTorrentAttributes} from "../../repository/interfaces/ingested_torrent_attributes"; + +class TorrentProcessingService { + public async processTorrentRecord(torrent: IngestedTorrentAttributes): Promise { + const { category } = torrent; + const type = category === 'tv' ? TorrentType.SERIES : TorrentType.MOVIE; + const torrentInfo: TorrentInfo = await this.parseTorrent(torrent, type); + + logger.info(`Processing torrent ${torrentInfo.title} with infoHash ${torrentInfo.infoHash}`); + + if (await checkAndUpdateTorrent(torrentInfo)) { + return; + } + + return createTorrentEntry(torrentInfo); + } + + private async assignTorrentTrackers(): Promise { + const trackers = await trackerService.getTrackers(); + return trackers.join(','); + } + + private async parseTorrent(torrent: IngestedTorrentAttributes, category: string): Promise { + const infoHash = torrent.info_hash?.trim().toLowerCase() + return { + title: torrent.name, + torrentId: `${torrent.name}_${infoHash}`, + infoHash: infoHash, + seeders: 100, + size: torrent.size, + uploadDate: torrent.createdAt, + imdbId: this.parseImdbId(torrent), + type: category, + provider: torrent.source, + trackers: await this.assignTorrentTrackers(), + } + } + + private parseImdbId(torrent: IngestedTorrentAttributes): string | undefined { + if (torrent.imdb === undefined || torrent.imdb === null) { + return undefined; + } + + return torrent.imdb; + } +} + +export const torrentProcessingService = new TorrentProcessingService(); + diff --git a/src/node/consumer/src/lib/services/tracker_service.ts b/src/node/consumer/src/lib/services/tracker_service.ts new file mode 100644 index 0000000..15438a5 --- /dev/null +++ b/src/node/consumer/src/lib/services/tracker_service.ts @@ -0,0 +1,32 @@ +import axios, { AxiosResponse } from 'axios'; +import { cacheService } from "./cache_service"; +import { trackerConfig } from '../config'; +import { logger } from "./logging_service"; + +class TrackerService { + public async getTrackers() : Promise { + return cacheService.cacheTrackers(this.downloadTrackers); + }; + + private async downloadTrackers(): Promise { + const response: AxiosResponse = await axios.get(trackerConfig.TRACKERS_URL); + const trackersListText: string = response.data; + // Trackers are separated by a newline character + let urlTrackers = trackersListText.split("\n"); + // remove blank lines + urlTrackers = urlTrackers.filter(line => line.trim() !== ''); + + if (!trackerConfig.UDP_ENABLED) { + // remove any udp trackers + urlTrackers = urlTrackers.filter(line => !line.startsWith('udp://')); + + } + + logger.info(`Trackers updated at ${Date.now()}: ${urlTrackers.length} trackers`); + + return urlTrackers; + }; +} + +export const trackerService = new TrackerService(); + diff --git a/src/node/consumer/src/lib/torrent.js b/src/node/consumer/src/lib/torrent.js deleted file mode 100644 index 4f78d05..0000000 --- a/src/node/consumer/src/lib/torrent.js +++ /dev/null @@ -1,82 +0,0 @@ -import { decode } from 'magnet-uri'; -import torrentStream from 'torrent-stream'; -import { torrentConfig } from './config'; -import {isSubtitle, isVideo} from './extension'; - -export async function torrentFiles(torrent, timeout) { - return filesFromTorrentStream(torrent, timeout) - .then(files => ({ - contents: files, - videos: filterVideos(files), - subtitles: filterSubtitles(files) - })); -} - -async function filesFromTorrentStream(torrent, timeout) { - return filesAndSizeFromTorrentStream(torrent, timeout).then(result => result.files); -} - -const engineOptions = { - connections: torrentConfig.MAX_CONNECTIONS_PER_TORRENT, - uploads: 0, - verify: false, - dht: false, - tracker: true -} - -function filesAndSizeFromTorrentStream(torrent, timeout = 30000) { - if (!torrent.infoHash) { - return Promise.reject(new Error("no infoHash...")); - } - const magnet = decode.encode({ infoHash: torrent.infoHash, announce: torrent.trackers }); - return new Promise((resolve, rejected) => { - const timeoutId = setTimeout(() => { - engine.destroy(); - rejected(new Error('No available connections for torrent!')); - }, timeout); - - const engine = new torrentStream(magnet, engineOptions); - - engine.ready(() => { - const files = engine.files - .map((file, fileId) => ({ - fileIndex: fileId, - name: file.name, - path: file.path.replace(/^[^/]+\//, ''), - size: file.length - })); - const size = engine.torrent.length; - resolve({ files, size }); - engine.destroy(); - clearTimeout(timeoutId); - }); - }); -} - -function filterVideos(files) { - if (files.length === 1 && !Number.isInteger(files[0].fileIndex)) { - return files; - } - const videos = files.filter(file => isVideo(file.path)); - const maxSize = Math.max(...videos.map(video => video.size)); - const minSampleRatio = videos.length <= 3 ? 3 : 10; - const minAnimeExtraRatio = 5; - const minRedundantRatio = videos.length <= 3 ? 30 : Number.MAX_VALUE; - const isSample = video => video.path.match(/sample|bonus|promo/i) && maxSize / parseInt(video.size) > minSampleRatio; - const isRedundant = video => maxSize / parseInt(video.size) > minRedundantRatio; - const isExtra = video => video.path.match(/extras?\//i); - const isAnimeExtra = video => video.path.match(/(?:\b|_)(?:NC)?(?:ED|OP|PV)(?:v?\d\d?)?(?:\b|_)/i) - && maxSize / parseInt(video.size) > minAnimeExtraRatio; - const isWatermark = video => video.path.match(/^[A-Z-]+(?:\.[A-Z]+)?\.\w{3,4}$/) - && maxSize / parseInt(video.size) > minAnimeExtraRatio - return videos - .filter(video => !isSample(video)) - .filter(video => !isExtra(video)) - .filter(video => !isAnimeExtra(video)) - .filter(video => !isRedundant(video)) - .filter(video => !isWatermark(video)); -} - -function filterSubtitles(files) { - return files.filter(file => isSubtitle(file.path)); -} diff --git a/src/node/consumer/src/lib/torrentEntries.js b/src/node/consumer/src/lib/torrentEntries.js index f341192..d4aeef2 100644 --- a/src/node/consumer/src/lib/torrentEntries.js +++ b/src/node/consumer/src/lib/torrentEntries.js @@ -1,12 +1,12 @@ import { parse } from 'parse-torrent-title'; -import { getImdbId, getKitsuId } from './metadata'; -import { isPackTorrent } from './parseHelper'; -import * as Promises from './promises'; +import { metadataService } from './services/metadata_service'; +import { parsingService } from './services/parsing_service'; +import {PromiseHelpers} from './helpers/promises_helpers.js'; import { repository } from '../repository/database_repository'; import { parseTorrentFiles } from './torrentFiles.js'; import { assignSubtitles } from './torrentSubtitles.js'; import { TorrentType } from './enums/torrent_types'; -import {logger} from "./logger"; +import {logger} from './services/logging_service'; export async function createTorrentEntry(torrent, overwrite = false) { const titleInfo = parse(torrent.title); @@ -17,7 +17,7 @@ export async function createTorrentEntry(torrent, overwrite = false) { year: titleInfo.year, type: torrent.type }; - torrent.imdbId = await getImdbId(imdbQuery) + torrent.imdbId = await metadataService.getImdbId(imdbQuery) .catch(() => undefined); } if (torrent.imdbId && torrent.imdbId.length < 9) { @@ -34,11 +34,11 @@ export async function createTorrentEntry(torrent, overwrite = false) { year: titleInfo.year, season: titleInfo.season, }; - torrent.kitsuId = await getKitsuId(kitsuQuery) + torrent.kitsuId = await metadataService.getKitsuId(kitsuQuery) .catch(() => undefined); } - if (!torrent.imdbId && !torrent.kitsuId && !isPackTorrent(torrent)) { + if (!torrent.imdbId && !torrent.kitsuId && !parsingService.isPackTorrent(torrent)) { logger.warn(`imdbId or kitsuId not found: ${torrent.provider} ${torrent.title}`); return; } @@ -56,7 +56,7 @@ export async function createTorrentEntry(torrent, overwrite = false) { } return repository.createTorrent({ ...torrent, contents, subtitles }) - .then(() => Promises.sequence(videos.map(video => () => repository.createFile(video)))) + .then(() => PromiseHelpers.sequence(videos.map(video => () => repository.createFile(video)))) .then(() => logger.info(`Created ${torrent.provider} entry for [${torrent.infoHash}] ${torrent.title}`)); } @@ -132,8 +132,8 @@ export async function createTorrentContents(torrent) { return; } const notOpenedVideo = storedVideos.length === 1 && !Number.isInteger(storedVideos[0].fileIndex); - const imdbId = Promises.mostCommonValue(storedVideos.map(stored => stored.imdbId)); - const kitsuId = Promises.mostCommonValue(storedVideos.map(stored => stored.kitsuId)); + const imdbId = PromiseHelpers.mostCommonValue(storedVideos.map(stored => stored.imdbId)); + const kitsuId = PromiseHelpers.mostCommonValue(storedVideos.map(stored => stored.kitsuId)); const { contents, videos, subtitles } = await parseTorrentFiles({ ...torrent, imdbId, kitsuId }) .then(torrentContents => notOpenedVideo ? torrentContents : { ...torrentContents, videos: storedVideos }) @@ -165,7 +165,7 @@ export async function createTorrentContents(torrent) { } return Promise.resolve(); }) - .then(() => Promises.sequence(videos.map(video => () => repository.createFile(video)))) + .then(() => PromiseHelpers.sequence(videos.map(video => () => repository.createFile(video)))) .then(() => logger.info(`Created contents for ${torrent.provider} [${torrent.infoHash}] ${torrent.title}`)) .catch(error => logger.error(`Failed saving contents for [${torrent.infoHash}] ${torrent.title}`, error)); } diff --git a/src/node/consumer/src/lib/torrentFiles.js b/src/node/consumer/src/lib/torrentFiles.js index 3539925..417f783 100644 --- a/src/node/consumer/src/lib/torrentFiles.js +++ b/src/node/consumer/src/lib/torrentFiles.js @@ -3,20 +3,20 @@ import distance from 'jaro-winkler'; import moment from 'moment'; import { parse } from 'parse-torrent-title'; import { metadataConfig } from './config'; -import { isDisk } from './extension'; -import { getMetadata, getImdbId, getKitsuId } from './metadata'; -import { parseSeriesVideos, isPackTorrent } from './parseHelper'; -import * as Promises from './promises'; -import {torrentFiles} from "./torrent.js"; +import { extensionService } from './services/extension_service'; +import { metadataService } from './services/metadata_service'; +import { parsingService } from './services/parsing_service'; +import {PromiseHelpers} from './helpers/promises_helpers.js'; +import {torrentDownloadService} from "./services/torrent_download_service"; import { TorrentType } from './enums/torrent_types'; -import {logger} from "./logger"; +import {logger} from "./services/logging_service"; const MIN_SIZE = 5 * 1024 * 1024; // 5 MB const imdb_limiter = new Bottleneck({ maxConcurrent: metadataConfig.IMDB_CONCURRENT, minTime: metadataConfig.IMDB_INTERVAL_MS }); export async function parseTorrentFiles(torrent) { const parsedTorrentName = parse(torrent.title); - const metadata = await getMetadata(torrent.kitsuId || torrent.imdbId, torrent.type || TorrentType.MOVIE) + const metadata = await metadataService.getMetadata(torrent.kitsuId || torrent.imdbId, torrent.type || TorrentType.MOVIE) .then(meta => Object.assign({}, meta)) .catch(() => undefined); @@ -53,7 +53,7 @@ async function parseMovieFiles(torrent, parsedName, metadata) { return { contents, videos: parsedVideos, subtitles }; } - const parsedVideos = await Promises.sequence(filteredVideos.map(video => () => isFeaturette(video) + const parsedVideos = await PromiseHelpers.sequence(filteredVideos.map(video => () => isFeaturette(video) ? Promise.resolve(video) : findMovieImdbId(video.name).then(imdbId => ({ ...video, imdbId })))) .then(videos => videos.map(video => ({ @@ -70,7 +70,7 @@ async function parseSeriesFiles(torrent, parsedName, metadata) { const { contents, videos, subtitles } = await getSeriesTorrentContent(torrent); const parsedVideos = await Promise.resolve(videos) .then(videos => videos.filter(video => videos.length === 1 || video.size > MIN_SIZE)) - .then(videos => parseSeriesVideos(torrent, videos)) + .then(videos => parsingService.parseSeriesVideos(torrent, videos)) .then(videos => decomposeEpisodes(torrent, videos, metadata)) .then(videos => assignKitsuOrImdbEpisodes(torrent, videos, metadata)) .then(videos => Promise.all(videos.map(video => video.isMovie @@ -97,9 +97,9 @@ async function getMoviesTorrentContent(torrent) { } async function getSeriesTorrentContent(torrent) { - return torrentFiles(torrent) + return torrentDownloadService.getTorrentFiles(torrent) .catch(error => { - if (!isPackTorrent(torrent)) { + if (!parsingService.isPackTorrent(torrent)) { return { videos: [{ name: torrent.title, path: torrent.title, size: torrent.size }] } } return Promise.reject(error); @@ -136,7 +136,7 @@ async function mapSeriesEpisode(file, torrent, files) { async function mapSeriesMovie(file, torrent) { const kitsuId = torrent.type === TorrentType.ANIME ? await findMovieKitsuId(file) : undefined; const imdbId = !kitsuId ? await findMovieImdbId(file) : undefined; - const metadata = await getMetadata(kitsuId || imdbId, TorrentType.MOVIE).catch(() => ({})); + const metadata = await metadataService.getMetadata(kitsuId || imdbId, TorrentType.MOVIE).catch(() => ({})); const hasEpisode = metadata.videos && metadata.videos.length && (file.episode || metadata.videos.length === 1); const episodeVideo = hasEpisode && metadata.videos[(file.episode || 1) - 1]; return [{ @@ -458,7 +458,7 @@ function needsCinemetaMetadataForAnime(files, metadata) { } async function updateToCinemetaMetadata(metadata) { - return getMetadata(metadata.imdbId, metadata.type) + return metadataService.getMetadata(metadata.imdbId, metadata.type) .then(newMetadata => !newMetadata.videos || !newMetadata.videos.length ? metadata : newMetadata) .then(newMetadata => { metadata.videos = newMetadata.videos; @@ -478,7 +478,7 @@ function findMovieImdbId(title) { year: parsedTitle.year, type: TorrentType.MOVIE }; - return getImdbId(imdbQuery).catch(() => undefined); + return metadataService.getImdbId(imdbQuery).catch(() => undefined); }); } @@ -490,11 +490,11 @@ function findMovieKitsuId(title) { season: parsedTitle.season, type: TorrentType.MOVIE }; - return getKitsuId(kitsuQuery).catch(() => undefined); + return metadataService.getKitsuId(kitsuQuery).catch(() => undefined); } function isDiskTorrent(contents) { - return contents.some(content => isDisk(content.path)); + return contents.some(content => extensionService.isDisk(content.path)); } function isSingleMovie(videos) { diff --git a/src/node/consumer/src/lib/torrent_processor.ts b/src/node/consumer/src/lib/torrent_processor.ts deleted file mode 100644 index 97387d1..0000000 --- a/src/node/consumer/src/lib/torrent_processor.ts +++ /dev/null @@ -1,49 +0,0 @@ -import {TorrentInfo} from "./interfaces/torrent_info"; -import {TorrentType} from "./enums/torrent_types"; -import {logger} from "./logger"; -import {checkAndUpdateTorrent, createTorrentEntry} from "./torrentEntries.js"; -import {getTrackers} from "./trackerService"; -import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes"; - -export async function processTorrentRecord(torrent: IngestedTorrentAttributes): Promise { - const { category } = torrent; - const type = category === 'tv' ? TorrentType.SERIES : TorrentType.MOVIE; - const torrentInfo: TorrentInfo = await parseTorrent(torrent, type); - - logger.info(`Processing torrent ${torrentInfo.title} with infoHash ${torrentInfo.infoHash}`); - - if (await checkAndUpdateTorrent(torrentInfo)) { - return; - } - - return createTorrentEntry(torrentInfo); -} - -async function assignTorrentTrackers(): Promise { - const trackers = await getTrackers(); - return trackers.join(','); -} - -async function parseTorrent(torrent: IngestedTorrentAttributes, category: string): Promise { - const infoHash = torrent.info_hash?.trim().toLowerCase() - return { - title: torrent.name, - torrentId: `${torrent.name}_${infoHash}`, - infoHash: infoHash, - seeders: 100, - size: torrent.size, - uploadDate: torrent.createdAt, - imdbId: parseImdbId(torrent), - type: category, - provider: torrent.source, - trackers: await assignTorrentTrackers(), - } -} - -function parseImdbId(torrent: IngestedTorrentAttributes): string | undefined { - if (torrent.imdb === undefined || torrent.imdb === null) { - return undefined; - } - - return torrent.imdb; -} \ No newline at end of file diff --git a/src/node/consumer/src/lib/trackerService.ts b/src/node/consumer/src/lib/trackerService.ts deleted file mode 100644 index f42e44e..0000000 --- a/src/node/consumer/src/lib/trackerService.ts +++ /dev/null @@ -1,27 +0,0 @@ -import axios, { AxiosResponse } from 'axios'; -import { cacheTrackers } from "./cache"; -import { trackerConfig } from './config'; -import { logger } from "./logger"; - -const downloadTrackers = async (): Promise => { - const response: AxiosResponse = await axios.get(trackerConfig.TRACKERS_URL); - const trackersListText: string = response.data; - // Trackers are separated by a newline character - let urlTrackers = trackersListText.split("\n"); - // remove blank lines - urlTrackers = urlTrackers.filter(line => line.trim() !== ''); - - if (!trackerConfig.UDP_ENABLED) { - // remove any udp trackers - urlTrackers = urlTrackers.filter(line => !line.startsWith('udp://')); - - } - - logger.info(`Trackers updated at ${Date.now()}: ${urlTrackers.length} trackers`); - - return urlTrackers; -}; - -export const getTrackers = async (): Promise => { - return cacheTrackers(downloadTrackers); -}; \ No newline at end of file diff --git a/src/node/consumer/src/repository/database_repository.ts b/src/node/consumer/src/repository/database_repository.ts index 023aa4c..c1772a2 100644 --- a/src/node/consumer/src/repository/database_repository.ts +++ b/src/node/consumer/src/repository/database_repository.ts @@ -2,7 +2,7 @@ import moment from 'moment'; import {literal, Op, WhereOptions} from "sequelize"; import {Model, Sequelize} from 'sequelize-typescript'; import {databaseConfig} from '../lib/config'; -import * as Promises from '../lib/promises'; +import {PromiseHelpers} from '../lib/helpers/promises_helpers'; import {Provider} from "./models/provider"; import {File} from "./models/file"; import {Torrent} from "./models/torrent"; @@ -13,7 +13,7 @@ import {SkipTorrent} from "./models/skipTorrent"; import {FileAttributes} from "./interfaces/file_attributes"; import {TorrentAttributes} from "./interfaces/torrent_attributes"; import {IngestedPage} from "./models/ingestedPage"; -import {logger} from "../lib/logger"; +import {logger} from "../lib/services/logging_service"; class DatabaseRepository { private readonly database: Sequelize; @@ -83,7 +83,7 @@ class DatabaseRepository { } public async setIngestedTorrentsProcessed(ingestedTorrents: IngestedTorrent[]): Promise { - await Promises.sequence(ingestedTorrents + await PromiseHelpers.sequence(ingestedTorrents .map(ingestedTorrent => async () => { ingestedTorrent.processed = true; await ingestedTorrent.save(); @@ -194,7 +194,7 @@ class DatabaseRepository { public async upsertSubtitles(file: File, subtitles: Subtitle[]): Promise { if (file.id && subtitles && subtitles.length) { - await Promises.sequence(subtitles + await PromiseHelpers.sequence(subtitles .map(subtitle => { subtitle.fileId = file.id; subtitle.infoHash = subtitle.infoHash || file.infoHash;