Add pino as a logger, and update cache manager
Rewrites the cache service to use the latest implementation of cache manager, and bring in the new mongo package. Introduce Logger service Removes bluebird as a dependency
This commit is contained in:
2931
src/node/consumer/package-lock.json
generated
2931
src/node/consumer/package-lock.json
generated
File diff suppressed because it is too large
Load Diff
@@ -4,27 +4,26 @@
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"build": "node esbuild.js",
|
||||
"dev": "tsx watch --ignore node_modules src/index.js",
|
||||
"dev": "tsx watch --ignore node_modules src/index.js | pino-pretty",
|
||||
"start": "node dist/index.cjs",
|
||||
"lint": "eslint . --ext .ts,.js"
|
||||
},
|
||||
"author": "A Dude",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@tirke/node-cache-manager-mongodb": "^1.6.0",
|
||||
"amqplib": "^0.10.3",
|
||||
"axios": "^1.6.1",
|
||||
"bluebird": "^3.7.2",
|
||||
"bottleneck": "^2.19.5",
|
||||
"cache-manager": "^3.4.4",
|
||||
"cache-manager-mongodb": "^0.3.0",
|
||||
"cache-manager": "^5.4.0",
|
||||
"google-sr": "^3.2.1",
|
||||
"jaro-winkler": "^0.2.8",
|
||||
"magnet-uri": "^6.2.0",
|
||||
"moment": "^2.30.1",
|
||||
"name-to-imdb": "^3.0.4",
|
||||
"parse-torrent-title": "git://github.com/TheBeastLT/parse-torrent-title.git#022408972c2a040f846331a912a6a8487746a654",
|
||||
"parse-torrent-title": "https://github.com/TheBeastLT/parse-torrent-title.git#022408972c2a040f846331a912a6a8487746a654",
|
||||
"pg": "^8.11.3",
|
||||
"pg-hstore": "^2.3.4",
|
||||
"pino": "^8.18.0",
|
||||
"sequelize": "^6.31.1",
|
||||
"torrent-stream": "^1.2.1",
|
||||
"user-agents": "^1.0.1444"
|
||||
@@ -32,10 +31,11 @@
|
||||
"devDependencies": {
|
||||
"@types/node": "^20.11.6",
|
||||
"@types/stremio-addon-sdk": "^1.6.10",
|
||||
"esbuild": "^0.19.12",
|
||||
"esbuild": "^0.20.0",
|
||||
"eslint": "^8.56.0",
|
||||
"eslint-plugin-import": "^2.29.1",
|
||||
"eslint-plugin-import-helpers": "^1.3.1",
|
||||
"tsx": "^4.7.0"
|
||||
"tsx": "^4.7.0",
|
||||
"pino-pretty": "^10.3.1"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,34 +1,37 @@
|
||||
import amqp from 'amqplib'
|
||||
import Promise from 'bluebird'
|
||||
import { rabbitConfig, jobConfig } from '../lib/config.js'
|
||||
import { processTorrentRecord } from "../lib/ingestedTorrent.js";
|
||||
import {logger} from "../lib/logger.js";
|
||||
|
||||
const assertQueueOptions = { durable: true }
|
||||
const consumeQueueOptions = { noAck: false }
|
||||
|
||||
const processMessage = msg =>
|
||||
Promise.resolve(getMessageAsJson(msg))
|
||||
.then(torrent => processTorrentRecord(torrent))
|
||||
.then(() => Promise.resolve(msg));
|
||||
|
||||
const getMessageAsJson = msg => {
|
||||
const torrent = JSON.parse(msg.content.toString());
|
||||
return Promise.resolve(torrent.message);
|
||||
}
|
||||
const processMessage = msg => processTorrentRecord(getMessageAsJson(msg));
|
||||
|
||||
const assertAndConsumeQueue = channel => {
|
||||
console.log('Worker is running! Waiting for new torrents...')
|
||||
const getMessageAsJson = msg =>
|
||||
JSON.parse(msg.content.toString()).message;
|
||||
|
||||
const ackMsg = msg => Promise.resolve(msg)
|
||||
.then(msg => processMessage(msg))
|
||||
.then(msg => channel.ack(msg))
|
||||
.catch(error => console.error('Failed processing torrent', error));
|
||||
const assertAndConsumeQueue = async channel => {
|
||||
logger.info('Worker is running! Waiting for new torrents...')
|
||||
|
||||
return channel.assertQueue(rabbitConfig.QUEUE_NAME, assertQueueOptions)
|
||||
const ackMsg = msg =>
|
||||
processMessage(msg)
|
||||
.then(() => channel.ack(msg))
|
||||
.catch(error => logger.error('Failed processing torrent', error));
|
||||
|
||||
channel.assertQueue(rabbitConfig.QUEUE_NAME, assertQueueOptions)
|
||||
.then(() => channel.prefetch(jobConfig.JOB_CONCURRENCY))
|
||||
.then(() => channel.consume(rabbitConfig.QUEUE_NAME, ackMsg, consumeQueueOptions))
|
||||
.catch(error => logger.error('Failed to setup channel', error));
|
||||
}
|
||||
|
||||
export const listenToQueue = () => amqp.connect(rabbitConfig.URI)
|
||||
.then(connection => connection.createChannel())
|
||||
.then(channel => assertAndConsumeQueue(channel))
|
||||
export const listenToQueue = async () => {
|
||||
if (!jobConfig.JOBS_ENABLED) {
|
||||
return;
|
||||
}
|
||||
|
||||
return amqp.connect(rabbitConfig.URI)
|
||||
.then(connection => connection.createChannel())
|
||||
.then(channel => assertAndConsumeQueue(channel))
|
||||
.catch(error => logger.error('Failed to connect and setup channel', error));
|
||||
};
|
||||
@@ -1,6 +1,8 @@
|
||||
import cacheManager from 'cache-manager';
|
||||
import mangodbStore from 'cache-manager-mongodb';
|
||||
import { createCache, memoryStore} from 'cache-manager';
|
||||
import { mongoDbStore } from '@tirke/node-cache-manager-mongodb'
|
||||
import { cacheConfig } from './config.js';
|
||||
import { logger } from './logger.js';
|
||||
import { CacheType } from "./types.js";
|
||||
|
||||
const GLOBAL_KEY_PREFIX = 'selfhostio-consumer';
|
||||
const IMDB_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|imdb_id`;
|
||||
@@ -12,61 +14,71 @@ const GLOBAL_TTL = process.env.METADATA_TTL || 7 * 24 * 60 * 60; // 7 days
|
||||
const MEMORY_TTL = process.env.METADATA_TTL || 2 * 60 * 60; // 2 hours
|
||||
const TRACKERS_TTL = 2 * 24 * 60 * 60; // 2 days
|
||||
|
||||
const memoryCache = initiateMemoryCache();
|
||||
const remoteCache = initiateRemoteCache();
|
||||
const initiateMemoryCache = () =>
|
||||
createCache(memoryStore(), {
|
||||
ttl: parseInt(MEMORY_TTL)
|
||||
});
|
||||
|
||||
function initiateRemoteCache() {
|
||||
if (cacheConfig.NO_CACHE) {
|
||||
return null;
|
||||
} else if (cacheConfig.MONGO_URI) {
|
||||
return cacheManager.caching({
|
||||
store: mangodbStore,
|
||||
uri: cacheConfig.MONGO_URI,
|
||||
options: {
|
||||
collection: cacheConfig.COLLECTION_NAME,
|
||||
socketTimeoutMS: 120000,
|
||||
useNewUrlParser: true,
|
||||
useUnifiedTopology: false,
|
||||
ttl: GLOBAL_TTL
|
||||
},
|
||||
ttl: GLOBAL_TTL,
|
||||
ignoreCacheErrors: true
|
||||
});
|
||||
} else {
|
||||
return cacheManager.caching({
|
||||
store: 'memory',
|
||||
ttl: MEMORY_TTL
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function initiateMemoryCache() {
|
||||
return cacheManager.caching({
|
||||
store: 'memory',
|
||||
ttl: MEMORY_TTL,
|
||||
max: Infinity // infinite LRU cache size
|
||||
const initiateMongoCache = () => {
|
||||
const store = mongoDbStore({
|
||||
collectionName: cacheConfig.COLLECTION_NAME,
|
||||
ttl: parseInt(GLOBAL_TTL),
|
||||
url: cacheConfig.MONGO_URI,
|
||||
mongoConfig:{
|
||||
socketTimeoutMS: 120000,
|
||||
appName: 'selfhostio-consumer',
|
||||
}
|
||||
});
|
||||
|
||||
return createCache(store, {
|
||||
ttl: parseInt(GLOBAL_TTL),
|
||||
});
|
||||
}
|
||||
|
||||
function cacheWrap(cache, key, method, options) {
|
||||
const initiateRemoteCache = ()=> {
|
||||
if (cacheConfig.NO_CACHE) {
|
||||
logger.debug('Cache is disabled');
|
||||
return null;
|
||||
}
|
||||
return cacheConfig.MONGO_URI ? initiateMongoCache() : initiateMemoryCache();
|
||||
}
|
||||
|
||||
const getCacheType = (cacheType) => {
|
||||
switch (cacheType) {
|
||||
case CacheType.MEMORY:
|
||||
return memoryCache;
|
||||
case CacheType.MONGODB:
|
||||
return remoteCache;
|
||||
default:
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
const memoryCache = initiateMemoryCache()
|
||||
const remoteCache = initiateRemoteCache()
|
||||
|
||||
const cacheWrap = async (cacheType, key, method, options) => {
|
||||
const cache = getCacheType(cacheType);
|
||||
|
||||
if (cacheConfig.NO_CACHE || !cache) {
|
||||
return method();
|
||||
}
|
||||
return cache.wrap(key, method, options);
|
||||
|
||||
console.debug(`Cache type: ${cacheType}`);
|
||||
console.debug(`Cache key: ${key}`);
|
||||
console.debug(`Cache options: ${JSON.stringify(options)}`);
|
||||
|
||||
return cache.wrap(key, method, options.ttl);
|
||||
}
|
||||
|
||||
export function cacheWrapImdbId(key, method) {
|
||||
return cacheWrap(remoteCache, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL });
|
||||
}
|
||||
export const cacheWrapImdbId = (key, method) =>
|
||||
cacheWrap(CacheType.MONGODB, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: parseInt(GLOBAL_TTL) });
|
||||
|
||||
export function cacheWrapKitsuId(key, method) {
|
||||
return cacheWrap(remoteCache, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL });
|
||||
}
|
||||
export const cacheWrapKitsuId = (key, method) =>
|
||||
cacheWrap(CacheType.MONGODB, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: parseInt(GLOBAL_TTL) });
|
||||
|
||||
export function cacheWrapMetadata(id, method) {
|
||||
return cacheWrap(memoryCache, `${METADATA_PREFIX}:${id}`, method, { ttl: MEMORY_TTL });
|
||||
}
|
||||
export const cacheWrapMetadata = (id, method) =>
|
||||
cacheWrap(CacheType.MEMORY, `${METADATA_PREFIX}:${id}`, method, { ttl: parseInt(MEMORY_TTL) });
|
||||
|
||||
export function cacheTrackers(method) {
|
||||
return cacheWrap(memoryCache, `${TRACKERS_KEY_PREFIX}`, method, { ttl: TRACKERS_TTL });
|
||||
}
|
||||
export const cacheTrackers = (method) =>
|
||||
cacheWrap(CacheType.MEMORY, `${TRACKERS_KEY_PREFIX}`, method, { ttl: parseInt(TRACKERS_TTL) });
|
||||
5
src/node/consumer/src/lib/logger.js
Normal file
5
src/node/consumer/src/lib/logger.js
Normal file
@@ -0,0 +1,5 @@
|
||||
import pino from 'pino';
|
||||
|
||||
export const logger = pino({
|
||||
level: process.env.LOG_LEVEL || 'info'
|
||||
});
|
||||
@@ -1,6 +1,11 @@
|
||||
export const Type = {
|
||||
export const TorrentType = {
|
||||
MOVIE: 'movie',
|
||||
SERIES: 'series',
|
||||
ANIME: 'anime',
|
||||
PORN: 'xxx',
|
||||
};
|
||||
|
||||
export const CacheType = {
|
||||
MEMORY: 'memory',
|
||||
MONGODB: 'mongodb',
|
||||
};
|
||||
Reference in New Issue
Block a user