diff --git a/addon/addon.js b/addon/addon.js index f7566e0..a017a4e 100644 --- a/addon/addon.js +++ b/addon/addon.js @@ -9,6 +9,7 @@ import applySorting from './lib/sort.js'; import applyFilters from './lib/filter.js'; import { applyMochs, getMochCatalog, getMochItemMeta } from './moch/moch.js'; import StaticLinks from './moch/static.js'; +import { createNamedQueue } from "./lib/namedQueue.js"; const CACHE_MAX_AGE = parseInt(process.env.CACHE_MAX_AGE) || 60 * 60; // 1 hour in seconds const CACHE_MAX_AGE_EMPTY = 60; // 60 seconds @@ -17,9 +18,10 @@ const STALE_REVALIDATE_AGE = 4 * 60 * 60; // 4 hours const STALE_ERROR_AGE = 7 * 24 * 60 * 60; // 7 days const builder = new addonBuilder(dummyManifest()); +const requestQueue = createNamedQueue(Infinity); const limiter = new Bottleneck({ - maxConcurrent: process.env.LIMIT_MAX_CONCURRENT || 100, - highWater: process.env.LIMIT_QUEUE_SIZE || 120, + maxConcurrent: process.env.LIMIT_MAX_CONCURRENT || 40, + highWater: process.env.LIMIT_QUEUE_SIZE || 100, strategy: Bottleneck.strategy.OVERFLOW }); const limiterOptions = { expiration: 2 * 60 * 1000 } @@ -29,10 +31,7 @@ builder.defineStreamHandler((args) => { return Promise.resolve({ streams: [] }); } - return cacheWrapStream(args.id, () => limiter.schedule(limiterOptions, () => streamHandler(args) - .then(records => records - .sort((a, b) => b.torrent.seeders - a.torrent.seeders || b.torrent.uploadDate - a.torrent.uploadDate) - .map(record => toStreamInfo(record))))) + return requestQueue.wrap(args.id, () => resolveStreams(args)) .then(streams => applyFilters(streams, args.extra)) .then(streams => applySorting(streams, args.extra, args.type)) .then(streams => applyStaticInfo(streams)) @@ -69,7 +68,15 @@ builder.defineMetaHandler((args) => { }); }) +async function resolveStreams(args) { + return cacheWrapStream(args.id, () => limiter.schedule(limiterOptions, () => streamHandler(args) + .then(records => records + .sort((a, b) => b.torrent.seeders - a.torrent.seeders || b.torrent.uploadDate - a.torrent.uploadDate) + .map(record => toStreamInfo(record))))); +} + async function streamHandler(args) { + console.log(`Current stats: `, limiter.counts()) if (args.type === Type.MOVIE) { return movieRecordsHandler(args); } else if (args.type === Type.SERIES) { diff --git a/addon/index.js b/addon/index.js index 73606ab..ab0f0a4 100644 --- a/addon/index.js +++ b/addon/index.js @@ -1,9 +1,22 @@ import express from 'express'; +import swStats from 'swagger-stats'; import serverless from './serverless.js'; +import { manifest } from './lib/manifest.js'; import { initBestTrackers } from './lib/magnetHelper.js'; const app = express(); app.enable('trust proxy'); +app.use(swStats.getMiddleware({ + name: manifest().name, + version: manifest().version, + timelineBucketDuration: 60 * 60 * 1000, + apdexThreshold: 100, + authentication: true, + onAuthenticate: (req, username, password) => { + return username === process.env.METRICS_USER + && password === process.env.METRICS_PASSWORD + }, +})) app.use(express.static('static', { maxAge: '1y' })); app.use((req, res, next) => serverless(req, res, next)); app.listen(process.env.PORT || 7000, () => { diff --git a/addon/lib/cache.js b/addon/lib/cache.js index 48a3d35..7f46d0a 100644 --- a/addon/lib/cache.js +++ b/addon/lib/cache.js @@ -7,7 +7,7 @@ const STREAM_KEY_PREFIX = `${GLOBAL_KEY_PREFIX}|stream`; const AVAILABILITY_KEY_PREFIX = `${GLOBAL_KEY_PREFIX}|availability`; const RESOLVED_URL_KEY_PREFIX = `${GLOBAL_KEY_PREFIX}|resolved`; -const STREAM_TTL = process.env.STREAM_TTL || 4 * 60 * 60; // 4 hours +const STREAM_TTL = process.env.STREAM_TTL || 24 * 60 * 60; // 24 hours const STREAM_EMPTY_TTL = process.env.STREAM_EMPTY_TTL || 60; // 1 minute const AVAILABILITY_TTL = 8 * 60 * 60; // 8 hours const AVAILABILITY_EMPTY_TTL = 30 * 60; // 30 minutes diff --git a/addon/lib/namedQueue.js b/addon/lib/namedQueue.js new file mode 100644 index 0000000..695ce05 --- /dev/null +++ b/addon/lib/namedQueue.js @@ -0,0 +1,11 @@ +import namedQueue from "named-queue"; + +export function createNamedQueue(concurrency) { + const queue = new namedQueue((task, callback) => task.method() + .then(result => callback(false, result)) + .catch((error => callback(error))), 200); + queue.wrap = (id, method) => new Promise(((resolve, reject) => { + queue.push({ id, method }, (error, result) => result ? resolve(result) : reject(error)); + })); + return queue; +} \ No newline at end of file diff --git a/addon/lib/repository.js b/addon/lib/repository.js index cdee1ac..9447514 100644 --- a/addon/lib/repository.js +++ b/addon/lib/repository.js @@ -3,7 +3,7 @@ const Op = Sequelize.Op; const DATABASE_URI = process.env.DATABASE_URI; -const database = new Sequelize(DATABASE_URI, { logging: false, pool: { max: 50 } }); +const database = new Sequelize(DATABASE_URI, { logging: false, pool: { max: 50, min: 5, idle: 60 * 60 * 1000 } }); const Torrent = database.define('torrent', { diff --git a/addon/moch/moch.js b/addon/moch/moch.js index 0dfb1e4..82376bf 100644 --- a/addon/moch/moch.js +++ b/addon/moch/moch.js @@ -10,6 +10,7 @@ import StaticResponse, { isStaticUrl } from './static.js'; import { cacheWrapResolvedUrl } from '../lib/cache.js'; import { timeout } from '../lib/promises.js'; import { BadTokenError, streamFilename, AccessDeniedError, enrichMeta } from './mochHelper.js'; +import { createNamedQueue } from "../lib/namedQueue.js"; const RESOLVE_TIMEOUT = 2 * 60 * 1000; // 2 minutes const MIN_API_KEY_SYMBOLS = 15; @@ -62,9 +63,7 @@ export const MochOptions = { const unrestrictQueues = {} Object.values(MochOptions) .map(moch => moch.key) - .forEach(mochKey => unrestrictQueues[mochKey] = new namedQueue((task, callback) => task.method() - .then(result => callback(false, result)) - .catch((error => callback(error))), 200)); + .forEach(mochKey => unrestrictQueues[mochKey] = createNamedQueue(50)); export function hasMochConfigured(config) { return Object.keys(MochOptions).find(moch => config?.[moch]) @@ -110,10 +109,7 @@ export async function resolve(parameters) { return StaticResponse.FAILED_UNEXPECTED; }) .then(url => isStaticUrl(url) ? `${parameters.host}/${url}` : url); - const unrestrictQueue = unrestrictQueues[moch.key]; - return new Promise(((resolve, reject) => { - unrestrictQueue.push({ id, method }, (error, result) => result ? resolve(result) : reject(error)); - })); + return unrestrictQueues[moch.key].wrap(id, method); } export async function getMochCatalog(mochKey, config) {