diff --git a/src/node/consumer/esbuild.js b/src/node/consumer/esbuild.js index 63b1d16..9aea0be 100644 --- a/src/node/consumer/esbuild.js +++ b/src/node/consumer/esbuild.js @@ -13,7 +13,7 @@ try { build({ bundle: true, entryPoints: [ - "./src/index.js", + "./src/index.ts", ], external: [...(devDependencies && Object.keys(devDependencies))], keepNames: true, diff --git a/src/node/consumer/package.json b/src/node/consumer/package.json index 78c9826..a1e5129 100644 --- a/src/node/consumer/package.json +++ b/src/node/consumer/package.json @@ -4,7 +4,7 @@ "type": "module", "scripts": { "build": "node esbuild.js", - "dev": "tsx watch --ignore node_modules src/index.js | pino-pretty", + "dev": "tsx watch --ignore node_modules src/index.ts | pino-pretty", "start": "node dist/index.cjs", "lint": "eslint . --ext .ts,.js" }, diff --git a/src/node/consumer/src/index.js b/src/node/consumer/src/index.ts similarity index 65% rename from src/node/consumer/src/index.js rename to src/node/consumer/src/index.ts index d4a163b..f022264 100644 --- a/src/node/consumer/src/index.js +++ b/src/node/consumer/src/index.ts @@ -1,9 +1,9 @@ -import { listenToQueue } from './jobs/processTorrents'; +import { processTorrentsJob } from './jobs/process_torrents_job.js'; import { repository } from "./repository/database_repository"; import { trackerService } from "./lib/services/tracker_service"; (async () => { await trackerService.getTrackers(); await repository.connect(); - await listenToQueue(); + await processTorrentsJob.listenToQueue(); })(); \ No newline at end of file diff --git a/src/node/consumer/src/jobs/processTorrents.ts b/src/node/consumer/src/jobs/processTorrents.ts deleted file mode 100644 index 9c7cd9b..0000000 --- a/src/node/consumer/src/jobs/processTorrents.ts +++ /dev/null @@ -1,56 +0,0 @@ -import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib' -import {configurationService} from '../lib/services/configuration_service'; -import {torrentProcessingService} from '../lib/services/torrent_processing_service'; -import {logger} from '../lib/services/logging_service'; -import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message"; -import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes"; - -const assertQueueOptions: Options.AssertQueue = { durable: true }; -const consumeQueueOptions: Options.Consume = { noAck: false }; - -const processMessage = (msg: ConsumeMessage | null): Promise => { - const ingestedTorrent: IngestedTorrentAttributes = getMessageAsJson(msg); - return torrentProcessingService.processTorrentRecord(ingestedTorrent); -}; - -const getMessageAsJson = (msg: ConsumeMessage | null): IngestedTorrentAttributes => { - const content = msg ? msg?.content.toString('utf8') : "{}"; - const receivedObject: IngestedRabbitMessage = JSON.parse(content) as IngestedRabbitMessage; - const receivedTorrent:IngestedRabbitTorrent = receivedObject.message; - const mappedObject: any = {...receivedTorrent, info_hash: receivedTorrent.infoHash}; - delete mappedObject.infoHash; - - return mappedObject as IngestedTorrentAttributes; -}; - -const assertAndConsumeQueue = async (channel: Channel): Promise => { - logger.info('Worker is running! Waiting for new torrents...'); - - const ackMsg = (msg: ConsumeMessage): void => { - processMessage(msg) - .then(() => channel.ack(msg)) - .catch((error: Error) => logger.error('Failed processing torrent', error)); - } - - try { - await channel.assertQueue(configurationService.rabbitConfig.QUEUE_NAME, assertQueueOptions); - await channel.prefetch(configurationService.jobConfig.JOB_CONCURRENCY); - await channel.consume(configurationService.rabbitConfig.QUEUE_NAME, ackMsg, consumeQueueOptions); - } catch(error) { - logger.error('Failed to setup channel', error); - } -} - -export const listenToQueue = async (): Promise => { - if (!configurationService.jobConfig.JOBS_ENABLED) { - return; - } - - try { - const connection: Connection = await client.connect(configurationService.rabbitConfig.RABBIT_URI); - const channel: Channel = await connection.createChannel(); - await assertAndConsumeQueue(channel); - } catch (error) { - logger.error('Failed to connect and setup channel', error); - } -} \ No newline at end of file diff --git a/src/node/consumer/src/jobs/process_torrents_job.ts b/src/node/consumer/src/jobs/process_torrents_job.ts new file mode 100644 index 0000000..f655d31 --- /dev/null +++ b/src/node/consumer/src/jobs/process_torrents_job.ts @@ -0,0 +1,57 @@ +import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib' +import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message"; +import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes"; +import {configurationService} from '../lib/services/configuration_service'; +import {torrentProcessingService} from '../lib/services/torrent_processing_service'; +import {logger} from '../lib/services/logging_service'; + +class ProcessTorrentsJob { + private readonly assertQueueOptions: Options.AssertQueue = {durable: true}; + private readonly consumeQueueOptions: Options.Consume = {noAck: false}; + + public listenToQueue = async (): Promise => { + if (!configurationService.jobConfig.JOBS_ENABLED) { + return; + } + + try { + const connection: Connection = await client.connect(configurationService.rabbitConfig.RABBIT_URI); + const channel: Channel = await connection.createChannel(); + await this.assertAndConsumeQueue(channel); + } catch (error) { + logger.error('Failed to connect and setup channel', error); + } + } + private processMessage = (msg: ConsumeMessage) => { + const ingestedTorrent: IngestedTorrentAttributes = this.getMessageAsJson(msg); + return torrentProcessingService.processTorrentRecord(ingestedTorrent); + }; + private getMessageAsJson = (msg: ConsumeMessage): IngestedTorrentAttributes => { + const content = msg?.content.toString('utf8') ?? "{}"; + const receivedObject: IngestedRabbitMessage = JSON.parse(content); + const receivedTorrent: IngestedRabbitTorrent = receivedObject.message; + return {...receivedTorrent, info_hash: receivedTorrent.infoHash}; + }; + private async assertAndConsumeQueue(channel: Channel) { + logger.info('Worker is running! Waiting for new torrents...'); + + const ackMsg = async (msg: ConsumeMessage) => { + try { + await this.processMessage(msg); + channel.ack(msg); + } catch (error) { + logger.error('Failed processing torrent', error); + } + } + + try { + await channel.assertQueue(configurationService.rabbitConfig.QUEUE_NAME, this.assertQueueOptions); + await channel.prefetch(configurationService.jobConfig.JOB_CONCURRENCY); + await channel.consume(configurationService.rabbitConfig.QUEUE_NAME, ackMsg, this.consumeQueueOptions); + } catch (error) { + logger.error('Failed to setup channel', error); + } + } +} + +export const processTorrentsJob = new ProcessTorrentsJob(); \ No newline at end of file diff --git a/src/node/consumer/src/lib/enums/cache_types.ts b/src/node/consumer/src/lib/enums/cache_types.ts index 989f6bd..d1cf197 100644 --- a/src/node/consumer/src/lib/enums/cache_types.ts +++ b/src/node/consumer/src/lib/enums/cache_types.ts @@ -1,4 +1,4 @@ export enum CacheType { - MEMORY = 'memory', - MONGODB = 'mongodb' + Memory = 'memory', + MongoDb = 'mongodb' } \ No newline at end of file diff --git a/src/node/consumer/src/lib/enums/torrent_types.ts b/src/node/consumer/src/lib/enums/torrent_types.ts index 7a20693..47743c6 100644 --- a/src/node/consumer/src/lib/enums/torrent_types.ts +++ b/src/node/consumer/src/lib/enums/torrent_types.ts @@ -1,6 +1,5 @@ export enum TorrentType { - SERIES = 'SERIES', - MOVIE = 'MOVIE', - ANIME = 'anime', - PORN = 'xxx', + Series = 'Series', + Movie = 'Movie', + Anime = 'anime', } \ No newline at end of file diff --git a/src/node/consumer/src/lib/services/cache_service.ts b/src/node/consumer/src/lib/services/cache_service.ts index 88aad32..1e1e673 100644 --- a/src/node/consumer/src/lib/services/cache_service.ts +++ b/src/node/consumer/src/lib/services/cache_service.ts @@ -29,16 +29,16 @@ class CacheService { } public cacheWrapImdbId = (key: string, method: CacheMethod): Promise => - this.cacheWrap(CacheType.MONGODB, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); + this.cacheWrap(CacheType.MongoDb, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); public cacheWrapKitsuId = (key: string, method: CacheMethod): Promise => - this.cacheWrap(CacheType.MONGODB, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); + this.cacheWrap(CacheType.MongoDb, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); public cacheWrapMetadata = (id: string, method: CacheMethod): Promise => - this.cacheWrap(CacheType.MEMORY, `${METADATA_PREFIX}:${id}`, method, { ttl: MEMORY_TTL }); + this.cacheWrap(CacheType.Memory, `${METADATA_PREFIX}:${id}`, method, { ttl: MEMORY_TTL }); public cacheTrackers = (method: CacheMethod): Promise => - this.cacheWrap(CacheType.MEMORY, `${TRACKERS_KEY_PREFIX}`, method, { ttl: TRACKERS_TTL }); + this.cacheWrap(CacheType.Memory, `${TRACKERS_KEY_PREFIX}`, method, { ttl: TRACKERS_TTL }); private initiateMemoryCache = () => createCache(memoryStore(), { @@ -72,9 +72,9 @@ class CacheService { private getCacheType = (cacheType: CacheType): typeof this.memoryCache | null => { switch (cacheType) { - case CacheType.MEMORY: + case CacheType.Memory: return this.memoryCache; - case CacheType.MONGODB: + case CacheType.MongoDb: return this.remoteCache; default: return null; diff --git a/src/node/consumer/src/lib/services/metadata_service.ts b/src/node/consumer/src/lib/services/metadata_service.ts index e564de2..3a14b23 100644 --- a/src/node/consumer/src/lib/services/metadata_service.ts +++ b/src/node/consumer/src/lib/services/metadata_service.ts @@ -59,12 +59,12 @@ class MetadataService { } const key = Number.isInteger(query.id) || query.id.toString().match(/^\d+$/) ? `kitsu:${query.id}` : query.id; - const metaType = query.type === TorrentType.MOVIE ? TorrentType.MOVIE : TorrentType.SERIES; + const metaType = query.type === TorrentType.Movie ? TorrentType.Movie : TorrentType.Series; return cacheService.cacheWrapMetadata(key.toString(), () => this.requestMetadata(`${KITSU_URL}/meta/${metaType}/${key}.json`) .catch(() => this.requestMetadata(`${CINEMETA_URL}/meta/${metaType}/${key}.json`)) .catch(() => { // try different type in case there was a mismatch - const otherType = metaType === TorrentType.MOVIE ? TorrentType.SERIES : TorrentType.MOVIE; + const otherType = metaType === TorrentType.Movie ? TorrentType.Series : TorrentType.Movie; return this.requestMetadata(`${CINEMETA_URL}/meta/${otherType}/${key}.json`) }) .catch((error) => { diff --git a/src/node/consumer/src/lib/services/parsing_service.ts b/src/node/consumer/src/lib/services/parsing_service.ts index dc0b2a6..bb8da27 100644 --- a/src/node/consumer/src/lib/services/parsing_service.ts +++ b/src/node/consumer/src/lib/services/parsing_service.ts @@ -19,7 +19,7 @@ class ParsingService { return true; } const parsedInfo = parse(torrent.title); - if (torrent.type === TorrentType.MOVIE) { + if (torrent.type === TorrentType.Movie) { return parsedInfo.complete || typeof parsedInfo.year === 'string' || /movies/i.test(torrent.title); } const hasMultipleEpisodes = parsedInfo.complete || @@ -86,7 +86,7 @@ class ParsingService { // movie if video explicitly has numbered movie keyword in the name, ie. 1 Movie or Movie 1 return true; } - if (!hasMovies && type !== TorrentType.ANIME) { + if (!hasMovies && type !== TorrentType.Anime) { // not movie if torrent name does not contain movies keyword or is not a pack torrent and is not anime return false; } diff --git a/src/node/consumer/src/lib/services/torrent_file_service.ts b/src/node/consumer/src/lib/services/torrent_file_service.ts index 6744d99..29aa558 100644 --- a/src/node/consumer/src/lib/services/torrent_file_service.ts +++ b/src/node/consumer/src/lib/services/torrent_file_service.ts @@ -28,18 +28,18 @@ class TorrentFileService { const parsedTorrentName = parse(torrent.title); const query: MetaDataQuery = { id: torrent.kitsuId || torrent.imdbId, - type: torrent.type || TorrentType.MOVIE, + type: torrent.type || TorrentType.Movie, }; const metadata = await metadataService.getMetadata(query) .then(meta => Object.assign({}, meta)) .catch(() => undefined); - if (torrent.type !== TorrentType.ANIME && metadata && metadata.type && metadata.type !== torrent.type) { + if (torrent.type !== TorrentType.Anime && metadata && metadata.type && metadata.type !== torrent.type) { // it's actually a movie/series torrent.type = metadata.type; } - if (torrent.type === TorrentType.MOVIE && (!parsedTorrentName.seasons || + if (torrent.type === TorrentType.Movie && (!parsedTorrentName.seasons || parsedTorrentName.season === 5 && [1, 5].includes(parsedTorrentName.episode))) { return this.parseMovieFiles(torrent, metadata); } @@ -145,7 +145,7 @@ class TorrentFileService { } private async mapSeriesMovie(file: ParsableTorrentFile, torrent: TorrentInfo): Promise { - const kitsuId= torrent.type === TorrentType.ANIME ? await this.findMovieKitsuId(file) + const kitsuId= torrent.type === TorrentType.Anime ? await this.findMovieKitsuId(file) .then(result => { if (result instanceof Error) { logger.warn(`Failed to retrieve kitsuId due to error: ${result.message}`); @@ -158,7 +158,7 @@ class TorrentFileService { const query: MetaDataQuery = { id: kitsuId || imdbId, - type: TorrentType.MOVIE + type: TorrentType.Movie }; const metadataOrError = await metadataService.getMetadata(query); @@ -201,7 +201,7 @@ class TorrentFileService { this.preprocessEpisodes(files); - if (torrent.type === TorrentType.ANIME && torrent.kitsuId) { + if (torrent.type === TorrentType.Anime && torrent.kitsuId) { if (this.needsCinemetaMetadataForAnime(files, metadata)) { // In some cases anime could be resolved to wrong kitsuId // because of imdb season naming/absolute per series naming/multiple seasons @@ -283,7 +283,7 @@ class TorrentFileService { private isAbsoluteEpisodeFiles(torrent: TorrentInfo, files: ParsableTorrentFile[], metadata: MetadataResponse) { const threshold = Math.ceil(files.length / 5); - const isAnime = torrent.type === TorrentType.ANIME && torrent.kitsuId; + const isAnime = torrent.type === TorrentType.Anime && torrent.kitsuId; const nonMovieEpisodes = files .filter(file => !file.isMovie && file.episodes); const absoluteEpisodes = files @@ -298,7 +298,7 @@ class TorrentFileService { // new episode might not yet been indexed by cinemeta. // detect this if episode number is larger than the last episode or season is larger than the last one // only for non anime metas - const isAnime = torrent.type === TorrentType.ANIME && torrent.kitsuId; + const isAnime = torrent.type === TorrentType.Anime && torrent.kitsuId; return !isAnime && !file.isMovie && file.episodes && file.season !== 1 && /continuing|current/i.test(metadata.status) && file.season >= metadata.episodeCount.length @@ -377,7 +377,7 @@ class TorrentFileService { private assignKitsuOrImdbEpisodes(torrent: TorrentInfo, files: ParsableTorrentFile[], metadata: MetadataResponse) { if (!metadata || !metadata.videos || !metadata.videos.length) { - if (torrent.type === TorrentType.ANIME) { + if (torrent.type === TorrentType.Anime) { // assign episodes as kitsu episodes for anime when no metadata available for imdb mapping files .filter(file => file.season && file.episodes) @@ -385,7 +385,7 @@ class TorrentFileService { file.season = undefined; file.episodes = undefined; }) - if (metadata.type === TorrentType.MOVIE && files.every(file => !file.imdbId)) { + if (metadata.type === TorrentType.Movie && files.every(file => !file.imdbId)) { // sometimes a movie has episode naming, thus not recognized as a movie and imdbId not assigned files.forEach(file => file.imdbId = metadata.imdbId); } @@ -507,7 +507,7 @@ class TorrentFileService { const imdbQuery = { title: parsedTitle.title, year: parsedTitle.year, - type: TorrentType.MOVIE + type: TorrentType.Movie }; try { return await metadataService.getImdbId(imdbQuery); @@ -523,7 +523,7 @@ class TorrentFileService { title: parsedTitle.title, year: parsedTitle.year, season: parsedTitle.season, - type: TorrentType.MOVIE + type: TorrentType.Movie }; try { return await metadataService.getKitsuId(kitsuQuery); diff --git a/src/node/consumer/src/lib/services/torrent_processing_service.ts b/src/node/consumer/src/lib/services/torrent_processing_service.ts index 3374881..c74a951 100644 --- a/src/node/consumer/src/lib/services/torrent_processing_service.ts +++ b/src/node/consumer/src/lib/services/torrent_processing_service.ts @@ -8,7 +8,7 @@ import {IngestedTorrentAttributes} from "../../repository/interfaces/ingested_to class TorrentProcessingService { public async processTorrentRecord(torrent: IngestedTorrentAttributes): Promise { const { category } = torrent; - const type = category === 'tv' ? TorrentType.SERIES : TorrentType.MOVIE; + const type = category === 'tv' ? TorrentType.Series : TorrentType.Movie; const torrentInfo: TorrentInfo = await this.parseTorrent(torrent, type); logger.info(`Processing torrent ${torrentInfo.title} with infoHash ${torrentInfo.infoHash}`); diff --git a/src/node/consumer/src/lib/torrentEntries.js b/src/node/consumer/src/lib/torrentEntries.js index ed4933e..ec3b1ce 100644 --- a/src/node/consumer/src/lib/torrentEntries.js +++ b/src/node/consumer/src/lib/torrentEntries.js @@ -11,7 +11,7 @@ import {logger} from './services/logging_service'; export async function createTorrentEntry(torrent, overwrite = false) { const titleInfo = parse(torrent.title); - if (!torrent.imdbId && torrent.type !== TorrentType.ANIME) { + if (!torrent.imdbId && torrent.type !== TorrentType.Anime) { const imdbQuery = { title: titleInfo.title, year: titleInfo.year, @@ -28,7 +28,7 @@ export async function createTorrentEntry(torrent, overwrite = false) { // sanitize imdbId from redundant zeros torrent.imdbId = torrent.imdbId.replace(/tt0+([0-9]{7,})$/, 'tt$1'); } - if (!torrent.kitsuId && torrent.type === TorrentType.ANIME) { + if (!torrent.kitsuId && torrent.type === TorrentType.Anime) { const kitsuQuery = { title: titleInfo.title, year: titleInfo.year, diff --git a/src/node/consumer/src/repository/interfaces/ingested_torrent_attributes.ts b/src/node/consumer/src/repository/interfaces/ingested_torrent_attributes.ts index b54eec1..c29ba9e 100644 --- a/src/node/consumer/src/repository/interfaces/ingested_torrent_attributes.ts +++ b/src/node/consumer/src/repository/interfaces/ingested_torrent_attributes.ts @@ -10,7 +10,7 @@ export interface IngestedTorrentAttributes { leechers: number; imdb: string; processed: boolean; - createdAt: Date; + createdAt?: Date; } export interface IngestedTorrentCreationAttributes extends Optional { diff --git a/src/node/consumer/tsconfig.json b/src/node/consumer/tsconfig.json index c6f83d1..3c3b81b 100644 --- a/src/node/consumer/tsconfig.json +++ b/src/node/consumer/tsconfig.json @@ -2,6 +2,7 @@ "compilerOptions": { "baseUrl": "./src", "checkJs": true, + "allowJs": true, "isolatedModules": true, "lib": ["ESNext"], "module": "ESNext",