mirror of
https://github.com/knightcrawler-stremio/knightcrawler.git
synced 2024-12-20 03:29:51 +00:00
cache now ts
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib'
|
import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib'
|
||||||
import {jobConfig, rabbitConfig} from '../lib/config.js';
|
import {jobConfig, rabbitConfig} from '../lib/config';
|
||||||
import {processTorrentRecord} from '../lib/torrent_processor';
|
import {processTorrentRecord} from '../lib/torrent_processor';
|
||||||
import {logger} from '../lib/logger';
|
import {logger} from '../lib/logger';
|
||||||
import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message";
|
import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message";
|
||||||
@@ -47,7 +47,7 @@ export const listenToQueue = async (): Promise<void> => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const connection: Connection = await client.connect(rabbitConfig.URI);
|
const connection: Connection = await client.connect(rabbitConfig.RABBIT_URI);
|
||||||
const channel: Channel = await connection.createChannel();
|
const channel: Channel = await connection.createChannel();
|
||||||
await assertAndConsumeQueue(channel);
|
await assertAndConsumeQueue(channel);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
import { createCache, memoryStore} from 'cache-manager';
|
import {Cache, createCache, memoryStore} from 'cache-manager';
|
||||||
import { mongoDbStore } from '@tirke/node-cache-manager-mongodb'
|
import { mongoDbStore } from '@tirke/node-cache-manager-mongodb'
|
||||||
import { cacheConfig } from './config.js';
|
import { cacheConfig } from './config';
|
||||||
import { logger } from './logger';
|
import { logger } from './logger';
|
||||||
import { CacheType } from "./enums/cache_types";
|
import { CacheType } from "./enums/cache_types";
|
||||||
|
import {CacheOptions} from "./interfaces/cache_options";
|
||||||
|
|
||||||
const GLOBAL_KEY_PREFIX = 'knightcrawler-consumer';
|
const GLOBAL_KEY_PREFIX = 'knightcrawler-consumer';
|
||||||
const IMDB_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|imdb_id`;
|
const IMDB_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|imdb_id`;
|
||||||
@@ -10,40 +11,43 @@ const KITSU_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|kitsu_id`;
|
|||||||
const METADATA_PREFIX = `${GLOBAL_KEY_PREFIX}|metadata`;
|
const METADATA_PREFIX = `${GLOBAL_KEY_PREFIX}|metadata`;
|
||||||
const TRACKERS_KEY_PREFIX = `${GLOBAL_KEY_PREFIX}|trackers`;
|
const TRACKERS_KEY_PREFIX = `${GLOBAL_KEY_PREFIX}|trackers`;
|
||||||
|
|
||||||
const GLOBAL_TTL = process.env.METADATA_TTL || 7 * 24 * 60 * 60; // 7 days
|
const GLOBAL_TTL: number = Number(process.env.METADATA_TTL) || 7 * 24 * 60 * 60; // 7 days
|
||||||
const MEMORY_TTL = process.env.METADATA_TTL || 2 * 60 * 60; // 2 hours
|
const MEMORY_TTL: number = Number(process.env.METADATA_TTL) || 2 * 60 * 60; // 2 hours
|
||||||
const TRACKERS_TTL = 2 * 24 * 60 * 60; // 2 days
|
const TRACKERS_TTL: number = 2 * 24 * 60 * 60; // 2 days
|
||||||
|
|
||||||
|
type CacheMethod = () => any;
|
||||||
|
|
||||||
const initiateMemoryCache = () =>
|
const initiateMemoryCache = () =>
|
||||||
createCache(memoryStore(), {
|
createCache(memoryStore(), {
|
||||||
ttl: parseInt(MEMORY_TTL)
|
ttl: MEMORY_TTL
|
||||||
});
|
}) as Cache;
|
||||||
|
|
||||||
const initiateMongoCache = () => {
|
const initiateMongoCache = () => {
|
||||||
const store = mongoDbStore({
|
const store = mongoDbStore({
|
||||||
collectionName: cacheConfig.COLLECTION_NAME,
|
collectionName: cacheConfig.COLLECTION_NAME,
|
||||||
ttl: parseInt(GLOBAL_TTL),
|
ttl: GLOBAL_TTL,
|
||||||
url: cacheConfig.MONGO_URI,
|
url: cacheConfig.MONGO_URI,
|
||||||
mongoConfig:{
|
mongoConfig:{
|
||||||
socketTimeoutMS: 120000,
|
socketTimeoutMS: 120000,
|
||||||
appName: 'knightcrawler-consumer',
|
appName: 'knightcrawler-consumer',
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
return createCache(store, {
|
return createCache(store, {
|
||||||
ttl: parseInt(GLOBAL_TTL),
|
ttl: GLOBAL_TTL,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
const initiateRemoteCache = ()=> {
|
const initiateRemoteCache = (): Cache => {
|
||||||
if (cacheConfig.NO_CACHE) {
|
if (cacheConfig.NO_CACHE) {
|
||||||
logger.debug('Cache is disabled');
|
logger.debug('Cache is disabled');
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
return cacheConfig.MONGO_URI ? initiateMongoCache() : initiateMemoryCache();
|
return cacheConfig.MONGO_URI ? initiateMongoCache() : initiateMemoryCache();
|
||||||
}
|
}
|
||||||
|
|
||||||
const getCacheType = (cacheType) => {
|
const getCacheType = (cacheType: CacheType): typeof memoryCache | null => {
|
||||||
switch (cacheType) {
|
switch (cacheType) {
|
||||||
case CacheType.MEMORY:
|
case CacheType.MEMORY:
|
||||||
return memoryCache;
|
return memoryCache;
|
||||||
@@ -54,12 +58,13 @@ const getCacheType = (cacheType) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const memoryCache = initiateMemoryCache()
|
const memoryCache = initiateMemoryCache();
|
||||||
const remoteCache = initiateRemoteCache()
|
const remoteCache = initiateRemoteCache();
|
||||||
|
|
||||||
const cacheWrap = async (cacheType, key, method, options) => {
|
const cacheWrap = async (
|
||||||
|
cacheType: CacheType, key: string, method: CacheMethod, options: CacheOptions): Promise<any> => {
|
||||||
const cache = getCacheType(cacheType);
|
const cache = getCacheType(cacheType);
|
||||||
|
|
||||||
if (cacheConfig.NO_CACHE || !cache) {
|
if (cacheConfig.NO_CACHE || !cache) {
|
||||||
return method();
|
return method();
|
||||||
}
|
}
|
||||||
@@ -67,18 +72,18 @@ const cacheWrap = async (cacheType, key, method, options) => {
|
|||||||
logger.debug(`Cache type: ${cacheType}`);
|
logger.debug(`Cache type: ${cacheType}`);
|
||||||
logger.debug(`Cache key: ${key}`);
|
logger.debug(`Cache key: ${key}`);
|
||||||
logger.debug(`Cache options: ${JSON.stringify(options)}`);
|
logger.debug(`Cache options: ${JSON.stringify(options)}`);
|
||||||
|
|
||||||
return cache.wrap(key, method, options.ttl);
|
return cache.wrap(key, method, options.ttl);
|
||||||
}
|
}
|
||||||
|
|
||||||
export const cacheWrapImdbId = (key, method) =>
|
export const cacheWrapImdbId = (key: string, method: CacheMethod): Promise<any> =>
|
||||||
cacheWrap(CacheType.MONGODB, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: parseInt(GLOBAL_TTL) });
|
cacheWrap(CacheType.MONGODB, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL });
|
||||||
|
|
||||||
export const cacheWrapKitsuId = (key, method) =>
|
export const cacheWrapKitsuId = (key: string, method: CacheMethod): Promise<any> =>
|
||||||
cacheWrap(CacheType.MONGODB, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: parseInt(GLOBAL_TTL) });
|
cacheWrap(CacheType.MONGODB, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL });
|
||||||
|
|
||||||
export const cacheWrapMetadata = (id, method) =>
|
export const cacheWrapMetadata = (id: string, method: CacheMethod): Promise<any> =>
|
||||||
cacheWrap(CacheType.MEMORY, `${METADATA_PREFIX}:${id}`, method, { ttl: parseInt(MEMORY_TTL) });
|
cacheWrap(CacheType.MEMORY, `${METADATA_PREFIX}:${id}`, method, { ttl: MEMORY_TTL });
|
||||||
|
|
||||||
export const cacheTrackers = (method) =>
|
export const cacheTrackers = (method: CacheMethod): Promise<any> =>
|
||||||
cacheWrap(CacheType.MEMORY, `${TRACKERS_KEY_PREFIX}`, method, { ttl: parseInt(TRACKERS_TTL) });
|
cacheWrap(CacheType.MEMORY, `${TRACKERS_KEY_PREFIX}`, method, { ttl: TRACKERS_TTL });
|
||||||
@@ -1,63 +0,0 @@
|
|||||||
export const rabbitConfig = {
|
|
||||||
URI: process.env.RABBIT_URI || 'amqp://localhost',
|
|
||||||
QUEUE_NAME: process.env.QUEUE_NAME || 'test-queue'
|
|
||||||
}
|
|
||||||
|
|
||||||
export const cacheConfig = {
|
|
||||||
MONGODB_HOST: process.env.MONGODB_HOST || 'mongodb',
|
|
||||||
MONGODB_PORT: process.env.MONGODB_PORT || '27017',
|
|
||||||
MONGODB_DB: process.env.MONGODB_DB || 'knightcrawler',
|
|
||||||
MONGO_INITDB_ROOT_USERNAME: process.env.MONGO_INITDB_ROOT_USERNAME || 'mongo',
|
|
||||||
MONGO_INITDB_ROOT_PASSWORD: process.env.MONGO_INITDB_ROOT_PASSWORD || 'mongo',
|
|
||||||
NO_CACHE: parseBool(process.env.NO_CACHE, false),
|
|
||||||
COLLECTION_NAME: process.env.MONGODB_COLLECTION || 'knightcrawler_consumer_collection'
|
|
||||||
}
|
|
||||||
|
|
||||||
// Combine the environment variables into a connection string
|
|
||||||
// The combined string will look something like:
|
|
||||||
// 'mongodb://mongo:mongo@localhost:27017/knightcrawler?authSource=admin'
|
|
||||||
cacheConfig.MONGO_URI = 'mongodb://' + cacheConfig.MONGO_INITDB_ROOT_USERNAME + ':' + cacheConfig.MONGO_INITDB_ROOT_PASSWORD + '@' + cacheConfig.MONGODB_HOST + ':' + cacheConfig.MONGODB_PORT + '/' + cacheConfig.MONGODB_DB + '?authSource=admin';
|
|
||||||
|
|
||||||
export const databaseConfig = {
|
|
||||||
POSTGRES_HOST: process.env.POSTGRES_HOST || 'postgres',
|
|
||||||
POSTGRES_PORT: process.env.POSTGRES_PORT || '5432',
|
|
||||||
POSTGRES_DB: process.env.POSTGRES_DB || 'knightcrawler',
|
|
||||||
POSTGRES_USER: process.env.POSTGRES_USER || 'postgres',
|
|
||||||
POSTGRES_PASSWORD: process.env.POSTGRES_PASSWORD || 'postgres',
|
|
||||||
AUTO_CREATE_AND_APPLY_MIGRATIONS: parseBool(process.env.AUTO_CREATE_AND_APPLY_MIGRATIONS, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Combine the environment variables into a connection string
|
|
||||||
// The combined string will look something like:
|
|
||||||
// 'postgres://postgres:postgres@localhost:5432/knightcrawler'
|
|
||||||
databaseConfig.POSTGRES_URI = 'postgres://' + databaseConfig.POSTGRES_USER + ':' + databaseConfig.POSTGRES_PASSWORD + '@' + databaseConfig.POSTGRES_HOST + ':' + databaseConfig.POSTGRES_PORT + '/' + databaseConfig.POSTGRES_DB;
|
|
||||||
|
|
||||||
export const jobConfig = {
|
|
||||||
JOB_CONCURRENCY: parseInt(process.env.JOB_CONCURRENCY || 1),
|
|
||||||
JOBS_ENABLED: parseBool(process.env.JOBS_ENABLED || true)
|
|
||||||
}
|
|
||||||
|
|
||||||
export const metadataConfig = {
|
|
||||||
IMDB_CONCURRENT: parseInt(process.env.IMDB_CONCURRENT || 1),
|
|
||||||
IMDB_INTERVAL_MS: parseInt(process.env.IMDB_INTERVAL_MS || 1000),
|
|
||||||
}
|
|
||||||
|
|
||||||
export const trackerConfig = {
|
|
||||||
TRACKERS_URL: process.env.TRACKERS_URL || 'https://ngosang.github.io/trackerslist/trackers_all.txt',
|
|
||||||
UDP_ENABLED: parseBool(process.env.UDP_TRACKERS_ENABLED || false),
|
|
||||||
}
|
|
||||||
|
|
||||||
export const torrentConfig = {
|
|
||||||
MAX_CONNECTIONS_PER_TORRENT: parseInt(process.env.MAX_SINGLE_TORRENT_CONNECTIONS || 20),
|
|
||||||
TIMEOUT: parseInt(process.env.TORRENT_TIMEOUT || 30000),
|
|
||||||
}
|
|
||||||
|
|
||||||
function parseBool(boolString, defaultValue) {
|
|
||||||
const isString = typeof boolString === 'string' || boolString instanceof String;
|
|
||||||
|
|
||||||
if (!isString) {
|
|
||||||
return defaultValue;
|
|
||||||
}
|
|
||||||
|
|
||||||
return boolString.toLowerCase() === 'true' ? true : defaultValue;
|
|
||||||
}
|
|
||||||
51
src/node/consumer/src/lib/config.ts
Normal file
51
src/node/consumer/src/lib/config.ts
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
const parseBool = (boolString: string | undefined, defaultValue: boolean): boolean =>
|
||||||
|
boolString?.toLowerCase() === 'true' ? true : defaultValue;
|
||||||
|
|
||||||
|
export const rabbitConfig = {
|
||||||
|
RABBIT_URI: process.env.RABBIT_URI || 'amqp://localhost',
|
||||||
|
QUEUE_NAME: process.env.QUEUE_NAME || 'test-queue'
|
||||||
|
}
|
||||||
|
|
||||||
|
export const cacheConfig = {
|
||||||
|
MONGODB_HOST: process.env.MONGODB_HOST || 'mongodb',
|
||||||
|
MONGODB_PORT: process.env.MONGODB_PORT || '27017',
|
||||||
|
MONGODB_DB: process.env.MONGODB_DB || 'knightcrawler',
|
||||||
|
MONGO_INITDB_ROOT_USERNAME: process.env.MONGO_INITDB_ROOT_USERNAME || 'mongo',
|
||||||
|
MONGO_INITDB_ROOT_PASSWORD: process.env.MONGO_INITDB_ROOT_PASSWORD || 'mongo',
|
||||||
|
NO_CACHE: parseBool(process.env.NO_CACHE, false),
|
||||||
|
COLLECTION_NAME: process.env.MONGODB_COLLECTION || 'knightcrawler_consumer_collection',
|
||||||
|
MONGO_URI: '',
|
||||||
|
}
|
||||||
|
|
||||||
|
cacheConfig.MONGO_URI = `mongodb://${cacheConfig.MONGO_INITDB_ROOT_USERNAME}:${cacheConfig.MONGO_INITDB_ROOT_PASSWORD}@${cacheConfig.MONGODB_HOST}:${cacheConfig.MONGODB_PORT}/${cacheConfig.MONGODB_DB}?authSource=admin`;
|
||||||
|
|
||||||
|
export const databaseConfig = {
|
||||||
|
POSTGRES_HOST: process.env.POSTGRES_HOST || 'postgres',
|
||||||
|
POSTGRES_PORT: process.env.POSTGRES_PORT || '5432',
|
||||||
|
POSTGRES_DATABASE: process.env.POSTGRES_DATABASE || 'knightcrawler',
|
||||||
|
POSTGRES_USERNAME: process.env.POSTGRES_USERNAME || 'postgres',
|
||||||
|
POSTGRES_PASSWORD: process.env.POSTGRES_PASSWORD || 'postgres',
|
||||||
|
POSTGRES_URI: '',
|
||||||
|
}
|
||||||
|
|
||||||
|
databaseConfig.POSTGRES_URI = `postgres://${databaseConfig.POSTGRES_USERNAME}:${databaseConfig.POSTGRES_PASSWORD}@${databaseConfig.POSTGRES_HOST}:${databaseConfig.POSTGRES_PORT}/${databaseConfig.POSTGRES_DATABASE}`;
|
||||||
|
|
||||||
|
export const jobConfig = {
|
||||||
|
JOB_CONCURRENCY: Number.parseInt(process.env.JOB_CONCURRENCY || "1", 10),
|
||||||
|
JOBS_ENABLED: parseBool(process.env.JOBS_ENABLED, true),
|
||||||
|
}
|
||||||
|
|
||||||
|
export const metadataConfig = {
|
||||||
|
IMDB_CONCURRENT: Number.parseInt(process.env.IMDB_CONCURRENT || "1", 10),
|
||||||
|
IMDB_INTERVAL_MS: Number.parseInt(process.env.IMDB_INTERVAL_MS || "1000", 10),
|
||||||
|
}
|
||||||
|
|
||||||
|
export const trackerConfig = {
|
||||||
|
TRACKERS_URL: process.env.TRACKERS_URL || 'https://ngosang.github.io/trackerslist/trackers_all.txt',
|
||||||
|
UDP_ENABLED: parseBool(process.env.UDP_TRACKERS_ENABLED, false),
|
||||||
|
}
|
||||||
|
|
||||||
|
export const torrentConfig = {
|
||||||
|
MAX_CONNECTIONS_PER_TORRENT: Number.parseInt(process.env.MAX_SINGLE_TORRENT_CONNECTIONS || "20", 10),
|
||||||
|
TIMEOUT: Number.parseInt(process.env.TORRENT_TIMEOUT || "30000", 10),
|
||||||
|
}
|
||||||
3
src/node/consumer/src/lib/interfaces/cache_options.ts
Normal file
3
src/node/consumer/src/lib/interfaces/cache_options.ts
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
export interface CacheOptions {
|
||||||
|
ttl: number;
|
||||||
|
}
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
import axios, {AxiosResponse} from 'axios';
|
import axios, {AxiosResponse} from 'axios';
|
||||||
import {search, ResultTypes} from 'google-sr';
|
import {search, ResultTypes} from 'google-sr';
|
||||||
import nameToImdb from 'name-to-imdb';
|
import nameToImdb from 'name-to-imdb';
|
||||||
import { cacheWrapImdbId, cacheWrapKitsuId, cacheWrapMetadata } from './cache.js';
|
import { cacheWrapImdbId, cacheWrapKitsuId, cacheWrapMetadata } from './cache';
|
||||||
import { TorrentType } from './enums/torrent_types';
|
import { TorrentType } from './enums/torrent_types';
|
||||||
import {MetadataResponse} from "./interfaces/metadata_response";
|
import {MetadataResponse} from "./interfaces/metadata_response";
|
||||||
import {CinemetaJsonResponse} from "./interfaces/cinemeta_metadata";
|
import {CinemetaJsonResponse} from "./interfaces/cinemeta_metadata";
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import { decode } from 'magnet-uri';
|
import { decode } from 'magnet-uri';
|
||||||
import torrentStream from 'torrent-stream';
|
import torrentStream from 'torrent-stream';
|
||||||
import { torrentConfig } from './config.js';
|
import { torrentConfig } from './config';
|
||||||
import {isSubtitle, isVideo} from './extension';
|
import {isSubtitle, isVideo} from './extension';
|
||||||
|
|
||||||
export async function torrentFiles(torrent, timeout) {
|
export async function torrentFiles(torrent, timeout) {
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ import Bottleneck from 'bottleneck';
|
|||||||
import distance from 'jaro-winkler';
|
import distance from 'jaro-winkler';
|
||||||
import moment from 'moment';
|
import moment from 'moment';
|
||||||
import { parse } from 'parse-torrent-title';
|
import { parse } from 'parse-torrent-title';
|
||||||
import { metadataConfig } from './config.js';
|
import { metadataConfig } from './config';
|
||||||
import { isDisk } from './extension';
|
import { isDisk } from './extension';
|
||||||
import { getMetadata, getImdbId, getKitsuId } from './metadata';
|
import { getMetadata, getImdbId, getKitsuId } from './metadata';
|
||||||
import { parseSeriesVideos, isPackTorrent } from './parseHelper';
|
import { parseSeriesVideos, isPackTorrent } from './parseHelper';
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import axios, { AxiosResponse } from 'axios';
|
import axios, { AxiosResponse } from 'axios';
|
||||||
import { cacheTrackers } from "./cache.js";
|
import { cacheTrackers } from "./cache";
|
||||||
import { trackerConfig } from './config.js';
|
import { trackerConfig } from './config';
|
||||||
import { logger } from "./logger";
|
import { logger } from "./logger";
|
||||||
|
|
||||||
const downloadTrackers = async (): Promise<string[]> => {
|
const downloadTrackers = async (): Promise<string[]> => {
|
||||||
|
|||||||
Reference in New Issue
Block a user