mirror of
https://github.com/knightcrawler-stremio/knightcrawler.git
synced 2024-12-20 03:29:51 +00:00
add named queue for stream requests
This commit is contained in:
@@ -9,6 +9,7 @@ import applySorting from './lib/sort.js';
|
|||||||
import applyFilters from './lib/filter.js';
|
import applyFilters from './lib/filter.js';
|
||||||
import { applyMochs, getMochCatalog, getMochItemMeta } from './moch/moch.js';
|
import { applyMochs, getMochCatalog, getMochItemMeta } from './moch/moch.js';
|
||||||
import StaticLinks from './moch/static.js';
|
import StaticLinks from './moch/static.js';
|
||||||
|
import { createNamedQueue } from "./lib/namedQueue.js";
|
||||||
|
|
||||||
const CACHE_MAX_AGE = parseInt(process.env.CACHE_MAX_AGE) || 60 * 60; // 1 hour in seconds
|
const CACHE_MAX_AGE = parseInt(process.env.CACHE_MAX_AGE) || 60 * 60; // 1 hour in seconds
|
||||||
const CACHE_MAX_AGE_EMPTY = 60; // 60 seconds
|
const CACHE_MAX_AGE_EMPTY = 60; // 60 seconds
|
||||||
@@ -17,9 +18,10 @@ const STALE_REVALIDATE_AGE = 4 * 60 * 60; // 4 hours
|
|||||||
const STALE_ERROR_AGE = 7 * 24 * 60 * 60; // 7 days
|
const STALE_ERROR_AGE = 7 * 24 * 60 * 60; // 7 days
|
||||||
|
|
||||||
const builder = new addonBuilder(dummyManifest());
|
const builder = new addonBuilder(dummyManifest());
|
||||||
|
const requestQueue = createNamedQueue(200);
|
||||||
const limiter = new Bottleneck({
|
const limiter = new Bottleneck({
|
||||||
maxConcurrent: process.env.LIMIT_MAX_CONCURRENT || 40,
|
maxConcurrent: process.env.LIMIT_MAX_CONCURRENT || 40,
|
||||||
highWater: process.env.LIMIT_QUEUE_SIZE || 60,
|
highWater: process.env.LIMIT_QUEUE_SIZE || 100,
|
||||||
strategy: Bottleneck.strategy.OVERFLOW
|
strategy: Bottleneck.strategy.OVERFLOW
|
||||||
});
|
});
|
||||||
const limiterOptions = { expiration: 2 * 60 * 1000 }
|
const limiterOptions = { expiration: 2 * 60 * 1000 }
|
||||||
@@ -29,10 +31,7 @@ builder.defineStreamHandler((args) => {
|
|||||||
return Promise.resolve({ streams: [] });
|
return Promise.resolve({ streams: [] });
|
||||||
}
|
}
|
||||||
|
|
||||||
return cacheWrapStream(args.id, () => limiter.schedule(limiterOptions, () => streamHandler(args)
|
return requestQueue.wrap(args, () => resolveStreams(args))
|
||||||
.then(records => records
|
|
||||||
.sort((a, b) => b.torrent.seeders - a.torrent.seeders || b.torrent.uploadDate - a.torrent.uploadDate)
|
|
||||||
.map(record => toStreamInfo(record)))))
|
|
||||||
.then(streams => applyFilters(streams, args.extra))
|
.then(streams => applyFilters(streams, args.extra))
|
||||||
.then(streams => applySorting(streams, args.extra, args.type))
|
.then(streams => applySorting(streams, args.extra, args.type))
|
||||||
.then(streams => applyStaticInfo(streams))
|
.then(streams => applyStaticInfo(streams))
|
||||||
@@ -69,19 +68,19 @@ builder.defineMetaHandler((args) => {
|
|||||||
});
|
});
|
||||||
})
|
})
|
||||||
|
|
||||||
|
async function resolveStreams(args) {
|
||||||
|
return cacheWrapStream(args.id, () => limiter.schedule(limiterOptions, () => streamHandler(args)
|
||||||
|
.then(records => records
|
||||||
|
.sort((a, b) => b.torrent.seeders - a.torrent.seeders || b.torrent.uploadDate - a.torrent.uploadDate)
|
||||||
|
.map(record => toStreamInfo(record)))));
|
||||||
|
}
|
||||||
|
|
||||||
async function streamHandler(args) {
|
async function streamHandler(args) {
|
||||||
console.log(`Current stats: `, limiter.counts())
|
console.log(`Current stats: `, limiter.counts())
|
||||||
const start = Date.now();
|
|
||||||
if (args.type === Type.MOVIE) {
|
if (args.type === Type.MOVIE) {
|
||||||
return movieRecordsHandler(args).then(result => {
|
return movieRecordsHandler(args);
|
||||||
console.log(`Execution time: ${Date.now() - start} ms`);
|
|
||||||
return result;
|
|
||||||
});
|
|
||||||
} else if (args.type === Type.SERIES) {
|
} else if (args.type === Type.SERIES) {
|
||||||
return seriesRecordsHandler(args).then(result => {
|
return seriesRecordsHandler(args);
|
||||||
console.log(`Execution time: ${Date.now() - start} ms`);
|
|
||||||
return result;
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
return Promise.reject('not supported type');
|
return Promise.reject('not supported type');
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user