diff --git a/src/node/consumer/src/index.js b/src/node/consumer/src/index.js index 7d3e675..d4a163b 100644 --- a/src/node/consumer/src/index.js +++ b/src/node/consumer/src/index.js @@ -1,6 +1,6 @@ import { listenToQueue } from './jobs/processTorrents'; import { repository } from "./repository/database_repository"; -import { trackerService } from "./lib/services/tracker_service.js"; +import { trackerService } from "./lib/services/tracker_service"; (async () => { await trackerService.getTrackers(); diff --git a/src/node/consumer/src/jobs/processTorrents.ts b/src/node/consumer/src/jobs/processTorrents.ts index 54a3a6b..9c7cd9b 100644 --- a/src/node/consumer/src/jobs/processTorrents.ts +++ b/src/node/consumer/src/jobs/processTorrents.ts @@ -1,5 +1,5 @@ import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib' -import {jobConfig, rabbitConfig} from '../lib/config'; +import {configurationService} from '../lib/services/configuration_service'; import {torrentProcessingService} from '../lib/services/torrent_processing_service'; import {logger} from '../lib/services/logging_service'; import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message"; @@ -33,21 +33,21 @@ const assertAndConsumeQueue = async (channel: Channel): Promise => { } try { - await channel.assertQueue(rabbitConfig.QUEUE_NAME, assertQueueOptions); - await channel.prefetch(jobConfig.JOB_CONCURRENCY); - await channel.consume(rabbitConfig.QUEUE_NAME, ackMsg, consumeQueueOptions); + await channel.assertQueue(configurationService.rabbitConfig.QUEUE_NAME, assertQueueOptions); + await channel.prefetch(configurationService.jobConfig.JOB_CONCURRENCY); + await channel.consume(configurationService.rabbitConfig.QUEUE_NAME, ackMsg, consumeQueueOptions); } catch(error) { logger.error('Failed to setup channel', error); } } export const listenToQueue = async (): Promise => { - if (!jobConfig.JOBS_ENABLED) { + if (!configurationService.jobConfig.JOBS_ENABLED) { return; } try { - const connection: Connection = await client.connect(rabbitConfig.RABBIT_URI); + const connection: Connection = await client.connect(configurationService.rabbitConfig.RABBIT_URI); const channel: Channel = await connection.createChannel(); await assertAndConsumeQueue(channel); } catch (error) { diff --git a/src/node/consumer/src/lib/config.ts b/src/node/consumer/src/lib/config.ts deleted file mode 100644 index ddd31d0..0000000 --- a/src/node/consumer/src/lib/config.ts +++ /dev/null @@ -1,51 +0,0 @@ -const parseBool = (boolString: string | undefined, defaultValue: boolean): boolean => - boolString?.toLowerCase() === 'true' ? true : defaultValue; - -export const rabbitConfig = { - RABBIT_URI: process.env.RABBIT_URI || 'amqp://localhost', - QUEUE_NAME: process.env.QUEUE_NAME || 'test-queue' -} - -export const cacheConfig = { - MONGODB_HOST: process.env.MONGODB_HOST || 'mongodb', - MONGODB_PORT: process.env.MONGODB_PORT || '27017', - MONGODB_DB: process.env.MONGODB_DB || 'knightcrawler', - MONGO_INITDB_ROOT_USERNAME: process.env.MONGO_INITDB_ROOT_USERNAME || 'mongo', - MONGO_INITDB_ROOT_PASSWORD: process.env.MONGO_INITDB_ROOT_PASSWORD || 'mongo', - NO_CACHE: parseBool(process.env.NO_CACHE, false), - COLLECTION_NAME: process.env.MONGODB_COLLECTION || 'knightcrawler_consumer_collection', - MONGO_URI: '', -} - -cacheConfig.MONGO_URI = `mongodb://${cacheConfig.MONGO_INITDB_ROOT_USERNAME}:${cacheConfig.MONGO_INITDB_ROOT_PASSWORD}@${cacheConfig.MONGODB_HOST}:${cacheConfig.MONGODB_PORT}/${cacheConfig.MONGODB_DB}?authSource=admin`; - -export const databaseConfig = { - POSTGRES_HOST: process.env.POSTGRES_HOST || 'postgres', - POSTGRES_PORT: process.env.POSTGRES_PORT || '5432', - POSTGRES_DATABASE: process.env.POSTGRES_DATABASE || 'knightcrawler', - POSTGRES_USERNAME: process.env.POSTGRES_USERNAME || 'postgres', - POSTGRES_PASSWORD: process.env.POSTGRES_PASSWORD || 'postgres', - POSTGRES_URI: '', -} - -databaseConfig.POSTGRES_URI = `postgres://${databaseConfig.POSTGRES_USERNAME}:${databaseConfig.POSTGRES_PASSWORD}@${databaseConfig.POSTGRES_HOST}:${databaseConfig.POSTGRES_PORT}/${databaseConfig.POSTGRES_DATABASE}`; - -export const jobConfig = { - JOB_CONCURRENCY: Number.parseInt(process.env.JOB_CONCURRENCY || "1", 10), - JOBS_ENABLED: parseBool(process.env.JOBS_ENABLED, true), -} - -export const metadataConfig = { - IMDB_CONCURRENT: Number.parseInt(process.env.IMDB_CONCURRENT || "1", 10), - IMDB_INTERVAL_MS: Number.parseInt(process.env.IMDB_INTERVAL_MS || "1000", 10), -} - -export const trackerConfig = { - TRACKERS_URL: process.env.TRACKERS_URL || 'https://ngosang.github.io/trackerslist/trackers_all.txt', - UDP_ENABLED: parseBool(process.env.UDP_TRACKERS_ENABLED, false), -} - -export const torrentConfig = { - MAX_CONNECTIONS_PER_TORRENT: Number.parseInt(process.env.MAX_SINGLE_TORRENT_CONNECTIONS || "20", 10), - TIMEOUT: Number.parseInt(process.env.TORRENT_TIMEOUT || "30000", 10), -} \ No newline at end of file diff --git a/src/node/consumer/src/lib/helpers/boolean_helpers.ts b/src/node/consumer/src/lib/helpers/boolean_helpers.ts new file mode 100644 index 0000000..9b608c0 --- /dev/null +++ b/src/node/consumer/src/lib/helpers/boolean_helpers.ts @@ -0,0 +1,8 @@ +export class BooleanHelpers { + public static parseBool(value: string, defaultValue: boolean): boolean { + if (value === undefined) { + return defaultValue; + } + return value.toLowerCase() === 'true'; + } +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/models/configuration/cache_config.ts b/src/node/consumer/src/lib/models/configuration/cache_config.ts new file mode 100644 index 0000000..1ca735b --- /dev/null +++ b/src/node/consumer/src/lib/models/configuration/cache_config.ts @@ -0,0 +1,15 @@ +import {BooleanHelpers} from "../../helpers/boolean_helpers"; + +export class CacheConfig { + public MONGODB_HOST: string = process.env.MONGODB_HOST || 'mongodb'; + public MONGODB_PORT: string = process.env.MONGODB_PORT || '27017'; + public MONGODB_DB: string = process.env.MONGODB_DB || 'knightcrawler'; + public MONGO_INITDB_ROOT_USERNAME: string = process.env.MONGO_INITDB_ROOT_USERNAME || 'mongo'; + public MONGO_INITDB_ROOT_PASSWORD: string = process.env.MONGO_INITDB_ROOT_PASSWORD || 'mongo'; + public NO_CACHE: boolean = BooleanHelpers.parseBool(process.env.NO_CACHE, false); + public COLLECTION_NAME: string = process.env.MONGODB_COLLECTION || 'knightcrawler_consumer_collection'; + + public get MONGO_URI() { + return `mongodb://${this.MONGO_INITDB_ROOT_USERNAME}:${this.MONGO_INITDB_ROOT_PASSWORD}@${this.MONGODB_HOST}:${this.MONGODB_PORT}/${this.MONGODB_DB}?authSource=admin`; + } +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/models/configuration/database_config.ts b/src/node/consumer/src/lib/models/configuration/database_config.ts new file mode 100644 index 0000000..bc47c9b --- /dev/null +++ b/src/node/consumer/src/lib/models/configuration/database_config.ts @@ -0,0 +1,11 @@ +export class DatabaseConfig { + public POSTGRES_HOST: string = process.env.POSTGRES_HOST || 'postgres'; + public POSTGRES_PORT: number = parseInt(process.env.POSTGRES_PORT || '5432'); + public POSTGRES_DATABASE: string = process.env.POSTGRES_DATABASE || 'knightcrawler'; + public POSTGRES_USER: string = process.env.POSTGRES_USER || 'postgres'; + public POSTGRES_PASSWORD: string = process.env.POSTGRES_PASSWORD || 'postgres'; + + public get POSTGRES_URI() { + return `postgres://${this.POSTGRES_USER}:${this.POSTGRES_PASSWORD}@${this.POSTGRES_HOST}:${this.POSTGRES_PORT}/${this.POSTGRES_DATABASE}`; + } +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/models/configuration/job_config.ts b/src/node/consumer/src/lib/models/configuration/job_config.ts new file mode 100644 index 0000000..9ebbf90 --- /dev/null +++ b/src/node/consumer/src/lib/models/configuration/job_config.ts @@ -0,0 +1,6 @@ +import {BooleanHelpers} from "../../helpers/boolean_helpers"; + +export class JobConfig { + public JOB_CONCURRENCY: number = parseInt(process.env.JOB_CONCURRENCY || "1", 10); + public JOBS_ENABLED: boolean = BooleanHelpers.parseBool(process.env.JOBS_ENABLED, true); +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/models/configuration/metadata_config.ts b/src/node/consumer/src/lib/models/configuration/metadata_config.ts new file mode 100644 index 0000000..2debe48 --- /dev/null +++ b/src/node/consumer/src/lib/models/configuration/metadata_config.ts @@ -0,0 +1,4 @@ +export class MetadataConfig { + public IMDB_CONCURRENT: number = parseInt(process.env.IMDB_CONCURRENT || "1", 10); + public IMDB_INTERVAL_MS: number = parseInt(process.env.IMDB_INTERVAL_MS || "1000", 10); +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/models/configuration/rabbit_config.ts b/src/node/consumer/src/lib/models/configuration/rabbit_config.ts new file mode 100644 index 0000000..d864f46 --- /dev/null +++ b/src/node/consumer/src/lib/models/configuration/rabbit_config.ts @@ -0,0 +1,4 @@ +export class RabbitConfig { + public RABBIT_URI: string = process.env.RABBIT_URI || 'amqp://localhost'; + public QUEUE_NAME: string = process.env.QUEUE_NAME || 'test-queue'; +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/models/configuration/torrent_config.ts b/src/node/consumer/src/lib/models/configuration/torrent_config.ts new file mode 100644 index 0000000..5f0f6c7 --- /dev/null +++ b/src/node/consumer/src/lib/models/configuration/torrent_config.ts @@ -0,0 +1,4 @@ +export class TorrentConfig { + public MAX_CONNECTIONS_PER_TORRENT: number = parseInt(process.env.MAX_SINGLE_TORRENT_CONNECTIONS || "20", 10); + public TIMEOUT: number = parseInt(process.env.TORRENT_TIMEOUT || "30000", 10); +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/models/configuration/tracker_config.ts b/src/node/consumer/src/lib/models/configuration/tracker_config.ts new file mode 100644 index 0000000..275dfa3 --- /dev/null +++ b/src/node/consumer/src/lib/models/configuration/tracker_config.ts @@ -0,0 +1,6 @@ +import {BooleanHelpers} from "../../helpers/boolean_helpers"; + +export class TrackerConfig { + public TRACKERS_URL: string = process.env.TRACKERS_URL || 'https://ngosang.github.io/trackerslist/trackers_all.txt'; + public UDP_ENABLED: boolean = BooleanHelpers.parseBool(process.env.UDP_TRACKERS_ENABLED, false); +} \ No newline at end of file diff --git a/src/node/consumer/src/lib/services/cache_service.ts b/src/node/consumer/src/lib/services/cache_service.ts index 5743d3c..88aad32 100644 --- a/src/node/consumer/src/lib/services/cache_service.ts +++ b/src/node/consumer/src/lib/services/cache_service.ts @@ -1,6 +1,6 @@ import {Cache, createCache, memoryStore} from 'cache-manager'; import { mongoDbStore } from '@tirke/node-cache-manager-mongodb' -import { cacheConfig } from '../config'; +import { configurationService } from './configuration_service'; import { logger } from './logging_service'; import { CacheType } from "../enums/cache_types"; import {CacheOptions} from "../interfaces/cache_options"; @@ -19,7 +19,7 @@ type CacheMethod = () => any; class CacheService { constructor() { - if (!cacheConfig.NO_CACHE) { + if (!configurationService.cacheConfig.NO_CACHE) { logger.info('Cache is disabled'); return; } @@ -47,9 +47,9 @@ class CacheService { private initiateMongoCache = () => { const store = mongoDbStore({ - collectionName: cacheConfig.COLLECTION_NAME, + collectionName: configurationService.cacheConfig.COLLECTION_NAME, ttl: GLOBAL_TTL, - url: cacheConfig.MONGO_URI, + url: configurationService.cacheConfig.MONGO_URI, mongoConfig:{ socketTimeoutMS: 120000, appName: 'knightcrawler-consumer', @@ -62,12 +62,12 @@ class CacheService { } private initiateRemoteCache = (): Cache => { - if (cacheConfig.NO_CACHE) { + if (configurationService.cacheConfig.NO_CACHE) { logger.debug('Cache is disabled'); return null; } - return cacheConfig.MONGO_URI ? this.initiateMongoCache() : this.initiateMemoryCache(); + return configurationService.cacheConfig.MONGO_URI ? this.initiateMongoCache() : this.initiateMemoryCache(); } private getCacheType = (cacheType: CacheType): typeof this.memoryCache | null => { @@ -88,7 +88,7 @@ class CacheService { cacheType: CacheType, key: string, method: CacheMethod, options: CacheOptions): Promise => { const cache = this.getCacheType(cacheType); - if (cacheConfig.NO_CACHE || !cache) { + if (configurationService.cacheConfig.NO_CACHE || !cache) { return method(); } diff --git a/src/node/consumer/src/lib/services/configuration_service.ts b/src/node/consumer/src/lib/services/configuration_service.ts new file mode 100644 index 0000000..9bef6d3 --- /dev/null +++ b/src/node/consumer/src/lib/services/configuration_service.ts @@ -0,0 +1,19 @@ +import {RabbitConfig} from "../models/configuration/rabbit_config"; +import {CacheConfig} from "../models/configuration/cache_config"; +import {DatabaseConfig} from "../models/configuration/database_config"; +import {JobConfig} from "../models/configuration/job_config"; +import {MetadataConfig} from "../models/configuration/metadata_config"; +import {TrackerConfig} from "../models/configuration/tracker_config"; +import {TorrentConfig} from "../models/configuration/torrent_config"; + +class ConfigurationService { + public readonly rabbitConfig = new RabbitConfig(); + public readonly cacheConfig = new CacheConfig(); + public readonly databaseConfig = new DatabaseConfig(); + public readonly jobConfig = new JobConfig(); + public readonly metadataConfig = new MetadataConfig(); + public readonly trackerConfig = new TrackerConfig(); + public readonly torrentConfig = new TorrentConfig(); +} + +export const configurationService = new ConfigurationService(); diff --git a/src/node/consumer/src/lib/services/torrent_download_service.ts b/src/node/consumer/src/lib/services/torrent_download_service.ts index 2cffccd..3af4a26 100644 --- a/src/node/consumer/src/lib/services/torrent_download_service.ts +++ b/src/node/consumer/src/lib/services/torrent_download_service.ts @@ -1,13 +1,13 @@ import { encode } from 'magnet-uri'; import torrentStream from 'torrent-stream'; -import { torrentConfig } from '../config'; +import { configurationService } from './configuration_service'; import {extensionService} from './extension_service'; import {TorrentInfo} from "../interfaces/torrent_info"; import {DownloadedTorrentFile} from "../interfaces/downloaded_torrent_file"; class TorrentDownloadService { private engineOptions: TorrentStream.TorrentEngineOptions = { - connections: torrentConfig.MAX_CONNECTIONS_PER_TORRENT, + connections: configurationService.torrentConfig.MAX_CONNECTIONS_PER_TORRENT, uploads: 0, verify: false, dht: false, diff --git a/src/node/consumer/src/lib/services/tracker_service.ts b/src/node/consumer/src/lib/services/tracker_service.ts index 15438a5..d17c7bb 100644 --- a/src/node/consumer/src/lib/services/tracker_service.ts +++ b/src/node/consumer/src/lib/services/tracker_service.ts @@ -1,6 +1,6 @@ import axios, { AxiosResponse } from 'axios'; import { cacheService } from "./cache_service"; -import { trackerConfig } from '../config'; +import { configurationService } from './configuration_service'; import { logger } from "./logging_service"; class TrackerService { @@ -9,14 +9,14 @@ class TrackerService { }; private async downloadTrackers(): Promise { - const response: AxiosResponse = await axios.get(trackerConfig.TRACKERS_URL); + const response: AxiosResponse = await axios.get(configurationService.trackerConfig.TRACKERS_URL); const trackersListText: string = response.data; // Trackers are separated by a newline character let urlTrackers = trackersListText.split("\n"); // remove blank lines urlTrackers = urlTrackers.filter(line => line.trim() !== ''); - if (!trackerConfig.UDP_ENABLED) { + if (!configurationService.trackerConfig.UDP_ENABLED) { // remove any udp trackers urlTrackers = urlTrackers.filter(line => !line.startsWith('udp://')); diff --git a/src/node/consumer/src/lib/torrentEntries.js b/src/node/consumer/src/lib/torrentEntries.js index d4aeef2..0212c1a 100644 --- a/src/node/consumer/src/lib/torrentEntries.js +++ b/src/node/consumer/src/lib/torrentEntries.js @@ -1,7 +1,7 @@ import { parse } from 'parse-torrent-title'; import { metadataService } from './services/metadata_service'; import { parsingService } from './services/parsing_service'; -import {PromiseHelpers} from './helpers/promises_helpers.js'; +import {PromiseHelpers} from './helpers/promises_helpers'; import { repository } from '../repository/database_repository'; import { parseTorrentFiles } from './torrentFiles.js'; import { assignSubtitles } from './torrentSubtitles.js'; diff --git a/src/node/consumer/src/lib/torrentFiles.js b/src/node/consumer/src/lib/torrentFiles.js index 417f783..c2fda7c 100644 --- a/src/node/consumer/src/lib/torrentFiles.js +++ b/src/node/consumer/src/lib/torrentFiles.js @@ -2,17 +2,17 @@ import Bottleneck from 'bottleneck'; import distance from 'jaro-winkler'; import moment from 'moment'; import { parse } from 'parse-torrent-title'; -import { metadataConfig } from './config'; +import { configurationService } from './services/configuration_service'; import { extensionService } from './services/extension_service'; import { metadataService } from './services/metadata_service'; import { parsingService } from './services/parsing_service'; -import {PromiseHelpers} from './helpers/promises_helpers.js'; +import {PromiseHelpers} from './helpers/promises_helpers'; import {torrentDownloadService} from "./services/torrent_download_service"; import { TorrentType } from './enums/torrent_types'; import {logger} from "./services/logging_service"; const MIN_SIZE = 5 * 1024 * 1024; // 5 MB -const imdb_limiter = new Bottleneck({ maxConcurrent: metadataConfig.IMDB_CONCURRENT, minTime: metadataConfig.IMDB_INTERVAL_MS }); +const imdb_limiter = new Bottleneck({ maxConcurrent: configurationService.metadataConfig.IMDB_CONCURRENT, minTime: configurationService.metadataConfig.IMDB_INTERVAL_MS }); export async function parseTorrentFiles(torrent) { const parsedTorrentName = parse(torrent.title); diff --git a/src/node/consumer/src/repository/database_repository.ts b/src/node/consumer/src/repository/database_repository.ts index c1772a2..d6e4455 100644 --- a/src/node/consumer/src/repository/database_repository.ts +++ b/src/node/consumer/src/repository/database_repository.ts @@ -1,7 +1,7 @@ import moment from 'moment'; import {literal, Op, WhereOptions} from "sequelize"; import {Model, Sequelize} from 'sequelize-typescript'; -import {databaseConfig} from '../lib/config'; +import {configurationService} from '../lib/services/configuration_service'; import {PromiseHelpers} from '../lib/helpers/promises_helpers'; import {Provider} from "./models/provider"; import {File} from "./models/file"; @@ -244,7 +244,7 @@ class DatabaseRepository { private createDatabase(): Sequelize { const newDatabase = new Sequelize( - databaseConfig.POSTGRES_URI, + configurationService.databaseConfig.POSTGRES_URI, { logging: false }