3 Commits

Author SHA1 Message Date
iPromKnight
3c8ffd5082 Fix Duplicates (#199)
* Fix Duplicates

* Version
2024-04-02 20:31:22 +01:00
iPromKnight
79e0a0f102 DMM Offline (#198)
* Process DMM all locally

single call to github to download the repo archive.
remove need for PAT
update RTN to 0.2.13
change to batch_parse for title parsing from RTN

* introduce concurrent dictionary, and parallelism
2024-04-02 17:01:22 +01:00
purple_emily
6181207513 Fix incorrect file index stored (#197)
* Fix incorrect file index stored

* Update `rank-torrent-name` to latest version

* Knight Crawler version update
2024-04-01 23:08:32 +01:00
23 changed files with 431 additions and 197 deletions

View File

@@ -94,7 +94,7 @@ services:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
hostname: knightcrawler-addon hostname: knightcrawler-addon
image: gabisonfire/knightcrawler-addon:2.0.18 image: gabisonfire/knightcrawler-addon:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -117,7 +117,7 @@ services:
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-consumer:2.0.18 image: gabisonfire/knightcrawler-consumer:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -138,7 +138,7 @@ services:
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-debrid-collector:2.0.18 image: gabisonfire/knightcrawler-debrid-collector:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -152,7 +152,7 @@ services:
migrator: migrator:
condition: service_completed_successfully condition: service_completed_successfully
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-metadata:2.0.18 image: gabisonfire/knightcrawler-metadata:2.0.21
networks: networks:
- knightcrawler-network - knightcrawler-network
restart: "no" restart: "no"
@@ -163,7 +163,7 @@ services:
postgres: postgres:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-migrator:2.0.18 image: gabisonfire/knightcrawler-migrator:2.0.21
networks: networks:
- knightcrawler-network - knightcrawler-network
restart: "no" restart: "no"
@@ -182,7 +182,7 @@ services:
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-producer:2.0.18 image: gabisonfire/knightcrawler-producer:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -207,7 +207,7 @@ services:
deploy: deploy:
replicas: ${QBIT_REPLICAS:-0} replicas: ${QBIT_REPLICAS:-0}
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-qbit-collector:2.0.18 image: gabisonfire/knightcrawler-qbit-collector:2.0.21
labels: labels:
logging: promtail logging: promtail
networks: networks:

View File

@@ -20,7 +20,7 @@ x-depends: &knightcrawler-app-depends
services: services:
metadata: metadata:
image: gabisonfire/knightcrawler-metadata:2.0.18 image: gabisonfire/knightcrawler-metadata:2.0.21
env_file: ../../.env env_file: ../../.env
networks: networks:
- knightcrawler-network - knightcrawler-network
@@ -30,7 +30,7 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
migrator: migrator:
image: gabisonfire/knightcrawler-migrator:2.0.18 image: gabisonfire/knightcrawler-migrator:2.0.21
env_file: ../../.env env_file: ../../.env
networks: networks:
- knightcrawler-network - knightcrawler-network
@@ -40,7 +40,7 @@ services:
condition: service_healthy condition: service_healthy
addon: addon:
image: gabisonfire/knightcrawler-addon:2.0.18 image: gabisonfire/knightcrawler-addon:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
hostname: knightcrawler-addon hostname: knightcrawler-addon
@@ -48,22 +48,22 @@ services:
- "7000:7000" - "7000:7000"
consumer: consumer:
image: gabisonfire/knightcrawler-consumer:2.0.18 image: gabisonfire/knightcrawler-consumer:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
debridcollector: debridcollector:
image: gabisonfire/knightcrawler-debrid-collector:2.0.18 image: gabisonfire/knightcrawler-debrid-collector:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
producer: producer:
image: gabisonfire/knightcrawler-producer:2.0.18 image: gabisonfire/knightcrawler-producer:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
qbitcollector: qbitcollector:
image: gabisonfire/knightcrawler-qbit-collector:2.0.18 image: gabisonfire/knightcrawler-qbit-collector:2.0.21
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
depends_on: depends_on:

View File

@@ -38,6 +38,3 @@ QBIT_REPLICAS=0
# Addon # Addon
DEBUG_MODE=false DEBUG_MODE=false
# Producer
GITHUB_PAT=

View File

@@ -16,13 +16,14 @@ public static class DebridMetaToTorrentMeta
foreach (var metadataEntry in Metadata.Where(m => Filetypes.VideoFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext)))) foreach (var metadataEntry in Metadata.Where(m => Filetypes.VideoFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext))))
{ {
var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex); var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex);
var fileIndexMinusOne = Math.Max(0, fileIndex - 1);
var file = new TorrentFile var file = new TorrentFile
{ {
ImdbId = ImdbId, ImdbId = ImdbId,
KitsuId = 0, KitsuId = 0,
InfoHash = torrent.InfoHash, InfoHash = torrent.InfoHash,
FileIndex = validFileIndex ? fileIndex : 0, FileIndex = validFileIndex ? fileIndexMinusOne : 0,
Title = metadataEntry.Value.Filename, Title = metadataEntry.Value.Filename,
Size = metadataEntry.Value.Filesize.GetValueOrDefault(), Size = metadataEntry.Value.Filesize.GetValueOrDefault(),
}; };
@@ -66,13 +67,14 @@ public static class DebridMetaToTorrentMeta
foreach (var metadataEntry in Metadata.Where(m => Filetypes.SubtitleFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext)))) foreach (var metadataEntry in Metadata.Where(m => Filetypes.SubtitleFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext))))
{ {
var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex); var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex);
var fileIndexMinusOne = Math.Max(0, fileIndex - 1);
var fileId = torrentFiles.FirstOrDefault( var fileId = torrentFiles.FirstOrDefault(
t => Path.GetFileNameWithoutExtension(t.Title) == Path.GetFileNameWithoutExtension(metadataEntry.Value.Filename))?.Id ?? 0; t => Path.GetFileNameWithoutExtension(t.Title) == Path.GetFileNameWithoutExtension(metadataEntry.Value.Filename))?.Id ?? 0;
var file = new SubtitleFile var file = new SubtitleFile
{ {
InfoHash = InfoHash, InfoHash = InfoHash,
FileIndex = validFileIndex ? fileIndex : 0, FileIndex = validFileIndex ? fileIndexMinusOne : 0,
FileId = fileId, FileId = fileId,
Title = metadataEntry.Value.Filename, Title = metadataEntry.Value.Filename,
}; };

View File

@@ -1 +1 @@
rank-torrent-name==0.2.5 rank-torrent-name==0.2.13

View File

@@ -0,0 +1,43 @@
-- Drop Duplicate Files in Files Table
DELETE FROM public.files
WHERE id NOT IN (
SELECT MAX(id)
FROM public.files
GROUP BY "infoHash", "fileIndex"
);
-- Add Index to files table
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'files_unique_infohash_fileindex'
) THEN
ALTER TABLE public.files
ADD CONSTRAINT files_unique_infohash_fileindex UNIQUE ("infoHash", "fileIndex");
END IF;
END $$;
-- Drop Duplicate subtitles in Subtitles Table
DELETE FROM public.subtitles
WHERE id NOT IN (
SELECT MAX(id)
FROM public.subtitles
GROUP BY "infoHash", "fileIndex"
);
-- Add Index to subtitles table
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'subtitles_unique_infohash_fileindex'
) THEN
ALTER TABLE public.subtitles
ADD CONSTRAINT subtitles_unique_infohash_fileindex UNIQUE ("infoHash", "fileIndex");
END IF;
END $$;

View File

@@ -28,7 +28,7 @@
}, },
{ {
"Name": "SyncDmmJob", "Name": "SyncDmmJob",
"IntervalSeconds": 1800, "IntervalSeconds": 10800,
"Enabled": true "Enabled": true
}, },
{ {

View File

@@ -0,0 +1,70 @@
namespace Producer.Features.Crawlers.Dmm;
public class DMMFileDownloader(HttpClient client, ILogger<DMMFileDownloader> logger) : IDMMFileDownloader
{
private const string Filename = "main.zip";
private readonly IReadOnlyCollection<string> _filesToIgnore = [
"index.html",
"404.html",
"dedupe.sh",
"CNAME",
];
public const string ClientName = "DmmFileDownloader";
public async Task<string> DownloadFileToTempPath(CancellationToken cancellationToken)
{
logger.LogInformation("Downloading DMM Hashlists");
var response = await client.GetAsync(Filename, cancellationToken);
var tempDirectory = Path.Combine(Path.GetTempPath(), "DMMHashlists");
EnsureDirectoryIsClean(tempDirectory);
response.EnsureSuccessStatusCode();
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
using var archive = new ZipArchive(stream);
logger.LogInformation("Extracting DMM Hashlists to {TempDirectory}", tempDirectory);
foreach (var entry in archive.Entries)
{
var entryPath = Path.Combine(tempDirectory, Path.GetFileName(entry.FullName));
if (!entry.FullName.EndsWith('/')) // It's a file
{
entry.ExtractToFile(entryPath, true);
}
}
foreach (var file in _filesToIgnore)
{
CleanRepoExtras(tempDirectory, file);
}
logger.LogInformation("Downloaded and extracted Repository to {TempDirectory}", tempDirectory);
return tempDirectory;
}
private static void CleanRepoExtras(string tempDirectory, string fileName)
{
var repoIndex = Path.Combine(tempDirectory, fileName);
if (File.Exists(repoIndex))
{
File.Delete(repoIndex);
}
}
private static void EnsureDirectoryIsClean(string tempDirectory)
{
if (Directory.Exists(tempDirectory))
{
Directory.Delete(tempDirectory, true);
}
Directory.CreateDirectory(tempDirectory);
}
}

View File

@@ -0,0 +1,6 @@
namespace Producer.Features.Crawlers.Dmm;
public class DMMHttpClient
{
}

View File

@@ -1,64 +1,99 @@
namespace Producer.Features.Crawlers.Dmm; namespace Producer.Features.Crawlers.Dmm;
public partial class DebridMediaManagerCrawler( public partial class DebridMediaManagerCrawler(
IHttpClientFactory httpClientFactory, IDMMFileDownloader dmmFileDownloader,
ILogger<DebridMediaManagerCrawler> logger, ILogger<DebridMediaManagerCrawler> logger,
IDataStorage storage, IDataStorage storage,
GithubConfiguration githubConfiguration,
IRankTorrentName rankTorrentName, IRankTorrentName rankTorrentName,
IDistributedCache cache) : BaseCrawler(logger, storage) IDistributedCache cache) : BaseCrawler(logger, storage)
{ {
[GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")] [GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")]
private static partial Regex HashCollectionMatcher(); private static partial Regex HashCollectionMatcher();
protected override string Url => "";
private const string DownloadBaseUrl = "https://raw.githubusercontent.com/debridmediamanager/hashlists/main";
protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>(); protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>();
protected override string Url => "https://api.github.com/repos/debridmediamanager/hashlists/git/trees/main?recursive=1";
protected override string Source => "DMM"; protected override string Source => "DMM";
private const int ParallelismCount = 4;
public override async Task Execute() public override async Task Execute()
{ {
var client = httpClientFactory.CreateClient("Scraper"); var tempDirectory = await dmmFileDownloader.DownloadFileToTempPath(CancellationToken.None);
client.DefaultRequestHeaders.Authorization = new("Bearer", githubConfiguration.PAT);
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
var jsonBody = await client.GetStringAsync(Url); var files = Directory.GetFiles(tempDirectory, "*.html", SearchOption.AllDirectories);
var json = JsonDocument.Parse(jsonBody); logger.LogInformation("Found {Files} files to parse", files.Length);
var entriesArray = json.RootElement.GetProperty("tree"); var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
logger.LogInformation("Found {Entries} total DMM pages", entriesArray.GetArrayLength()); await Parallel.ForEachAsync(files, options, async (file, token) =>
foreach (var entry in entriesArray.EnumerateArray())
{ {
await ParsePage(entry, client); var fileName = Path.GetFileName(file);
} var torrentDictionary = await ExtractPageContents(file, fileName);
if (torrentDictionary == null)
{
return;
}
await ParseTitlesWithRtn(fileName, torrentDictionary);
var results = await ParseTorrents(torrentDictionary);
if (results.Count <= 0)
{
return;
}
await InsertTorrents(results);
await Storage.MarkPageAsIngested(fileName, token);
});
} }
private async Task ParsePage(JsonElement entry, HttpClient client) private async Task ParseTitlesWithRtn(string fileName, IDictionary<string, DmmContent> page)
{ {
var (pageIngested, name) = await IsAlreadyIngested(entry); logger.LogInformation("Parsing titles for {Page}", fileName);
if (string.IsNullOrEmpty(name) || pageIngested) var batchProcessables = page.Select(value => new RtnBatchProcessable(value.Key, value.Value.Filename)).ToList();
var parsedResponses = rankTorrentName.BatchParse(
batchProcessables.Select<RtnBatchProcessable, string>(bp => bp.Filename).ToList(), trashGarbage: false);
// Filter out unsuccessful responses and match RawTitle to requesting title
var successfulResponses = parsedResponses
.Where(response => response != null && response.Success)
.GroupBy(response => response.Response.RawTitle!)
.ToDictionary(group => group.Key, group => group.First());
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options, (infoHash, _) =>
{ {
return; if (page.TryGetValue(infoHash, out var dmmContent) &&
} successfulResponses.TryGetValue(dmmContent.Filename, out var parsedResponse))
{
var pageSource = await client.GetStringAsync($"{DownloadBaseUrl}/{name}"); page[infoHash] = dmmContent with {ParseResponse = parsedResponse};
}
await ExtractPageContents(pageSource, name);
return ValueTask.CompletedTask;
});
} }
private async Task ExtractPageContents(string pageSource, string name) private async Task<ConcurrentDictionary<string, DmmContent>?> ExtractPageContents(string filePath, string filenameOnly)
{ {
var (pageIngested, name) = await IsAlreadyIngested(filenameOnly);
if (pageIngested)
{
return [];
}
var pageSource = await File.ReadAllTextAsync(filePath);
var match = HashCollectionMatcher().Match(pageSource); var match = HashCollectionMatcher().Match(pageSource);
if (!match.Success) if (!match.Success)
{ {
logger.LogWarning("Failed to match hash collection for {Name}", name); logger.LogWarning("Failed to match hash collection for {Name}", name);
await Storage.MarkPageAsIngested(name); await Storage.MarkPageAsIngested(filenameOnly);
return; return [];
} }
var encodedJson = match.Groups.Values.ElementAtOrDefault(1); var encodedJson = match.Groups.Values.ElementAtOrDefault(1);
@@ -66,90 +101,92 @@ public partial class DebridMediaManagerCrawler(
if (string.IsNullOrEmpty(encodedJson?.Value)) if (string.IsNullOrEmpty(encodedJson?.Value))
{ {
logger.LogWarning("Failed to extract encoded json for {Name}", name); logger.LogWarning("Failed to extract encoded json for {Name}", name);
return; return [];
} }
await ProcessExtractedContentsAsTorrentCollection(encodedJson.Value, name); var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson.Value);
}
private async Task ProcessExtractedContentsAsTorrentCollection(string encodedJson, string name)
{
var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson);
var json = JsonDocument.Parse(decodedJson); var json = JsonDocument.Parse(decodedJson);
var torrents = await json.RootElement.EnumerateArray()
.ToAsyncEnumerable()
.Select(ParsePageContent)
.Where(t => t is not null)
.ToListAsync();
await InsertTorrentsForPage(json); if (torrents.Count == 0)
var result = await Storage.MarkPageAsIngested(name);
if (!result.IsSuccess)
{ {
logger.LogWarning("Failed to mark page as ingested: [{Error}]", result.Failure.ErrorMessage); logger.LogWarning("No torrents found in {Name}", name);
return; await Storage.MarkPageAsIngested(filenameOnly);
return [];
} }
var torrentDictionary = torrents
.Where(x => x is not null)
.GroupBy(x => x.InfoHash)
.ToConcurrentDictionary(g => g.Key, g => new DmmContent(g.First().Filename, g.First().Bytes, null));
logger.LogInformation("Successfully marked page as ingested"); logger.LogInformation("Parsed {Torrents} torrents for {Name}", torrentDictionary.Count, name);
return torrentDictionary;
} }
private async Task<IngestedTorrent?> ParseTorrent(JsonElement item) private async Task<List<IngestedTorrent>> ParseTorrents(IDictionary<string, DmmContent> page)
{ {
var ingestedTorrents = new List<IngestedTorrent>();
if (!item.TryGetProperty("filename", out var filenameElement) || var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
!item.TryGetProperty("bytes", out var bytesElement) ||
!item.TryGetProperty("hash", out var hashElement)) await Parallel.ForEachAsync(page, options, async (kvp, ct) =>
{ {
return null; var (infoHash, dmmContent) = kvp;
} var parsedTorrent = dmmContent.ParseResponse;
if (parsedTorrent is not {Success: true})
{
return;
}
var torrentTitle = filenameElement.GetString(); var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries";
var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year);
var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey);
if (torrentTitle.IsNullOrEmpty()) if (cached)
{ {
return null; logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle);
} lock (ingestedTorrents)
{
var parsedTorrent = rankTorrentName.Parse(torrentTitle); ingestedTorrents.Add(MapToTorrent(cachedResult, dmmContent.Bytes, infoHash, parsedTorrent));
}
if (!parsedTorrent.Success) return;
{ }
return null;
}
var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries";
var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year);
var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey);
if (cached)
{
logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle);
return MapToTorrent(cachedResult, bytesElement, hashElement, parsedTorrent);
}
int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null; int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null;
var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year); var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year, ct);
if (imdbEntry is null) if (imdbEntry is null)
{ {
return null; return;
} }
await AddToCache(cacheKey, imdbEntry);
logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score);
return MapToTorrent(imdbEntry, bytesElement, hashElement, parsedTorrent); await AddToCache(cacheKey, imdbEntry);
logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score);
lock (ingestedTorrents)
{
ingestedTorrents.Add(MapToTorrent(imdbEntry, dmmContent.Bytes, infoHash, parsedTorrent));
}
});
return ingestedTorrents;
} }
private IngestedTorrent MapToTorrent(ImdbEntry result, JsonElement bytesElement, JsonElement hashElement, ParseTorrentTitleResponse parsedTorrent) => private IngestedTorrent MapToTorrent(ImdbEntry result, long size, string infoHash, ParseTorrentTitleResponse parsedTorrent) =>
new() new()
{ {
Source = Source, Source = Source,
Name = result.Title, Name = result.Title,
Imdb = result.ImdbId, Imdb = result.ImdbId,
Size = bytesElement.GetInt64().ToString(), Size = size.ToString(),
InfoHash = hashElement.ToString(), InfoHash = infoHash,
Seeders = 0, Seeders = 0,
Leechers = 0, Leechers = 0,
Category = AssignCategory(result), Category = AssignCategory(result),
@@ -179,35 +216,11 @@ public partial class DebridMediaManagerCrawler(
return (false, null); return (false, null);
} }
private async Task InsertTorrentsForPage(JsonDocument json) private async Task<(bool Success, string? Name)> IsAlreadyIngested(string filename)
{ {
var torrents = await json.RootElement.EnumerateArray() var pageIngested = await Storage.PageIngested(filename);
.ToAsyncEnumerable()
.SelectAwait(async x => await ParseTorrent(x))
.Where(t => t is not null)
.ToListAsync();
if (torrents.Count == 0) return (pageIngested, filename);
{
logger.LogWarning("No torrents found in {Source} response", Source);
return;
}
await InsertTorrents(torrents!);
}
private async Task<(bool Success, string? Name)> IsAlreadyIngested(JsonElement entry)
{
var name = entry.GetProperty("path").GetString();
if (string.IsNullOrEmpty(name))
{
return (false, null);
}
var pageIngested = await Storage.PageIngested(name);
return (pageIngested, name);
} }
private static string AssignCategory(ImdbEntry entry) => private static string AssignCategory(ImdbEntry entry) =>
@@ -219,4 +232,20 @@ public partial class DebridMediaManagerCrawler(
}; };
private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}"; private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}";
private static ExtractedDMMContent? ParsePageContent(JsonElement item)
{
if (!item.TryGetProperty("filename", out var filenameElement) ||
!item.TryGetProperty("bytes", out var bytesElement) ||
!item.TryGetProperty("hash", out var hashElement))
{
return null;
}
return new(filenameElement.GetString(), bytesElement.GetInt64(), hashElement.GetString());
}
private record DmmContent(string Filename, long Bytes, ParseTorrentTitleResponse? ParseResponse);
private record ExtractedDMMContent(string Filename, long Bytes, string InfoHash);
private record RtnBatchProcessable(string InfoHash, string Filename);
} }

View File

@@ -1,9 +0,0 @@
namespace Producer.Features.Crawlers.Dmm;
public class GithubConfiguration
{
private const string Prefix = "GITHUB";
private const string PatVariable = "PAT";
public string? PAT { get; init; } = Prefix.GetOptionalEnvironmentVariableAsString(PatVariable);
}

View File

@@ -0,0 +1,6 @@
namespace Producer.Features.Crawlers.Dmm;
public interface IDMMFileDownloader
{
Task<string> DownloadFileToTempPath(CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,16 @@
namespace Producer.Features.Crawlers.Dmm;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddDmmSupport(this IServiceCollection services)
{
services.AddHttpClient<IDMMFileDownloader, DMMFileDownloader>(DMMFileDownloader.ClientName, client =>
{
client.BaseAddress = new("https://github.com/debridmediamanager/hashlists/zipball/main/");
client.DefaultRequestHeaders.Add("Accept-Encoding", "gzip");
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
});
return services;
}
}

View File

@@ -5,7 +5,6 @@ internal static class ServiceCollectionExtensions
internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration) internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration)
{ {
var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName); var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName);
var githubConfiguration = services.LoadConfigurationFromEnv<GithubConfiguration>();
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>(); var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
var jobTypes = Assembly.GetAssembly(typeof(BaseJob)) var jobTypes = Assembly.GetAssembly(typeof(BaseJob))
@@ -19,18 +18,13 @@ internal static class ServiceCollectionExtensions
services.AddTransient(type); services.AddTransient(type);
} }
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
{
services.AddTransient<SyncDmmJob>();
}
var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance); var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance);
services.AddQuartz( services.AddQuartz(
quartz => quartz =>
{ {
RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration); RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration);
RegisterDmmJob(githubConfiguration, quartz, scrapeConfiguration); RegisterDmmJob(quartz, scrapeConfiguration);
RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration); RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration);
RegisterPublisher(quartz, rabbitConfiguration); RegisterPublisher(quartz, rabbitConfiguration);
}); });
@@ -64,13 +58,8 @@ internal static class ServiceCollectionExtensions
} }
} }
private static void RegisterDmmJob(GithubConfiguration githubConfiguration, IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) private static void RegisterDmmJob(IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) =>
{ AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
{
AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
}
}
private static void RegisterTorrentioJob( private static void RegisterTorrentioJob(
IServiceCollection services, IServiceCollection services,

View File

@@ -1,12 +1,12 @@
// Global using directives // Global using directives
global using System.Collections.Concurrent;
global using System.IO.Compression;
global using System.Reflection; global using System.Reflection;
global using System.Text; global using System.Text;
global using System.Text.Json; global using System.Text.Json;
global using System.Text.RegularExpressions; global using System.Text.RegularExpressions;
global using System.Xml.Linq; global using System.Xml.Linq;
global using FuzzySharp;
global using FuzzySharp.Extractor;
global using FuzzySharp.PreProcess; global using FuzzySharp.PreProcess;
global using FuzzySharp.SimilarityRatio.Scorer; global using FuzzySharp.SimilarityRatio.Scorer;
global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive; global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive;

View File

@@ -12,7 +12,8 @@ builder.Services
.RegisterMassTransit() .RegisterMassTransit()
.AddDataStorage() .AddDataStorage()
.AddCrawlers() .AddCrawlers()
.AddDmmSupport()
.AddQuartz(builder.Configuration); .AddQuartz(builder.Configuration);
var app = builder.Build(); var app = builder.Build();
app.Run(); app.Run();

View File

@@ -1 +1 @@
rank-torrent-name==0.2.5 rank-torrent-name==0.2.13

View File

@@ -1 +1 @@
rank-torrent-name==0.2.5 rank-torrent-name==0.2.13

View File

@@ -152,7 +152,8 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
INSERT INTO files INSERT INTO files
("infoHash", "fileIndex", title, "size", "imdbId", "imdbSeason", "imdbEpisode", "kitsuId", "kitsuEpisode", "createdAt", "updatedAt") ("infoHash", "fileIndex", title, "size", "imdbId", "imdbSeason", "imdbEpisode", "kitsuId", "kitsuEpisode", "createdAt", "updatedAt")
VALUES VALUES
(@InfoHash, @FileIndex, @Title, @Size, @ImdbId, @ImdbSeason, @ImdbEpisode, @KitsuId, @KitsuEpisode, Now(), Now()); (@InfoHash, @FileIndex, @Title, @Size, @ImdbId, @ImdbSeason, @ImdbEpisode, @KitsuId, @KitsuEpisode, Now(), Now())
ON CONFLICT ("infoHash", "fileIndex") DO NOTHING;
"""; """;
await connection.ExecuteAsync(query, files); await connection.ExecuteAsync(query, files);
@@ -167,7 +168,8 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
INSERT INTO subtitles INSERT INTO subtitles
("infoHash", "fileIndex", "fileId", "title") ("infoHash", "fileIndex", "fileId", "title")
VALUES VALUES
(@InfoHash, @FileIndex, @FileId, @Title); (@InfoHash, @FileIndex, @FileId, @Title)
ON CONFLICT ("infoHash", "fileIndex") DO NOTHING;
"""; """;
await connection.ExecuteAsync(query, subtitles); await connection.ExecuteAsync(query, subtitles);

View File

@@ -0,0 +1,19 @@
namespace SharedContracts.Extensions;
public static class DictionaryExtensions
{
public static ConcurrentDictionary<TKey, TValue> ToConcurrentDictionary<TSource, TKey, TValue>(
this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
Func<TSource, TValue> valueSelector) where TKey : notnull
{
var concurrentDictionary = new ConcurrentDictionary<TKey, TValue>();
foreach (var element in source)
{
concurrentDictionary.TryAdd(keySelector(element), valueSelector(element));
}
return concurrentDictionary;
}
}

View File

@@ -1,5 +1,6 @@
// Global using directives // Global using directives
global using System.Collections.Concurrent;
global using System.Text.Json; global using System.Text.Json;
global using System.Text.Json.Serialization; global using System.Text.Json.Serialization;
global using System.Text.RegularExpressions; global using System.Text.RegularExpressions;

View File

@@ -2,5 +2,6 @@ namespace SharedContracts.Python.RTN;
public interface IRankTorrentName public interface IRankTorrentName
{ {
ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true); ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
} }

View File

@@ -12,41 +12,102 @@ public class RankTorrentName : IRankTorrentName
_pythonEngineService = pythonEngineService; _pythonEngineService = pythonEngineService;
InitModules(); InitModules();
} }
public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true) =>
_pythonEngineService.ExecutePythonOperationWithDefault(
() =>
{
var result = _rtn?.parse(title, trashGarbage);
return ParseResult(result);
}, new ParseTorrentTitleResponse(false, null), nameof(Parse), throwOnErrors: false, logErrors: false);
private static ParseTorrentTitleResponse ParseResult(dynamic result) public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
{ {
if (result == null) try
{
using var gil = Py.GIL();
var result = _rtn?.parse(title, trashGarbage);
return ParseResult(result);
}
catch (Exception ex)
{
if (logErrors)
{
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
}
if (throwOnErrors)
{
throw;
}
return new(false, null);
}
}
public List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
{
var responses = new List<ParseTorrentTitleResponse?>();
try
{
if (titles.Count == 0)
{
return responses;
}
using var gil = Py.GIL();
var pythonList = new PyList(titles.Select(x => new PyString(x).As<PyObject>()).ToArray());
PyList results = _rtn?.batch_parse(pythonList, trashGarbage, chunkSize, workers);
if (results == null)
{
return responses;
}
responses.AddRange(results.Select(ParseResult));
}
catch (Exception ex)
{
if (logErrors)
{
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
}
if (throwOnErrors)
{
throw;
}
}
return responses;
}
private static ParseTorrentTitleResponse? ParseResult(dynamic result)
{
try
{
if (result == null)
{
return new(false, null);
}
var json = result.model_dump_json()?.As<string?>();
if (json is null || string.IsNullOrEmpty(json))
{
return new(false, null);
}
var mediaType = result.GetAttr("type")?.As<string>();
if (string.IsNullOrEmpty(mediaType))
{
return new(false, null);
}
var response = JsonSerializer.Deserialize<RtnResponse>(json);
response.IsMovie = mediaType.Equals("movie", StringComparison.OrdinalIgnoreCase);
return new(true, response);
}
catch
{ {
return new(false, null); return new(false, null);
} }
var json = result.model_dump_json()?.As<string?>();
if (json is null || string.IsNullOrEmpty(json))
{
return new(false, null);
}
var mediaType = result.GetAttr("type")?.As<string>();
if (string.IsNullOrEmpty(mediaType))
{
return new(false, null);
}
var response = JsonSerializer.Deserialize<RtnResponse>(json);
response.IsMovie = mediaType.Equals("movie", StringComparison.OrdinalIgnoreCase);
return new(true, response);
} }
private void InitModules() => private void InitModules() =>