diff --git a/src/producer/Configuration/torrentio.json b/src/producer/Configuration/torrentio.json index 8209dd7..0400ffe 100644 --- a/src/producer/Configuration/torrentio.json +++ b/src/producer/Configuration/torrentio.json @@ -6,7 +6,10 @@ "Url": "https://torrentio.strem.fun", "RateLimit": { "RequestLimit": 300, - "IntervalInSeconds": 3600 + "IntervalInSeconds": 3600, + "MongoBatchSize": 300, + "ExceptionLimit": 5, + "ExceptionIntervalInSeconds": 60 } } ] diff --git a/src/producer/Features/Crawlers/Torrentio/TorrentioCrawler.cs b/src/producer/Features/Crawlers/Torrentio/TorrentioCrawler.cs index d135a95..b74de23 100644 --- a/src/producer/Features/Crawlers/Torrentio/TorrentioCrawler.cs +++ b/src/producer/Features/Crawlers/Torrentio/TorrentioCrawler.cs @@ -11,6 +11,7 @@ public partial class TorrentioCrawler( { [GeneratedRegex(@"(\d+(\.\d+)?) (GB|MB)")] private static partial Regex SizeMatcher(); + private const int MaximumEmptyItemsCount = 5; private const string MovieSlug = "movie/{0}.json"; protected override string Url => "sort=size%7Cqualityfilter=other,scr,cam,unknown/stream/{0}"; @@ -31,20 +32,33 @@ public partial class TorrentioCrawler( Task.Run( async () => { - while (instance.TotalProcessedRequests(_instanceStates) < totalRecordCount) + var emptyMongoDbItemsCount = 0; + + var state = instance.EnsureStateExists(_instanceStates); + + SetupResiliencyPolicyForInstance(instance, state); + + while (state.TotalProcessed < totalRecordCount) { logger.LogInformation("Processing {TorrentioInstance}", instance.Name); - logger.LogInformation("Current processed requests: {ProcessedRequests}", instance.TotalProcessedRequests(_instanceStates)); + logger.LogInformation("Current processed requests: {ProcessedRequests}", state.TotalProcessed); var items = await imdbDataService.GetImdbEntriesForRequests( DateTime.UtcNow.Year.ToString(), - instance.RateLimit.RequestLimit, - instance.LastProcessedImdbId(_instanceStates)); + instance.RateLimit.MongoBatchSize, + state.LastProcessedImdbId); if (items.Count == 0) { + emptyMongoDbItemsCount++; logger.LogInformation("No items to process for {TorrentioInstance}", instance.Name); await Task.Delay(10000); + if (emptyMongoDbItemsCount >= MaximumEmptyItemsCount) + { + logger.LogInformation("Maximum empty document count reached. Cancelling {TorrentioInstance}", instance.Name); + break; + } + continue; } @@ -55,29 +69,34 @@ public partial class TorrentioCrawler( { processedItemsCount++; - var waitTime = instance.CalculateWaitTime(_instanceStates); + var waitTime = instance.CalculateWaitTime(state); if (waitTime > TimeSpan.Zero) { logger.LogInformation("Rate limit reached for {TorrentioInstance}", instance.Name); - logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime}", instance.Name, waitTime); + logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, waitTime / 1000.0); await Task.Delay(waitTime); } if (processedItemsCount % 2 == 0) { - var randomWait = new Random().Next(1000, 5000); - logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime}", instance.Name, randomWait); + var randomWait = Random.Shared.Next(1000, 5000); + logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, randomWait / 1000.0); await Task.Delay(randomWait); } try { - var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client); - if (torrentInfo is not null) - { - newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!)); - } + await state.ResiliencyPolicy.ExecuteAsync( + async () => + { + var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client); + + if (torrentInfo is not null) + { + newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!)); + } + }); } catch (Exception error) { @@ -88,16 +107,31 @@ public partial class TorrentioCrawler( if (newTorrents.Count > 0) { await InsertTorrents(newTorrents); - - var currentState = _instanceStates[instance.Name]; - _instanceStates[instance.Name] = currentState with - { - LastProcessedImdbId = items[^1].ImdbId, - }; + + state.LastProcessedImdbId = items[^1].ImdbId; } } }); + private void SetupResiliencyPolicyForInstance(TorrentioInstance instance, TorrentioScrapeInstance state) + { + var policy = Policy + .Handle() + .CircuitBreakerAsync( + exceptionsAllowedBeforeBreaking: instance.RateLimit.ExceptionLimit, + durationOfBreak: TimeSpan.FromSeconds(instance.RateLimit.ExceptionIntervalInSeconds), + onBreak: (ex, breakDelay) => + { + logger.LogWarning(ex, "Breaking circuit for {TorrentioInstance} for {BreakDelay}ms due to {Exception}", instance.Name, breakDelay.TotalMilliseconds, ex.Message); + }, + onReset: () => logger.LogInformation("Circuit closed for {TorrentioInstance}, calls will flow again", instance.Name), + onHalfOpen: () => logger.LogInformation("Circuit is half-open for {TorrentioInstance}, next call is a trial if it should close or break again", instance.Name)); + + var policyWrap = Policy.WrapAsync(policy, Policy.TimeoutAsync(TimeSpan.FromSeconds(30))); + + state.ResiliencyPolicy = policyWrap; + } + private async Task?> ScrapeInstance(TorrentioInstance instance, string imdbId, HttpClient client) { logger.LogInformation("Searching Torrentio {TorrentioInstance}: {ImdbId}", instance.Name, imdbId); diff --git a/src/producer/Features/Crawlers/Torrentio/TorrentioInstanceExtensions.cs b/src/producer/Features/Crawlers/Torrentio/TorrentioInstanceExtensions.cs index 7ffd316..114d698 100644 --- a/src/producer/Features/Crawlers/Torrentio/TorrentioInstanceExtensions.cs +++ b/src/producer/Features/Crawlers/Torrentio/TorrentioInstanceExtensions.cs @@ -2,47 +2,37 @@ namespace Producer.Features.Crawlers.Torrentio; public static class TorrentioInstancesExtensions { - public static TimeSpan CalculateWaitTime(this TorrentioInstance instance, Dictionary scraperState) + public static TimeSpan CalculateWaitTime(this TorrentioInstance instance, TorrentioScrapeInstance state) { - if (!scraperState.TryGetValue(instance.Name, out var state)) + if (state.RequestCount < instance.RateLimit.RequestLimit) { - state = new (DateTime.UtcNow, 0, 0, null); - scraperState[instance.Name] = state; - } - - var (startedAt, requestCount, totalProcessed, lastProcessedImdbId) = state; - - if (requestCount < instance.RateLimit.RequestLimit) - { - scraperState[instance.Name] = new (startedAt, requestCount + 1, totalProcessed + 1, lastProcessedImdbId); + state.RequestCount++; + state.TotalProcessed++; return TimeSpan.Zero; } - var elapsed = DateTime.UtcNow - startedAt; + var elapsed = DateTime.UtcNow - state.StartedAt; var interval = TimeSpan.FromSeconds(instance.RateLimit.IntervalInSeconds); var remaining = interval - elapsed; // reset the state for the next interval - scraperState[instance.Name] = new (DateTime.UtcNow, 0, totalProcessed, lastProcessedImdbId); + state.StartedAt = DateTime.UtcNow; + state.RequestCount = 0; return remaining > TimeSpan.Zero ? remaining : TimeSpan.Zero; } public static void SetPossiblyRateLimited(this TorrentioInstance instance, Dictionary scraperState, int minutesToWait = 5) { - if (!scraperState.TryGetValue(instance.Name, out var state)) - { - state = new (DateTime.UtcNow, 0, 0, null); - } - - var (_, _, totalProcessed, lastProcessedImdbId) = state; + var state = instance.EnsureStateExists(scraperState); // Set the start time to 15 minutes in the past so that the next check will result in a rate limit period of 15 minutes var startedAt = DateTime.UtcNow.AddMinutes(-minutesToWait); var requestCount = instance.RateLimit.RequestLimit; // Update the scraper state for the instance - scraperState[instance.Name] = new (startedAt, requestCount, totalProcessed, lastProcessedImdbId); + state.StartedAt = startedAt; + state.RequestCount = requestCount; } public static long TotalProcessedRequests(this TorrentioInstance instance, Dictionary scraperState) => @@ -50,4 +40,15 @@ public static class TorrentioInstancesExtensions public static string? LastProcessedImdbId(this TorrentioInstance instance, Dictionary scraperState) => !scraperState.TryGetValue(instance.Name, out var state) ? null : state.LastProcessedImdbId; + + public static TorrentioScrapeInstance EnsureStateExists(this TorrentioInstance instance, Dictionary scraperState) + { + if (!scraperState.TryGetValue(instance.Name, out var state)) + { + state = new(); + scraperState[instance.Name] = state; + } + + return state; + } } \ No newline at end of file diff --git a/src/producer/Features/Crawlers/Torrentio/TorrentioRateLimit.cs b/src/producer/Features/Crawlers/Torrentio/TorrentioRateLimit.cs index fe89195..97b9c75 100644 --- a/src/producer/Features/Crawlers/Torrentio/TorrentioRateLimit.cs +++ b/src/producer/Features/Crawlers/Torrentio/TorrentioRateLimit.cs @@ -4,4 +4,10 @@ public class TorrentioRateLimit { public int RequestLimit { get; set; } public int IntervalInSeconds { get; set; } + + public int MongoBatchSize { get; set; } + + public int ExceptionLimit { get; set; } + + public int ExceptionIntervalInSeconds { get; set; } } \ No newline at end of file diff --git a/src/producer/Features/Crawlers/Torrentio/TorrentioScrapeInstance.cs b/src/producer/Features/Crawlers/Torrentio/TorrentioScrapeInstance.cs index 9d6c23e..5724305 100644 --- a/src/producer/Features/Crawlers/Torrentio/TorrentioScrapeInstance.cs +++ b/src/producer/Features/Crawlers/Torrentio/TorrentioScrapeInstance.cs @@ -1,3 +1,10 @@ namespace Producer.Features.Crawlers.Torrentio; -public record TorrentioScrapeInstance(DateTime StartedAt, int RequestCount, int TotalProcessed, string? LastProcessedImdbId); \ No newline at end of file +public class TorrentioScrapeInstance +{ + public DateTime StartedAt { get; set; } = DateTime.UtcNow; + public int RequestCount { get; set; } + public int TotalProcessed { get; set; } + public string? LastProcessedImdbId { get; set; } + public IAsyncPolicy? ResiliencyPolicy { get; set; } +} \ No newline at end of file diff --git a/src/producer/GlobalUsings.cs b/src/producer/GlobalUsings.cs index 131bc36..a2b16b8 100644 --- a/src/producer/GlobalUsings.cs +++ b/src/producer/GlobalUsings.cs @@ -16,6 +16,7 @@ global using Microsoft.Extensions.Logging; global using MongoDB.Bson.Serialization.Attributes; global using MongoDB.Driver; global using Npgsql; +global using Polly; global using Quartz; global using Producer.Extensions; global using Producer.Features.Amqp; diff --git a/src/producer/Producer.csproj b/src/producer/Producer.csproj index 2405295..b85b0ee 100644 --- a/src/producer/Producer.csproj +++ b/src/producer/Producer.csproj @@ -17,6 +17,7 @@ +