Torrent processing orchestrator now typescript too
Will start to tackle some of the other services after work tomorrow.
This commit is contained in:
10
src/node/consumer/package-lock.json
generated
10
src/node/consumer/package-lock.json
generated
@@ -30,6 +30,7 @@
|
||||
"user-agents": "^1.0.1444"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/amqplib": "^0.10.4",
|
||||
"@types/node": "^20.11.16",
|
||||
"@types/stremio-addon-sdk": "^1.6.10",
|
||||
"@types/validator": "^13.11.8",
|
||||
@@ -564,6 +565,15 @@
|
||||
"url": "https://github.com/sponsors/tirke"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/amqplib": {
|
||||
"version": "0.10.4",
|
||||
"resolved": "https://registry.npmjs.org/@types/amqplib/-/amqplib-0.10.4.tgz",
|
||||
"integrity": "sha512-Y5Sqquh/LqDxSgxYaAAFNM0M7GyONtSDCcFMJk+DQwYEjibPyW6y+Yu9H9omdkKc3epyXULmFN3GTaeBHhn2Hg==",
|
||||
"dev": true,
|
||||
"dependencies": {
|
||||
"@types/node": "*"
|
||||
}
|
||||
},
|
||||
"node_modules/@types/debug": {
|
||||
"version": "4.1.12",
|
||||
"license": "MIT",
|
||||
|
||||
@@ -31,6 +31,7 @@
|
||||
"user-agents": "^1.0.1444"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/amqplib": "^0.10.4",
|
||||
"@types/node": "^20.11.16",
|
||||
"@types/stremio-addon-sdk": "^1.6.10",
|
||||
"@types/validator": "^13.11.8",
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { listenToQueue } from './jobs/processTorrents.js';
|
||||
import { listenToQueue } from './jobs/processTorrents';
|
||||
import { repository } from "./repository/database_repository";
|
||||
import { getTrackers } from "./lib/trackerService.js";
|
||||
|
||||
|
||||
@@ -1,37 +0,0 @@
|
||||
import amqp from 'amqplib'
|
||||
import { rabbitConfig, jobConfig } from '../lib/config.js'
|
||||
import { processTorrentRecord } from "../lib/ingestedTorrent.js";
|
||||
import {logger} from "../lib/logger";
|
||||
|
||||
const assertQueueOptions = { durable: true }
|
||||
const consumeQueueOptions = { noAck: false }
|
||||
|
||||
const processMessage = msg => processTorrentRecord(getMessageAsJson(msg));
|
||||
|
||||
const getMessageAsJson = msg =>
|
||||
JSON.parse(msg.content.toString()).message;
|
||||
|
||||
const assertAndConsumeQueue = async channel => {
|
||||
logger.info('Worker is running! Waiting for new torrents...')
|
||||
|
||||
const ackMsg = msg =>
|
||||
processMessage(msg)
|
||||
.then(() => channel.ack(msg))
|
||||
.catch(error => logger.error('Failed processing torrent', error));
|
||||
|
||||
channel.assertQueue(rabbitConfig.QUEUE_NAME, assertQueueOptions)
|
||||
.then(() => channel.prefetch(jobConfig.JOB_CONCURRENCY))
|
||||
.then(() => channel.consume(rabbitConfig.QUEUE_NAME, ackMsg, consumeQueueOptions))
|
||||
.catch(error => logger.error('Failed to setup channel', error));
|
||||
}
|
||||
|
||||
export const listenToQueue = async () => {
|
||||
if (!jobConfig.JOBS_ENABLED) {
|
||||
return;
|
||||
}
|
||||
|
||||
return amqp.connect(rabbitConfig.URI)
|
||||
.then(connection => connection.createChannel())
|
||||
.then(channel => assertAndConsumeQueue(channel))
|
||||
.catch(error => logger.error('Failed to connect and setup channel', error));
|
||||
};
|
||||
56
src/node/consumer/src/jobs/processTorrents.ts
Normal file
56
src/node/consumer/src/jobs/processTorrents.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
import client, {Channel, Connection, ConsumeMessage, Options} from 'amqplib'
|
||||
import {jobConfig, rabbitConfig} from '../lib/config.js';
|
||||
import {processTorrentRecord} from '../lib/torrent_processor';
|
||||
import {logger} from '../lib/logger';
|
||||
import {IngestedRabbitMessage, IngestedRabbitTorrent} from "../lib/interfaces/ingested_rabbit_message";
|
||||
import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes";
|
||||
|
||||
const assertQueueOptions: Options.AssertQueue = { durable: true };
|
||||
const consumeQueueOptions: Options.Consume = { noAck: false };
|
||||
|
||||
const processMessage = (msg: ConsumeMessage | null): Promise<void> => {
|
||||
const ingestedTorrent: IngestedTorrentAttributes = getMessageAsJson(msg);
|
||||
return processTorrentRecord(ingestedTorrent);
|
||||
};
|
||||
|
||||
const getMessageAsJson = (msg: ConsumeMessage | null): IngestedTorrentAttributes => {
|
||||
const content = msg ? msg?.content.toString('utf8') : "{}";
|
||||
const receivedObject: IngestedRabbitMessage = JSON.parse(content) as IngestedRabbitMessage;
|
||||
const receivedTorrent:IngestedRabbitTorrent = receivedObject.message;
|
||||
const mappedObject: any = {...receivedTorrent, info_hash: receivedTorrent.infoHash};
|
||||
delete mappedObject.infoHash;
|
||||
|
||||
return mappedObject as IngestedTorrentAttributes;
|
||||
};
|
||||
|
||||
const assertAndConsumeQueue = async (channel: Channel): Promise<void> => {
|
||||
logger.info('Worker is running! Waiting for new torrents...');
|
||||
|
||||
const ackMsg = (msg: ConsumeMessage): void => {
|
||||
processMessage(msg)
|
||||
.then(() => channel.ack(msg))
|
||||
.catch((error: Error) => logger.error('Failed processing torrent', error));
|
||||
}
|
||||
|
||||
try {
|
||||
await channel.assertQueue(rabbitConfig.QUEUE_NAME, assertQueueOptions);
|
||||
await channel.prefetch(jobConfig.JOB_CONCURRENCY);
|
||||
await channel.consume(rabbitConfig.QUEUE_NAME, ackMsg, consumeQueueOptions);
|
||||
} catch(error) {
|
||||
logger.error('Failed to setup channel', error);
|
||||
}
|
||||
}
|
||||
|
||||
export const listenToQueue = async (): Promise<void> => {
|
||||
if (!jobConfig.JOBS_ENABLED) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
const connection: Connection = await client.connect(rabbitConfig.URI);
|
||||
const channel: Channel = await connection.createChannel();
|
||||
await assertAndConsumeQueue(channel);
|
||||
} catch (error) {
|
||||
logger.error('Failed to connect and setup channel', error);
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,7 @@ import { createCache, memoryStore} from 'cache-manager';
|
||||
import { mongoDbStore } from '@tirke/node-cache-manager-mongodb'
|
||||
import { cacheConfig } from './config.js';
|
||||
import { logger } from './logger';
|
||||
import { CacheType } from "./types.js";
|
||||
import { CacheType } from "./enums/cache_types";
|
||||
|
||||
const GLOBAL_KEY_PREFIX = 'knightcrawler-consumer';
|
||||
const IMDB_ID_PREFIX = `${GLOBAL_KEY_PREFIX}|imdb_id`;
|
||||
|
||||
4
src/node/consumer/src/lib/enums/cache_types.ts
Normal file
4
src/node/consumer/src/lib/enums/cache_types.ts
Normal file
@@ -0,0 +1,4 @@
|
||||
export enum CacheType {
|
||||
MEMORY = 'memory',
|
||||
MONGODB = 'mongodb'
|
||||
}
|
||||
6
src/node/consumer/src/lib/enums/torrent_types.ts
Normal file
6
src/node/consumer/src/lib/enums/torrent_types.ts
Normal file
@@ -0,0 +1,6 @@
|
||||
export enum TorrentType {
|
||||
SERIES = 'SERIES',
|
||||
MOVIE = 'MOVIE',
|
||||
ANIME = 'anime',
|
||||
PORN = 'xxx',
|
||||
}
|
||||
@@ -1,46 +0,0 @@
|
||||
import { createTorrentEntry, checkAndUpdateTorrent } from './torrentEntries.js';
|
||||
import {getTrackers} from "./trackerService.js";
|
||||
import { TorrentType } from './types.js';
|
||||
import {logger} from "./logger";
|
||||
|
||||
export async function processTorrentRecord(torrent) {
|
||||
const {category} = torrent;
|
||||
const type = category === 'tv' ? TorrentType.SERIES : TorrentType.MOVIE;
|
||||
const torrentInfo = await parseTorrent(torrent, type);
|
||||
logger.info(`Processing torrent ${torrentInfo.title} with infoHash ${torrentInfo.infoHash}`)
|
||||
|
||||
if (await checkAndUpdateTorrent(torrentInfo)) {
|
||||
return torrentInfo;
|
||||
}
|
||||
|
||||
return createTorrentEntry(torrentInfo);
|
||||
}
|
||||
|
||||
async function assignTorrentTrackers() {
|
||||
const trackers = await getTrackers();
|
||||
return trackers.join(',');
|
||||
}
|
||||
|
||||
async function parseTorrent(torrent, category) {
|
||||
const infoHash = torrent.infoHash?.trim().toLowerCase()
|
||||
return {
|
||||
title: torrent.name,
|
||||
torrentId: `${torrent.name}_${infoHash}`,
|
||||
infoHash: infoHash,
|
||||
seeders: 100,
|
||||
size: torrent.size,
|
||||
uploadDate: torrent.createdAt,
|
||||
imdbId: parseImdbId(torrent),
|
||||
type: category,
|
||||
provider: torrent.source,
|
||||
trackers: await assignTorrentTrackers(),
|
||||
}
|
||||
}
|
||||
|
||||
function parseImdbId(torrent) {
|
||||
if (torrent.imdb === undefined || torrent.imdb === null) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return torrent.imdb;
|
||||
}
|
||||
@@ -0,0 +1,15 @@
|
||||
export interface IngestedRabbitTorrent {
|
||||
name: string;
|
||||
source: string;
|
||||
category: string;
|
||||
infoHash: string;
|
||||
size: string;
|
||||
seeders: number;
|
||||
leechers: number;
|
||||
imdb: string;
|
||||
processed: boolean;
|
||||
}
|
||||
|
||||
export interface IngestedRabbitMessage {
|
||||
message: IngestedRabbitTorrent;
|
||||
}
|
||||
12
src/node/consumer/src/lib/interfaces/torrent_info.ts
Normal file
12
src/node/consumer/src/lib/interfaces/torrent_info.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
export interface TorrentInfo {
|
||||
title: string | null;
|
||||
torrentId: string;
|
||||
infoHash: string | null;
|
||||
seeders: number;
|
||||
size: string | null;
|
||||
uploadDate: Date;
|
||||
imdbId: string | undefined;
|
||||
type: string;
|
||||
provider: string | null;
|
||||
trackers: string;
|
||||
}
|
||||
@@ -2,7 +2,7 @@ import axios from 'axios';
|
||||
import { search } from 'google-sr';
|
||||
import nameToImdb from 'name-to-imdb';
|
||||
import { cacheWrapImdbId, cacheWrapKitsuId, cacheWrapMetadata } from './cache.js';
|
||||
import { TorrentType } from './types.js';
|
||||
import { TorrentType } from './enums/torrent_types';
|
||||
|
||||
const CINEMETA_URL = 'https://v3-cinemeta.strem.io';
|
||||
const KITSU_URL = 'https://anime-kitsu.strem.fun';
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { parse } from 'parse-torrent-title';
|
||||
import { TorrentType } from './types.js';
|
||||
import { TorrentType } from './enums/torrent_types';
|
||||
|
||||
const MULTIPLE_FILES_SIZE = 4 * 1024 * 1024 * 1024; // 4 GB
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import * as Promises from './promises.js';
|
||||
import { repository } from '../repository/database_repository';
|
||||
import { parseTorrentFiles } from './torrentFiles.js';
|
||||
import { assignSubtitles } from './torrentSubtitles.js';
|
||||
import { TorrentType } from './types.js';
|
||||
import { TorrentType } from './enums/torrent_types';
|
||||
import {logger} from "./logger";
|
||||
|
||||
export async function createTorrentEntry(torrent, overwrite = false) {
|
||||
|
||||
@@ -8,7 +8,7 @@ import { getMetadata, getImdbId, getKitsuId } from './metadata.js';
|
||||
import { parseSeriesVideos, isPackTorrent } from './parseHelper.js';
|
||||
import * as Promises from './promises.js';
|
||||
import {torrentFiles} from "./torrent.js";
|
||||
import { TorrentType } from './types.js';
|
||||
import { TorrentType } from './enums/torrent_types';
|
||||
import {logger} from "./logger";
|
||||
|
||||
const MIN_SIZE = 5 * 1024 * 1024; // 5 MB
|
||||
|
||||
49
src/node/consumer/src/lib/torrent_processor.ts
Normal file
49
src/node/consumer/src/lib/torrent_processor.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import {TorrentInfo} from "./interfaces/torrent_info";
|
||||
import {TorrentType} from "./enums/torrent_types";
|
||||
import {logger} from "./logger";
|
||||
import {checkAndUpdateTorrent, createTorrentEntry} from "./torrentEntries.js";
|
||||
import {getTrackers} from "./trackerService.js";
|
||||
import {IngestedTorrentAttributes} from "../repository/interfaces/ingested_torrent_attributes";
|
||||
|
||||
export async function processTorrentRecord(torrent: IngestedTorrentAttributes): Promise<void> {
|
||||
const { category } = torrent;
|
||||
const type = category === 'tv' ? TorrentType.SERIES : TorrentType.MOVIE;
|
||||
const torrentInfo: TorrentInfo = await parseTorrent(torrent, type);
|
||||
|
||||
logger.info(`Processing torrent ${torrentInfo.title} with infoHash ${torrentInfo.infoHash}`);
|
||||
|
||||
if (await checkAndUpdateTorrent(torrentInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
return createTorrentEntry(torrentInfo);
|
||||
}
|
||||
|
||||
async function assignTorrentTrackers(): Promise<string> {
|
||||
const trackers = await getTrackers();
|
||||
return trackers.join(',');
|
||||
}
|
||||
|
||||
async function parseTorrent(torrent: IngestedTorrentAttributes, category: string): Promise<TorrentInfo> {
|
||||
const infoHash = torrent.info_hash?.trim().toLowerCase()
|
||||
return {
|
||||
title: torrent.name,
|
||||
torrentId: `${torrent.name}_${infoHash}`,
|
||||
infoHash: infoHash,
|
||||
seeders: 100,
|
||||
size: torrent.size,
|
||||
uploadDate: torrent.createdAt,
|
||||
imdbId: parseImdbId(torrent),
|
||||
type: category,
|
||||
provider: torrent.source,
|
||||
trackers: await assignTorrentTrackers(),
|
||||
}
|
||||
}
|
||||
|
||||
function parseImdbId(torrent: IngestedTorrentAttributes): string | undefined {
|
||||
if (torrent.imdb === undefined || torrent.imdb === null) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return torrent.imdb;
|
||||
}
|
||||
@@ -1,11 +0,0 @@
|
||||
export const TorrentType = {
|
||||
MOVIE: 'movie',
|
||||
SERIES: 'series',
|
||||
ANIME: 'anime',
|
||||
PORN: 'xxx',
|
||||
};
|
||||
|
||||
export const CacheType = {
|
||||
MEMORY: 'memory',
|
||||
MONGODB: 'mongodb',
|
||||
};
|
||||
@@ -10,6 +10,7 @@ export interface IngestedTorrentAttributes {
|
||||
leechers: number;
|
||||
imdb: string;
|
||||
processed: boolean;
|
||||
createdAt: Date;
|
||||
}
|
||||
|
||||
export interface IngestedTorrentCreationAttributes extends Optional<IngestedTorrentAttributes, 'processed'> {
|
||||
|
||||
Reference in New Issue
Block a user