DMM Offline (#198)
* Process DMM all locally single call to github to download the repo archive. remove need for PAT update RTN to 0.2.13 change to batch_parse for title parsing from RTN * introduce concurrent dictionary, and parallelism
This commit is contained in:
@@ -94,7 +94,7 @@ services:
|
|||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
env_file: stack.env
|
env_file: stack.env
|
||||||
hostname: knightcrawler-addon
|
hostname: knightcrawler-addon
|
||||||
image: gabisonfire/knightcrawler-addon:2.0.19
|
image: gabisonfire/knightcrawler-addon:2.0.20
|
||||||
labels:
|
labels:
|
||||||
logging: promtail
|
logging: promtail
|
||||||
networks:
|
networks:
|
||||||
@@ -117,7 +117,7 @@ services:
|
|||||||
redis:
|
redis:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
env_file: stack.env
|
env_file: stack.env
|
||||||
image: gabisonfire/knightcrawler-consumer:2.0.19
|
image: gabisonfire/knightcrawler-consumer:2.0.20
|
||||||
labels:
|
labels:
|
||||||
logging: promtail
|
logging: promtail
|
||||||
networks:
|
networks:
|
||||||
@@ -138,7 +138,7 @@ services:
|
|||||||
redis:
|
redis:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
env_file: stack.env
|
env_file: stack.env
|
||||||
image: gabisonfire/knightcrawler-debrid-collector:2.0.19
|
image: gabisonfire/knightcrawler-debrid-collector:2.0.20
|
||||||
labels:
|
labels:
|
||||||
logging: promtail
|
logging: promtail
|
||||||
networks:
|
networks:
|
||||||
@@ -152,7 +152,7 @@ services:
|
|||||||
migrator:
|
migrator:
|
||||||
condition: service_completed_successfully
|
condition: service_completed_successfully
|
||||||
env_file: stack.env
|
env_file: stack.env
|
||||||
image: gabisonfire/knightcrawler-metadata:2.0.19
|
image: gabisonfire/knightcrawler-metadata:2.0.20
|
||||||
networks:
|
networks:
|
||||||
- knightcrawler-network
|
- knightcrawler-network
|
||||||
restart: "no"
|
restart: "no"
|
||||||
@@ -163,7 +163,7 @@ services:
|
|||||||
postgres:
|
postgres:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
env_file: stack.env
|
env_file: stack.env
|
||||||
image: gabisonfire/knightcrawler-migrator:2.0.19
|
image: gabisonfire/knightcrawler-migrator:2.0.20
|
||||||
networks:
|
networks:
|
||||||
- knightcrawler-network
|
- knightcrawler-network
|
||||||
restart: "no"
|
restart: "no"
|
||||||
@@ -182,7 +182,7 @@ services:
|
|||||||
redis:
|
redis:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
env_file: stack.env
|
env_file: stack.env
|
||||||
image: gabisonfire/knightcrawler-producer:2.0.19
|
image: gabisonfire/knightcrawler-producer:2.0.20
|
||||||
labels:
|
labels:
|
||||||
logging: promtail
|
logging: promtail
|
||||||
networks:
|
networks:
|
||||||
@@ -207,7 +207,7 @@ services:
|
|||||||
deploy:
|
deploy:
|
||||||
replicas: ${QBIT_REPLICAS:-0}
|
replicas: ${QBIT_REPLICAS:-0}
|
||||||
env_file: stack.env
|
env_file: stack.env
|
||||||
image: gabisonfire/knightcrawler-qbit-collector:2.0.19
|
image: gabisonfire/knightcrawler-qbit-collector:2.0.20
|
||||||
labels:
|
labels:
|
||||||
logging: promtail
|
logging: promtail
|
||||||
networks:
|
networks:
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ x-depends: &knightcrawler-app-depends
|
|||||||
|
|
||||||
services:
|
services:
|
||||||
metadata:
|
metadata:
|
||||||
image: gabisonfire/knightcrawler-metadata:2.0.18
|
image: gabisonfire/knightcrawler-metadata:2.0.20
|
||||||
env_file: ../../.env
|
env_file: ../../.env
|
||||||
networks:
|
networks:
|
||||||
- knightcrawler-network
|
- knightcrawler-network
|
||||||
@@ -30,7 +30,7 @@ services:
|
|||||||
condition: service_completed_successfully
|
condition: service_completed_successfully
|
||||||
|
|
||||||
migrator:
|
migrator:
|
||||||
image: gabisonfire/knightcrawler-migrator:2.0.18
|
image: gabisonfire/knightcrawler-migrator:2.0.20
|
||||||
env_file: ../../.env
|
env_file: ../../.env
|
||||||
networks:
|
networks:
|
||||||
- knightcrawler-network
|
- knightcrawler-network
|
||||||
@@ -40,7 +40,7 @@ services:
|
|||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
|
|
||||||
addon:
|
addon:
|
||||||
image: gabisonfire/knightcrawler-addon:2.0.18
|
image: gabisonfire/knightcrawler-addon:2.0.20
|
||||||
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
hostname: knightcrawler-addon
|
hostname: knightcrawler-addon
|
||||||
@@ -48,22 +48,22 @@ services:
|
|||||||
- "7000:7000"
|
- "7000:7000"
|
||||||
|
|
||||||
consumer:
|
consumer:
|
||||||
image: gabisonfire/knightcrawler-consumer:2.0.18
|
image: gabisonfire/knightcrawler-consumer:2.0.20
|
||||||
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|
||||||
debridcollector:
|
debridcollector:
|
||||||
image: gabisonfire/knightcrawler-debrid-collector:2.0.18
|
image: gabisonfire/knightcrawler-debrid-collector:2.0.20
|
||||||
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|
||||||
producer:
|
producer:
|
||||||
image: gabisonfire/knightcrawler-producer:2.0.18
|
image: gabisonfire/knightcrawler-producer:2.0.20
|
||||||
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|
||||||
qbitcollector:
|
qbitcollector:
|
||||||
image: gabisonfire/knightcrawler-qbit-collector:2.0.18
|
image: gabisonfire/knightcrawler-qbit-collector:2.0.20
|
||||||
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
<<: [*knightcrawler-app, *knightcrawler-app-depends]
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
depends_on:
|
depends_on:
|
||||||
|
|||||||
@@ -38,6 +38,3 @@ QBIT_REPLICAS=0
|
|||||||
|
|
||||||
# Addon
|
# Addon
|
||||||
DEBUG_MODE=false
|
DEBUG_MODE=false
|
||||||
|
|
||||||
# Producer
|
|
||||||
GITHUB_PAT=
|
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
rank-torrent-name==0.2.11
|
rank-torrent-name==0.2.13
|
||||||
@@ -28,7 +28,7 @@
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
"Name": "SyncDmmJob",
|
"Name": "SyncDmmJob",
|
||||||
"IntervalSeconds": 1800,
|
"IntervalSeconds": 10800,
|
||||||
"Enabled": true
|
"Enabled": true
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
70
src/producer/src/Features/Crawlers/Dmm/DMMFileDownloader.cs
Normal file
70
src/producer/src/Features/Crawlers/Dmm/DMMFileDownloader.cs
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
namespace Producer.Features.Crawlers.Dmm;
|
||||||
|
|
||||||
|
public class DMMFileDownloader(HttpClient client, ILogger<DMMFileDownloader> logger) : IDMMFileDownloader
|
||||||
|
{
|
||||||
|
private const string Filename = "main.zip";
|
||||||
|
private readonly IReadOnlyCollection<string> _filesToIgnore = [
|
||||||
|
"index.html",
|
||||||
|
"404.html",
|
||||||
|
"dedupe.sh",
|
||||||
|
"CNAME",
|
||||||
|
];
|
||||||
|
|
||||||
|
public const string ClientName = "DmmFileDownloader";
|
||||||
|
|
||||||
|
public async Task<string> DownloadFileToTempPath(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
logger.LogInformation("Downloading DMM Hashlists");
|
||||||
|
|
||||||
|
var response = await client.GetAsync(Filename, cancellationToken);
|
||||||
|
|
||||||
|
var tempDirectory = Path.Combine(Path.GetTempPath(), "DMMHashlists");
|
||||||
|
|
||||||
|
EnsureDirectoryIsClean(tempDirectory);
|
||||||
|
|
||||||
|
response.EnsureSuccessStatusCode();
|
||||||
|
|
||||||
|
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
|
||||||
|
using var archive = new ZipArchive(stream);
|
||||||
|
|
||||||
|
logger.LogInformation("Extracting DMM Hashlists to {TempDirectory}", tempDirectory);
|
||||||
|
|
||||||
|
foreach (var entry in archive.Entries)
|
||||||
|
{
|
||||||
|
var entryPath = Path.Combine(tempDirectory, Path.GetFileName(entry.FullName));
|
||||||
|
if (!entry.FullName.EndsWith('/')) // It's a file
|
||||||
|
{
|
||||||
|
entry.ExtractToFile(entryPath, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var file in _filesToIgnore)
|
||||||
|
{
|
||||||
|
CleanRepoExtras(tempDirectory, file);
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.LogInformation("Downloaded and extracted Repository to {TempDirectory}", tempDirectory);
|
||||||
|
|
||||||
|
return tempDirectory;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void CleanRepoExtras(string tempDirectory, string fileName)
|
||||||
|
{
|
||||||
|
var repoIndex = Path.Combine(tempDirectory, fileName);
|
||||||
|
|
||||||
|
if (File.Exists(repoIndex))
|
||||||
|
{
|
||||||
|
File.Delete(repoIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void EnsureDirectoryIsClean(string tempDirectory)
|
||||||
|
{
|
||||||
|
if (Directory.Exists(tempDirectory))
|
||||||
|
{
|
||||||
|
Directory.Delete(tempDirectory, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
Directory.CreateDirectory(tempDirectory);
|
||||||
|
}
|
||||||
|
}
|
||||||
6
src/producer/src/Features/Crawlers/Dmm/DMMHttpClient.cs
Normal file
6
src/producer/src/Features/Crawlers/Dmm/DMMHttpClient.cs
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
namespace Producer.Features.Crawlers.Dmm;
|
||||||
|
|
||||||
|
public class DMMHttpClient
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
@@ -1,64 +1,99 @@
|
|||||||
namespace Producer.Features.Crawlers.Dmm;
|
namespace Producer.Features.Crawlers.Dmm;
|
||||||
|
|
||||||
public partial class DebridMediaManagerCrawler(
|
public partial class DebridMediaManagerCrawler(
|
||||||
IHttpClientFactory httpClientFactory,
|
IDMMFileDownloader dmmFileDownloader,
|
||||||
ILogger<DebridMediaManagerCrawler> logger,
|
ILogger<DebridMediaManagerCrawler> logger,
|
||||||
IDataStorage storage,
|
IDataStorage storage,
|
||||||
GithubConfiguration githubConfiguration,
|
|
||||||
IRankTorrentName rankTorrentName,
|
IRankTorrentName rankTorrentName,
|
||||||
IDistributedCache cache) : BaseCrawler(logger, storage)
|
IDistributedCache cache) : BaseCrawler(logger, storage)
|
||||||
{
|
{
|
||||||
[GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")]
|
[GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")]
|
||||||
private static partial Regex HashCollectionMatcher();
|
private static partial Regex HashCollectionMatcher();
|
||||||
|
protected override string Url => "";
|
||||||
private const string DownloadBaseUrl = "https://raw.githubusercontent.com/debridmediamanager/hashlists/main";
|
|
||||||
protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>();
|
protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>();
|
||||||
protected override string Url => "https://api.github.com/repos/debridmediamanager/hashlists/git/trees/main?recursive=1";
|
|
||||||
protected override string Source => "DMM";
|
protected override string Source => "DMM";
|
||||||
|
|
||||||
|
private const int ParallelismCount = 4;
|
||||||
|
|
||||||
public override async Task Execute()
|
public override async Task Execute()
|
||||||
{
|
{
|
||||||
var client = httpClientFactory.CreateClient("Scraper");
|
var tempDirectory = await dmmFileDownloader.DownloadFileToTempPath(CancellationToken.None);
|
||||||
client.DefaultRequestHeaders.Authorization = new("Bearer", githubConfiguration.PAT);
|
|
||||||
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
|
|
||||||
|
|
||||||
var jsonBody = await client.GetStringAsync(Url);
|
var files = Directory.GetFiles(tempDirectory, "*.html", SearchOption.AllDirectories);
|
||||||
|
|
||||||
var json = JsonDocument.Parse(jsonBody);
|
logger.LogInformation("Found {Files} files to parse", files.Length);
|
||||||
|
|
||||||
var entriesArray = json.RootElement.GetProperty("tree");
|
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
|
||||||
|
|
||||||
logger.LogInformation("Found {Entries} total DMM pages", entriesArray.GetArrayLength());
|
await Parallel.ForEachAsync(files, options, async (file, token) =>
|
||||||
|
|
||||||
foreach (var entry in entriesArray.EnumerateArray())
|
|
||||||
{
|
{
|
||||||
await ParsePage(entry, client);
|
var fileName = Path.GetFileName(file);
|
||||||
}
|
var torrentDictionary = await ExtractPageContents(file, fileName);
|
||||||
}
|
|
||||||
|
|
||||||
private async Task ParsePage(JsonElement entry, HttpClient client)
|
if (torrentDictionary == null)
|
||||||
{
|
|
||||||
var (pageIngested, name) = await IsAlreadyIngested(entry);
|
|
||||||
|
|
||||||
if (string.IsNullOrEmpty(name) || pageIngested)
|
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
var pageSource = await client.GetStringAsync($"{DownloadBaseUrl}/{name}");
|
await ParseTitlesWithRtn(fileName, torrentDictionary);
|
||||||
|
var results = await ParseTorrents(torrentDictionary);
|
||||||
|
|
||||||
await ExtractPageContents(pageSource, name);
|
if (results.Count <= 0)
|
||||||
|
{
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task ExtractPageContents(string pageSource, string name)
|
await InsertTorrents(results);
|
||||||
|
await Storage.MarkPageAsIngested(fileName, token);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task ParseTitlesWithRtn(string fileName, IDictionary<string, DmmContent> page)
|
||||||
{
|
{
|
||||||
|
logger.LogInformation("Parsing titles for {Page}", fileName);
|
||||||
|
|
||||||
|
var batchProcessables = page.Select(value => new RtnBatchProcessable(value.Key, value.Value.Filename)).ToList();
|
||||||
|
var parsedResponses = rankTorrentName.BatchParse(
|
||||||
|
batchProcessables.Select<RtnBatchProcessable, string>(bp => bp.Filename).ToList(), trashGarbage: false);
|
||||||
|
|
||||||
|
// Filter out unsuccessful responses and match RawTitle to requesting title
|
||||||
|
var successfulResponses = parsedResponses
|
||||||
|
.Where(response => response != null && response.Success)
|
||||||
|
.GroupBy(response => response.Response.RawTitle!)
|
||||||
|
.ToDictionary(group => group.Key, group => group.First());
|
||||||
|
|
||||||
|
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
|
||||||
|
|
||||||
|
await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options, (infoHash, _) =>
|
||||||
|
{
|
||||||
|
if (page.TryGetValue(infoHash, out var dmmContent) &&
|
||||||
|
successfulResponses.TryGetValue(dmmContent.Filename, out var parsedResponse))
|
||||||
|
{
|
||||||
|
page[infoHash] = dmmContent with {ParseResponse = parsedResponse};
|
||||||
|
}
|
||||||
|
|
||||||
|
return ValueTask.CompletedTask;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<ConcurrentDictionary<string, DmmContent>?> ExtractPageContents(string filePath, string filenameOnly)
|
||||||
|
{
|
||||||
|
var (pageIngested, name) = await IsAlreadyIngested(filenameOnly);
|
||||||
|
|
||||||
|
if (pageIngested)
|
||||||
|
{
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
var pageSource = await File.ReadAllTextAsync(filePath);
|
||||||
|
|
||||||
var match = HashCollectionMatcher().Match(pageSource);
|
var match = HashCollectionMatcher().Match(pageSource);
|
||||||
|
|
||||||
if (!match.Success)
|
if (!match.Success)
|
||||||
{
|
{
|
||||||
logger.LogWarning("Failed to match hash collection for {Name}", name);
|
logger.LogWarning("Failed to match hash collection for {Name}", name);
|
||||||
await Storage.MarkPageAsIngested(name);
|
await Storage.MarkPageAsIngested(filenameOnly);
|
||||||
return;
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
var encodedJson = match.Groups.Values.ElementAtOrDefault(1);
|
var encodedJson = match.Groups.Values.ElementAtOrDefault(1);
|
||||||
@@ -66,90 +101,92 @@ public partial class DebridMediaManagerCrawler(
|
|||||||
if (string.IsNullOrEmpty(encodedJson?.Value))
|
if (string.IsNullOrEmpty(encodedJson?.Value))
|
||||||
{
|
{
|
||||||
logger.LogWarning("Failed to extract encoded json for {Name}", name);
|
logger.LogWarning("Failed to extract encoded json for {Name}", name);
|
||||||
return;
|
return [];
|
||||||
}
|
}
|
||||||
|
|
||||||
await ProcessExtractedContentsAsTorrentCollection(encodedJson.Value, name);
|
var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson.Value);
|
||||||
}
|
|
||||||
|
|
||||||
private async Task ProcessExtractedContentsAsTorrentCollection(string encodedJson, string name)
|
|
||||||
{
|
|
||||||
var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson);
|
|
||||||
|
|
||||||
var json = JsonDocument.Parse(decodedJson);
|
var json = JsonDocument.Parse(decodedJson);
|
||||||
|
|
||||||
await InsertTorrentsForPage(json);
|
var torrents = await json.RootElement.EnumerateArray()
|
||||||
|
.ToAsyncEnumerable()
|
||||||
|
.Select(ParsePageContent)
|
||||||
|
.Where(t => t is not null)
|
||||||
|
.ToListAsync();
|
||||||
|
|
||||||
var result = await Storage.MarkPageAsIngested(name);
|
if (torrents.Count == 0)
|
||||||
|
{
|
||||||
if (!result.IsSuccess)
|
logger.LogWarning("No torrents found in {Name}", name);
|
||||||
|
await Storage.MarkPageAsIngested(filenameOnly);
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
|
||||||
|
var torrentDictionary = torrents
|
||||||
|
.Where(x => x is not null)
|
||||||
|
.GroupBy(x => x.InfoHash)
|
||||||
|
.ToConcurrentDictionary(g => g.Key, g => new DmmContent(g.First().Filename, g.First().Bytes, null));
|
||||||
|
|
||||||
|
logger.LogInformation("Parsed {Torrents} torrents for {Name}", torrentDictionary.Count, name);
|
||||||
|
|
||||||
|
return torrentDictionary;
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task<List<IngestedTorrent>> ParseTorrents(IDictionary<string, DmmContent> page)
|
||||||
|
{
|
||||||
|
var ingestedTorrents = new List<IngestedTorrent>();
|
||||||
|
|
||||||
|
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
|
||||||
|
|
||||||
|
await Parallel.ForEachAsync(page, options, async (kvp, ct) =>
|
||||||
|
{
|
||||||
|
var (infoHash, dmmContent) = kvp;
|
||||||
|
var parsedTorrent = dmmContent.ParseResponse;
|
||||||
|
if (parsedTorrent is not {Success: true})
|
||||||
{
|
{
|
||||||
logger.LogWarning("Failed to mark page as ingested: [{Error}]", result.Failure.ErrorMessage);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.LogInformation("Successfully marked page as ingested");
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task<IngestedTorrent?> ParseTorrent(JsonElement item)
|
|
||||||
{
|
|
||||||
|
|
||||||
if (!item.TryGetProperty("filename", out var filenameElement) ||
|
|
||||||
!item.TryGetProperty("bytes", out var bytesElement) ||
|
|
||||||
!item.TryGetProperty("hash", out var hashElement))
|
|
||||||
{
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
var torrentTitle = filenameElement.GetString();
|
|
||||||
|
|
||||||
if (torrentTitle.IsNullOrEmpty())
|
|
||||||
{
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
var parsedTorrent = rankTorrentName.Parse(torrentTitle);
|
|
||||||
|
|
||||||
if (!parsedTorrent.Success)
|
|
||||||
{
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries";
|
var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries";
|
||||||
|
|
||||||
var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year);
|
var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year);
|
||||||
|
|
||||||
var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey);
|
var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey);
|
||||||
|
|
||||||
if (cached)
|
if (cached)
|
||||||
{
|
{
|
||||||
logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle);
|
logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle);
|
||||||
return MapToTorrent(cachedResult, bytesElement, hashElement, parsedTorrent);
|
lock (ingestedTorrents)
|
||||||
|
{
|
||||||
|
ingestedTorrents.Add(MapToTorrent(cachedResult, dmmContent.Bytes, infoHash, parsedTorrent));
|
||||||
|
}
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null;
|
int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null;
|
||||||
var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year);
|
var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year, ct);
|
||||||
|
|
||||||
if (imdbEntry is null)
|
if (imdbEntry is null)
|
||||||
{
|
{
|
||||||
return null;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await AddToCache(cacheKey, imdbEntry);
|
await AddToCache(cacheKey, imdbEntry);
|
||||||
|
|
||||||
logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score);
|
logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score);
|
||||||
|
lock (ingestedTorrents)
|
||||||
|
{
|
||||||
|
ingestedTorrents.Add(MapToTorrent(imdbEntry, dmmContent.Bytes, infoHash, parsedTorrent));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
return MapToTorrent(imdbEntry, bytesElement, hashElement, parsedTorrent);
|
return ingestedTorrents;
|
||||||
}
|
}
|
||||||
|
|
||||||
private IngestedTorrent MapToTorrent(ImdbEntry result, JsonElement bytesElement, JsonElement hashElement, ParseTorrentTitleResponse parsedTorrent) =>
|
private IngestedTorrent MapToTorrent(ImdbEntry result, long size, string infoHash, ParseTorrentTitleResponse parsedTorrent) =>
|
||||||
new()
|
new()
|
||||||
{
|
{
|
||||||
Source = Source,
|
Source = Source,
|
||||||
Name = result.Title,
|
Name = result.Title,
|
||||||
Imdb = result.ImdbId,
|
Imdb = result.ImdbId,
|
||||||
Size = bytesElement.GetInt64().ToString(),
|
Size = size.ToString(),
|
||||||
InfoHash = hashElement.ToString(),
|
InfoHash = infoHash,
|
||||||
Seeders = 0,
|
Seeders = 0,
|
||||||
Leechers = 0,
|
Leechers = 0,
|
||||||
Category = AssignCategory(result),
|
Category = AssignCategory(result),
|
||||||
@@ -179,35 +216,11 @@ public partial class DebridMediaManagerCrawler(
|
|||||||
return (false, null);
|
return (false, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task InsertTorrentsForPage(JsonDocument json)
|
private async Task<(bool Success, string? Name)> IsAlreadyIngested(string filename)
|
||||||
{
|
{
|
||||||
var torrents = await json.RootElement.EnumerateArray()
|
var pageIngested = await Storage.PageIngested(filename);
|
||||||
.ToAsyncEnumerable()
|
|
||||||
.SelectAwait(async x => await ParseTorrent(x))
|
|
||||||
.Where(t => t is not null)
|
|
||||||
.ToListAsync();
|
|
||||||
|
|
||||||
if (torrents.Count == 0)
|
return (pageIngested, filename);
|
||||||
{
|
|
||||||
logger.LogWarning("No torrents found in {Source} response", Source);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
await InsertTorrents(torrents!);
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task<(bool Success, string? Name)> IsAlreadyIngested(JsonElement entry)
|
|
||||||
{
|
|
||||||
var name = entry.GetProperty("path").GetString();
|
|
||||||
|
|
||||||
if (string.IsNullOrEmpty(name))
|
|
||||||
{
|
|
||||||
return (false, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
var pageIngested = await Storage.PageIngested(name);
|
|
||||||
|
|
||||||
return (pageIngested, name);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static string AssignCategory(ImdbEntry entry) =>
|
private static string AssignCategory(ImdbEntry entry) =>
|
||||||
@@ -219,4 +232,20 @@ public partial class DebridMediaManagerCrawler(
|
|||||||
};
|
};
|
||||||
|
|
||||||
private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}";
|
private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}";
|
||||||
|
|
||||||
|
private static ExtractedDMMContent? ParsePageContent(JsonElement item)
|
||||||
|
{
|
||||||
|
if (!item.TryGetProperty("filename", out var filenameElement) ||
|
||||||
|
!item.TryGetProperty("bytes", out var bytesElement) ||
|
||||||
|
!item.TryGetProperty("hash", out var hashElement))
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new(filenameElement.GetString(), bytesElement.GetInt64(), hashElement.GetString());
|
||||||
|
}
|
||||||
|
|
||||||
|
private record DmmContent(string Filename, long Bytes, ParseTorrentTitleResponse? ParseResponse);
|
||||||
|
private record ExtractedDMMContent(string Filename, long Bytes, string InfoHash);
|
||||||
|
private record RtnBatchProcessable(string InfoHash, string Filename);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +0,0 @@
|
|||||||
namespace Producer.Features.Crawlers.Dmm;
|
|
||||||
|
|
||||||
public class GithubConfiguration
|
|
||||||
{
|
|
||||||
private const string Prefix = "GITHUB";
|
|
||||||
private const string PatVariable = "PAT";
|
|
||||||
|
|
||||||
public string? PAT { get; init; } = Prefix.GetOptionalEnvironmentVariableAsString(PatVariable);
|
|
||||||
}
|
|
||||||
@@ -0,0 +1,6 @@
|
|||||||
|
namespace Producer.Features.Crawlers.Dmm;
|
||||||
|
|
||||||
|
public interface IDMMFileDownloader
|
||||||
|
{
|
||||||
|
Task<string> DownloadFileToTempPath(CancellationToken cancellationToken);
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
namespace Producer.Features.Crawlers.Dmm;
|
||||||
|
|
||||||
|
public static class ServiceCollectionExtensions
|
||||||
|
{
|
||||||
|
public static IServiceCollection AddDmmSupport(this IServiceCollection services)
|
||||||
|
{
|
||||||
|
services.AddHttpClient<IDMMFileDownloader, DMMFileDownloader>(DMMFileDownloader.ClientName, client =>
|
||||||
|
{
|
||||||
|
client.BaseAddress = new("https://github.com/debridmediamanager/hashlists/zipball/main/");
|
||||||
|
client.DefaultRequestHeaders.Add("Accept-Encoding", "gzip");
|
||||||
|
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
|
||||||
|
});
|
||||||
|
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -5,7 +5,6 @@ internal static class ServiceCollectionExtensions
|
|||||||
internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration)
|
internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration)
|
||||||
{
|
{
|
||||||
var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName);
|
var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName);
|
||||||
var githubConfiguration = services.LoadConfigurationFromEnv<GithubConfiguration>();
|
|
||||||
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
|
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
|
||||||
|
|
||||||
var jobTypes = Assembly.GetAssembly(typeof(BaseJob))
|
var jobTypes = Assembly.GetAssembly(typeof(BaseJob))
|
||||||
@@ -19,18 +18,13 @@ internal static class ServiceCollectionExtensions
|
|||||||
services.AddTransient(type);
|
services.AddTransient(type);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
|
|
||||||
{
|
|
||||||
services.AddTransient<SyncDmmJob>();
|
|
||||||
}
|
|
||||||
|
|
||||||
var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance);
|
var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance);
|
||||||
|
|
||||||
services.AddQuartz(
|
services.AddQuartz(
|
||||||
quartz =>
|
quartz =>
|
||||||
{
|
{
|
||||||
RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration);
|
RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration);
|
||||||
RegisterDmmJob(githubConfiguration, quartz, scrapeConfiguration);
|
RegisterDmmJob(quartz, scrapeConfiguration);
|
||||||
RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration);
|
RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration);
|
||||||
RegisterPublisher(quartz, rabbitConfiguration);
|
RegisterPublisher(quartz, rabbitConfiguration);
|
||||||
});
|
});
|
||||||
@@ -64,13 +58,8 @@ internal static class ServiceCollectionExtensions
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void RegisterDmmJob(GithubConfiguration githubConfiguration, IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration)
|
private static void RegisterDmmJob(IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) =>
|
||||||
{
|
|
||||||
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
|
|
||||||
{
|
|
||||||
AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
|
AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void RegisterTorrentioJob(
|
private static void RegisterTorrentioJob(
|
||||||
IServiceCollection services,
|
IServiceCollection services,
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
// Global using directives
|
// Global using directives
|
||||||
|
|
||||||
|
global using System.Collections.Concurrent;
|
||||||
|
global using System.IO.Compression;
|
||||||
global using System.Reflection;
|
global using System.Reflection;
|
||||||
global using System.Text;
|
global using System.Text;
|
||||||
global using System.Text.Json;
|
global using System.Text.Json;
|
||||||
global using System.Text.RegularExpressions;
|
global using System.Text.RegularExpressions;
|
||||||
global using System.Xml.Linq;
|
global using System.Xml.Linq;
|
||||||
global using FuzzySharp;
|
|
||||||
global using FuzzySharp.Extractor;
|
|
||||||
global using FuzzySharp.PreProcess;
|
global using FuzzySharp.PreProcess;
|
||||||
global using FuzzySharp.SimilarityRatio.Scorer;
|
global using FuzzySharp.SimilarityRatio.Scorer;
|
||||||
global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive;
|
global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive;
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ builder.Services
|
|||||||
.RegisterMassTransit()
|
.RegisterMassTransit()
|
||||||
.AddDataStorage()
|
.AddDataStorage()
|
||||||
.AddCrawlers()
|
.AddCrawlers()
|
||||||
|
.AddDmmSupport()
|
||||||
.AddQuartz(builder.Configuration);
|
.AddQuartz(builder.Configuration);
|
||||||
|
|
||||||
var app = builder.Build();
|
var app = builder.Build();
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
rank-torrent-name==0.2.11
|
rank-torrent-name==0.2.13
|
||||||
@@ -1 +1 @@
|
|||||||
rank-torrent-name==0.2.11
|
rank-torrent-name==0.2.13
|
||||||
19
src/shared/Extensions/DictionaryExtensions.cs
Normal file
19
src/shared/Extensions/DictionaryExtensions.cs
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
namespace SharedContracts.Extensions;
|
||||||
|
|
||||||
|
public static class DictionaryExtensions
|
||||||
|
{
|
||||||
|
public static ConcurrentDictionary<TKey, TValue> ToConcurrentDictionary<TSource, TKey, TValue>(
|
||||||
|
this IEnumerable<TSource> source,
|
||||||
|
Func<TSource, TKey> keySelector,
|
||||||
|
Func<TSource, TValue> valueSelector) where TKey : notnull
|
||||||
|
{
|
||||||
|
var concurrentDictionary = new ConcurrentDictionary<TKey, TValue>();
|
||||||
|
|
||||||
|
foreach (var element in source)
|
||||||
|
{
|
||||||
|
concurrentDictionary.TryAdd(keySelector(element), valueSelector(element));
|
||||||
|
}
|
||||||
|
|
||||||
|
return concurrentDictionary;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
// Global using directives
|
// Global using directives
|
||||||
|
|
||||||
|
global using System.Collections.Concurrent;
|
||||||
global using System.Text.Json;
|
global using System.Text.Json;
|
||||||
global using System.Text.Json.Serialization;
|
global using System.Text.Json.Serialization;
|
||||||
global using System.Text.RegularExpressions;
|
global using System.Text.RegularExpressions;
|
||||||
|
|||||||
@@ -2,5 +2,6 @@ namespace SharedContracts.Python.RTN;
|
|||||||
|
|
||||||
public interface IRankTorrentName
|
public interface IRankTorrentName
|
||||||
{
|
{
|
||||||
ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true);
|
ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
|
||||||
|
List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
|
||||||
}
|
}
|
||||||
@@ -13,15 +13,71 @@ public class RankTorrentName : IRankTorrentName
|
|||||||
InitModules();
|
InitModules();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true) =>
|
public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
|
||||||
_pythonEngineService.ExecutePythonOperationWithDefault(
|
|
||||||
() =>
|
|
||||||
{
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
using var gil = Py.GIL();
|
||||||
var result = _rtn?.parse(title, trashGarbage);
|
var result = _rtn?.parse(title, trashGarbage);
|
||||||
return ParseResult(result);
|
return ParseResult(result);
|
||||||
}, new ParseTorrentTitleResponse(false, null), nameof(Parse), throwOnErrors: false, logErrors: false);
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
if (logErrors)
|
||||||
|
{
|
||||||
|
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
|
||||||
|
}
|
||||||
|
|
||||||
private static ParseTorrentTitleResponse ParseResult(dynamic result)
|
if (throwOnErrors)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new(false, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
|
||||||
|
{
|
||||||
|
var responses = new List<ParseTorrentTitleResponse?>();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
if (titles.Count == 0)
|
||||||
|
{
|
||||||
|
return responses;
|
||||||
|
}
|
||||||
|
|
||||||
|
using var gil = Py.GIL();
|
||||||
|
var pythonList = new PyList(titles.Select(x => new PyString(x).As<PyObject>()).ToArray());
|
||||||
|
PyList results = _rtn?.batch_parse(pythonList, trashGarbage, chunkSize, workers);
|
||||||
|
|
||||||
|
if (results == null)
|
||||||
|
{
|
||||||
|
return responses;
|
||||||
|
}
|
||||||
|
|
||||||
|
responses.AddRange(results.Select(ParseResult));
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
if (logErrors)
|
||||||
|
{
|
||||||
|
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (throwOnErrors)
|
||||||
|
{
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return responses;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ParseTorrentTitleResponse? ParseResult(dynamic result)
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
if (result == null)
|
if (result == null)
|
||||||
{
|
{
|
||||||
@@ -48,6 +104,11 @@ public class RankTorrentName : IRankTorrentName
|
|||||||
|
|
||||||
return new(true, response);
|
return new(true, response);
|
||||||
}
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
return new(false, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void InitModules() =>
|
private void InitModules() =>
|
||||||
_rtn =
|
_rtn =
|
||||||
|
|||||||
Reference in New Issue
Block a user