From cf25f32cb619c209382d4b5663f8a029bfbe592c Mon Sep 17 00:00:00 2001 From: iPromKnight Date: Mon, 5 Feb 2024 03:35:52 +0000 Subject: [PATCH] Torrent processing orchestrator now typescript too Will start to tackle some of the other services after work tomorrow. --- src/node/consumer/package-lock.json | 10 ++++ src/node/consumer/package.json | 1 + src/node/consumer/src/index.js | 2 +- src/node/consumer/src/jobs/processTorrents.js | 37 ------------ src/node/consumer/src/jobs/processTorrents.ts | 56 +++++++++++++++++++ src/node/consumer/src/lib/cache.js | 2 +- .../consumer/src/lib/enums/cache_types.ts | 4 ++ .../consumer/src/lib/enums/torrent_types.ts | 6 ++ src/node/consumer/src/lib/ingestedTorrent.js | 46 --------------- .../lib/interfaces/ingested_rabbit_message.ts | 15 +++++ .../src/lib/interfaces/torrent_info.ts | 12 ++++ src/node/consumer/src/lib/metadata.js | 2 +- src/node/consumer/src/lib/parseHelper.js | 2 +- src/node/consumer/src/lib/torrentEntries.js | 2 +- src/node/consumer/src/lib/torrentFiles.js | 2 +- .../consumer/src/lib/torrent_processor.ts | 49 ++++++++++++++++ src/node/consumer/src/lib/types.js | 11 ---- .../interfaces/ingested_torrent_attributes.ts | 1 + 18 files changed, 160 insertions(+), 100 deletions(-) delete mode 100644 src/node/consumer/src/jobs/processTorrents.js create mode 100644 src/node/consumer/src/jobs/processTorrents.ts create mode 100644 src/node/consumer/src/lib/enums/cache_types.ts create mode 100644 src/node/consumer/src/lib/enums/torrent_types.ts delete mode 100644 src/node/consumer/src/lib/ingestedTorrent.js create mode 100644 src/node/consumer/src/lib/interfaces/ingested_rabbit_message.ts create mode 100644 src/node/consumer/src/lib/interfaces/torrent_info.ts create mode 100644 src/node/consumer/src/lib/torrent_processor.ts delete mode 100644 src/node/consumer/src/lib/types.js diff --git a/src/node/consumer/package-lock.json b/src/node/consumer/package-lock.json index 9a34802..3ad7ee7 100644 --- a/src/node/consumer/package-lock.json +++ b/src/node/consumer/package-lock.json @@ -30,6 +30,7 @@ "user-agents": "^1.0.1444" }, "devDependencies": { + "@types/amqplib": "^0.10.4", "@types/node": "^20.11.16", "@types/stremio-addon-sdk": "^1.6.10", "@types/validator": "^13.11.8", @@ -564,6 +565,15 @@ "url": "https://github.com/sponsors/tirke" } }, + "node_modules/@types/amqplib": { + "version": "0.10.4", + "resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.10.4.tgz", + "integrity": "sha512-Y5Sqquh/LqDxSgxYaAAFNM0M7GyONtSDCcFMJk+DQwYEjibPyW6y+Yu9H9omdkKc3epyXULmFN3GTaeBHhn2Hg==", + "dev": true, + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/debug": { "version": "4.1.12", "license": "MIT", diff --git a/src/node/consumer/package.json b/src/node/consumer/package.json index 2639881..0c86906 100644 --- a/src/node/consumer/package.json +++ b/src/node/consumer/package.json @@ -31,6 +31,7 @@ "user-agents": "^1.0.1444" }, "devDependencies": { + "@types/amqplib": "^0.10.4", "@types/node": "^20.11.16", "@types/stremio-addon-sdk": "^1.6.10", "@types/validator": "^13.11.8", diff --git a/src/node/consumer/src/index.js b/src/node/consumer/src/index.js index e3ac0b7..92f994d 100644 --- a/src/node/consumer/src/index.js +++ b/src/node/consumer/src/index.js @@ -1,4 +1,4 @@ -import { listenToQueue } from './jobs/processTorrents.js'; +import { listenToQueue } from './jobs/processTorrents'; import { repository } from "./repository/database_repository"; import { getTrackers } from "./lib/trackerService.js"; diff --git a/src/node/consumer/src/jobs/processTorrents.js b/src/node/consumer/src/jobs/processTorrents.js deleted file mode 100644 index 1dd50b2..0000000 --- a/src/node/consumer/src/jobs/processTorrents.js +++ /dev/null @@ -1,37 +0,0 @@ -import amqp from 'amqplib' -import { rabbitConfig, jobConfig } from '../lib/config.js' -import { processTorrentRecord } from "../lib/ingestedTorrent.js"; -import {logger} from "../lib/logger"; - -const assertQueueOptions = { durable: true } -const consumeQueueOptions = { noAck: false } - -const processMessage = msg => processTorrentRecord(getMessageAsJson(msg)); - -const getMessageAsJson = msg => - JSON.parse(msg.content.toString()).message; - -const assertAndConsumeQueue = async channel => { - logger.info('Worker is running! Waiting for new torrents...') - - const ackMsg = msg => - processMessage(msg) - .then(() => channel.ack(msg)) - .catch(error => logger.error('Failed processing torrent', error)); - - channel.assertQueue(rabbitConfig.QUEUE_NAME, assertQueueOptions) - .then(() => channel.prefetch(jobConfig.JOB_CONCURRENCY)) - .then(() => channel.consume(rabbitConfig.QUEUE_NAME, ackMsg, consumeQueueOptions)) - .catch(error => logger.error('Failed to setup channel', error)); -} - -export const listenToQueue = async () => { - if (!jobConfig.JOBS_ENABLED) { - return; - } - - return amqp.connect(rabbitConfig.URI) - .then(connection => connection.createChannel()) - .then(channel => assertAndConsumeQueue(channel)) - .catch(error => logger.error('Failed to connect and setup channel', error)); - }; \ No newline at end of file diff --git a/src/node/consumer/src/jobs/processTorrents.ts b/src/node/consumer/src/jobs/processTorrents.ts new file mode 100644 index 0000000..200d84c --- /dev/null +++ b/src/node/consumer/src/jobs/processTorrents.ts @@ -0,0 +1,56 @@ +import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib' +import {jobConfig, rabbitConfig} from '../lib/config.js'; +import {processTorrentRecord} from '../lib/torrent_processor'; +import {logger} from '../lib/logger'; +import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message"; +import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes"; + +const assertQueueOptions: Options.AssertQueue = { durable: true }; +const consumeQueueOptions: Options.Consume = { noAck: false }; + +const processMessage = (msg: ConsumeMessage | null): Promise => { + const ingestedTorrent: IngestedTorrentAttributes = getMessageAsJson(msg); + return processTorrentRecord(ingestedTorrent); +}; + +const getMessageAsJson = (msg: ConsumeMessage | null): IngestedTorrentAttributes => { + const content = msg ? msg?.content.toString('utf8') : "{}"; + const receivedObject: IngestedRabbitMessage = JSON.parse(content) as IngestedRabbitMessage; + const receivedTorrent:IngestedRabbitTorrent = receivedObject.message; + const mappedObject: any = {...receivedTorrent, info_hash: receivedTorrent.infoHash}; + delete mappedObject.infoHash; + + return mappedObject as IngestedTorrentAttributes; +}; + +const assertAndConsumeQueue = async (channel: Channel): Promise => { + logger.info('Worker is running! Waiting for new torrents...'); + + const ackMsg = (msg: ConsumeMessage): void => { + processMessage(msg) + .then(() => channel.ack(msg)) + .catch((error: Error) => logger.error('Failed processing torrent', error)); + } + + try { + await channel.assertQueue(rabbitConfig.QUEUE_NAME, assertQueueOptions); + await channel.prefetch(jobConfig.JOB_CONCURRENCY); + await channel.consume(rabbitConfig.QUEUE_NAME, ackMsg, consumeQueueOptions); + } catch(error) { + logger.error('Failed to setup channel', error); + } +} + +export const listenToQueue = async (): Promise => { + if (!jobConfig.JOBS_ENABLED) { + return; + } + + try { + const connection: Connection = await client.connect(rabbitConfig.URI); + const channel: Channel = await connection.createChannel(); + await assertAndConsumeQueue(channel); + } catch (error) { + logger.error('Failed to connect and setup channel', error); + } +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/cache.js b/src/node/consumer/src/lib/cache.js index a7c1128..3c9d69d 100644 --- a/src/node/consumer/src/lib/cache.js +++ b/src/node/consumer/src/lib/cache.js @@ -2,7 +2,7 @@ import { createCache, memoryStore} from 'cache-manager'; import { mongoDbStore } from '@tirke/node-cache-manager-mongodb' import { cacheConfig } from './config.js'; import { logger } from './logger'; -import { CacheType } from "./types.js"; +import { CacheType } from "./enums/cache_types"; const GLOBAL_KEY_PREFIX = 'knightcrawler-consumer'; const IMDB_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|imdb_id`; diff --git a/src/node/consumer/src/lib/enums/cache_types.ts b/src/node/consumer/src/lib/enums/cache_types.ts new file mode 100644 index 0000000..989f6bd --- /dev/null +++ b/src/node/consumer/src/lib/enums/cache_types.ts @@ -0,0 +1,4 @@ +export enum CacheType { + MEMORY = 'memory', + MONGODB = 'mongodb' +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/enums/torrent_types.ts b/src/node/consumer/src/lib/enums/torrent_types.ts new file mode 100644 index 0000000..7a20693 --- /dev/null +++ b/src/node/consumer/src/lib/enums/torrent_types.ts @@ -0,0 +1,6 @@ +export enum TorrentType { + SERIES = 'SERIES', + MOVIE = 'MOVIE', + ANIME = 'anime', + PORN = 'xxx', +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/ingestedTorrent.js b/src/node/consumer/src/lib/ingestedTorrent.js deleted file mode 100644 index fe8705d..0000000 --- a/src/node/consumer/src/lib/ingestedTorrent.js +++ /dev/null @@ -1,46 +0,0 @@ -import { createTorrentEntry, checkAndUpdateTorrent } from './torrentEntries.js'; -import {getTrackers} from "./trackerService.js"; -import { TorrentType } from './types.js'; -import {logger} from "./logger"; - -export async function processTorrentRecord(torrent) { - const {category} = torrent; - const type = category === 'tv' ? TorrentType.SERIES : TorrentType.MOVIE; - const torrentInfo = await parseTorrent(torrent, type); - logger.info(`Processing torrent ${torrentInfo.title} with infoHash ${torrentInfo.infoHash}`) - - if (await checkAndUpdateTorrent(torrentInfo)) { - return torrentInfo; - } - - return createTorrentEntry(torrentInfo); -} - -async function assignTorrentTrackers() { - const trackers = await getTrackers(); - return trackers.join(','); -} - -async function parseTorrent(torrent, category) { - const infoHash = torrent.infoHash?.trim().toLowerCase() - return { - title: torrent.name, - torrentId: `${torrent.name}_${infoHash}`, - infoHash: infoHash, - seeders: 100, - size: torrent.size, - uploadDate: torrent.createdAt, - imdbId: parseImdbId(torrent), - type: category, - provider: torrent.source, - trackers: await assignTorrentTrackers(), - } -} - -function parseImdbId(torrent) { - if (torrent.imdb === undefined || torrent.imdb === null) { - return undefined; - } - - return torrent.imdb; -} \ No newline at end of file diff --git a/src/node/consumer/src/lib/interfaces/ingested_rabbit_message.ts b/src/node/consumer/src/lib/interfaces/ingested_rabbit_message.ts new file mode 100644 index 0000000..28ef21c --- /dev/null +++ b/src/node/consumer/src/lib/interfaces/ingested_rabbit_message.ts @@ -0,0 +1,15 @@ +export interface IngestedRabbitTorrent { + name: string; + source: string; + category: string; + infoHash: string; + size: string; + seeders: number; + leechers: number; + imdb: string; + processed: boolean; +} + +export interface IngestedRabbitMessage { + message: IngestedRabbitTorrent; +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/interfaces/torrent_info.ts b/src/node/consumer/src/lib/interfaces/torrent_info.ts new file mode 100644 index 0000000..31895bd --- /dev/null +++ b/src/node/consumer/src/lib/interfaces/torrent_info.ts @@ -0,0 +1,12 @@ +export interface TorrentInfo { + title: string | null; + torrentId: string; + infoHash: string | null; + seeders: number; + size: string | null; + uploadDate: Date; + imdbId: string | undefined; + type: string; + provider: string | null; + trackers: string; +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/metadata.js b/src/node/consumer/src/lib/metadata.js index 874decb..09da123 100644 --- a/src/node/consumer/src/lib/metadata.js +++ b/src/node/consumer/src/lib/metadata.js @@ -2,7 +2,7 @@ import axios from 'axios'; import { search } from 'google-sr'; import nameToImdb from 'name-to-imdb'; import { cacheWrapImdbId, cacheWrapKitsuId, cacheWrapMetadata } from './cache.js'; -import { TorrentType } from './types.js'; +import { TorrentType } from './enums/torrent_types'; const CINEMETA_URL = 'https://v3-cinemeta.strem.io'; const KITSU_URL = 'https://anime-kitsu.strem.fun'; diff --git a/src/node/consumer/src/lib/parseHelper.js b/src/node/consumer/src/lib/parseHelper.js index b088687..131e4f0 100644 --- a/src/node/consumer/src/lib/parseHelper.js +++ b/src/node/consumer/src/lib/parseHelper.js @@ -1,5 +1,5 @@ import { parse } from 'parse-torrent-title'; -import { TorrentType } from './types.js'; +import { TorrentType } from './enums/torrent_types'; const MULTIPLE_FILES_SIZE = 4 * 1024 * 1024 * 1024; // 4 GB diff --git a/src/node/consumer/src/lib/torrentEntries.js b/src/node/consumer/src/lib/torrentEntries.js index 9fb18a2..d5a629d 100644 --- a/src/node/consumer/src/lib/torrentEntries.js +++ b/src/node/consumer/src/lib/torrentEntries.js @@ -5,7 +5,7 @@ import * as Promises from './promises.js'; import { repository } from '../repository/database_repository'; import { parseTorrentFiles } from './torrentFiles.js'; import { assignSubtitles } from './torrentSubtitles.js'; -import { TorrentType } from './types.js'; +import { TorrentType } from './enums/torrent_types'; import {logger} from "./logger"; export async function createTorrentEntry(torrent, overwrite = false) { diff --git a/src/node/consumer/src/lib/torrentFiles.js b/src/node/consumer/src/lib/torrentFiles.js index 68913f9..0a59c56 100644 --- a/src/node/consumer/src/lib/torrentFiles.js +++ b/src/node/consumer/src/lib/torrentFiles.js @@ -8,7 +8,7 @@ import { getMetadata, getImdbId, getKitsuId } from './metadata.js'; import { parseSeriesVideos, isPackTorrent } from './parseHelper.js'; import * as Promises from './promises.js'; import {torrentFiles} from "./torrent.js"; -import { TorrentType } from './types.js'; +import { TorrentType } from './enums/torrent_types'; import {logger} from "./logger"; const MIN_SIZE = 5 * 1024 * 1024; // 5 MB diff --git a/src/node/consumer/src/lib/torrent_processor.ts b/src/node/consumer/src/lib/torrent_processor.ts new file mode 100644 index 0000000..49f842a --- /dev/null +++ b/src/node/consumer/src/lib/torrent_processor.ts @@ -0,0 +1,49 @@ +import {TorrentInfo} from "./interfaces/torrent_info"; +import {TorrentType} from "./enums/torrent_types"; +import {logger} from "./logger"; +import {checkAndUpdateTorrent, createTorrentEntry} from "./torrentEntries.js"; +import {getTrackers} from "./trackerService.js"; +import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes"; + +export async function processTorrentRecord(torrent: IngestedTorrentAttributes): Promise { + const { category } = torrent; + const type = category === 'tv' ? TorrentType.SERIES : TorrentType.MOVIE; + const torrentInfo: TorrentInfo = await parseTorrent(torrent, type); + + logger.info(`Processing torrent ${torrentInfo.title} with infoHash ${torrentInfo.infoHash}`); + + if (await checkAndUpdateTorrent(torrentInfo)) { + return; + } + + return createTorrentEntry(torrentInfo); +} + +async function assignTorrentTrackers(): Promise { + const trackers = await getTrackers(); + return trackers.join(','); +} + +async function parseTorrent(torrent: IngestedTorrentAttributes, category: string): Promise { + const infoHash = torrent.info_hash?.trim().toLowerCase() + return { + title: torrent.name, + torrentId: `${torrent.name}_${infoHash}`, + infoHash: infoHash, + seeders: 100, + size: torrent.size, + uploadDate: torrent.createdAt, + imdbId: parseImdbId(torrent), + type: category, + provider: torrent.source, + trackers: await assignTorrentTrackers(), + } +} + +function parseImdbId(torrent: IngestedTorrentAttributes): string | undefined { + if (torrent.imdb === undefined || torrent.imdb === null) { + return undefined; + } + + return torrent.imdb; +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/types.js b/src/node/consumer/src/lib/types.js deleted file mode 100644 index 17d37dc..0000000 --- a/src/node/consumer/src/lib/types.js +++ /dev/null @@ -1,11 +0,0 @@ -export const TorrentType = { - MOVIE: 'movie', - SERIES: 'series', - ANIME: 'anime', - PORN: 'xxx', -}; - -export const CacheType = { - MEMORY: 'memory', - MONGODB: 'mongodb', -}; \ No newline at end of file diff --git a/src/node/consumer/src/repository/interfaces/ingested_torrent_attributes.ts b/src/node/consumer/src/repository/interfaces/ingested_torrent_attributes.ts index 9dbe491..b54eec1 100644 --- a/src/node/consumer/src/repository/interfaces/ingested_torrent_attributes.ts +++ b/src/node/consumer/src/repository/interfaces/ingested_torrent_attributes.ts @@ -10,6 +10,7 @@ export interface IngestedTorrentAttributes { leechers: number; imdb: string; processed: boolean; + createdAt: Date; } export interface IngestedTorrentCreationAttributes extends Optional {