3 Commits

Author SHA1 Message Date
iPromKnight
3c8ffd5082 Fix Duplicates (#199)
* Fix Duplicates

* Version
2024-04-02 20:31:22 +01:00
iPromKnight
79e0a0f102 DMM Offline (#198)
* Process DMM all locally

single call to github to download the repo archive.
remove need for PAT
update RTN to 0.2.13
change to batch_parse for title parsing from RTN

* introduce concurrent dictionary, and parallelism
2024-04-02 17:01:22 +01:00
purple_emily
6181207513 Fix incorrect file index stored (#197)
* Fix incorrect file index stored

* Update `rank-torrent-name` to latest version

* Knight Crawler version update
2024-04-01 23:08:32 +01:00
31 changed files with 1550 additions and 1316 deletions

View File

@@ -94,7 +94,7 @@ services:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
hostname: knightcrawler-addon hostname: knightcrawler-addon
image: gabisonfire/knightcrawler-addon:2.0.18 image: gabisonfire/knightcrawler-addon:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -117,7 +117,7 @@ services:
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-consumer:2.0.18 image: gabisonfire/knightcrawler-consumer:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -138,7 +138,7 @@ services:
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-debrid-collector:2.0.18 image: gabisonfire/knightcrawler-debrid-collector:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -152,7 +152,7 @@ services:
migrator: migrator:
condition: service_completed_successfully condition: service_completed_successfully
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-metadata:2.0.18 image: gabisonfire/knightcrawler-metadata:2.0.21
networks: networks:
- knightcrawler-network - knightcrawler-network
restart: "no" restart: "no"
@@ -163,7 +163,7 @@ services:
postgres: postgres:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-migrator:2.0.18 image: gabisonfire/knightcrawler-migrator:2.0.21
networks: networks:
- knightcrawler-network - knightcrawler-network
restart: "no" restart: "no"
@@ -182,7 +182,7 @@ services:
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-producer:2.0.18 image: gabisonfire/knightcrawler-producer:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -207,7 +207,7 @@ services:
deploy: deploy:
replicas: ${QBIT_REPLICAS:-0} replicas: ${QBIT_REPLICAS:-0}
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-qbit-collector:2.0.18 image: gabisonfire/knightcrawler-qbit-collector:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:

View File

@@ -20,7 +20,7 @@ x-depends: &knightcrawler-app-depends
services: services:
metadata: metadata:
image: gabisonfire/knightcrawler-metadata:2.0.18 image: gabisonfire/knightcrawler-metadata:2.0.21
env_file: ../../.env env_file: ../../.env
networks: networks:
- knightcrawler-network - knightcrawler-network
@@ -30,7 +30,7 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
migrator: migrator:
image: gabisonfire/knightcrawler-migrator:2.0.18 image: gabisonfire/knightcrawler-migrator:2.0.21
env_file: ../../.env env_file: ../../.env
networks: networks:
- knightcrawler-network - knightcrawler-network
@@ -40,7 +40,7 @@ services:
condition: service_healthy condition: service_healthy
addon: addon:
image: gabisonfire/knightcrawler-addon:2.0.18 image: gabisonfire/knightcrawler-addon:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
hostname: knightcrawler-addon hostname: knightcrawler-addon
@@ -48,22 +48,22 @@ services:
- "7000:7000" - "7000:7000"
consumer: consumer:
image: gabisonfire/knightcrawler-consumer:2.0.18 image: gabisonfire/knightcrawler-consumer:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
debridcollector: debridcollector:
image: gabisonfire/knightcrawler-debrid-collector:2.0.18 image: gabisonfire/knightcrawler-debrid-collector:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
producer: producer:
image: gabisonfire/knightcrawler-producer:2.0.18 image: gabisonfire/knightcrawler-producer:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
qbitcollector: qbitcollector:
image: gabisonfire/knightcrawler-qbit-collector:2.0.18 image: gabisonfire/knightcrawler-qbit-collector:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
depends_on: depends_on:

View File

@@ -38,6 +38,3 @@ QBIT_REPLICAS=0
# Addon # Addon
DEBUG_MODE=false DEBUG_MODE=false
# Producer
GITHUB_PAT=

View File

@@ -27,7 +27,7 @@ export async function getCachedStreams(streams, apiKey) {
const fileName = streamTitleParts[streamTitleParts.length - 1]; const fileName = streamTitleParts[streamTitleParts.length - 1];
const fileIndex = streamTitleParts.length === 2 ? stream.fileIdx : null; const fileIndex = streamTitleParts.length === 2 ? stream.fileIdx : null;
const encodedFileName = encodeURIComponent(fileName); const encodedFileName = encodeURIComponent(fileName);
mochStreams[`${stream.infoHash}@${stream.fileIdx}`] = { mochStreams[stream.infoHash] = {
url: `${apiKey}/${stream.infoHash}/${encodedFileName}/${fileIndex}`, url: `${apiKey}/${stream.infoHash}/${encodedFileName}/${fileIndex}`,
cached: cachedEntry?.instant cached: cachedEntry?.instant
} }
@@ -142,10 +142,10 @@ async function _createTorrent(AD, infoHash) {
async function _unrestrictLink(AD, torrent, encodedFileName, fileIndex) { async function _unrestrictLink(AD, torrent, encodedFileName, fileIndex) {
const targetFileName = decodeURIComponent(encodedFileName); const targetFileName = decodeURIComponent(encodedFileName);
const videos = torrent.links.filter(link => isVideo(link.filename)).sort((a, b) => b.size - a.size); const videos = torrent.links.filter(link => isVideo(link.filename));
const targetVideo = Number.isInteger(fileIndex) const targetVideo = Number.isInteger(fileIndex)
? videos.find(video => sameFilename(targetFileName, video.filename)) ? videos.find(video => sameFilename(targetFileName, video.filename))
: videos[0]; : videos.sort((a, b) => b.size - a.size)[0];
if (!targetVideo && torrent.links.every(link => isArchive(link.filename))) { if (!targetVideo && torrent.links.every(link => isArchive(link.filename))) {
console.log(`Only AllDebrid archive is available for [${torrent.hash}] ${encodedFileName}`) console.log(`Only AllDebrid archive is available for [${torrent.hash}] ${encodedFileName}`)

View File

@@ -25,7 +25,7 @@ export async function getCachedStreams(streams, apiKey) {
return available && streams return available && streams
.reduce((mochStreams, stream) => { .reduce((mochStreams, stream) => {
const cachedEntry = available[stream.infoHash]; const cachedEntry = available[stream.infoHash];
mochStreams[`${stream.infoHash}@${stream.fileIdx}`] = { mochStreams[stream.infoHash] = {
url: `${apiKey}/${stream.infoHash}/null/${stream.fileIdx}`, url: `${apiKey}/${stream.infoHash}/null/${stream.fileIdx}`,
cached: !!cachedEntry cached: !!cachedEntry
}; };

View File

@@ -173,7 +173,7 @@ function processMochResults(streams, config, results) {
function populateCachedLinks(streams, mochResult, config) { function populateCachedLinks(streams, mochResult, config) {
return streams.map(stream => { return streams.map(stream => {
const cachedEntry = stream.infoHash && mochResult.mochStreams[`${stream.infoHash}@${stream.fileIdx}`]; const cachedEntry = stream.infoHash && mochResult.mochStreams[stream.infoHash];
if (cachedEntry?.cached) { if (cachedEntry?.cached) {
return { return {
name: `[${mochResult.moch.shortName}+] ${stream.name}`, name: `[${mochResult.moch.shortName}+] ${stream.name}`,
@@ -190,7 +190,7 @@ function populateDownloadLinks(streams, mochResults, config) {
const torrentStreams = streams.filter(stream => stream.infoHash); const torrentStreams = streams.filter(stream => stream.infoHash);
const seededStreams = streams.filter(stream => !stream.title.includes('👤 0')); const seededStreams = streams.filter(stream => !stream.title.includes('👤 0'));
torrentStreams.forEach(stream => mochResults.forEach(mochResult => { torrentStreams.forEach(stream => mochResults.forEach(mochResult => {
const cachedEntry = mochResult.mochStreams[`${stream.infoHash}@${stream.fileIdx}`]; const cachedEntry = mochResult.mochStreams[stream.infoHash];
const isCached = cachedEntry?.cached; const isCached = cachedEntry?.cached;
if (!isCached && isHealthyStreamForDebrid(seededStreams, stream)) { if (!isCached && isHealthyStreamForDebrid(seededStreams, stream)) {
streams.push({ streams.push({

View File

@@ -27,9 +27,10 @@ export async function getCachedStreams(streams, apiKey) {
const isCached = available.includes(stream.infoHash); const isCached = available.includes(stream.infoHash);
const streamTitleParts = stream.title.replace(/\n👤.*/s, '').split('\n'); const streamTitleParts = stream.title.replace(/\n👤.*/s, '').split('\n');
const fileName = streamTitleParts[streamTitleParts.length - 1]; const fileName = streamTitleParts[streamTitleParts.length - 1];
const fileIndex = streamTitleParts.length === 2 ? stream.fileIdx : null;
const encodedFileName = encodeURIComponent(fileName); const encodedFileName = encodeURIComponent(fileName);
mochStreams[`${stream.infoHash}@${stream.fileIdx}`] = { mochStreams[stream.infoHash] = {
url: `${apiKey}/${stream.infoHash}/${encodedFileName}/${stream.fileIdx}`, url: `${apiKey}/${stream.infoHash}/${encodedFileName}/${fileIndex}`,
cached: isCached cached: isCached
}; };
return mochStreams; return mochStreams;
@@ -136,8 +137,7 @@ async function _retryCreateTorrent(OC, infoHash, cachedEntryInfo, fileIndex) {
async function _unrestrictLink(OC, infoHash, torrent, cachedEntryInfo, fileIndex) { async function _unrestrictLink(OC, infoHash, torrent, cachedEntryInfo, fileIndex) {
const targetFileName = decodeURIComponent(cachedEntryInfo); const targetFileName = decodeURIComponent(cachedEntryInfo);
const files = await _getFileUrls(OC, torrent) const files = await _getFileUrls(OC, torrent)
const targetFile = files.find(file => file.includes(`/${torrent.requestId}/${fileIndex}/`)) const targetFile = files.find(file => sameFilename(targetFileName, file.split('/').pop()))
|| files.find(file => sameFilename(targetFileName, file.split('/').pop()))
|| files.find(file => isVideo(file)) || files.find(file => isVideo(file))
|| files.pop(); || files.pop();

View File

@@ -124,10 +124,10 @@ async function _getCachedLink(PM, infoHash, encodedFileName, fileIndex, ip, isBr
const cachedTorrent = await PM.transfer.directDownload(magnet.encode({ infoHash }), ip); const cachedTorrent = await PM.transfer.directDownload(magnet.encode({ infoHash }), ip);
if (cachedTorrent?.content?.length) { if (cachedTorrent?.content?.length) {
const targetFileName = decodeURIComponent(encodedFileName); const targetFileName = decodeURIComponent(encodedFileName);
const videos = cachedTorrent.content.filter(file => isVideo(file.path)).sort((a, b) => b.size - a.size); const videos = cachedTorrent.content.filter(file => isVideo(file.path));
const targetVideo = Number.isInteger(fileIndex) const targetVideo = Number.isInteger(fileIndex)
? videos.find(video => sameFilename(video.path, targetFileName)) ? videos.find(video => sameFilename(video.path, targetFileName))
: videos[0]; : videos.sort((a, b) => b.size - a.size)[0];
if (!targetVideo && videos.every(video => isArchive(video.path))) { if (!targetVideo && videos.every(video => isArchive(video.path))) {
console.log(`Only Premiumize archive is available for [${infoHash}] ${fileIndex}`) console.log(`Only Premiumize archive is available for [${infoHash}] ${fileIndex}`)
return StaticResponse.FAILED_RAR; return StaticResponse.FAILED_RAR;

View File

@@ -17,7 +17,7 @@ export async function getCachedStreams(streams, apiKey) {
const fileName = streamTitleParts[streamTitleParts.length - 1]; const fileName = streamTitleParts[streamTitleParts.length - 1];
const fileIndex = streamTitleParts.length === 2 ? stream.fileIdx : null; const fileIndex = streamTitleParts.length === 2 ? stream.fileIdx : null;
const encodedFileName = encodeURIComponent(fileName); const encodedFileName = encodeURIComponent(fileName);
mochStreams[`${stream.infoHash}@${stream.fileIdx}`] = { mochStreams[stream.infoHash] = {
url: `${apiKey}/${stream.infoHash}/${encodedFileName}/${fileIndex}`, url: `${apiKey}/${stream.infoHash}/${encodedFileName}/${fileIndex}`,
cached: false cached: false
}; };
@@ -168,12 +168,12 @@ async function _getTargetFile(Putio, torrent, encodedFileName, fileIndex) {
while (!targetFile && files.length) { while (!targetFile && files.length) {
const folders = files.filter(file => file.file_type === 'FOLDER'); const folders = files.filter(file => file.file_type === 'FOLDER');
videos = videos.concat(files.filter(file => isVideo(file.name))).sort((a, b) => b.size - a.size); videos = videos.concat(files.filter(file => isVideo(file.name)));
// when specific file index is defined search by filename // when specific file index is defined search by filename
// when it's not defined find all videos and take the largest one // when it's not defined find all videos and take the largest one
targetFile = Number.isInteger(fileIndex) targetFile = Number.isInteger(fileIndex)
? videos.find(video => sameFilename(targetFileName, video.name)) ? videos.find(video => sameFilename(targetFileName, video.name))
: !folders.length && videos[0]; : !folders.length && videos.sort((a, b) => b.size - a.size)[0];
files = !targetFile files = !targetFile
? await Promise.all(folders.map(folder => _getFiles(Putio, folder.id))) ? await Promise.all(folders.map(folder => _getFiles(Putio, folder.id)))
.then(results => results.reduce((a, b) => a.concat(b), [])) .then(results => results.reduce((a, b) => a.concat(b), []))

View File

@@ -21,7 +21,7 @@ export async function getCachedStreams(streams, apiKey) {
.reduce((mochStreams, stream) => { .reduce((mochStreams, stream) => {
const cachedEntry = available[stream.infoHash]; const cachedEntry = available[stream.infoHash];
const cachedIds = _getCachedFileIds(stream.fileIdx, cachedEntry); const cachedIds = _getCachedFileIds(stream.fileIdx, cachedEntry);
mochStreams[`${stream.infoHash}@${stream.fileIdx}`] = { mochStreams[stream.infoHash] = {
url: `${apiKey}/${stream.infoHash}/null/${stream.fileIdx}`, url: `${apiKey}/${stream.infoHash}/null/${stream.fileIdx}`,
cached: !!cachedIds.length cached: !!cachedIds.length
}; };
@@ -395,5 +395,5 @@ function infringingFile(error) {
} }
async function getDefaultOptions(ip) { async function getDefaultOptions(ip) {
return { ip, timeout: 15000 }; return { ip, timeout: 10000 };
} }

View File

@@ -16,13 +16,14 @@ public static class DebridMetaToTorrentMeta
foreach (var metadataEntry in Metadata.Where(m => Filetypes.VideoFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext)))) foreach (var metadataEntry in Metadata.Where(m => Filetypes.VideoFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext))))
{ {
var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex); var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex);
var fileIndexMinusOne = Math.Max(0, fileIndex - 1);
var file = new TorrentFile var file = new TorrentFile
{ {
ImdbId = ImdbId, ImdbId = ImdbId,
KitsuId = 0, KitsuId = 0,
InfoHash = torrent.InfoHash, InfoHash = torrent.InfoHash,
FileIndex = validFileIndex ? fileIndex : 0, FileIndex = validFileIndex ? fileIndexMinusOne : 0,
Title = metadataEntry.Value.Filename, Title = metadataEntry.Value.Filename,
Size = metadataEntry.Value.Filesize.GetValueOrDefault(), Size = metadataEntry.Value.Filesize.GetValueOrDefault(),
}; };
@@ -66,13 +67,14 @@ public static class DebridMetaToTorrentMeta
foreach (var metadataEntry in Metadata.Where(m => Filetypes.SubtitleFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext)))) foreach (var metadataEntry in Metadata.Where(m => Filetypes.SubtitleFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext))))
{ {
var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex); var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex);
var fileIndexMinusOne = Math.Max(0, fileIndex - 1);
var fileId = torrentFiles.FirstOrDefault( var fileId = torrentFiles.FirstOrDefault(
t => Path.GetFileNameWithoutExtension(t.Title) == Path.GetFileNameWithoutExtension(metadataEntry.Value.Filename))?.Id ?? 0; t => Path.GetFileNameWithoutExtension(t.Title) == Path.GetFileNameWithoutExtension(metadataEntry.Value.Filename))?.Id ?? 0;
var file = new SubtitleFile var file = new SubtitleFile
{ {
InfoHash = InfoHash, InfoHash = InfoHash,
FileIndex = validFileIndex ? fileIndex : 0, FileIndex = validFileIndex ? fileIndexMinusOne : 0,
FileId = fileId, FileId = fileId,
Title = metadataEntry.Value.Filename, Title = metadataEntry.Value.Filename,
}; };

View File

@@ -1 +1 @@
rank-torrent-name==0.2.5 rank-torrent-name==0.2.13

View File

@@ -0,0 +1,43 @@
-- Drop Duplicate Files in Files Table
DELETE FROM public.files
WHERE id NOT IN (
SELECT MAX(id)
FROM public.files
GROUP BY "infoHash", "fileIndex"
);
-- Add Index to files table
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'files_unique_infohash_fileindex'
) THEN
ALTER TABLE public.files
ADD CONSTRAINT files_unique_infohash_fileindex UNIQUE ("infoHash", "fileIndex");
END IF;
END $$;
-- Drop Duplicate subtitles in Subtitles Table
DELETE FROM public.subtitles
WHERE id NOT IN (
SELECT MAX(id)
FROM public.subtitles
GROUP BY "infoHash", "fileIndex"
);
-- Add Index to subtitles table
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'subtitles_unique_infohash_fileindex'
) THEN
ALTER TABLE public.subtitles
ADD CONSTRAINT subtitles_unique_infohash_fileindex UNIQUE ("infoHash", "fileIndex");
END IF;
END $$;

View File

@@ -28,7 +28,7 @@
}, },
{ {
"Name": "SyncDmmJob", "Name": "SyncDmmJob",
"IntervalSeconds": 1800, "IntervalSeconds": 10800,
"Enabled": true "Enabled": true
}, },
{ {

View File

@@ -0,0 +1,70 @@
namespace Producer.Features.Crawlers.Dmm;
public class DMMFileDownloader(HttpClient client, ILogger<DMMFileDownloader> logger) : IDMMFileDownloader
{
private const string Filename = "main.zip";
private readonly IReadOnlyCollection<string> _filesToIgnore = [
"index.html",
"404.html",
"dedupe.sh",
"CNAME",
];
public const string ClientName = "DmmFileDownloader";
public async Task<string> DownloadFileToTempPath(CancellationToken cancellationToken)
{
logger.LogInformation("Downloading DMM Hashlists");
var response = await client.GetAsync(Filename, cancellationToken);
var tempDirectory = Path.Combine(Path.GetTempPath(), "DMMHashlists");
EnsureDirectoryIsClean(tempDirectory);
response.EnsureSuccessStatusCode();
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
using var archive = new ZipArchive(stream);
logger.LogInformation("Extracting DMM Hashlists to {TempDirectory}", tempDirectory);
foreach (var entry in archive.Entries)
{
var entryPath = Path.Combine(tempDirectory, Path.GetFileName(entry.FullName));
if (!entry.FullName.EndsWith('/')) // It's a file
{
entry.ExtractToFile(entryPath, true);
}
}
foreach (var file in _filesToIgnore)
{
CleanRepoExtras(tempDirectory, file);
}
logger.LogInformation("Downloaded and extracted Repository to {TempDirectory}", tempDirectory);
return tempDirectory;
}
private static void CleanRepoExtras(string tempDirectory, string fileName)
{
var repoIndex = Path.Combine(tempDirectory, fileName);
if (File.Exists(repoIndex))
{
File.Delete(repoIndex);
}
}
private static void EnsureDirectoryIsClean(string tempDirectory)
{
if (Directory.Exists(tempDirectory))
{
Directory.Delete(tempDirectory, true);
}
Directory.CreateDirectory(tempDirectory);
}
}

View File

@@ -0,0 +1,6 @@
namespace Producer.Features.Crawlers.Dmm;
public class DMMHttpClient
{
}

View File

@@ -1,64 +1,99 @@
namespace Producer.Features.Crawlers.Dmm; namespace Producer.Features.Crawlers.Dmm;
public partial class DebridMediaManagerCrawler( public partial class DebridMediaManagerCrawler(
IHttpClientFactory httpClientFactory, IDMMFileDownloader dmmFileDownloader,
ILogger<DebridMediaManagerCrawler> logger, ILogger<DebridMediaManagerCrawler> logger,
IDataStorage storage, IDataStorage storage,
GithubConfiguration githubConfiguration,
IRankTorrentName rankTorrentName, IRankTorrentName rankTorrentName,
IDistributedCache cache) : BaseCrawler(logger, storage) IDistributedCache cache) : BaseCrawler(logger, storage)
{ {
[GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")] [GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")]
private static partial Regex HashCollectionMatcher(); private static partial Regex HashCollectionMatcher();
protected override string Url => "";
private const string DownloadBaseUrl = "https://raw.githubusercontent.com/debridmediamanager/hashlists/main";
protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>(); protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>();
protected override string Url => "https://api.github.com/repos/debridmediamanager/hashlists/git/trees/main?recursive=1";
protected override string Source => "DMM"; protected override string Source => "DMM";
private const int ParallelismCount = 4;
public override async Task Execute() public override async Task Execute()
{ {
var client = httpClientFactory.CreateClient("Scraper"); var tempDirectory = await dmmFileDownloader.DownloadFileToTempPath(CancellationToken.None);
client.DefaultRequestHeaders.Authorization = new("Bearer", githubConfiguration.PAT);
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
var jsonBody = await client.GetStringAsync(Url); var files = Directory.GetFiles(tempDirectory, "*.html", SearchOption.AllDirectories);
var json = JsonDocument.Parse(jsonBody); logger.LogInformation("Found {Files} files to parse", files.Length);
var entriesArray = json.RootElement.GetProperty("tree"); var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
logger.LogInformation("Found {Entries} total DMM pages", entriesArray.GetArrayLength()); await Parallel.ForEachAsync(files, options, async (file, token) =>
foreach (var entry in entriesArray.EnumerateArray())
{ {
await ParsePage(entry, client); var fileName = Path.GetFileName(file);
} var torrentDictionary = await ExtractPageContents(file, fileName);
}
private async Task ParsePage(JsonElement entry, HttpClient client) if (torrentDictionary == null)
{
var (pageIngested, name) = await IsAlreadyIngested(entry);
if (string.IsNullOrEmpty(name) || pageIngested)
{ {
return; return;
} }
var pageSource = await client.GetStringAsync($"{DownloadBaseUrl}/{name}"); await ParseTitlesWithRtn(fileName, torrentDictionary);
var results = await ParseTorrents(torrentDictionary);
await ExtractPageContents(pageSource, name); if (results.Count <= 0)
{
return;
} }
private async Task ExtractPageContents(string pageSource, string name) await InsertTorrents(results);
await Storage.MarkPageAsIngested(fileName, token);
});
}
private async Task ParseTitlesWithRtn(string fileName, IDictionary<string, DmmContent> page)
{ {
logger.LogInformation("Parsing titles for {Page}", fileName);
var batchProcessables = page.Select(value => new RtnBatchProcessable(value.Key, value.Value.Filename)).ToList();
var parsedResponses = rankTorrentName.BatchParse(
batchProcessables.Select<RtnBatchProcessable, string>(bp => bp.Filename).ToList(), trashGarbage: false);
// Filter out unsuccessful responses and match RawTitle to requesting title
var successfulResponses = parsedResponses
.Where(response => response != null && response.Success)
.GroupBy(response => response.Response.RawTitle!)
.ToDictionary(group => group.Key, group => group.First());
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options, (infoHash, _) =>
{
if (page.TryGetValue(infoHash, out var dmmContent) &&
successfulResponses.TryGetValue(dmmContent.Filename, out var parsedResponse))
{
page[infoHash] = dmmContent with {ParseResponse = parsedResponse};
}
return ValueTask.CompletedTask;
});
}
private async Task<ConcurrentDictionary<string, DmmContent>?> ExtractPageContents(string filePath, string filenameOnly)
{
var (pageIngested, name) = await IsAlreadyIngested(filenameOnly);
if (pageIngested)
{
return [];
}
var pageSource = await File.ReadAllTextAsync(filePath);
var match = HashCollectionMatcher().Match(pageSource); var match = HashCollectionMatcher().Match(pageSource);
if (!match.Success) if (!match.Success)
{ {
logger.LogWarning("Failed to match hash collection for {Name}", name); logger.LogWarning("Failed to match hash collection for {Name}", name);
await Storage.MarkPageAsIngested(name); await Storage.MarkPageAsIngested(filenameOnly);
return; return [];
} }
var encodedJson = match.Groups.Values.ElementAtOrDefault(1); var encodedJson = match.Groups.Values.ElementAtOrDefault(1);
@@ -66,90 +101,92 @@ public partial class DebridMediaManagerCrawler(
if (string.IsNullOrEmpty(encodedJson?.Value)) if (string.IsNullOrEmpty(encodedJson?.Value))
{ {
logger.LogWarning("Failed to extract encoded json for {Name}", name); logger.LogWarning("Failed to extract encoded json for {Name}", name);
return; return [];
} }
await ProcessExtractedContentsAsTorrentCollection(encodedJson.Value, name); var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson.Value);
}
private async Task ProcessExtractedContentsAsTorrentCollection(string encodedJson, string name)
{
var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson);
var json = JsonDocument.Parse(decodedJson); var json = JsonDocument.Parse(decodedJson);
await InsertTorrentsForPage(json); var torrents = await json.RootElement.EnumerateArray()
.ToAsyncEnumerable()
.Select(ParsePageContent)
.Where(t => t is not null)
.ToListAsync();
var result = await Storage.MarkPageAsIngested(name); if (torrents.Count == 0)
{
if (!result.IsSuccess) logger.LogWarning("No torrents found in {Name}", name);
await Storage.MarkPageAsIngested(filenameOnly);
return [];
}
var torrentDictionary = torrents
.Where(x => x is not null)
.GroupBy(x => x.InfoHash)
.ToConcurrentDictionary(g => g.Key, g => new DmmContent(g.First().Filename, g.First().Bytes, null));
logger.LogInformation("Parsed {Torrents} torrents for {Name}", torrentDictionary.Count, name);
return torrentDictionary;
}
private async Task<List<IngestedTorrent>> ParseTorrents(IDictionary<string, DmmContent> page)
{
var ingestedTorrents = new List<IngestedTorrent>();
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
await Parallel.ForEachAsync(page, options, async (kvp, ct) =>
{
var (infoHash, dmmContent) = kvp;
var parsedTorrent = dmmContent.ParseResponse;
if (parsedTorrent is not {Success: true})
{ {
logger.LogWarning("Failed to mark page as ingested: [{Error}]", result.Failure.ErrorMessage);
return; return;
} }
logger.LogInformation("Successfully marked page as ingested");
}
private async Task<IngestedTorrent?> ParseTorrent(JsonElement item)
{
if (!item.TryGetProperty("filename", out var filenameElement) ||
!item.TryGetProperty("bytes", out var bytesElement) ||
!item.TryGetProperty("hash", out var hashElement))
{
return null;
}
var torrentTitle = filenameElement.GetString();
if (torrentTitle.IsNullOrEmpty())
{
return null;
}
var parsedTorrent = rankTorrentName.Parse(torrentTitle);
if (!parsedTorrent.Success)
{
return null;
}
var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries"; var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries";
var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year); var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year);
var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey); var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey);
if (cached) if (cached)
{ {
logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle); logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle);
return MapToTorrent(cachedResult, bytesElement, hashElement, parsedTorrent); lock (ingestedTorrents)
{
ingestedTorrents.Add(MapToTorrent(cachedResult, dmmContent.Bytes, infoHash, parsedTorrent));
}
return;
} }
int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null; int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null;
var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year); var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year, ct);
if (imdbEntry is null) if (imdbEntry is null)
{ {
return null; return;
} }
await AddToCache(cacheKey, imdbEntry); await AddToCache(cacheKey, imdbEntry);
logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score); logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score);
lock (ingestedTorrents)
{
ingestedTorrents.Add(MapToTorrent(imdbEntry, dmmContent.Bytes, infoHash, parsedTorrent));
}
});
return MapToTorrent(imdbEntry, bytesElement, hashElement, parsedTorrent); return ingestedTorrents;
} }
private IngestedTorrent MapToTorrent(ImdbEntry result, JsonElement bytesElement, JsonElement hashElement, ParseTorrentTitleResponse parsedTorrent) => private IngestedTorrent MapToTorrent(ImdbEntry result, long size, string infoHash, ParseTorrentTitleResponse parsedTorrent) =>
new() new()
{ {
Source = Source, Source = Source,
Name = result.Title, Name = result.Title,
Imdb = result.ImdbId, Imdb = result.ImdbId,
Size = bytesElement.GetInt64().ToString(), Size = size.ToString(),
InfoHash = hashElement.ToString(), InfoHash = infoHash,
Seeders = 0, Seeders = 0,
Leechers = 0, Leechers = 0,
Category = AssignCategory(result), Category = AssignCategory(result),
@@ -179,35 +216,11 @@ public partial class DebridMediaManagerCrawler(
return (false, null); return (false, null);
} }
private async Task InsertTorrentsForPage(JsonDocument json) private async Task<(bool Success, string? Name)> IsAlreadyIngested(string filename)
{ {
var torrents = await json.RootElement.EnumerateArray() var pageIngested = await Storage.PageIngested(filename);
.ToAsyncEnumerable()
.SelectAwait(async x => await ParseTorrent(x))
.Where(t => t is not null)
.ToListAsync();
if (torrents.Count == 0) return (pageIngested, filename);
{
logger.LogWarning("No torrents found in {Source} response", Source);
return;
}
await InsertTorrents(torrents!);
}
private async Task<(bool Success, string? Name)> IsAlreadyIngested(JsonElement entry)
{
var name = entry.GetProperty("path").GetString();
if (string.IsNullOrEmpty(name))
{
return (false, null);
}
var pageIngested = await Storage.PageIngested(name);
return (pageIngested, name);
} }
private static string AssignCategory(ImdbEntry entry) => private static string AssignCategory(ImdbEntry entry) =>
@@ -219,4 +232,20 @@ public partial class DebridMediaManagerCrawler(
}; };
private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}"; private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}";
private static ExtractedDMMContent? ParsePageContent(JsonElement item)
{
if (!item.TryGetProperty("filename", out var filenameElement) ||
!item.TryGetProperty("bytes", out var bytesElement) ||
!item.TryGetProperty("hash", out var hashElement))
{
return null;
}
return new(filenameElement.GetString(), bytesElement.GetInt64(), hashElement.GetString());
}
private record DmmContent(string Filename, long Bytes, ParseTorrentTitleResponse? ParseResponse);
private record ExtractedDMMContent(string Filename, long Bytes, string InfoHash);
private record RtnBatchProcessable(string InfoHash, string Filename);
} }

View File

@@ -1,9 +0,0 @@
namespace Producer.Features.Crawlers.Dmm;
public class GithubConfiguration
{
private const string Prefix = "GITHUB";
private const string PatVariable = "PAT";
public string? PAT { get; init; } = Prefix.GetOptionalEnvironmentVariableAsString(PatVariable);
}

View File

@@ -0,0 +1,6 @@
namespace Producer.Features.Crawlers.Dmm;
public interface IDMMFileDownloader
{
Task<string> DownloadFileToTempPath(CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,16 @@
namespace Producer.Features.Crawlers.Dmm;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddDmmSupport(this IServiceCollection services)
{
services.AddHttpClient<IDMMFileDownloader, DMMFileDownloader>(DMMFileDownloader.ClientName, client =>
{
client.BaseAddress = new("https://github.com/debridmediamanager/hashlists/zipball/main/");
client.DefaultRequestHeaders.Add("Accept-Encoding", "gzip");
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
});
return services;
}
}

View File

@@ -5,7 +5,6 @@ internal static class ServiceCollectionExtensions
internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration) internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration)
{ {
var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName); var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName);
var githubConfiguration = services.LoadConfigurationFromEnv<GithubConfiguration>();
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>(); var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
var jobTypes = Assembly.GetAssembly(typeof(BaseJob)) var jobTypes = Assembly.GetAssembly(typeof(BaseJob))
@@ -19,18 +18,13 @@ internal static class ServiceCollectionExtensions
services.AddTransient(type); services.AddTransient(type);
} }
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
{
services.AddTransient<SyncDmmJob>();
}
var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance); var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance);
services.AddQuartz( services.AddQuartz(
quartz => quartz =>
{ {
RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration); RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration);
RegisterDmmJob(githubConfiguration, quartz, scrapeConfiguration); RegisterDmmJob(quartz, scrapeConfiguration);
RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration); RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration);
RegisterPublisher(quartz, rabbitConfiguration); RegisterPublisher(quartz, rabbitConfiguration);
}); });
@@ -64,13 +58,8 @@ internal static class ServiceCollectionExtensions
} }
} }
private static void RegisterDmmJob(GithubConfiguration githubConfiguration, IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) private static void RegisterDmmJob(IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) =>
{
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
{
AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration); AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
}
}
private static void RegisterTorrentioJob( private static void RegisterTorrentioJob(
IServiceCollection services, IServiceCollection services,

View File

@@ -1,12 +1,12 @@
// Global using directives // Global using directives
global using System.Collections.Concurrent;
global using System.IO.Compression;
global using System.Reflection; global using System.Reflection;
global using System.Text; global using System.Text;
global using System.Text.Json; global using System.Text.Json;
global using System.Text.RegularExpressions; global using System.Text.RegularExpressions;
global using System.Xml.Linq; global using System.Xml.Linq;
global using FuzzySharp;
global using FuzzySharp.Extractor;
global using FuzzySharp.PreProcess; global using FuzzySharp.PreProcess;
global using FuzzySharp.SimilarityRatio.Scorer; global using FuzzySharp.SimilarityRatio.Scorer;
global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive; global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive;

View File

@@ -12,6 +12,7 @@ builder.Services
.RegisterMassTransit() .RegisterMassTransit()
.AddDataStorage() .AddDataStorage()
.AddCrawlers() .AddCrawlers()
.AddDmmSupport()
.AddQuartz(builder.Configuration); .AddQuartz(builder.Configuration);
var app = builder.Build(); var app = builder.Build();

View File

@@ -1 +1 @@
rank-torrent-name==0.2.5 rank-torrent-name==0.2.13

View File

@@ -1 +1 @@
rank-torrent-name==0.2.5 rank-torrent-name==0.2.13

View File

@@ -152,7 +152,8 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
INSERT INTO files INSERT INTO files
("infoHash", "fileIndex", title, "size", "imdbId", "imdbSeason", "imdbEpisode", "kitsuId", "kitsuEpisode", "createdAt", "updatedAt") ("infoHash", "fileIndex", title, "size", "imdbId", "imdbSeason", "imdbEpisode", "kitsuId", "kitsuEpisode", "createdAt", "updatedAt")
VALUES VALUES
(@InfoHash, @FileIndex, @Title, @Size, @ImdbId, @ImdbSeason, @ImdbEpisode, @KitsuId, @KitsuEpisode, Now(), Now()); (@InfoHash, @FileIndex, @Title, @Size, @ImdbId, @ImdbSeason, @ImdbEpisode, @KitsuId, @KitsuEpisode, Now(), Now())
ON CONFLICT ("infoHash", "fileIndex") DO NOTHING;
"""; """;
await connection.ExecuteAsync(query, files); await connection.ExecuteAsync(query, files);
@@ -167,7 +168,8 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
INSERT INTO subtitles INSERT INTO subtitles
("infoHash", "fileIndex", "fileId", "title") ("infoHash", "fileIndex", "fileId", "title")
VALUES VALUES
(@InfoHash, @FileIndex, @FileId, @Title); (@InfoHash, @FileIndex, @FileId, @Title)
ON CONFLICT ("infoHash", "fileIndex") DO NOTHING;
"""; """;
await connection.ExecuteAsync(query, subtitles); await connection.ExecuteAsync(query, subtitles);

View File

@@ -0,0 +1,19 @@
namespace SharedContracts.Extensions;
public static class DictionaryExtensions
{
public static ConcurrentDictionary<TKey, TValue> ToConcurrentDictionary<TSource, TKey, TValue>(
this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
Func<TSource, TValue> valueSelector) where TKey : notnull
{
var concurrentDictionary = new ConcurrentDictionary<TKey, TValue>();
foreach (var element in source)
{
concurrentDictionary.TryAdd(keySelector(element), valueSelector(element));
}
return concurrentDictionary;
}
}

View File

@@ -1,5 +1,6 @@
// Global using directives // Global using directives
global using System.Collections.Concurrent;
global using System.Text.Json; global using System.Text.Json;
global using System.Text.Json.Serialization; global using System.Text.Json.Serialization;
global using System.Text.RegularExpressions; global using System.Text.RegularExpressions;

View File

@@ -2,5 +2,6 @@ namespace SharedContracts.Python.RTN;
public interface IRankTorrentName public interface IRankTorrentName
{ {
ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true); ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
} }

View File

@@ -13,15 +13,71 @@ public class RankTorrentName : IRankTorrentName
InitModules(); InitModules();
} }
public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true) => public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
_pythonEngineService.ExecutePythonOperationWithDefault(
() =>
{ {
try
{
using var gil = Py.GIL();
var result = _rtn?.parse(title, trashGarbage); var result = _rtn?.parse(title, trashGarbage);
return ParseResult(result); return ParseResult(result);
}, new ParseTorrentTitleResponse(false, null), nameof(Parse), throwOnErrors: false, logErrors: false); }
catch (Exception ex)
{
if (logErrors)
{
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
}
private static ParseTorrentTitleResponse ParseResult(dynamic result) if (throwOnErrors)
{
throw;
}
return new(false, null);
}
}
public List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
{
var responses = new List<ParseTorrentTitleResponse?>();
try
{
if (titles.Count == 0)
{
return responses;
}
using var gil = Py.GIL();
var pythonList = new PyList(titles.Select(x => new PyString(x).As<PyObject>()).ToArray());
PyList results = _rtn?.batch_parse(pythonList, trashGarbage, chunkSize, workers);
if (results == null)
{
return responses;
}
responses.AddRange(results.Select(ParseResult));
}
catch (Exception ex)
{
if (logErrors)
{
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
}
if (throwOnErrors)
{
throw;
}
}
return responses;
}
private static ParseTorrentTitleResponse? ParseResult(dynamic result)
{
try
{ {
if (result == null) if (result == null)
{ {
@@ -48,6 +104,11 @@ public class RankTorrentName : IRankTorrentName
return new(true, response); return new(true, response);
} }
catch
{
return new(false, null);
}
}
private void InitModules() => private void InitModules() =>
_rtn = _rtn =