mirror of
https://github.com/knightcrawler-stremio/knightcrawler.git
synced 2024-12-20 03:29:51 +00:00
Run pre-commit
This commit is contained in:
@@ -3,4 +3,4 @@
|
||||
public interface IMessagePublisher
|
||||
{
|
||||
Task<bool> PublishAsync(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,32 +9,32 @@ public class PublisherJob(IMessagePublisher publisher, IDataStorage storage, ILo
|
||||
private const string JobName = nameof(PublisherJob);
|
||||
public static readonly JobKey Key = new(JobName, nameof(Literals.PublishingJobs));
|
||||
public static readonly TriggerKey Trigger = new($"{JobName}-trigger", nameof(Literals.PublishingJobs));
|
||||
|
||||
|
||||
public async Task Execute(IJobExecutionContext context)
|
||||
{
|
||||
var cancellationToken = context.CancellationToken;
|
||||
var torrents = await storage.GetPublishableTorrents(cancellationToken);
|
||||
|
||||
|
||||
if (torrents.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
var published = await publisher.PublishAsync(torrents, cancellationToken);
|
||||
|
||||
if (!published)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
var result = await storage.SetTorrentsProcessed(torrents, cancellationToken);
|
||||
|
||||
|
||||
if (!result.Success)
|
||||
{
|
||||
logger.LogWarning("Failed to set torrents as processed: [{Error}]", result.ErrorMessage);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
logger.LogInformation("Successfully set {Count} torrents as processed", result.UpdatedCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,7 +11,7 @@ public class RabbitMqConfiguration
|
||||
private const string MaxQueueSizeVariable = "MAX_QUEUE_SIZE";
|
||||
private const string MaxPublishBatchSizeVariable = "MAX_PUBLISH_BATCH_SIZE";
|
||||
private const string PublishIntervalInSecondsVariable = "PUBLISH_INTERVAL_IN_SECONDS";
|
||||
|
||||
|
||||
public string? Username { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(UsernameVariable);
|
||||
public string? Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HostVariable);
|
||||
public string? Password { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(PasswordVariable);
|
||||
@@ -27,12 +27,12 @@ public class RabbitMqConfiguration
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (MaxQueueSize < 0)
|
||||
{
|
||||
throw new InvalidOperationException("MaxQueueSize cannot be less than 0 in RabbitMqConfiguration");
|
||||
}
|
||||
|
||||
|
||||
if (MaxPublishBatchSize < 0)
|
||||
{
|
||||
throw new InvalidOperationException("MaxPublishBatchSize cannot be less than 0 in RabbitMqConfiguration");
|
||||
@@ -43,4 +43,4 @@ public class RabbitMqConfiguration
|
||||
throw new InvalidOperationException("MaxPublishBatchSize cannot be greater than MaxQueueSize in RabbitMqConfiguration");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ internal static class ServiceCollectionExtensions
|
||||
internal static IServiceCollection RegisterMassTransit(this IServiceCollection services)
|
||||
{
|
||||
var rabbitConfig = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
|
||||
|
||||
|
||||
services.AddMassTransit(busConfigurator =>
|
||||
{
|
||||
busConfigurator.SetKebabCaseEndpointNameFormatter();
|
||||
@@ -21,4 +21,4 @@ internal static class ServiceCollectionExtensions
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,14 +13,14 @@ public class TorrentPublisher(
|
||||
|
||||
if (!await CanPublishToRabbitMq(torrents, cancellationToken))
|
||||
{
|
||||
logger.LogWarning("Queue is full or not accessible, not publishing this time.");
|
||||
logger.LogWarning("Queue is full or not accessible, not publishing this time.");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
await sendEndpoint.SendBatch(torrents, cancellationToken: cancellationToken);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
private string ConstructQueue()
|
||||
{
|
||||
var queueBuilder = new StringBuilder();
|
||||
@@ -31,23 +31,23 @@ public class TorrentPublisher(
|
||||
|
||||
return queueBuilder.ToString();
|
||||
}
|
||||
|
||||
|
||||
private async Task<bool> CanPublishToRabbitMq(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken)
|
||||
{
|
||||
if (configuration.MaxQueueSize == 0)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
var client = httpClientFactory.CreateClient("RabbitMq");
|
||||
|
||||
client.DefaultRequestHeaders.Authorization =
|
||||
|
||||
client.DefaultRequestHeaders.Authorization =
|
||||
new("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{configuration.Username}:{configuration.Password}")));
|
||||
|
||||
|
||||
var url = $"http://{configuration.Host}:15672/api/queues/{Uri.EscapeDataString("/")}/{configuration.QueueName}";
|
||||
|
||||
|
||||
var response = await client.GetAsync(url, cancellationToken);
|
||||
|
||||
|
||||
var body = await response.Content.ReadAsStringAsync(cancellationToken);
|
||||
var doc = JsonDocument.Parse(body);
|
||||
|
||||
@@ -62,17 +62,17 @@ public class TorrentPublisher(
|
||||
logger.LogWarning("Failed to get message count from RabbitMq");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
logger.LogInformation("Current queue message count: {MessageCount}", messageCount);
|
||||
|
||||
|
||||
var canPublish = messageCount < configuration.MaxQueueSize + torrents.Count;
|
||||
|
||||
|
||||
if (!canPublish)
|
||||
{
|
||||
logger.LogWarning("You have configured a max queue size of {MaxQueueSize}.", configuration.MaxQueueSize);
|
||||
logger.LogWarning("Not publishing this time.");
|
||||
}
|
||||
|
||||
|
||||
return canPublish;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,14 +12,14 @@ public abstract class BaseCrawler(ILogger<BaseCrawler> logger, IDataStorage stor
|
||||
protected async Task<InsertTorrentResult> InsertTorrents(IReadOnlyCollection<Torrent> torrent)
|
||||
{
|
||||
var result = await storage.InsertTorrents(torrent);
|
||||
|
||||
|
||||
if (!result.Success)
|
||||
{
|
||||
logger.LogWarning("Ingestion Failed: [{Error}]", result.ErrorMessage);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
logger.LogInformation("Ingestion Successful - Wrote {Count} new torrents", result.InsertedCount);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ namespace Producer.Features.CrawlerSupport;
|
||||
public abstract class BaseJsonCrawler(IHttpClientFactory httpClientFactory, ILogger<BaseJsonCrawler> logger, IDataStorage storage) : BaseCrawler(logger, storage)
|
||||
{
|
||||
private readonly HttpClient _client = httpClientFactory.CreateClient(Literals.CrawlerClient);
|
||||
|
||||
|
||||
protected virtual async Task Execute(string collectionName)
|
||||
{
|
||||
logger.LogInformation("Starting {Source} crawl", Source);
|
||||
@@ -16,13 +16,13 @@ public abstract class BaseJsonCrawler(IHttpClientFactory httpClientFactory, ILog
|
||||
.Select(ParseTorrent)
|
||||
.Where(x => x is not null)
|
||||
.ToList();
|
||||
|
||||
|
||||
if (torrents.Count == 0)
|
||||
{
|
||||
logger.LogWarning("No torrents found in {Source} response", Source);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
await InsertTorrents(torrents!);
|
||||
}
|
||||
|
||||
@@ -40,6 +40,6 @@ public abstract class BaseJsonCrawler(IHttpClientFactory httpClientFactory, ILog
|
||||
torrent.InfoHash = infoHash;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected abstract Torrent? ParseTorrent(JsonElement item);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,22 +5,22 @@ public abstract class BaseXmlCrawler(IHttpClientFactory httpClientFactory, ILogg
|
||||
public override async Task Execute()
|
||||
{
|
||||
logger.LogInformation("Starting {Source} crawl", Source);
|
||||
|
||||
|
||||
using var client = httpClientFactory.CreateClient(Literals.CrawlerClient);
|
||||
var xml = await client.GetStringAsync(Url);
|
||||
var xmlRoot = XElement.Parse(xml);
|
||||
|
||||
|
||||
var torrents = xmlRoot.Descendants("item")
|
||||
.Select(ParseTorrent)
|
||||
.Where(x => x is not null)
|
||||
.ToList();
|
||||
|
||||
|
||||
if (torrents.Count == 0)
|
||||
{
|
||||
logger.LogWarning("No torrents found in {Source} response", Source);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
await InsertTorrents(torrents!);
|
||||
}
|
||||
|
||||
@@ -40,4 +40,4 @@ public abstract class BaseXmlCrawler(IHttpClientFactory httpClientFactory, ILogg
|
||||
}
|
||||
|
||||
protected abstract Torrent? ParseTorrent(XElement itemNode);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,8 @@ public class CrawlerProvider(IServiceProvider serviceProvider) : ICrawlerProvide
|
||||
{
|
||||
public IEnumerable<ICrawler> GetAll() =>
|
||||
serviceProvider.GetServices<ICrawler>();
|
||||
|
||||
public ICrawler Get(string name) =>
|
||||
|
||||
public ICrawler Get(string name) =>
|
||||
serviceProvider.GetRequiredKeyedService<ICrawler>(name);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,4 +3,4 @@ namespace Producer.Features.CrawlerSupport;
|
||||
public interface ICrawler
|
||||
{
|
||||
Task Execute();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,6 +3,6 @@ namespace Producer.Features.CrawlerSupport;
|
||||
public interface ICrawlerProvider
|
||||
{
|
||||
IEnumerable<ICrawler> GetAll();
|
||||
|
||||
|
||||
ICrawler Get(string name);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,4 +3,4 @@ namespace Producer.Features.CrawlerSupport;
|
||||
public static class Literals
|
||||
{
|
||||
public const string CrawlerClient = "Scraper";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,4 +2,4 @@ namespace Producer.Features.CrawlerSupport;
|
||||
|
||||
public record InsertTorrentResult(bool Success, int InsertedCount = 0, string? ErrorMessage = null);
|
||||
public record UpdatedTorrentResult(bool Success, int UpdatedCount = 0, string? ErrorMessage = null);
|
||||
public record PageIngestedResult(bool Success, string? ErrorMessage = null);
|
||||
public record PageIngestedResult(bool Success, string? ErrorMessage = null);
|
||||
|
||||
@@ -7,4 +7,4 @@ public class Scraper
|
||||
public int IntervalSeconds { get; set; } = 60;
|
||||
|
||||
public bool Enabled { get; set; } = true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,11 +5,11 @@ internal static class ServiceCollectionExtensions
|
||||
internal static IServiceCollection AddCrawlers(this IServiceCollection services)
|
||||
{
|
||||
services.AddHttpClient(Literals.CrawlerClient);
|
||||
|
||||
|
||||
var crawlerTypes = Assembly.GetAssembly(typeof(ICrawler))
|
||||
.GetTypes()
|
||||
.Where(t => t is {IsClass: true, IsAbstract: false} && typeof(ICrawler).IsAssignableFrom(t));
|
||||
|
||||
|
||||
foreach (var type in crawlerTypes)
|
||||
{
|
||||
services.AddKeyedTransient(typeof(ICrawler), type.Name, type);
|
||||
@@ -20,4 +20,4 @@ internal static class ServiceCollectionExtensions
|
||||
|
||||
return services;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,4 +17,4 @@ public class Torrent
|
||||
public bool Processed { get; set; } = false;
|
||||
public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
|
||||
public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,12 +8,12 @@ public partial class DebridMediaManagerCrawler(
|
||||
{
|
||||
[GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")]
|
||||
private static partial Regex HashCollectionMatcher();
|
||||
|
||||
|
||||
[GeneratedRegex(@"[sS]([0-9]{1,2})|seasons?[\s-]?([0-9]{1,2})", RegexOptions.IgnoreCase, "en-GB")]
|
||||
private static partial Regex SeasonMatcher();
|
||||
|
||||
private const string DownloadBaseUrl = "https://raw.githubusercontent.com/debridmediamanager/hashlists/main";
|
||||
|
||||
|
||||
protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>();
|
||||
protected override string Url => "https://api.github.com/repos/debridmediamanager/hashlists/git/trees/main?recursive=1";
|
||||
protected override string Source => "DMM";
|
||||
@@ -23,13 +23,13 @@ public partial class DebridMediaManagerCrawler(
|
||||
var client = httpClientFactory.CreateClient("Scraper");
|
||||
client.DefaultRequestHeaders.Authorization = new("Bearer", githubConfiguration.PAT);
|
||||
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
|
||||
|
||||
|
||||
var jsonBody = await client.GetStringAsync(Url);
|
||||
|
||||
|
||||
var json = JsonDocument.Parse(jsonBody);
|
||||
|
||||
var entriesArray = json.RootElement.GetProperty("tree");
|
||||
|
||||
|
||||
logger.LogInformation("Found {Entries} total DMM pages", entriesArray.GetArrayLength());
|
||||
|
||||
foreach (var entry in entriesArray.EnumerateArray())
|
||||
@@ -41,21 +41,21 @@ public partial class DebridMediaManagerCrawler(
|
||||
private async Task ParsePage(JsonElement entry, HttpClient client)
|
||||
{
|
||||
var (pageIngested, name) = await IsAlreadyIngested(entry);
|
||||
|
||||
|
||||
if (string.IsNullOrEmpty(name) || pageIngested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
var pageSource = await client.GetStringAsync($"{DownloadBaseUrl}/{name}");
|
||||
|
||||
|
||||
await ExtractPageContents(pageSource, name);
|
||||
}
|
||||
|
||||
private async Task ExtractPageContents(string pageSource, string name)
|
||||
{
|
||||
var match = HashCollectionMatcher().Match(pageSource);
|
||||
|
||||
|
||||
if (!match.Success)
|
||||
{
|
||||
logger.LogWarning("Failed to match hash collection for {Name}", name);
|
||||
@@ -64,32 +64,32 @@ public partial class DebridMediaManagerCrawler(
|
||||
}
|
||||
|
||||
var encodedJson = match.Groups.Values.ElementAtOrDefault(1);
|
||||
|
||||
|
||||
if (string.IsNullOrEmpty(encodedJson?.Value))
|
||||
{
|
||||
logger.LogWarning("Failed to extract encoded json for {Name}", name);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
await ProcessExtractedContentsAsTorrentCollection(encodedJson.Value, name);
|
||||
}
|
||||
|
||||
private async Task ProcessExtractedContentsAsTorrentCollection(string encodedJson, string name)
|
||||
{
|
||||
var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson);
|
||||
|
||||
|
||||
var json = JsonDocument.Parse(decodedJson);
|
||||
|
||||
await InsertTorrentsForPage(json);
|
||||
|
||||
|
||||
var result = await Storage.MarkPageAsIngested(name);
|
||||
|
||||
|
||||
if (!result.Success)
|
||||
{
|
||||
logger.LogWarning("Failed to mark page as ingested: [{Error}]", result.ErrorMessage);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
logger.LogInformation("Successfully marked page as ingested");
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ public partial class DebridMediaManagerCrawler(
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
torrent.Category = SeasonMatcher().IsMatch(torrent.Name) ? "tv" : "movies";
|
||||
|
||||
return torrent;
|
||||
@@ -120,16 +120,16 @@ public partial class DebridMediaManagerCrawler(
|
||||
var torrents = json.RootElement.EnumerateArray()
|
||||
.Select(ParseTorrent)
|
||||
.ToList();
|
||||
|
||||
|
||||
if (torrents.Count == 0)
|
||||
{
|
||||
logger.LogWarning("No torrents found in {Source} response", Source);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
await InsertTorrents(torrents!);
|
||||
}
|
||||
|
||||
|
||||
private async Task<(bool Success, string? Name)> IsAlreadyIngested(JsonElement entry)
|
||||
{
|
||||
var name = entry.GetProperty("path").GetString();
|
||||
@@ -138,9 +138,9 @@ public partial class DebridMediaManagerCrawler(
|
||||
{
|
||||
return (false, null);
|
||||
}
|
||||
|
||||
|
||||
var pageIngested = await Storage.PageIngested(name);
|
||||
|
||||
return (pageIngested, name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,6 @@ public class GithubConfiguration
|
||||
{
|
||||
private const string Prefix = "GITHUB";
|
||||
private const string PatVariable = "PAT";
|
||||
|
||||
|
||||
public string? PAT { get; init; } = Prefix.GetOptionalEnvironmentVariableAsString(PatVariable);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,4 +10,4 @@ public class SyncDmmJob(ICrawlerProvider crawlerProvider) : BaseJob(crawlerProvi
|
||||
public static readonly JobKey Key = new(JobName, nameof(Literals.CrawlersJobs));
|
||||
public static readonly TriggerKey Trigger = new($"{JobName}-trigger", nameof(Literals.CrawlersJobs));
|
||||
protected override string Crawler => nameof(DebridMediaManagerCrawler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,8 @@ public class EzTvCrawler(IHttpClientFactory httpClientFactory, ILogger<EzTvCrawl
|
||||
{
|
||||
protected override string Url => "https://eztv1.xyz/ezrss.xml";
|
||||
protected override string Source => "EZTV";
|
||||
|
||||
private static readonly XNamespace XmlNamespace = "http://xmlns.ezrss.it/0.1/";
|
||||
|
||||
private static readonly XNamespace XmlNamespace = "http://xmlns.ezrss.it/0.1/";
|
||||
|
||||
protected override IReadOnlyDictionary<string, string> Mappings =>
|
||||
new Dictionary<string, string>
|
||||
@@ -29,4 +29,4 @@ public class EzTvCrawler(IHttpClientFactory httpClientFactory, ILogger<EzTvCrawl
|
||||
InfoHash = itemNode.Element(XmlNamespace + Mappings[nameof(Torrent.InfoHash)])?.Value,
|
||||
Category = itemNode.Element(Mappings[nameof(Torrent.Category)])?.Value.ToLowerInvariant(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,4 +9,4 @@ public class SyncEzTvJob(ICrawlerProvider crawlerProvider) : BaseJob(crawlerProv
|
||||
public static readonly JobKey Key = new(JobName, nameof(Literals.CrawlersJobs));
|
||||
public static readonly TriggerKey Trigger = new($"{JobName}-trigger", nameof(Literals.CrawlersJobs));
|
||||
protected override string Crawler => nameof(EzTvCrawler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,8 +4,8 @@ public class NyaaCrawler(IHttpClientFactory httpClientFactory, ILogger<NyaaCrawl
|
||||
{
|
||||
protected override string Url => "https://nyaa.si/?page=rss&c=1_2&f=0";
|
||||
protected override string Source => "Nyaa";
|
||||
|
||||
private static readonly XNamespace XmlNamespace = "https://nyaa.si/xmlns/nyaa";
|
||||
|
||||
private static readonly XNamespace XmlNamespace = "https://nyaa.si/xmlns/nyaa";
|
||||
|
||||
protected override IReadOnlyDictionary<string, string> Mappings =>
|
||||
new Dictionary<string, string>
|
||||
@@ -29,4 +29,4 @@ public class NyaaCrawler(IHttpClientFactory httpClientFactory, ILogger<NyaaCrawl
|
||||
InfoHash = itemNode.Element(XmlNamespace + Mappings[nameof(Torrent.InfoHash)])?.Value,
|
||||
Category = itemNode.Element(Mappings[nameof(Torrent.Category)])?.Value.ToLowerInvariant(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,4 +9,4 @@ public class SyncNyaaJob(ICrawlerProvider crawlerProvider) : BaseJob(crawlerProv
|
||||
public static readonly JobKey Key = new(JobName, nameof(Literals.CrawlersJobs));
|
||||
public static readonly TriggerKey Trigger = new($"{JobName}-trigger", nameof(Literals.CrawlersJobs));
|
||||
protected override string Crawler => nameof(NyaaCrawler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,4 +9,4 @@ public class SyncTgxJob(ICrawlerProvider crawlerProvider) : BaseJob(crawlerProvi
|
||||
public static readonly JobKey Key = new(JobName, nameof(Literals.CrawlersJobs));
|
||||
public static readonly TriggerKey Trigger = new($"{JobName}-trigger", nameof(Literals.CrawlersJobs));
|
||||
protected override string Crawler => nameof(TgxCrawler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<Tg
|
||||
private static partial Regex SizeStringExtractor();
|
||||
[GeneratedRegex(@"(?i)\b(\d+(\.\d+)?)\s*([KMGT]?B)\b", RegexOptions.None, "en-GB")]
|
||||
private static partial Regex SizeStringParser();
|
||||
|
||||
|
||||
protected override string Url => "https://tgx.rs/rss";
|
||||
|
||||
protected override string Source => "TorrentGalaxy";
|
||||
@@ -18,8 +18,8 @@ public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<Tg
|
||||
[nameof(Torrent.InfoHash)] = "guid",
|
||||
[nameof(Torrent.Category)] = "category",
|
||||
};
|
||||
|
||||
private static readonly HashSet<string> AllowedCategories =
|
||||
|
||||
private static readonly HashSet<string> AllowedCategories =
|
||||
[
|
||||
"movies",
|
||||
"tv",
|
||||
@@ -28,18 +28,18 @@ public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<Tg
|
||||
protected override Torrent? ParseTorrent(XElement itemNode)
|
||||
{
|
||||
var category = itemNode.Element(Mappings["Category"])?.Value.ToLowerInvariant();
|
||||
|
||||
|
||||
if (category is null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
if (!IsAllowedCategory(category))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
var torrent = new Torrent
|
||||
{
|
||||
Source = Source,
|
||||
@@ -49,11 +49,11 @@ public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<Tg
|
||||
Seeders = 0,
|
||||
Leechers = 0,
|
||||
};
|
||||
|
||||
|
||||
HandleSize(itemNode, torrent, "Size");
|
||||
|
||||
torrent.Category = SetCategory(category);
|
||||
|
||||
|
||||
return torrent;
|
||||
}
|
||||
|
||||
@@ -88,12 +88,12 @@ public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<Tg
|
||||
private long? ExtractSizeFromDescription(string input)
|
||||
{
|
||||
var sizeMatch = SizeStringExtractor().Match(input);
|
||||
|
||||
|
||||
if (!sizeMatch.Success)
|
||||
{
|
||||
throw new FormatException("Unable to parse size from the input.");
|
||||
}
|
||||
|
||||
|
||||
var sizeString = sizeMatch.Groups[1].Value;
|
||||
|
||||
var units = new Dictionary<string, long>
|
||||
@@ -106,7 +106,7 @@ public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<Tg
|
||||
};
|
||||
|
||||
var match = SizeStringParser().Match(sizeString);
|
||||
|
||||
|
||||
if (match.Success)
|
||||
{
|
||||
var val = double.Parse(match.Groups[1].Value);
|
||||
@@ -137,7 +137,7 @@ public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<Tg
|
||||
private static bool IsAllowedCategory(string category)
|
||||
{
|
||||
var parsedCategory = category.Split(':').ElementAtOrDefault(0)?.Trim().ToLower();
|
||||
|
||||
|
||||
return parsedCategory is not null && AllowedCategories.Contains(parsedCategory);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,4 +10,4 @@ public class SyncTorrentioJob(ICrawlerProvider crawlerProvider) : BaseJob(crawle
|
||||
public static readonly JobKey Key = new(JobName, nameof(Literals.CrawlersJobs));
|
||||
public static readonly TriggerKey Trigger = new($"{JobName}-trigger", nameof(Literals.CrawlersJobs));
|
||||
protected override string Crawler => nameof(TorrentioCrawler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,6 @@ public class TorrentioConfiguration
|
||||
{
|
||||
public const string SectionName = "TorrentioConfiguration";
|
||||
public const string Filename = "torrentio.json";
|
||||
|
||||
|
||||
public List<TorrentioInstance> Instances { get; set; } = [];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@ public partial class TorrentioCrawler(
|
||||
[GeneratedRegex(@"(\d+(\.\d+)?) (GB|MB)")]
|
||||
private static partial Regex SizeMatcher();
|
||||
private const int MaximumEmptyItemsCount = 5;
|
||||
|
||||
|
||||
private const string MovieSlug = "movie/{0}.json";
|
||||
protected override string Url => "sort=size%7Cqualityfilter=other,scr,cam,unknown/stream/{0}";
|
||||
protected override IReadOnlyDictionary<string, string> Mappings { get; } = new Dictionary<string, string>();
|
||||
@@ -33,21 +33,21 @@ public partial class TorrentioCrawler(
|
||||
async () =>
|
||||
{
|
||||
var emptyMongoDbItemsCount = 0;
|
||||
|
||||
|
||||
var state = instance.EnsureStateExists(_instanceStates);
|
||||
|
||||
|
||||
SetupResiliencyPolicyForInstance(instance, state);
|
||||
|
||||
|
||||
while (state.TotalProcessed < totalRecordCount)
|
||||
{
|
||||
logger.LogInformation("Processing {TorrentioInstance}", instance.Name);
|
||||
logger.LogInformation("Current processed requests: {ProcessedRequests}", state.TotalProcessed);
|
||||
|
||||
|
||||
var items = await imdbDataService.GetImdbEntriesForRequests(
|
||||
DateTime.UtcNow.Year.ToString(),
|
||||
instance.RateLimit.MongoBatchSize,
|
||||
state.LastProcessedImdbId);
|
||||
|
||||
|
||||
if (items.Count == 0)
|
||||
{
|
||||
emptyMongoDbItemsCount++;
|
||||
@@ -58,10 +58,10 @@ public partial class TorrentioCrawler(
|
||||
logger.LogInformation("Maximum empty document count reached. Cancelling {TorrentioInstance}", instance.Name);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
var newTorrents = new List<Torrent>();
|
||||
var processedItemsCount = 0;
|
||||
|
||||
@@ -70,7 +70,7 @@ public partial class TorrentioCrawler(
|
||||
try
|
||||
{
|
||||
var currentCount = processedItemsCount;
|
||||
|
||||
|
||||
await state.ResiliencyPolicy.ExecuteAsync(
|
||||
async () =>
|
||||
{
|
||||
@@ -97,7 +97,7 @@ public partial class TorrentioCrawler(
|
||||
newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!));
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
processedItemsCount++;
|
||||
}
|
||||
catch (Exception)
|
||||
@@ -127,7 +127,7 @@ public partial class TorrentioCrawler(
|
||||
{
|
||||
logger.LogWarning("Retry {RetryCount} encountered an exception: {Message}. Pausing for {Timespan} seconds instance {TorrentioInstance}", retryCount, exception.Message, timeSpan.Seconds, instance.Name);
|
||||
});
|
||||
|
||||
|
||||
var circuitBreakerPolicy = Policy
|
||||
.Handle<Exception>()
|
||||
.CircuitBreakerAsync(
|
||||
@@ -139,9 +139,9 @@ public partial class TorrentioCrawler(
|
||||
},
|
||||
onReset: () => logger.LogInformation("Circuit closed for {TorrentioInstance}, calls will flow again", instance.Name),
|
||||
onHalfOpen: () => logger.LogInformation("Circuit is half-open for {TorrentioInstance}, next call is a trial if it should close or break again", instance.Name));
|
||||
|
||||
|
||||
var policyWrap = Policy.WrapAsync(retryPolicy, circuitBreakerPolicy);
|
||||
|
||||
|
||||
state.ResiliencyPolicy = policyWrap;
|
||||
}
|
||||
|
||||
@@ -162,24 +162,24 @@ public partial class TorrentioCrawler(
|
||||
{
|
||||
throw new("Failed to fetch " + requestUrl);
|
||||
}
|
||||
|
||||
|
||||
var json = JsonDocument.Parse(await response.Content.ReadAsStringAsync());
|
||||
var streams = json.RootElement.GetProperty("streams").EnumerateArray();
|
||||
return streams.Select(x => ParseTorrent(instance, x, imdbId)).Where(x => x != null).ToList();
|
||||
}
|
||||
|
||||
|
||||
private Torrent? ParseTorrent(TorrentioInstance instance, JsonElement item, string imdId)
|
||||
{
|
||||
var title = item.GetProperty("title").GetString();
|
||||
var infoHash = item.GetProperty("infoHash").GetString();
|
||||
|
||||
|
||||
if (string.IsNullOrEmpty(title) || string.IsNullOrEmpty(infoHash))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
var torrent = ParseTorrentDetails(title, instance, infoHash, imdId);
|
||||
|
||||
|
||||
return string.IsNullOrEmpty(torrent.Name) ? null : torrent;
|
||||
}
|
||||
|
||||
@@ -218,4 +218,4 @@ public partial class TorrentioCrawler(
|
||||
|
||||
return torrent;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,4 +7,4 @@ public class TorrentioInstance
|
||||
public string Url { get; init; } = default!;
|
||||
|
||||
public TorrentioRateLimit RateLimit { get; init; } = default!;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ public static class TorrentioInstancesExtensions
|
||||
|
||||
return remaining > TimeSpan.Zero ? remaining : TimeSpan.Zero;
|
||||
}
|
||||
|
||||
|
||||
public static void SetPossiblyRateLimited(this TorrentioInstance instance, TorrentioScrapeInstance state, int minutesToWait = 5)
|
||||
{
|
||||
// Set the start time to 15 minutes in the past so that the next check will result in a rate limit period of 15 minutes
|
||||
@@ -33,12 +33,12 @@ public static class TorrentioInstancesExtensions
|
||||
state.RequestCount = requestCount;
|
||||
}
|
||||
|
||||
public static long TotalProcessedRequests(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
||||
public static long TotalProcessedRequests(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
||||
!scraperState.TryGetValue(instance.Name, out var state) ? 0 : state.TotalProcessed;
|
||||
|
||||
public static string? LastProcessedImdbId(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
||||
|
||||
public static string? LastProcessedImdbId(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
||||
!scraperState.TryGetValue(instance.Name, out var state) ? null : state.LastProcessedImdbId;
|
||||
|
||||
|
||||
public static TorrentioScrapeInstance EnsureStateExists(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState)
|
||||
{
|
||||
if (!scraperState.TryGetValue(instance.Name, out var state))
|
||||
@@ -49,4 +49,4 @@ public static class TorrentioInstancesExtensions
|
||||
|
||||
return state;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,10 +4,10 @@ public class TorrentioRateLimit
|
||||
{
|
||||
public int RequestLimit { get; set; }
|
||||
public int IntervalInSeconds { get; set; }
|
||||
|
||||
|
||||
public int MongoBatchSize { get; set; }
|
||||
|
||||
|
||||
public int ExceptionLimit { get; set; }
|
||||
|
||||
|
||||
public int ExceptionIntervalInSeconds { get; set; }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,4 +7,4 @@ public class TorrentioScrapeInstance
|
||||
public int TotalProcessed { get; set; }
|
||||
public string? LastProcessedImdbId { get; set; }
|
||||
public IAsyncPolicy? ResiliencyPolicy { get; set; }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,4 +9,4 @@ public class SyncTpbJob(ICrawlerProvider crawlerProvider) : BaseJob(crawlerProvi
|
||||
public static readonly JobKey Key = new(JobName, nameof(Literals.CrawlersJobs));
|
||||
public static readonly TriggerKey Trigger = new($"{JobName}-trigger", nameof(Literals.CrawlersJobs));
|
||||
protected override string Crawler => nameof(TpbCrawler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler
|
||||
protected override string Url => "https://apibay.org/precompiled/data_top100_recent.json";
|
||||
|
||||
protected override string Source => "TPB";
|
||||
|
||||
|
||||
// ReSharper disable once UnusedMember.Local
|
||||
private readonly Dictionary<string, Dictionary<string, int>> TpbCategories = new()
|
||||
{
|
||||
@@ -33,12 +33,12 @@ public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler
|
||||
{"OTHER", 599},
|
||||
}},
|
||||
};
|
||||
|
||||
|
||||
private static readonly HashSet<int> TvSeriesCategories = [ 205, 208 ];
|
||||
private static readonly HashSet<int> MovieCategories = [ 201, 202, 207, 209 ];
|
||||
private static readonly HashSet<int> PornCategories = [ 500, 501, 502, 505, 506 ];
|
||||
private static readonly HashSet<int> AllowedCategories = [ ..MovieCategories, ..TvSeriesCategories ];
|
||||
|
||||
|
||||
protected override IReadOnlyDictionary<string, string> Mappings
|
||||
=> new Dictionary<string, string>
|
||||
{
|
||||
@@ -54,12 +54,12 @@ public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler
|
||||
protected override Torrent? ParseTorrent(JsonElement item)
|
||||
{
|
||||
var incomingCategory = item.GetProperty(Mappings["Category"]).GetInt32();
|
||||
|
||||
|
||||
if (!AllowedCategories.Contains(incomingCategory))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
var torrent = new Torrent
|
||||
{
|
||||
Source = Source,
|
||||
@@ -69,11 +69,11 @@ public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler
|
||||
Leechers = item.GetProperty(Mappings["Leechers"]).GetInt32(),
|
||||
Imdb = item.GetProperty(Mappings["Imdb"]).GetString(),
|
||||
};
|
||||
|
||||
|
||||
HandleInfoHash(item, torrent, "InfoHash");
|
||||
|
||||
|
||||
torrent.Category = HandleCategory(incomingCategory);
|
||||
|
||||
|
||||
return torrent;
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler
|
||||
MovieCategories.Contains(category) switch
|
||||
{
|
||||
true => "movies",
|
||||
_ => TvSeriesCategories.Contains(category) switch
|
||||
_ => TvSeriesCategories.Contains(category) switch
|
||||
{
|
||||
true => "tv",
|
||||
_ => "xxx",
|
||||
@@ -89,4 +89,4 @@ public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler
|
||||
};
|
||||
|
||||
public override Task Execute() => Execute("items");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,4 +9,4 @@ public class SyncYtsJob(ICrawlerProvider crawlerProvider) : BaseJob(crawlerProvi
|
||||
public static readonly JobKey Key = new(JobName, nameof(Literals.CrawlersJobs));
|
||||
public static readonly TriggerKey Trigger = new($"{JobName}-trigger", nameof(Literals.CrawlersJobs));
|
||||
protected override string Crawler => nameof(YtsCrawler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,19 +26,19 @@ public class YtsCrawler(IHttpClientFactory httpClientFactory, ILogger<YtsCrawler
|
||||
Seeders = 0,
|
||||
Leechers = 0,
|
||||
};
|
||||
|
||||
|
||||
HandleInfoHash(itemNode, torrent, "InfoHash");
|
||||
|
||||
|
||||
return torrent;
|
||||
}
|
||||
|
||||
protected override void HandleInfoHash(XElement itemNode, Torrent torrent, string infoHashKey)
|
||||
{
|
||||
var infoHash = itemNode.Element(Mappings[infoHashKey])?.Attribute("url")?.Value.Split("/download/").ElementAtOrDefault(1);
|
||||
|
||||
|
||||
if (infoHash is not null)
|
||||
{
|
||||
torrent.InfoHash = infoHash;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,16 +8,16 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
|
||||
VALUES (@Name, @Source, @Category, @InfoHash, @Size, @Seeders, @Leechers, @Imdb, @Processed, @CreatedAt, @UpdatedAt)
|
||||
ON CONFLICT (source, info_hash) DO NOTHING
|
||||
""";
|
||||
|
||||
|
||||
private const string InsertIngestedPageSql =
|
||||
"""
|
||||
INSERT INTO ingested_pages (url, "createdAt", "updatedAt")
|
||||
VALUES (@Url, @CreatedAt, @UpdatedAt)
|
||||
""";
|
||||
|
||||
|
||||
private const string GetMovieAndSeriesTorrentsNotProcessedSql =
|
||||
"""
|
||||
SELECT
|
||||
SELECT
|
||||
id as "Id",
|
||||
name as "Name",
|
||||
source as "Source",
|
||||
@@ -29,20 +29,20 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
|
||||
imdb as "Imdb",
|
||||
processed as "Processed",
|
||||
"createdAt" as "CreatedAt",
|
||||
"updatedAt" as "UpdatedAt"
|
||||
"updatedAt" as "UpdatedAt"
|
||||
FROM ingested_torrents
|
||||
WHERE processed = false AND category != 'xxx'
|
||||
""";
|
||||
|
||||
|
||||
private const string UpdateProcessedSql =
|
||||
"""
|
||||
UPDATE ingested_torrents
|
||||
Set
|
||||
processed = true,
|
||||
Set
|
||||
processed = true,
|
||||
"updatedAt" = @UpdatedAt
|
||||
WHERE id = @Id
|
||||
""";
|
||||
|
||||
|
||||
public async Task<InsertTorrentResult> InsertTorrents(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
@@ -73,19 +73,19 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
|
||||
return new List<Torrent>();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public async Task<UpdatedTorrentResult> SetTorrentsProcessed(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default)
|
||||
{
|
||||
try
|
||||
{
|
||||
await using var connection = new NpgsqlConnection(configuration.StorageConnectionString);
|
||||
await connection.OpenAsync(cancellationToken);
|
||||
|
||||
|
||||
foreach (var torrent in torrents)
|
||||
{
|
||||
torrent.UpdatedAt = DateTime.UtcNow;
|
||||
}
|
||||
|
||||
|
||||
var updated = await connection.ExecuteAsync(UpdateProcessedSql, torrents);
|
||||
return new(true, updated);
|
||||
}
|
||||
@@ -99,11 +99,11 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
|
||||
{
|
||||
await using var connection = new NpgsqlConnection(configuration.StorageConnectionString);
|
||||
await connection.OpenAsync(cancellationToken);
|
||||
|
||||
|
||||
const string query = "SELECT EXISTS (SELECT 1 FROM ingested_pages WHERE url = @Url)";
|
||||
|
||||
|
||||
var result = await connection.ExecuteScalarAsync<bool>(query, new { Url = pageId });
|
||||
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@@ -115,14 +115,14 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
|
||||
try
|
||||
{
|
||||
var date = DateTime.UtcNow;
|
||||
|
||||
|
||||
await connection.ExecuteAsync(InsertIngestedPageSql, new
|
||||
{
|
||||
Url = pageId,
|
||||
CreatedAt = date,
|
||||
UpdatedAt = date,
|
||||
});
|
||||
|
||||
|
||||
return new(true, "Page successfully marked as ingested");
|
||||
}
|
||||
catch (Exception e)
|
||||
@@ -130,4 +130,4 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
|
||||
return new(false, $"Failed to mark page as ingested: {e.Message}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,4 +7,4 @@ public interface IDataStorage
|
||||
Task<UpdatedTorrentResult> SetTorrentsProcessed(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default);
|
||||
Task<bool> PageIngested(string pageId, CancellationToken cancellationToken = default);
|
||||
Task<PageIngestedResult> MarkPageAsIngested(string pageId, CancellationToken cancellationToken = default);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,4 +12,4 @@ public class ImdbEntry
|
||||
public string? EndYear { get; set; }
|
||||
public string? RuntimeMinutes { get; set; }
|
||||
public string? Genres { get; set; }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,30 +8,30 @@ public class ImdbMongoDbService
|
||||
public ImdbMongoDbService(MongoConfiguration configuration, ILogger<ImdbMongoDbService> logger)
|
||||
{
|
||||
_logger = logger;
|
||||
|
||||
|
||||
var client = new MongoClient(configuration.ConnectionString);
|
||||
var database = client.GetDatabase(configuration.DbName);
|
||||
|
||||
_imdbCollection = database.GetCollection<ImdbEntry>("imdb-entries");
|
||||
}
|
||||
|
||||
|
||||
public async Task<IReadOnlyList<ImdbEntry>> GetImdbEntriesForRequests(string startYear, int requestLimit, string? startingId = null)
|
||||
{
|
||||
_logger.LogInformation("Getting IMDB entries for requests");
|
||||
_logger.LogInformation("Start year: {StartYear}", startYear);
|
||||
_logger.LogInformation("Request limit: {RequestLimit}", requestLimit);
|
||||
_logger.LogInformation("Starting ID: {StartingId}", startingId);
|
||||
|
||||
|
||||
var sort = Builders<ImdbEntry>.Sort
|
||||
.Descending(e => e.StartYear)
|
||||
.Descending(e => e.ImdbId);
|
||||
|
||||
|
||||
var filter = Builders<ImdbEntry>.Filter
|
||||
.And(
|
||||
Builders<ImdbEntry>.Filter.Eq(e => e.TitleType, "movie"),
|
||||
Builders<ImdbEntry>.Filter.Lte(e => e.StartYear, startYear)
|
||||
);
|
||||
|
||||
|
||||
if (!string.IsNullOrWhiteSpace(startingId))
|
||||
{
|
||||
filter = Builders<ImdbEntry>.Filter.And(filter, Builders<ImdbEntry>.Filter.Lt(e => e.ImdbId, startingId));
|
||||
@@ -39,7 +39,7 @@ public class ImdbMongoDbService
|
||||
|
||||
return await _imdbCollection.Find(filter).Limit(requestLimit).Sort(sort).ToListAsync();
|
||||
}
|
||||
|
||||
|
||||
public async Task<long> GetTotalCountAsync()
|
||||
{
|
||||
var filter = Builders<ImdbEntry>.Filter.Eq(x => x.TitleType, "movie");
|
||||
@@ -55,14 +55,14 @@ public class ImdbMongoDbService
|
||||
.Text(e => e.PrimaryTitle)
|
||||
.Ascending(e => e.TitleType)
|
||||
.Ascending(e => e.StartYear);
|
||||
|
||||
|
||||
CreateIndex(index1KeysDefinition);
|
||||
|
||||
// Compound index for StartYear and _id in descending order
|
||||
var index2KeysDefinition = Builders<ImdbEntry>.IndexKeys
|
||||
.Descending(e => e.StartYear)
|
||||
.Descending(e => e.ImdbId);
|
||||
|
||||
|
||||
CreateIndex(index2KeysDefinition);
|
||||
|
||||
return true;
|
||||
@@ -73,11 +73,11 @@ public class ImdbMongoDbService
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void CreateIndex(IndexKeysDefinition<ImdbEntry> keysDefinition)
|
||||
{
|
||||
var createIndexOptions = new CreateIndexOptions { Background = true };
|
||||
var indexModel = new CreateIndexModel<ImdbEntry>(keysDefinition, createIndexOptions);
|
||||
_imdbCollection.Indexes.CreateOne(indexModel);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,13 +8,13 @@ public class MongoConfiguration
|
||||
private const string DbVariable = "DB";
|
||||
private const string UsernameVariable = "USER";
|
||||
private const string PasswordVariable = "PASSWORD";
|
||||
|
||||
|
||||
|
||||
private string Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HostVariable);
|
||||
private int Port { get; init; } = Prefix.GetEnvironmentVariableAsInt(PortVariable, 27017);
|
||||
private string Username { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(UsernameVariable);
|
||||
private string Password { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(PasswordVariable);
|
||||
public string DbName { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(DbVariable);
|
||||
|
||||
|
||||
public string ConnectionString => $"mongodb://{Username}:{Password}@{Host}:{Port}/{DbName}?tls=false&directConnection=true&authSource=admin";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,4 +16,4 @@ public class PostgresConfiguration
|
||||
private int PORT { get; init; } = Prefix.GetEnvironmentVariableAsInt(PortVariable, 5432);
|
||||
|
||||
public string StorageConnectionString => $"Host={Host};Port={PORT};Username={Username};Password={Password};Database={Database};";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,4 +11,4 @@ internal static class ServiceCollectionExtensions
|
||||
services.AddSingleton<ImdbMongoDbService>();
|
||||
return services;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,4 +20,4 @@ public abstract class BaseJob(ICrawlerProvider crawlerProvider) : IJob
|
||||
}
|
||||
|
||||
protected abstract string Crawler { get; }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,4 +4,4 @@ public interface ICrawlerJob<out TCrawler> : IJob
|
||||
where TCrawler : ICrawler
|
||||
{
|
||||
TCrawler CrawlerType { get; }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,4 +4,4 @@ public static class Literals
|
||||
{
|
||||
public const string CrawlersJobs = "CrawlersJobs";
|
||||
public const string PublishingJobs = "PublishingJobs";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,4 +3,4 @@ namespace Producer.Features.JobSupport;
|
||||
[AttributeUsage(AttributeTargets.Class)]
|
||||
public class ManualJobRegistrationAttribute : Attribute
|
||||
{
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,6 @@ public class ScrapeConfiguration
|
||||
{
|
||||
public const string SectionName = "ScrapeConfiguration";
|
||||
public const string Filename = "scrapers.json";
|
||||
|
||||
|
||||
public List<Scraper> Scrapers { get; set; } = [];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ internal static class ServiceCollectionExtensions
|
||||
var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName);
|
||||
var githubConfiguration = services.LoadConfigurationFromEnv<GithubConfiguration>();
|
||||
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
|
||||
|
||||
|
||||
var jobTypes = Assembly.GetAssembly(typeof(BaseJob))
|
||||
.GetTypes()
|
||||
.Where(t => t is {IsClass: true, IsAbstract: false} && typeof(IJob).IsAssignableFrom(t) &&
|
||||
@@ -23,7 +23,7 @@ internal static class ServiceCollectionExtensions
|
||||
{
|
||||
services.AddTransient<SyncDmmJob>();
|
||||
}
|
||||
|
||||
|
||||
var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance);
|
||||
|
||||
services.AddQuartz(
|
||||
@@ -51,18 +51,18 @@ internal static class ServiceCollectionExtensions
|
||||
{
|
||||
var key = jobType.GetField("Key")?.GetValue(jobType);
|
||||
var trigger = jobType.GetField("Trigger")?.GetValue(jobType);
|
||||
|
||||
|
||||
if (key is null || trigger is null)
|
||||
{
|
||||
Console.WriteLine($"Job {jobType.Name} does not have a JobKey or TriggerKey property");
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
var method = openMethod.MakeGenericMethod(jobType);
|
||||
method.Invoke(null, [quartz, key, trigger, scrapeConfiguration]);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static void RegisterDmmJob(GithubConfiguration githubConfiguration, IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
|
||||
@@ -70,7 +70,7 @@ internal static class ServiceCollectionExtensions
|
||||
AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private static void RegisterTorrentioJob(
|
||||
IServiceCollection services,
|
||||
IServiceCollectionQuartzConfigurator quartz,
|
||||
@@ -78,14 +78,14 @@ internal static class ServiceCollectionExtensions
|
||||
ScrapeConfiguration scrapeConfiguration)
|
||||
{
|
||||
var torrentioConfiguration = services.LoadConfigurationFromConfig<TorrentioConfiguration>(configuration, TorrentioConfiguration.SectionName);
|
||||
|
||||
|
||||
if (torrentioConfiguration.Instances.Count != 0)
|
||||
{
|
||||
AddJobWithTrigger<SyncTorrentioJob>(quartz, SyncTorrentioJob.Key, SyncTorrentioJob.Trigger, scrapeConfiguration);
|
||||
}
|
||||
}
|
||||
|
||||
private static void RegisterPublisher(IServiceCollectionQuartzConfigurator quartz, RabbitMqConfiguration rabbitConfig) =>
|
||||
private static void RegisterPublisher(IServiceCollectionQuartzConfigurator quartz, RabbitMqConfiguration rabbitConfig) =>
|
||||
AddJobWithTriggerAndInterval<PublisherJob>(quartz, PublisherJob.Key, PublisherJob.Trigger, rabbitConfig.PublishIntervalInSeconds);
|
||||
|
||||
private static void AddJobWithTrigger<TJobType>(
|
||||
@@ -95,14 +95,14 @@ internal static class ServiceCollectionExtensions
|
||||
ScrapeConfiguration scrapeConfiguration) where TJobType : IJob
|
||||
{
|
||||
var scraper = scrapeConfiguration.Scrapers
|
||||
.FirstOrDefault(x => x.Name != null &&
|
||||
.FirstOrDefault(x => x.Name != null &&
|
||||
x.Name.Equals(typeof(TJobType).Name, StringComparison.OrdinalIgnoreCase));
|
||||
|
||||
if (scraper is null || !scraper.Enabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
quartz.AddJob<TJobType>(opts => opts.WithIdentity(key).StoreDurably());
|
||||
|
||||
quartz.AddTrigger(
|
||||
@@ -112,7 +112,7 @@ internal static class ServiceCollectionExtensions
|
||||
.StartAt(DateTimeOffset.Now.AddSeconds(20))
|
||||
.WithSimpleSchedule(x => x.WithInterval(TimeSpan.FromSeconds(scraper.IntervalSeconds)).RepeatForever()));
|
||||
}
|
||||
|
||||
|
||||
private static void AddJobWithTriggerAndInterval<TJobType>(
|
||||
IServiceCollectionQuartzConfigurator quartz,
|
||||
JobKey key,
|
||||
@@ -120,7 +120,7 @@ internal static class ServiceCollectionExtensions
|
||||
int interval) where TJobType : IJob
|
||||
{
|
||||
quartz.AddJob<TJobType>(opts => opts.WithIdentity(key).StoreDurably());
|
||||
|
||||
|
||||
quartz.AddTrigger(
|
||||
opts => opts
|
||||
.ForJob(key)
|
||||
@@ -128,4 +128,4 @@ internal static class ServiceCollectionExtensions
|
||||
.StartAt(DateTimeOffset.Now.AddSeconds(20))
|
||||
.WithSimpleSchedule(x => x.WithInterval(TimeSpan.FromSeconds(interval)).RepeatForever()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user