some comment resolution - enums, process_torrents_job
This commit is contained in:
@@ -13,7 +13,7 @@ try {
|
||||
build({
|
||||
bundle: true,
|
||||
entryPoints: [
|
||||
"./src/index.js",
|
||||
"./src/index.ts",
|
||||
],
|
||||
external: [...(devDependencies && Object.keys(devDependencies))],
|
||||
keepNames: true,
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"build": "node esbuild.js",
|
||||
"dev": "tsx watch --ignore node_modules src/index.js | pino-pretty",
|
||||
"dev": "tsx watch --ignore node_modules src/index.ts | pino-pretty",
|
||||
"start": "node dist/index.cjs",
|
||||
"lint": "eslint . --ext .ts,.js"
|
||||
},
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { listenToQueue } from './jobs/processTorrents';
|
||||
import { processTorrentsJob } from './jobs/process_torrents_job.js';
|
||||
import { repository } from "./repository/database_repository";
|
||||
import { trackerService } from "./lib/services/tracker_service";
|
||||
|
||||
(async () => {
|
||||
await trackerService.getTrackers();
|
||||
await repository.connect();
|
||||
await listenToQueue();
|
||||
await processTorrentsJob.listenToQueue();
|
||||
})();
|
||||
@@ -1,56 +0,0 @@
|
||||
import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib'
|
||||
import {configurationService} from '../lib/services/configuration_service';
|
||||
import {torrentProcessingService} from '../lib/services/torrent_processing_service';
|
||||
import {logger} from '../lib/services/logging_service';
|
||||
import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message";
|
||||
import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes";
|
||||
|
||||
const assertQueueOptions: Options.AssertQueue = { durable: true };
|
||||
const consumeQueueOptions: Options.Consume = { noAck: false };
|
||||
|
||||
const processMessage = (msg: ConsumeMessage | null): Promise<void> => {
|
||||
const ingestedTorrent: IngestedTorrentAttributes = getMessageAsJson(msg);
|
||||
return torrentProcessingService.processTorrentRecord(ingestedTorrent);
|
||||
};
|
||||
|
||||
const getMessageAsJson = (msg: ConsumeMessage | null): IngestedTorrentAttributes => {
|
||||
const content = msg ? msg?.content.toString('utf8') : "{}";
|
||||
const receivedObject: IngestedRabbitMessage = JSON.parse(content) as IngestedRabbitMessage;
|
||||
const receivedTorrent:IngestedRabbitTorrent = receivedObject.message;
|
||||
const mappedObject: any = {...receivedTorrent, info_hash: receivedTorrent.infoHash};
|
||||
delete mappedObject.infoHash;
|
||||
|
||||
return mappedObject as IngestedTorrentAttributes;
|
||||
};
|
||||
|
||||
const assertAndConsumeQueue = async (channel: Channel): Promise<void> => {
|
||||
logger.info('Worker is running! Waiting for new torrents...');
|
||||
|
||||
const ackMsg = (msg: ConsumeMessage): void => {
|
||||
processMessage(msg)
|
||||
.then(() => channel.ack(msg))
|
||||
.catch((error: Error) => logger.error('Failed processing torrent', error));
|
||||
}
|
||||
|
||||
try {
|
||||
await channel.assertQueue(configurationService.rabbitConfig.QUEUE_NAME, assertQueueOptions);
|
||||
await channel.prefetch(configurationService.jobConfig.JOB_CONCURRENCY);
|
||||
await channel.consume(configurationService.rabbitConfig.QUEUE_NAME, ackMsg, consumeQueueOptions);
|
||||
} catch(error) {
|
||||
logger.error('Failed to setup channel', error);
|
||||
}
|
||||
}
|
||||
|
||||
export const listenToQueue = async (): Promise<void> => {
|
||||
if (!configurationService.jobConfig.JOBS_ENABLED) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const connection: Connection = await client.connect(configurationService.rabbitConfig.RABBIT_URI);
|
||||
const channel: Channel = await connection.createChannel();
|
||||
await assertAndConsumeQueue(channel);
|
||||
} catch (error) {
|
||||
logger.error('Failed to connect and setup channel', error);
|
||||
}
|
||||
}
|
||||
57
src/node/consumer/src/jobs/process_torrents_job.ts
Normal file
57
src/node/consumer/src/jobs/process_torrents_job.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib'
|
||||
import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message";
|
||||
import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes";
|
||||
import {configurationService} from '../lib/services/configuration_service';
|
||||
import {torrentProcessingService} from '../lib/services/torrent_processing_service';
|
||||
import {logger} from '../lib/services/logging_service';
|
||||
|
||||
class ProcessTorrentsJob {
|
||||
private readonly assertQueueOptions: Options.AssertQueue = {durable: true};
|
||||
private readonly consumeQueueOptions: Options.Consume = {noAck: false};
|
||||
|
||||
public listenToQueue = async (): Promise<void> => {
|
||||
if (!configurationService.jobConfig.JOBS_ENABLED) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const connection: Connection = await client.connect(configurationService.rabbitConfig.RABBIT_URI);
|
||||
const channel: Channel = await connection.createChannel();
|
||||
await this.assertAndConsumeQueue(channel);
|
||||
} catch (error) {
|
||||
logger.error('Failed to connect and setup channel', error);
|
||||
}
|
||||
}
|
||||
private processMessage = (msg: ConsumeMessage) => {
|
||||
const ingestedTorrent: IngestedTorrentAttributes = this.getMessageAsJson(msg);
|
||||
return torrentProcessingService.processTorrentRecord(ingestedTorrent);
|
||||
};
|
||||
private getMessageAsJson = (msg: ConsumeMessage): IngestedTorrentAttributes => {
|
||||
const content = msg?.content.toString('utf8') ?? "{}";
|
||||
const receivedObject: IngestedRabbitMessage = JSON.parse(content);
|
||||
const receivedTorrent: IngestedRabbitTorrent = receivedObject.message;
|
||||
return {...receivedTorrent, info_hash: receivedTorrent.infoHash};
|
||||
};
|
||||
private async assertAndConsumeQueue(channel: Channel) {
|
||||
logger.info('Worker is running! Waiting for new torrents...');
|
||||
|
||||
const ackMsg = async (msg: ConsumeMessage) => {
|
||||
try {
|
||||
await this.processMessage(msg);
|
||||
channel.ack(msg);
|
||||
} catch (error) {
|
||||
logger.error('Failed processing torrent', error);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await channel.assertQueue(configurationService.rabbitConfig.QUEUE_NAME, this.assertQueueOptions);
|
||||
await channel.prefetch(configurationService.jobConfig.JOB_CONCURRENCY);
|
||||
await channel.consume(configurationService.rabbitConfig.QUEUE_NAME, ackMsg, this.consumeQueueOptions);
|
||||
} catch (error) {
|
||||
logger.error('Failed to setup channel', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const processTorrentsJob = new ProcessTorrentsJob();
|
||||
@@ -1,4 +1,4 @@
|
||||
export enum CacheType {
|
||||
MEMORY = 'memory',
|
||||
MONGODB = 'mongodb'
|
||||
Memory = 'memory',
|
||||
MongoDb = 'mongodb'
|
||||
}
|
||||
@@ -1,6 +1,5 @@
|
||||
export enum TorrentType {
|
||||
SERIES = 'SERIES',
|
||||
MOVIE = 'MOVIE',
|
||||
ANIME = 'anime',
|
||||
PORN = 'xxx',
|
||||
Series = 'Series',
|
||||
Movie = 'Movie',
|
||||
Anime = 'anime',
|
||||
}
|
||||
@@ -29,16 +29,16 @@ class CacheService {
|
||||
}
|
||||
|
||||
public cacheWrapImdbId = (key: string, method: CacheMethod): Promise<any> =>
|
||||
this.cacheWrap(CacheType.MONGODB, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL });
|
||||
this.cacheWrap(CacheType.MongoDb, `${IMDB_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL });
|
||||
|
||||
public cacheWrapKitsuId = (key: string, method: CacheMethod): Promise<any> =>
|
||||
this.cacheWrap(CacheType.MONGODB, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL });
|
||||
this.cacheWrap(CacheType.MongoDb, `${KITSU_ID_PREFIX}:${key}`, method, { ttl: GLOBAL_TTL });
|
||||
|
||||
public cacheWrapMetadata = (id: string, method: CacheMethod): Promise<any> =>
|
||||
this.cacheWrap(CacheType.MEMORY, `${METADATA_PREFIX}:${id}`, method, { ttl: MEMORY_TTL });
|
||||
this.cacheWrap(CacheType.Memory, `${METADATA_PREFIX}:${id}`, method, { ttl: MEMORY_TTL });
|
||||
|
||||
public cacheTrackers = (method: CacheMethod): Promise<any> =>
|
||||
this.cacheWrap(CacheType.MEMORY, `${TRACKERS_KEY_PREFIX}`, method, { ttl: TRACKERS_TTL });
|
||||
this.cacheWrap(CacheType.Memory, `${TRACKERS_KEY_PREFIX}`, method, { ttl: TRACKERS_TTL });
|
||||
|
||||
private initiateMemoryCache = () =>
|
||||
createCache(memoryStore(), {
|
||||
@@ -72,9 +72,9 @@ class CacheService {
|
||||
|
||||
private getCacheType = (cacheType: CacheType): typeof this.memoryCache | null => {
|
||||
switch (cacheType) {
|
||||
case CacheType.MEMORY:
|
||||
case CacheType.Memory:
|
||||
return this.memoryCache;
|
||||
case CacheType.MONGODB:
|
||||
case CacheType.MongoDb:
|
||||
return this.remoteCache;
|
||||
default:
|
||||
return null;
|
||||
|
||||
@@ -59,12 +59,12 @@ class MetadataService {
|
||||
}
|
||||
|
||||
const key = Number.isInteger(query.id) || query.id.toString().match(/^\d+$/) ? `kitsu:${query.id}` : query.id;
|
||||
const metaType = query.type === TorrentType.MOVIE ? TorrentType.MOVIE : TorrentType.SERIES;
|
||||
const metaType = query.type === TorrentType.Movie ? TorrentType.Movie : TorrentType.Series;
|
||||
return cacheService.cacheWrapMetadata(key.toString(), () => this.requestMetadata(`${KITSU_URL}/meta/${metaType}/${key}.json`)
|
||||
.catch(() => this.requestMetadata(`${CINEMETA_URL}/meta/${metaType}/${key}.json`))
|
||||
.catch(() => {
|
||||
// try different type in case there was a mismatch
|
||||
const otherType = metaType === TorrentType.MOVIE ? TorrentType.SERIES : TorrentType.MOVIE;
|
||||
const otherType = metaType === TorrentType.Movie ? TorrentType.Series : TorrentType.Movie;
|
||||
return this.requestMetadata(`${CINEMETA_URL}/meta/${otherType}/${key}.json`)
|
||||
})
|
||||
.catch((error) => {
|
||||
|
||||
@@ -19,7 +19,7 @@ class ParsingService {
|
||||
return true;
|
||||
}
|
||||
const parsedInfo = parse(torrent.title);
|
||||
if (torrent.type === TorrentType.MOVIE) {
|
||||
if (torrent.type === TorrentType.Movie) {
|
||||
return parsedInfo.complete || typeof parsedInfo.year === 'string' || /movies/i.test(torrent.title);
|
||||
}
|
||||
const hasMultipleEpisodes = parsedInfo.complete ||
|
||||
@@ -86,7 +86,7 @@ class ParsingService {
|
||||
// movie if video explicitly has numbered movie keyword in the name, ie. 1 Movie or Movie 1
|
||||
return true;
|
||||
}
|
||||
if (!hasMovies && type !== TorrentType.ANIME) {
|
||||
if (!hasMovies && type !== TorrentType.Anime) {
|
||||
// not movie if torrent name does not contain movies keyword or is not a pack torrent and is not anime
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -28,18 +28,18 @@ class TorrentFileService {
|
||||
const parsedTorrentName = parse(torrent.title);
|
||||
const query: MetaDataQuery = {
|
||||
id: torrent.kitsuId || torrent.imdbId,
|
||||
type: torrent.type || TorrentType.MOVIE,
|
||||
type: torrent.type || TorrentType.Movie,
|
||||
};
|
||||
const metadata = await metadataService.getMetadata(query)
|
||||
.then(meta => Object.assign({}, meta))
|
||||
.catch(() => undefined);
|
||||
|
||||
if (torrent.type !== TorrentType.ANIME && metadata && metadata.type && metadata.type !== torrent.type) {
|
||||
if (torrent.type !== TorrentType.Anime && metadata && metadata.type && metadata.type !== torrent.type) {
|
||||
// it's actually a movie/series
|
||||
torrent.type = metadata.type;
|
||||
}
|
||||
|
||||
if (torrent.type === TorrentType.MOVIE && (!parsedTorrentName.seasons ||
|
||||
if (torrent.type === TorrentType.Movie && (!parsedTorrentName.seasons ||
|
||||
parsedTorrentName.season === 5 && [1, 5].includes(parsedTorrentName.episode))) {
|
||||
return this.parseMovieFiles(torrent, metadata);
|
||||
}
|
||||
@@ -145,7 +145,7 @@ class TorrentFileService {
|
||||
}
|
||||
|
||||
private async mapSeriesMovie(file: ParsableTorrentFile, torrent: TorrentInfo): Promise<ParsableTorrentFile> {
|
||||
const kitsuId= torrent.type === TorrentType.ANIME ? await this.findMovieKitsuId(file)
|
||||
const kitsuId= torrent.type === TorrentType.Anime ? await this.findMovieKitsuId(file)
|
||||
.then(result => {
|
||||
if (result instanceof Error) {
|
||||
logger.warn(`Failed to retrieve kitsuId due to error: ${result.message}`);
|
||||
@@ -158,7 +158,7 @@ class TorrentFileService {
|
||||
|
||||
const query: MetaDataQuery = {
|
||||
id: kitsuId || imdbId,
|
||||
type: TorrentType.MOVIE
|
||||
type: TorrentType.Movie
|
||||
};
|
||||
|
||||
const metadataOrError = await metadataService.getMetadata(query);
|
||||
@@ -201,7 +201,7 @@ class TorrentFileService {
|
||||
|
||||
this.preprocessEpisodes(files);
|
||||
|
||||
if (torrent.type === TorrentType.ANIME && torrent.kitsuId) {
|
||||
if (torrent.type === TorrentType.Anime && torrent.kitsuId) {
|
||||
if (this.needsCinemetaMetadataForAnime(files, metadata)) {
|
||||
// In some cases anime could be resolved to wrong kitsuId
|
||||
// because of imdb season naming/absolute per series naming/multiple seasons
|
||||
@@ -283,7 +283,7 @@ class TorrentFileService {
|
||||
|
||||
private isAbsoluteEpisodeFiles(torrent: TorrentInfo, files: ParsableTorrentFile[], metadata: MetadataResponse) {
|
||||
const threshold = Math.ceil(files.length / 5);
|
||||
const isAnime = torrent.type === TorrentType.ANIME && torrent.kitsuId;
|
||||
const isAnime = torrent.type === TorrentType.Anime && torrent.kitsuId;
|
||||
const nonMovieEpisodes = files
|
||||
.filter(file => !file.isMovie && file.episodes);
|
||||
const absoluteEpisodes = files
|
||||
@@ -298,7 +298,7 @@ class TorrentFileService {
|
||||
// new episode might not yet been indexed by cinemeta.
|
||||
// detect this if episode number is larger than the last episode or season is larger than the last one
|
||||
// only for non anime metas
|
||||
const isAnime = torrent.type === TorrentType.ANIME && torrent.kitsuId;
|
||||
const isAnime = torrent.type === TorrentType.Anime && torrent.kitsuId;
|
||||
return !isAnime && !file.isMovie && file.episodes && file.season !== 1
|
||||
&& /continuing|current/i.test(metadata.status)
|
||||
&& file.season >= metadata.episodeCount.length
|
||||
@@ -377,7 +377,7 @@ class TorrentFileService {
|
||||
|
||||
private assignKitsuOrImdbEpisodes(torrent: TorrentInfo, files: ParsableTorrentFile[], metadata: MetadataResponse) {
|
||||
if (!metadata || !metadata.videos || !metadata.videos.length) {
|
||||
if (torrent.type === TorrentType.ANIME) {
|
||||
if (torrent.type === TorrentType.Anime) {
|
||||
// assign episodes as kitsu episodes for anime when no metadata available for imdb mapping
|
||||
files
|
||||
.filter(file => file.season && file.episodes)
|
||||
@@ -385,7 +385,7 @@ class TorrentFileService {
|
||||
file.season = undefined;
|
||||
file.episodes = undefined;
|
||||
})
|
||||
if (metadata.type === TorrentType.MOVIE && files.every(file => !file.imdbId)) {
|
||||
if (metadata.type === TorrentType.Movie && files.every(file => !file.imdbId)) {
|
||||
// sometimes a movie has episode naming, thus not recognized as a movie and imdbId not assigned
|
||||
files.forEach(file => file.imdbId = metadata.imdbId);
|
||||
}
|
||||
@@ -507,7 +507,7 @@ class TorrentFileService {
|
||||
const imdbQuery = {
|
||||
title: parsedTitle.title,
|
||||
year: parsedTitle.year,
|
||||
type: TorrentType.MOVIE
|
||||
type: TorrentType.Movie
|
||||
};
|
||||
try {
|
||||
return await metadataService.getImdbId(imdbQuery);
|
||||
@@ -523,7 +523,7 @@ class TorrentFileService {
|
||||
title: parsedTitle.title,
|
||||
year: parsedTitle.year,
|
||||
season: parsedTitle.season,
|
||||
type: TorrentType.MOVIE
|
||||
type: TorrentType.Movie
|
||||
};
|
||||
try {
|
||||
return await metadataService.getKitsuId(kitsuQuery);
|
||||
|
||||
@@ -8,7 +8,7 @@ import {IngestedTorrentAttributes} from "../../repository/interfaces/ingested_to
|
||||
class TorrentProcessingService {
|
||||
public async processTorrentRecord(torrent: IngestedTorrentAttributes): Promise<void> {
|
||||
const { category } = torrent;
|
||||
const type = category === 'tv' ? TorrentType.SERIES : TorrentType.MOVIE;
|
||||
const type = category === 'tv' ? TorrentType.Series : TorrentType.Movie;
|
||||
const torrentInfo: TorrentInfo = await this.parseTorrent(torrent, type);
|
||||
|
||||
logger.info(`Processing torrent ${torrentInfo.title} with infoHash ${torrentInfo.infoHash}`);
|
||||
|
||||
@@ -11,7 +11,7 @@ import {logger} from './services/logging_service';
|
||||
export async function createTorrentEntry(torrent, overwrite = false) {
|
||||
const titleInfo = parse(torrent.title);
|
||||
|
||||
if (!torrent.imdbId && torrent.type !== TorrentType.ANIME) {
|
||||
if (!torrent.imdbId && torrent.type !== TorrentType.Anime) {
|
||||
const imdbQuery = {
|
||||
title: titleInfo.title,
|
||||
year: titleInfo.year,
|
||||
@@ -28,7 +28,7 @@ export async function createTorrentEntry(torrent, overwrite = false) {
|
||||
// sanitize imdbId from redundant zeros
|
||||
torrent.imdbId = torrent.imdbId.replace(/tt0+([0-9]{7,})$/, 'tt$1');
|
||||
}
|
||||
if (!torrent.kitsuId && torrent.type === TorrentType.ANIME) {
|
||||
if (!torrent.kitsuId && torrent.type === TorrentType.Anime) {
|
||||
const kitsuQuery = {
|
||||
title: titleInfo.title,
|
||||
year: titleInfo.year,
|
||||
|
||||
@@ -10,7 +10,7 @@ export interface IngestedTorrentAttributes {
|
||||
leechers: number;
|
||||
imdb: string;
|
||||
processed: boolean;
|
||||
createdAt: Date;
|
||||
createdAt?: Date;
|
||||
}
|
||||
|
||||
export interface IngestedTorrentCreationAttributes extends Optional<IngestedTorrentAttributes, 'processed'> {
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
"compilerOptions": {
|
||||
"baseUrl": "./src",
|
||||
"checkJs": true,
|
||||
"allowJs": true,
|
||||
"isolatedModules": true,
|
||||
"lib": ["ESNext"],
|
||||
"module": "ESNext",
|
||||
|
||||
Reference in New Issue
Block a user