4 Commits

Author SHA1 Message Date
iPromKnight
e6a63fd72e Allow configuration of producer urls (#203)
* Allow configuration of urls in scrapers by mounting the scrapers.json file over the one in the container

* version bump
2024-04-11 18:23:42 +01:00
iPromKnight
02101ac50a Allow qbit concurrency to be configurable (#200) 2024-04-11 18:02:29 +01:00
iPromKnight
3c8ffd5082 Fix Duplicates (#199)
* Fix Duplicates

* Version
2024-04-02 20:31:22 +01:00
iPromKnight
79e0a0f102 DMM Offline (#198)
* Process DMM all locally

single call to github to download the repo archive.
remove need for PAT
update RTN to 0.2.13
change to batch_parse for title parsing from RTN

* introduce concurrent dictionary, and parallelism
2024-04-02 17:01:22 +01:00
35 changed files with 489 additions and 224 deletions

View File

@@ -14,12 +14,14 @@ program=
[BitTorrent]
Session\AnonymousModeEnabled=true
Session\BTProtocol=TCP
Session\ConnectionSpeed=150
Session\DefaultSavePath=/downloads/
Session\ExcludedFileNames=
Session\MaxActiveCheckingTorrents=5
Session\MaxActiveDownloads=10
Session\MaxActiveCheckingTorrents=20
Session\MaxActiveDownloads=20
Session\MaxActiveTorrents=50
Session\MaxActiveUploads=50
Session\MaxConcurrentHTTPAnnounces=1000
Session\MaxConnections=2000
Session\Port=6881
Session\QueueingSystemEnabled=true

View File

@@ -94,7 +94,7 @@ services:
condition: service_healthy
env_file: stack.env
hostname: knightcrawler-addon
image: gabisonfire/knightcrawler-addon:2.0.19
image: gabisonfire/knightcrawler-addon:2.0.23
labels:
logging: promtail
networks:
@@ -117,7 +117,7 @@ services:
redis:
condition: service_healthy
env_file: stack.env
image: gabisonfire/knightcrawler-consumer:2.0.19
image: gabisonfire/knightcrawler-consumer:2.0.23
labels:
logging: promtail
networks:
@@ -138,7 +138,7 @@ services:
redis:
condition: service_healthy
env_file: stack.env
image: gabisonfire/knightcrawler-debrid-collector:2.0.19
image: gabisonfire/knightcrawler-debrid-collector:2.0.23
labels:
logging: promtail
networks:
@@ -152,7 +152,7 @@ services:
migrator:
condition: service_completed_successfully
env_file: stack.env
image: gabisonfire/knightcrawler-metadata:2.0.19
image: gabisonfire/knightcrawler-metadata:2.0.23
networks:
- knightcrawler-network
restart: "no"
@@ -163,7 +163,7 @@ services:
postgres:
condition: service_healthy
env_file: stack.env
image: gabisonfire/knightcrawler-migrator:2.0.19
image: gabisonfire/knightcrawler-migrator:2.0.23
networks:
- knightcrawler-network
restart: "no"
@@ -182,7 +182,7 @@ services:
redis:
condition: service_healthy
env_file: stack.env
image: gabisonfire/knightcrawler-producer:2.0.19
image: gabisonfire/knightcrawler-producer:2.0.23
labels:
logging: promtail
networks:
@@ -207,7 +207,7 @@ services:
deploy:
replicas: ${QBIT_REPLICAS:-0}
env_file: stack.env
image: gabisonfire/knightcrawler-qbit-collector:2.0.19
image: gabisonfire/knightcrawler-qbit-collector:2.0.23
labels:
logging: promtail
networks:

View File

@@ -20,7 +20,7 @@ x-depends: &knightcrawler-app-depends
services:
metadata:
image: gabisonfire/knightcrawler-metadata:2.0.18
image: gabisonfire/knightcrawler-metadata:2.0.23
env_file: ../../.env
networks:
- knightcrawler-network
@@ -30,7 +30,7 @@ services:
condition: service_completed_successfully
migrator:
image: gabisonfire/knightcrawler-migrator:2.0.18
image: gabisonfire/knightcrawler-migrator:2.0.23
env_file: ../../.env
networks:
- knightcrawler-network
@@ -40,7 +40,7 @@ services:
condition: service_healthy
addon:
image: gabisonfire/knightcrawler-addon:2.0.18
image: gabisonfire/knightcrawler-addon:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped
hostname: knightcrawler-addon
@@ -48,22 +48,22 @@ services:
- "7000:7000"
consumer:
image: gabisonfire/knightcrawler-consumer:2.0.18
image: gabisonfire/knightcrawler-consumer:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped
debridcollector:
image: gabisonfire/knightcrawler-debrid-collector:2.0.18
image: gabisonfire/knightcrawler-debrid-collector:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped
producer:
image: gabisonfire/knightcrawler-producer:2.0.18
image: gabisonfire/knightcrawler-producer:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped
qbitcollector:
image: gabisonfire/knightcrawler-qbit-collector:2.0.18
image: gabisonfire/knightcrawler-qbit-collector:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped
depends_on:

View File

@@ -32,12 +32,10 @@ COLLECTOR_DEBRID_ENABLED=true
COLLECTOR_REAL_DEBRID_API_KEY=
QBIT_HOST=http://qbittorrent:8080
QBIT_TRACKERS_URL=https://raw.githubusercontent.com/ngosang/trackerslist/master/trackers_all_http.txt
QBIT_CONCURRENCY=8
# Number of replicas for the qBittorrent collector and qBitTorrent client. Should be 0 or 1.
QBIT_REPLICAS=0
# Addon
DEBUG_MODE=false
# Producer
GITHUB_PAT=

View File

@@ -1 +1 @@
rank-torrent-name==0.2.11
rank-torrent-name==0.2.13

View File

@@ -0,0 +1,43 @@
-- Drop Duplicate Files in Files Table
DELETE FROM public.files
WHERE id NOT IN (
SELECT MAX(id)
FROM public.files
GROUP BY "infoHash", "fileIndex"
);
-- Add Index to files table
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'files_unique_infohash_fileindex'
) THEN
ALTER TABLE public.files
ADD CONSTRAINT files_unique_infohash_fileindex UNIQUE ("infoHash", "fileIndex");
END IF;
END $$;
-- Drop Duplicate subtitles in Subtitles Table
DELETE FROM public.subtitles
WHERE id NOT IN (
SELECT MAX(id)
FROM public.subtitles
GROUP BY "infoHash", "fileIndex"
);
-- Add Index to subtitles table
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'subtitles_unique_infohash_fileindex'
) THEN
ALTER TABLE public.subtitles
ADD CONSTRAINT subtitles_unique_infohash_fileindex UNIQUE ("infoHash", "fileIndex");
END IF;
END $$;

View File

@@ -4,31 +4,38 @@
{
"Name": "SyncEzTvJob",
"IntervalSeconds": 60,
"Enabled": true
"Enabled": true,
"Url": "https://eztv1.xyz/ezrss.xml",
"XmlNamespace": "http://xmlns.ezrss.it/0.1/"
},
{
"Name": "SyncNyaaJob",
"IntervalSeconds": 60,
"Enabled": true
"Enabled": true,
"Url": "https://nyaa.si/?page=rss&c=1_2&f=0",
"XmlNamespace": "https://nyaa.si/xmlns/nyaa"
},
{
"Name": "SyncTpbJob",
"IntervalSeconds": 60,
"Enabled": true
"Enabled": true,
"Url": "https://apibay.org/precompiled/data_top100_recent.json"
},
{
"Name": "SyncYtsJob",
"IntervalSeconds": 60,
"Enabled": true
"Enabled": true,
"Url": "https://yts.am/rss"
},
{
"Name": "SyncTgxJob",
"IntervalSeconds": 60,
"Enabled": true
"Enabled": true,
"Url": "https://tgx.rs/rss"
},
{
"Name": "SyncDmmJob",
"IntervalSeconds": 1800,
"IntervalSeconds": 10800,
"Enabled": true
},
{

View File

@@ -6,6 +6,12 @@ public abstract class BaseJsonCrawler(IHttpClientFactory httpClientFactory, ILog
protected virtual async Task Execute(string collectionName)
{
if (string.IsNullOrWhiteSpace(Url))
{
logger.LogWarning("No URL provided for {Source} crawl", Source);
return;
}
logger.LogInformation("Starting {Source} crawl", Source);
using var client = httpClientFactory.CreateClient("Scraper");

View File

@@ -4,6 +4,12 @@ public abstract class BaseXmlCrawler(IHttpClientFactory httpClientFactory, ILogg
{
public override async Task Execute()
{
if (string.IsNullOrWhiteSpace(Url))
{
logger.LogWarning("No URL provided for {Source} crawl", Source);
return;
}
logger.LogInformation("Starting {Source} crawl", Source);
using var client = httpClientFactory.CreateClient(Literals.CrawlerClient);

View File

@@ -7,4 +7,8 @@ public class Scraper
public int IntervalSeconds { get; set; } = 60;
public bool Enabled { get; set; } = true;
public string? Url { get; set; }
public string? XmlNamespace { get; set; }
}

View File

@@ -0,0 +1,70 @@
namespace Producer.Features.Crawlers.Dmm;
public class DMMFileDownloader(HttpClient client, ILogger<DMMFileDownloader> logger) : IDMMFileDownloader
{
private const string Filename = "main.zip";
private readonly IReadOnlyCollection<string> _filesToIgnore = [
"index.html",
"404.html",
"dedupe.sh",
"CNAME",
];
public const string ClientName = "DmmFileDownloader";
public async Task<string> DownloadFileToTempPath(CancellationToken cancellationToken)
{
logger.LogInformation("Downloading DMM Hashlists");
var response = await client.GetAsync(Filename, cancellationToken);
var tempDirectory = Path.Combine(Path.GetTempPath(), "DMMHashlists");
EnsureDirectoryIsClean(tempDirectory);
response.EnsureSuccessStatusCode();
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
using var archive = new ZipArchive(stream);
logger.LogInformation("Extracting DMM Hashlists to {TempDirectory}", tempDirectory);
foreach (var entry in archive.Entries)
{
var entryPath = Path.Combine(tempDirectory, Path.GetFileName(entry.FullName));
if (!entry.FullName.EndsWith('/')) // It's a file
{
entry.ExtractToFile(entryPath, true);
}
}
foreach (var file in _filesToIgnore)
{
CleanRepoExtras(tempDirectory, file);
}
logger.LogInformation("Downloaded and extracted Repository to {TempDirectory}", tempDirectory);
return tempDirectory;
}
private static void CleanRepoExtras(string tempDirectory, string fileName)
{
var repoIndex = Path.Combine(tempDirectory, fileName);
if (File.Exists(repoIndex))
{
File.Delete(repoIndex);
}
}
private static void EnsureDirectoryIsClean(string tempDirectory)
{
if (Directory.Exists(tempDirectory))
{
Directory.Delete(tempDirectory, true);
}
Directory.CreateDirectory(tempDirectory);
}
}

View File

@@ -0,0 +1,6 @@
namespace Producer.Features.Crawlers.Dmm;
public class DMMHttpClient
{
}

View File

@@ -1,64 +1,99 @@
namespace Producer.Features.Crawlers.Dmm;
public partial class DebridMediaManagerCrawler(
IHttpClientFactory httpClientFactory,
IDMMFileDownloader dmmFileDownloader,
ILogger<DebridMediaManagerCrawler> logger,
IDataStorage storage,
GithubConfiguration githubConfiguration,
IRankTorrentName rankTorrentName,
IDistributedCache cache) : BaseCrawler(logger, storage)
{
[GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")]
private static partial Regex HashCollectionMatcher();
private const string DownloadBaseUrl = "https://raw.githubusercontent.com/debridmediamanager/hashlists/main";
protected override string Url => "";
protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>();
protected override string Url => "https://api.github.com/repos/debridmediamanager/hashlists/git/trees/main?recursive=1";
protected override string Source => "DMM";
private const int ParallelismCount = 4;
public override async Task Execute()
{
var client = httpClientFactory.CreateClient("Scraper");
client.DefaultRequestHeaders.Authorization = new("Bearer", githubConfiguration.PAT);
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
var tempDirectory = await dmmFileDownloader.DownloadFileToTempPath(CancellationToken.None);
var jsonBody = await client.GetStringAsync(Url);
var files = Directory.GetFiles(tempDirectory, "*.html", SearchOption.AllDirectories);
var json = JsonDocument.Parse(jsonBody);
logger.LogInformation("Found {Files} files to parse", files.Length);
var entriesArray = json.RootElement.GetProperty("tree");
logger.LogInformation("Found {Entries} total DMM pages", entriesArray.GetArrayLength());
foreach (var entry in entriesArray.EnumerateArray())
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
await Parallel.ForEachAsync(files, options, async (file, token) =>
{
await ParsePage(entry, client);
}
var fileName = Path.GetFileName(file);
var torrentDictionary = await ExtractPageContents(file, fileName);
if (torrentDictionary == null)
{
return;
}
await ParseTitlesWithRtn(fileName, torrentDictionary);
var results = await ParseTorrents(torrentDictionary);
if (results.Count <= 0)
{
return;
}
await InsertTorrents(results);
await Storage.MarkPageAsIngested(fileName, token);
});
}
private async Task ParsePage(JsonElement entry, HttpClient client)
private async Task ParseTitlesWithRtn(string fileName, IDictionary<string, DmmContent> page)
{
var (pageIngested, name) = await IsAlreadyIngested(entry);
logger.LogInformation("Parsing titles for {Page}", fileName);
if (string.IsNullOrEmpty(name) || pageIngested)
var batchProcessables = page.Select(value => new RtnBatchProcessable(value.Key, value.Value.Filename)).ToList();
var parsedResponses = rankTorrentName.BatchParse(
batchProcessables.Select<RtnBatchProcessable, string>(bp => bp.Filename).ToList(), trashGarbage: false);
// Filter out unsuccessful responses and match RawTitle to requesting title
var successfulResponses = parsedResponses
.Where(response => response != null && response.Success)
.GroupBy(response => response.Response.RawTitle!)
.ToDictionary(group => group.Key, group => group.First());
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options, (infoHash, _) =>
{
return;
}
var pageSource = await client.GetStringAsync($"{DownloadBaseUrl}/{name}");
await ExtractPageContents(pageSource, name);
if (page.TryGetValue(infoHash, out var dmmContent) &&
successfulResponses.TryGetValue(dmmContent.Filename, out var parsedResponse))
{
page[infoHash] = dmmContent with {ParseResponse = parsedResponse};
}
return ValueTask.CompletedTask;
});
}
private async Task ExtractPageContents(string pageSource, string name)
private async Task<ConcurrentDictionary<string, DmmContent>?> ExtractPageContents(string filePath, string filenameOnly)
{
var (pageIngested, name) = await IsAlreadyIngested(filenameOnly);
if (pageIngested)
{
return [];
}
var pageSource = await File.ReadAllTextAsync(filePath);
var match = HashCollectionMatcher().Match(pageSource);
if (!match.Success)
{
logger.LogWarning("Failed to match hash collection for {Name}", name);
await Storage.MarkPageAsIngested(name);
return;
await Storage.MarkPageAsIngested(filenameOnly);
return [];
}
var encodedJson = match.Groups.Values.ElementAtOrDefault(1);
@@ -66,90 +101,92 @@ public partial class DebridMediaManagerCrawler(
if (string.IsNullOrEmpty(encodedJson?.Value))
{
logger.LogWarning("Failed to extract encoded json for {Name}", name);
return;
return [];
}
await ProcessExtractedContentsAsTorrentCollection(encodedJson.Value, name);
}
private async Task ProcessExtractedContentsAsTorrentCollection(string encodedJson, string name)
{
var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson);
var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson.Value);
var json = JsonDocument.Parse(decodedJson);
var torrents = await json.RootElement.EnumerateArray()
.ToAsyncEnumerable()
.Select(ParsePageContent)
.Where(t => t is not null)
.ToListAsync();
await InsertTorrentsForPage(json);
var result = await Storage.MarkPageAsIngested(name);
if (!result.IsSuccess)
if (torrents.Count == 0)
{
logger.LogWarning("Failed to mark page as ingested: [{Error}]", result.Failure.ErrorMessage);
return;
logger.LogWarning("No torrents found in {Name}", name);
await Storage.MarkPageAsIngested(filenameOnly);
return [];
}
var torrentDictionary = torrents
.Where(x => x is not null)
.GroupBy(x => x.InfoHash)
.ToConcurrentDictionary(g => g.Key, g => new DmmContent(g.First().Filename, g.First().Bytes, null));
logger.LogInformation("Successfully marked page as ingested");
logger.LogInformation("Parsed {Torrents} torrents for {Name}", torrentDictionary.Count, name);
return torrentDictionary;
}
private async Task<IngestedTorrent?> ParseTorrent(JsonElement item)
private async Task<List<IngestedTorrent>> ParseTorrents(IDictionary<string, DmmContent> page)
{
var ingestedTorrents = new List<IngestedTorrent>();
if (!item.TryGetProperty("filename", out var filenameElement) ||
!item.TryGetProperty("bytes", out var bytesElement) ||
!item.TryGetProperty("hash", out var hashElement))
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
await Parallel.ForEachAsync(page, options, async (kvp, ct) =>
{
return null;
}
var (infoHash, dmmContent) = kvp;
var parsedTorrent = dmmContent.ParseResponse;
if (parsedTorrent is not {Success: true})
{
return;
}
var torrentTitle = filenameElement.GetString();
var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries";
var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year);
var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey);
if (torrentTitle.IsNullOrEmpty())
{
return null;
}
var parsedTorrent = rankTorrentName.Parse(torrentTitle);
if (!parsedTorrent.Success)
{
return null;
}
var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries";
var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year);
var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey);
if (cached)
{
logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle);
return MapToTorrent(cachedResult, bytesElement, hashElement, parsedTorrent);
}
if (cached)
{
logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle);
lock (ingestedTorrents)
{
ingestedTorrents.Add(MapToTorrent(cachedResult, dmmContent.Bytes, infoHash, parsedTorrent));
}
return;
}
int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null;
var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year);
int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null;
var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year, ct);
if (imdbEntry is null)
{
return null;
}
await AddToCache(cacheKey, imdbEntry);
logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score);
if (imdbEntry is null)
{
return;
}
return MapToTorrent(imdbEntry, bytesElement, hashElement, parsedTorrent);
await AddToCache(cacheKey, imdbEntry);
logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score);
lock (ingestedTorrents)
{
ingestedTorrents.Add(MapToTorrent(imdbEntry, dmmContent.Bytes, infoHash, parsedTorrent));
}
});
return ingestedTorrents;
}
private IngestedTorrent MapToTorrent(ImdbEntry result, JsonElement bytesElement, JsonElement hashElement, ParseTorrentTitleResponse parsedTorrent) =>
private IngestedTorrent MapToTorrent(ImdbEntry result, long size, string infoHash, ParseTorrentTitleResponse parsedTorrent) =>
new()
{
Source = Source,
Name = result.Title,
Imdb = result.ImdbId,
Size = bytesElement.GetInt64().ToString(),
InfoHash = hashElement.ToString(),
Size = size.ToString(),
InfoHash = infoHash,
Seeders = 0,
Leechers = 0,
Category = AssignCategory(result),
@@ -179,35 +216,11 @@ public partial class DebridMediaManagerCrawler(
return (false, null);
}
private async Task InsertTorrentsForPage(JsonDocument json)
private async Task<(bool Success, string? Name)> IsAlreadyIngested(string filename)
{
var torrents = await json.RootElement.EnumerateArray()
.ToAsyncEnumerable()
.SelectAwait(async x => await ParseTorrent(x))
.Where(t => t is not null)
.ToListAsync();
var pageIngested = await Storage.PageIngested(filename);
if (torrents.Count == 0)
{
logger.LogWarning("No torrents found in {Source} response", Source);
return;
}
await InsertTorrents(torrents!);
}
private async Task<(bool Success, string? Name)> IsAlreadyIngested(JsonElement entry)
{
var name = entry.GetProperty("path").GetString();
if (string.IsNullOrEmpty(name))
{
return (false, null);
}
var pageIngested = await Storage.PageIngested(name);
return (pageIngested, name);
return (pageIngested, filename);
}
private static string AssignCategory(ImdbEntry entry) =>
@@ -219,4 +232,20 @@ public partial class DebridMediaManagerCrawler(
};
private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}";
private static ExtractedDMMContent? ParsePageContent(JsonElement item)
{
if (!item.TryGetProperty("filename", out var filenameElement) ||
!item.TryGetProperty("bytes", out var bytesElement) ||
!item.TryGetProperty("hash", out var hashElement))
{
return null;
}
return new(filenameElement.GetString(), bytesElement.GetInt64(), hashElement.GetString());
}
private record DmmContent(string Filename, long Bytes, ParseTorrentTitleResponse? ParseResponse);
private record ExtractedDMMContent(string Filename, long Bytes, string InfoHash);
private record RtnBatchProcessable(string InfoHash, string Filename);
}

View File

@@ -1,9 +0,0 @@
namespace Producer.Features.Crawlers.Dmm;
public class GithubConfiguration
{
private const string Prefix = "GITHUB";
private const string PatVariable = "PAT";
public string? PAT { get; init; } = Prefix.GetOptionalEnvironmentVariableAsString(PatVariable);
}

View File

@@ -0,0 +1,6 @@
namespace Producer.Features.Crawlers.Dmm;
public interface IDMMFileDownloader
{
Task<string> DownloadFileToTempPath(CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,16 @@
namespace Producer.Features.Crawlers.Dmm;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddDmmSupport(this IServiceCollection services)
{
services.AddHttpClient<IDMMFileDownloader, DMMFileDownloader>(DMMFileDownloader.ClientName, client =>
{
client.BaseAddress = new("https://github.com/debridmediamanager/hashlists/zipball/main/");
client.DefaultRequestHeaders.Add("Accept-Encoding", "gzip");
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
});
return services;
}
}

View File

@@ -1,11 +1,10 @@
namespace Producer.Features.Crawlers.EzTv;
public class EzTvCrawler(IHttpClientFactory httpClientFactory, ILogger<EzTvCrawler> logger, IDataStorage storage) : BaseXmlCrawler(httpClientFactory, logger, storage)
public class EzTvCrawler(IHttpClientFactory httpClientFactory, ILogger<EzTvCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseXmlCrawler(httpClientFactory, logger, storage)
{
protected override string Url => "https://eztv1.xyz/ezrss.xml";
protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncEzTvJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "EZTV";
private static readonly XNamespace XmlNamespace = "http://xmlns.ezrss.it/0.1/";
private XNamespace XmlNamespace => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncEzTvJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override IReadOnlyDictionary<string, string> Mappings =>
new Dictionary<string, string>

View File

@@ -1,11 +1,10 @@
namespace Producer.Features.Crawlers.Nyaa;
public class NyaaCrawler(IHttpClientFactory httpClientFactory, ILogger<NyaaCrawler> logger, IDataStorage storage) : BaseXmlCrawler(httpClientFactory, logger, storage)
public class NyaaCrawler(IHttpClientFactory httpClientFactory, ILogger<NyaaCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseXmlCrawler(httpClientFactory, logger, storage)
{
protected override string Url => "https://nyaa.si/?page=rss&c=1_2&f=0";
protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncNyaaJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "Nyaa";
private static readonly XNamespace XmlNamespace = "https://nyaa.si/xmlns/nyaa";
private XNamespace XmlNamespace => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncNyaaJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override IReadOnlyDictionary<string, string> Mappings =>
new Dictionary<string, string>

View File

@@ -1,13 +1,13 @@
namespace Producer.Features.Crawlers.Tgx;
public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<TgxCrawler> logger, IDataStorage storage) : BaseXmlCrawler(httpClientFactory, logger, storage)
public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<TgxCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseXmlCrawler(httpClientFactory, logger, storage)
{
[GeneratedRegex(@"Size:\s+(.+?)\s+Added")]
private static partial Regex SizeStringExtractor();
[GeneratedRegex(@"(?i)\b(\d+(\.\d+)?)\s*([KMGT]?B)\b", RegexOptions.None, "en-GB")]
private static partial Regex SizeStringParser();
protected override string Url => "https://tgx.rs/rss";
protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncTgxJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "TorrentGalaxy";
protected override IReadOnlyDictionary<string, string> Mappings

View File

@@ -1,8 +1,8 @@
namespace Producer.Features.Crawlers.Tpb;
public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler> logger, IDataStorage storage) : BaseJsonCrawler(httpClientFactory, logger, storage)
public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseJsonCrawler(httpClientFactory, logger, storage)
{
protected override string Url => "https://apibay.org/precompiled/data_top100_recent.json";
protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncTpbJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "TPB";

View File

@@ -1,9 +1,8 @@
namespace Producer.Features.Crawlers.Yts;
public class YtsCrawler(IHttpClientFactory httpClientFactory, ILogger<YtsCrawler> logger, IDataStorage storage) : BaseXmlCrawler(httpClientFactory, logger, storage)
public class YtsCrawler(IHttpClientFactory httpClientFactory, ILogger<YtsCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseXmlCrawler(httpClientFactory, logger, storage)
{
protected override string Url => "https://yts.am/rss";
protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncYtsJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "YTS";
protected override IReadOnlyDictionary<string, string> Mappings
=> new Dictionary<string, string>

View File

@@ -5,7 +5,6 @@ internal static class ServiceCollectionExtensions
internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration)
{
var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName);
var githubConfiguration = services.LoadConfigurationFromEnv<GithubConfiguration>();
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
var jobTypes = Assembly.GetAssembly(typeof(BaseJob))
@@ -19,18 +18,13 @@ internal static class ServiceCollectionExtensions
services.AddTransient(type);
}
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
{
services.AddTransient<SyncDmmJob>();
}
var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance);
services.AddQuartz(
quartz =>
{
RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration);
RegisterDmmJob(githubConfiguration, quartz, scrapeConfiguration);
RegisterDmmJob(quartz, scrapeConfiguration);
RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration);
RegisterPublisher(quartz, rabbitConfiguration);
});
@@ -64,13 +58,8 @@ internal static class ServiceCollectionExtensions
}
}
private static void RegisterDmmJob(GithubConfiguration githubConfiguration, IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration)
{
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
{
AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
}
}
private static void RegisterDmmJob(IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) =>
AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
private static void RegisterTorrentioJob(
IServiceCollection services,

View File

@@ -1,12 +1,12 @@
// Global using directives
global using System.Collections.Concurrent;
global using System.IO.Compression;
global using System.Reflection;
global using System.Text;
global using System.Text.Json;
global using System.Text.RegularExpressions;
global using System.Xml.Linq;
global using FuzzySharp;
global using FuzzySharp.Extractor;
global using FuzzySharp.PreProcess;
global using FuzzySharp.SimilarityRatio.Scorer;
global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive;

View File

@@ -12,7 +12,8 @@ builder.Services
.RegisterMassTransit()
.AddDataStorage()
.AddCrawlers()
.AddDmmSupport()
.AddQuartz(builder.Configuration);
var app = builder.Build();
app.Run();
app.Run();

View File

@@ -1 +1 @@
rank-torrent-name==0.2.11
rank-torrent-name==0.2.13

View File

@@ -44,6 +44,7 @@ public static class ServiceCollectionExtensions
{
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
var redisConfiguration = services.LoadConfigurationFromEnv<RedisConfiguration>();
var qbitConfiguration = services.LoadConfigurationFromEnv<QbitConfiguration>();
services.AddStackExchangeRedisCache(
option =>
@@ -80,8 +81,8 @@ public static class ServiceCollectionExtensions
e.ConfigureConsumer<WriteQbitMetadataConsumer>(context);
e.ConfigureConsumer<PerformQbitMetadataRequestConsumer>(context);
e.ConfigureSaga<QbitMetadataSagaState>(context);
e.ConcurrentMessageLimit = 5;
e.PrefetchCount = 5;
e.ConcurrentMessageLimit = qbitConfiguration.Concurrency;
e.PrefetchCount = qbitConfiguration.Concurrency;
});
});
});
@@ -98,7 +99,7 @@ public static class ServiceCollectionExtensions
cfg.UseTimeout(
timeout =>
{
timeout.Timeout = TimeSpan.FromMinutes(1);
timeout.Timeout = TimeSpan.FromMinutes(3);
});
})
.RedisRepository(redisConfiguration.ConnectionString, options =>
@@ -110,7 +111,7 @@ public static class ServiceCollectionExtensions
{
var qbitConfiguration = services.LoadConfigurationFromEnv<QbitConfiguration>();
var client = new QBittorrentClient(new(qbitConfiguration.Host));
client.Timeout = TimeSpan.FromSeconds(10);
client.Timeout = TimeSpan.FromSeconds(20);
services.AddSingleton<IQBittorrentClient>(client);
}

View File

@@ -1,6 +1,6 @@
namespace QBitCollector.Features.Qbit;
public class QbitRequestProcessor(IQBittorrentClient client, ITrackersService trackersService, ILogger<QbitRequestProcessor> logger)
public class QbitRequestProcessor(IQBittorrentClient client, ITrackersService trackersService, ILogger<QbitRequestProcessor> logger, QbitConfiguration configuration)
{
public async Task<IReadOnlyList<TorrentContent>?> ProcessAsync(string infoHash, CancellationToken cancellationToken = default)
{
@@ -14,7 +14,7 @@ public class QbitRequestProcessor(IQBittorrentClient client, ITrackersService tr
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30));
timeoutCts.CancelAfter(TimeSpan.FromSeconds(60));
try
{
@@ -30,7 +30,7 @@ public class QbitRequestProcessor(IQBittorrentClient client, ITrackersService tr
break;
}
await Task.Delay(TimeSpan.FromSeconds(1), timeoutCts.Token);
await Task.Delay(TimeSpan.FromMilliseconds(200), timeoutCts.Token);
}
}
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested)

View File

@@ -5,7 +5,10 @@ public class QbitConfiguration
private const string Prefix = "QBIT";
private const string HOST_VARIABLE = "HOST";
private const string TRACKERS_URL_VARIABLE = "TRACKERS_URL";
private const string CONCURRENCY_VARIABLE = "CONCURRENCY";
public string? Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HOST_VARIABLE);
public string? TrackersUrl { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(TRACKERS_URL_VARIABLE);
public int Concurrency { get; init; } = Prefix.GetEnvironmentVariableAsInt(CONCURRENCY_VARIABLE, 8);
}

View File

@@ -5,6 +5,12 @@ public class WriteQbitMetadataConsumer(IRankTorrentName rankTorrentName, IDataSt
public async Task Consume(ConsumeContext<WriteQbitMetadata> context)
{
var request = context.Message;
if (request.Metadata.Metadata.Count == 0)
{
await context.Publish(new QbitMetadataWritten(request.Metadata, false));
return;
}
var torrentFiles = QbitMetaToTorrentMeta.MapMetadataToFilesCollection(
rankTorrentName, request.Torrent, request.ImdbId, request.Metadata.Metadata, logger);

View File

@@ -1 +1 @@
rank-torrent-name==0.2.11
rank-torrent-name==0.2.13

View File

@@ -152,7 +152,8 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
INSERT INTO files
("infoHash", "fileIndex", title, "size", "imdbId", "imdbSeason", "imdbEpisode", "kitsuId", "kitsuEpisode", "createdAt", "updatedAt")
VALUES
(@InfoHash, @FileIndex, @Title, @Size, @ImdbId, @ImdbSeason, @ImdbEpisode, @KitsuId, @KitsuEpisode, Now(), Now());
(@InfoHash, @FileIndex, @Title, @Size, @ImdbId, @ImdbSeason, @ImdbEpisode, @KitsuId, @KitsuEpisode, Now(), Now())
ON CONFLICT ("infoHash", "fileIndex") DO NOTHING;
""";
await connection.ExecuteAsync(query, files);
@@ -167,7 +168,8 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
INSERT INTO subtitles
("infoHash", "fileIndex", "fileId", "title")
VALUES
(@InfoHash, @FileIndex, @FileId, @Title);
(@InfoHash, @FileIndex, @FileId, @Title)
ON CONFLICT ("infoHash", "fileIndex") DO NOTHING;
""";
await connection.ExecuteAsync(query, subtitles);

View File

@@ -0,0 +1,19 @@
namespace SharedContracts.Extensions;
public static class DictionaryExtensions
{
public static ConcurrentDictionary<TKey, TValue> ToConcurrentDictionary<TSource, TKey, TValue>(
this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
Func<TSource, TValue> valueSelector) where TKey : notnull
{
var concurrentDictionary = new ConcurrentDictionary<TKey, TValue>();
foreach (var element in source)
{
concurrentDictionary.TryAdd(keySelector(element), valueSelector(element));
}
return concurrentDictionary;
}
}

View File

@@ -1,5 +1,6 @@
// Global using directives
global using System.Collections.Concurrent;
global using System.Text.Json;
global using System.Text.Json.Serialization;
global using System.Text.RegularExpressions;

View File

@@ -2,5 +2,6 @@ namespace SharedContracts.Python.RTN;
public interface IRankTorrentName
{
ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true);
ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
}

View File

@@ -12,41 +12,102 @@ public class RankTorrentName : IRankTorrentName
_pythonEngineService = pythonEngineService;
InitModules();
}
public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true) =>
_pythonEngineService.ExecutePythonOperationWithDefault(
() =>
{
var result = _rtn?.parse(title, trashGarbage);
return ParseResult(result);
}, new ParseTorrentTitleResponse(false, null), nameof(Parse), throwOnErrors: false, logErrors: false);
private static ParseTorrentTitleResponse ParseResult(dynamic result)
public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
{
if (result == null)
try
{
using var gil = Py.GIL();
var result = _rtn?.parse(title, trashGarbage);
return ParseResult(result);
}
catch (Exception ex)
{
if (logErrors)
{
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
}
if (throwOnErrors)
{
throw;
}
return new(false, null);
}
}
public List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
{
var responses = new List<ParseTorrentTitleResponse?>();
try
{
if (titles.Count == 0)
{
return responses;
}
using var gil = Py.GIL();
var pythonList = new PyList(titles.Select(x => new PyString(x).As<PyObject>()).ToArray());
PyList results = _rtn?.batch_parse(pythonList, trashGarbage, chunkSize, workers);
if (results == null)
{
return responses;
}
responses.AddRange(results.Select(ParseResult));
}
catch (Exception ex)
{
if (logErrors)
{
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
}
if (throwOnErrors)
{
throw;
}
}
return responses;
}
private static ParseTorrentTitleResponse? ParseResult(dynamic result)
{
try
{
if (result == null)
{
return new(false, null);
}
var json = result.model_dump_json()?.As<string?>();
if (json is null || string.IsNullOrEmpty(json))
{
return new(false, null);
}
var mediaType = result.GetAttr("type")?.As<string>();
if (string.IsNullOrEmpty(mediaType))
{
return new(false, null);
}
var response = JsonSerializer.Deserialize<RtnResponse>(json);
response.IsMovie = mediaType.Equals("movie", StringComparison.OrdinalIgnoreCase);
return new(true, response);
}
catch
{
return new(false, null);
}
var json = result.model_dump_json()?.As<string?>();
if (json is null || string.IsNullOrEmpty(json))
{
return new(false, null);
}
var mediaType = result.GetAttr("type")?.As<string>();
if (string.IsNullOrEmpty(mediaType))
{
return new(false, null);
}
var response = JsonSerializer.Deserialize<RtnResponse>(json);
response.IsMovie = mediaType.Equals("movie", StringComparison.OrdinalIgnoreCase);
return new(true, response);
}
private void InitModules() =>