diff --git a/src/node/consumer/src/jobs/processTorrents.ts b/src/node/consumer/src/jobs/processTorrents.ts index 200d84c..6f43426 100644 --- a/src/node/consumer/src/jobs/processTorrents.ts +++ b/src/node/consumer/src/jobs/processTorrents.ts @@ -1,5 +1,5 @@ import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib' -import {jobConfig, rabbitConfig} from '../lib/config.js'; +import {jobConfig, rabbitConfig} from '../lib/config'; import {processTorrentRecord} from '../lib/torrent_processor'; import {logger} from '../lib/logger'; import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message"; @@ -47,7 +47,7 @@ export const listenToQueue = async (): Promise => { } try { - const connection: Connection = await client.connect(rabbitConfig.URI); + const connection: Connection = await client.connect(rabbitConfig.RABBIT_URI); const channel: Channel = await connection.createChannel(); await assertAndConsumeQueue(channel); } catch (error) { diff --git a/src/node/consumer/src/lib/cache.js b/src/node/consumer/src/lib/cache.ts similarity index 58% rename from src/node/consumer/src/lib/cache.js rename to src/node/consumer/src/lib/cache.ts index 3c9d69d..d389257 100644 --- a/src/node/consumer/src/lib/cache.js +++ b/src/node/consumer/src/lib/cache.ts @@ -1,8 +1,9 @@ -import { createCache, memoryStore} from 'cache-manager'; +import {Cache, createCache, memoryStore} from 'cache-manager'; import { mongoDbStore } from '@tirke/node-cache-manager-mongodb' -import { cacheConfig } from './config.js'; +import { cacheConfig } from './config'; import { logger } from './logger'; import { CacheType } from "./enums/cache_types"; +import {CacheOptions} from "./interfaces/cache_options"; const GLOBAL_KEY_PREFIX = 'knightcrawler-consumer'; const IMDB_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|imdb_id`; @@ -10,40 +11,43 @@ const KITSU_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|kitsu_id`; const METADATA_PREFIX = `${GLOBAL_KEY_PREFIX}|metadata`; const TRACKERS_KEY_PREFIX = `${GLOBAL_KEY_PREFIX}|trackers`; -const GLOBAL_TTL = process.env.METADATA_TTL || 7 * 24 * 60 * 60; // 7 days -const MEMORY_TTL = process.env.METADATA_TTL || 2 * 60 * 60; // 2 hours -const TRACKERS_TTL = 2 * 24 * 60 * 60; // 2 days +const GLOBAL_TTL: number = Number(process.env.METADATA_TTL) || 7 * 24 * 60 * 60; // 7 days +const MEMORY_TTL: number = Number(process.env.METADATA_TTL) || 2 * 60 * 60; // 2 hours +const TRACKERS_TTL: number = 2 * 24 * 60 * 60; // 2 days + +type CacheMethod = () => any; const initiateMemoryCache = () => createCache(memoryStore(), { - ttl: parseInt(MEMORY_TTL) - }); + ttl: MEMORY_TTL + }) as Cache; const initiateMongoCache = () => { const store = mongoDbStore({ collectionName: cacheConfig.COLLECTION_NAME, - ttl: parseInt(GLOBAL_TTL), + ttl: GLOBAL_TTL, url: cacheConfig.MONGO_URI, mongoConfig:{ socketTimeoutMS: 120000, appName: 'knightcrawler-consumer', } - }); - + }); + return createCache(store, { - ttl: parseInt(GLOBAL_TTL), + ttl: GLOBAL_TTL, }); } -const initiateRemoteCache = ()=> { +const initiateRemoteCache = (): Cache => { if (cacheConfig.NO_CACHE) { logger.debug('Cache is disabled'); return null; } + return cacheConfig.MONGO_URI ? initiateMongoCache() : initiateMemoryCache(); } -const getCacheType = (cacheType) => { +const getCacheType = (cacheType: CacheType): typeof memoryCache | null => { switch (cacheType) { case CacheType.MEMORY: return memoryCache; @@ -54,12 +58,13 @@ const getCacheType = (cacheType) => { } } -const memoryCache = initiateMemoryCache() -const remoteCache = initiateRemoteCache() +const memoryCache = initiateMemoryCache(); +const remoteCache = initiateRemoteCache(); -const cacheWrap = async (cacheType, key, method, options) => { +const cacheWrap = async ( + cacheType: CacheType, key: string, method: CacheMethod, options: CacheOptions): Promise => { const cache = getCacheType(cacheType); - + if (cacheConfig.NO_CACHE || !cache) { return method(); } @@ -67,18 +72,18 @@ const cacheWrap = async (cacheType, key, method, options) => { logger.debug(`Cache type: ${cacheType}`); logger.debug(`Cache key: ${key}`); logger.debug(`Cache options: ${JSON.stringify(options)}`); - + return cache.wrap(key, method, options.ttl); } -export const cacheWrapImdbId = (key, method) => - cacheWrap(CacheType.MONGODB, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: parseInt(GLOBAL_TTL) }); +export const cacheWrapImdbId = (key: string, method: CacheMethod): Promise => + cacheWrap(CacheType.MONGODB, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); -export const cacheWrapKitsuId = (key, method) => - cacheWrap(CacheType.MONGODB, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: parseInt(GLOBAL_TTL) }); +export const cacheWrapKitsuId = (key: string, method: CacheMethod): Promise => + cacheWrap(CacheType.MONGODB, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL }); -export const cacheWrapMetadata = (id, method) => - cacheWrap(CacheType.MEMORY, `${METADATA_PREFIX}:${id}`, method, { ttl: parseInt(MEMORY_TTL) }); +export const cacheWrapMetadata = (id: string, method: CacheMethod): Promise => + cacheWrap(CacheType.MEMORY, `${METADATA_PREFIX}:${id}`, method, { ttl: MEMORY_TTL }); -export const cacheTrackers = (method) => - cacheWrap(CacheType.MEMORY, `${TRACKERS_KEY_PREFIX}`, method, { ttl: parseInt(TRACKERS_TTL) }); \ No newline at end of file +export const cacheTrackers = (method: CacheMethod): Promise => + cacheWrap(CacheType.MEMORY, `${TRACKERS_KEY_PREFIX}`, method, { ttl: TRACKERS_TTL }); \ No newline at end of file diff --git a/src/node/consumer/src/lib/config.js b/src/node/consumer/src/lib/config.js deleted file mode 100644 index 69769ea..0000000 --- a/src/node/consumer/src/lib/config.js +++ /dev/null @@ -1,63 +0,0 @@ -export const rabbitConfig = { - URI: process.env.RABBIT_URI || 'amqp://localhost', - QUEUE_NAME: process.env.QUEUE_NAME || 'test-queue' -} - -export const cacheConfig = { - MONGODB_HOST: process.env.MONGODB_HOST || 'mongodb', - MONGODB_PORT: process.env.MONGODB_PORT || '27017', - MONGODB_DB: process.env.MONGODB_DB || 'knightcrawler', - MONGO_INITDB_ROOT_USERNAME: process.env.MONGO_INITDB_ROOT_USERNAME || 'mongo', - MONGO_INITDB_ROOT_PASSWORD: process.env.MONGO_INITDB_ROOT_PASSWORD || 'mongo', - NO_CACHE: parseBool(process.env.NO_CACHE, false), - COLLECTION_NAME: process.env.MONGODB_COLLECTION || 'knightcrawler_consumer_collection' -} - -// Combine the environment variables into a connection string -// The combined string will look something like: -// 'mongodb://mongo:mongo@localhost:27017/knightcrawler?authSource=admin' -cacheConfig.MONGO_URI = 'mongodb://' + cacheConfig.MONGO_INITDB_ROOT_USERNAME + ':' + cacheConfig.MONGO_INITDB_ROOT_PASSWORD + '@' + cacheConfig.MONGODB_HOST + ':' + cacheConfig.MONGODB_PORT + '/' + cacheConfig.MONGODB_DB + '?authSource=admin'; - -export const databaseConfig = { - POSTGRES_HOST: process.env.POSTGRES_HOST || 'postgres', - POSTGRES_PORT: process.env.POSTGRES_PORT || '5432', - POSTGRES_DB: process.env.POSTGRES_DB || 'knightcrawler', - POSTGRES_USER: process.env.POSTGRES_USER || 'postgres', - POSTGRES_PASSWORD: process.env.POSTGRES_PASSWORD || 'postgres', - AUTO_CREATE_AND_APPLY_MIGRATIONS: parseBool(process.env.AUTO_CREATE_AND_APPLY_MIGRATIONS, false) -} - -// Combine the environment variables into a connection string -// The combined string will look something like: -// 'postgres://postgres:postgres@localhost:5432/knightcrawler' -databaseConfig.POSTGRES_URI = 'postgres://' + databaseConfig.POSTGRES_USER + ':' + databaseConfig.POSTGRES_PASSWORD + '@' + databaseConfig.POSTGRES_HOST + ':' + databaseConfig.POSTGRES_PORT + '/' + databaseConfig.POSTGRES_DB; - -export const jobConfig = { - JOB_CONCURRENCY: parseInt(process.env.JOB_CONCURRENCY || 1), - JOBS_ENABLED: parseBool(process.env.JOBS_ENABLED || true) -} - -export const metadataConfig = { - IMDB_CONCURRENT: parseInt(process.env.IMDB_CONCURRENT || 1), - IMDB_INTERVAL_MS: parseInt(process.env.IMDB_INTERVAL_MS || 1000), -} - -export const trackerConfig = { - TRACKERS_URL: process.env.TRACKERS_URL || 'https://ngosang.github.io/trackerslist/trackers_all.txt', - UDP_ENABLED: parseBool(process.env.UDP_TRACKERS_ENABLED || false), -} - -export const torrentConfig = { - MAX_CONNECTIONS_PER_TORRENT: parseInt(process.env.MAX_SINGLE_TORRENT_CONNECTIONS || 20), - TIMEOUT: parseInt(process.env.TORRENT_TIMEOUT || 30000), -} - -function parseBool(boolString, defaultValue) { - const isString = typeof boolString === 'string' || boolString instanceof String; - - if (!isString) { - return defaultValue; - } - - return boolString.toLowerCase() === 'true' ? true : defaultValue; -} \ No newline at end of file diff --git a/src/node/consumer/src/lib/config.ts b/src/node/consumer/src/lib/config.ts new file mode 100644 index 0000000..ddd31d0 --- /dev/null +++ b/src/node/consumer/src/lib/config.ts @@ -0,0 +1,51 @@ +const parseBool = (boolString: string | undefined, defaultValue: boolean): boolean => + boolString?.toLowerCase() === 'true' ? true : defaultValue; + +export const rabbitConfig = { + RABBIT_URI: process.env.RABBIT_URI || 'amqp://localhost', + QUEUE_NAME: process.env.QUEUE_NAME || 'test-queue' +} + +export const cacheConfig = { + MONGODB_HOST: process.env.MONGODB_HOST || 'mongodb', + MONGODB_PORT: process.env.MONGODB_PORT || '27017', + MONGODB_DB: process.env.MONGODB_DB || 'knightcrawler', + MONGO_INITDB_ROOT_USERNAME: process.env.MONGO_INITDB_ROOT_USERNAME || 'mongo', + MONGO_INITDB_ROOT_PASSWORD: process.env.MONGO_INITDB_ROOT_PASSWORD || 'mongo', + NO_CACHE: parseBool(process.env.NO_CACHE, false), + COLLECTION_NAME: process.env.MONGODB_COLLECTION || 'knightcrawler_consumer_collection', + MONGO_URI: '', +} + +cacheConfig.MONGO_URI = `mongodb://${cacheConfig.MONGO_INITDB_ROOT_USERNAME}:${cacheConfig.MONGO_INITDB_ROOT_PASSWORD}@${cacheConfig.MONGODB_HOST}:${cacheConfig.MONGODB_PORT}/${cacheConfig.MONGODB_DB}?authSource=admin`; + +export const databaseConfig = { + POSTGRES_HOST: process.env.POSTGRES_HOST || 'postgres', + POSTGRES_PORT: process.env.POSTGRES_PORT || '5432', + POSTGRES_DATABASE: process.env.POSTGRES_DATABASE || 'knightcrawler', + POSTGRES_USERNAME: process.env.POSTGRES_USERNAME || 'postgres', + POSTGRES_PASSWORD: process.env.POSTGRES_PASSWORD || 'postgres', + POSTGRES_URI: '', +} + +databaseConfig.POSTGRES_URI = `postgres://${databaseConfig.POSTGRES_USERNAME}:${databaseConfig.POSTGRES_PASSWORD}@${databaseConfig.POSTGRES_HOST}:${databaseConfig.POSTGRES_PORT}/${databaseConfig.POSTGRES_DATABASE}`; + +export const jobConfig = { + JOB_CONCURRENCY: Number.parseInt(process.env.JOB_CONCURRENCY || "1", 10), + JOBS_ENABLED: parseBool(process.env.JOBS_ENABLED, true), +} + +export const metadataConfig = { + IMDB_CONCURRENT: Number.parseInt(process.env.IMDB_CONCURRENT || "1", 10), + IMDB_INTERVAL_MS: Number.parseInt(process.env.IMDB_INTERVAL_MS || "1000", 10), +} + +export const trackerConfig = { + TRACKERS_URL: process.env.TRACKERS_URL || 'https://ngosang.github.io/trackerslist/trackers_all.txt', + UDP_ENABLED: parseBool(process.env.UDP_TRACKERS_ENABLED, false), +} + +export const torrentConfig = { + MAX_CONNECTIONS_PER_TORRENT: Number.parseInt(process.env.MAX_SINGLE_TORRENT_CONNECTIONS || "20", 10), + TIMEOUT: Number.parseInt(process.env.TORRENT_TIMEOUT || "30000", 10), +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/interfaces/cache_options.ts b/src/node/consumer/src/lib/interfaces/cache_options.ts new file mode 100644 index 0000000..f8b25e6 --- /dev/null +++ b/src/node/consumer/src/lib/interfaces/cache_options.ts @@ -0,0 +1,3 @@ +export interface CacheOptions { + ttl: number; +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/metadata.ts b/src/node/consumer/src/lib/metadata.ts index e29e80b..99ebc8d 100644 --- a/src/node/consumer/src/lib/metadata.ts +++ b/src/node/consumer/src/lib/metadata.ts @@ -1,7 +1,7 @@ import axios, {AxiosResponse} from 'axios'; import {search, ResultTypes} from 'google-sr'; import nameToImdb from 'name-to-imdb'; -import { cacheWrapImdbId, cacheWrapKitsuId, cacheWrapMetadata } from './cache.js'; +import { cacheWrapImdbId, cacheWrapKitsuId, cacheWrapMetadata } from './cache'; import { TorrentType } from './enums/torrent_types'; import {MetadataResponse} from "./interfaces/metadata_response"; import {CinemetaJsonResponse} from "./interfaces/cinemeta_metadata"; diff --git a/src/node/consumer/src/lib/torrent.js b/src/node/consumer/src/lib/torrent.js index 3a066b3..4f78d05 100644 --- a/src/node/consumer/src/lib/torrent.js +++ b/src/node/consumer/src/lib/torrent.js @@ -1,6 +1,6 @@ import { decode } from 'magnet-uri'; import torrentStream from 'torrent-stream'; -import { torrentConfig } from './config.js'; +import { torrentConfig } from './config'; import {isSubtitle, isVideo} from './extension'; export async function torrentFiles(torrent, timeout) { diff --git a/src/node/consumer/src/lib/torrentFiles.js b/src/node/consumer/src/lib/torrentFiles.js index 3a60556..3539925 100644 --- a/src/node/consumer/src/lib/torrentFiles.js +++ b/src/node/consumer/src/lib/torrentFiles.js @@ -2,7 +2,7 @@ import Bottleneck from 'bottleneck'; import distance from 'jaro-winkler'; import moment from 'moment'; import { parse } from 'parse-torrent-title'; -import { metadataConfig } from './config.js'; +import { metadataConfig } from './config'; import { isDisk } from './extension'; import { getMetadata, getImdbId, getKitsuId } from './metadata'; import { parseSeriesVideos, isPackTorrent } from './parseHelper'; diff --git a/src/node/consumer/src/lib/trackerService.ts b/src/node/consumer/src/lib/trackerService.ts index f2310c0..f42e44e 100644 --- a/src/node/consumer/src/lib/trackerService.ts +++ b/src/node/consumer/src/lib/trackerService.ts @@ -1,6 +1,6 @@ import axios, { AxiosResponse } from 'axios'; -import { cacheTrackers } from "./cache.js"; -import { trackerConfig } from './config.js'; +import { cacheTrackers } from "./cache"; +import { trackerConfig } from './config'; import { logger } from "./logger"; const downloadTrackers = async (): Promise => {