From c3a281c39f227d68ffcd7900bc2b74cd88e05433 Mon Sep 17 00:00:00 2001 From: iPromKnight Date: Sun, 3 Mar 2024 19:54:32 +0000 Subject: [PATCH] retry polic and circuit breaker policy --- .../Crawlers/Torrentio/TorrentioCrawler.cs | 146 +++++++++--------- .../Torrentio/TorrentioInstanceExtensions.cs | 11 ++ .../Torrentio/TorrentioScrapeInstance.cs | 1 + 3 files changed, 87 insertions(+), 71 deletions(-) diff --git a/src/producer/Features/Crawlers/Torrentio/TorrentioCrawler.cs b/src/producer/Features/Crawlers/Torrentio/TorrentioCrawler.cs index acb33bd..166f3c2 100644 --- a/src/producer/Features/Crawlers/Torrentio/TorrentioCrawler.cs +++ b/src/producer/Features/Crawlers/Torrentio/TorrentioCrawler.cs @@ -67,43 +67,52 @@ public partial class TorrentioCrawler( foreach (var item in items) { - processedItemsCount++; - - var waitTime = instance.CalculateWaitTime(state); - - if (waitTime > TimeSpan.Zero) - { - logger.LogInformation("Rate limit reached for {TorrentioInstance}", instance.Name); - logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, waitTime / 1000.0); - await Task.Delay(waitTime); - } - - if (processedItemsCount % 2 == 0) - { - var randomWait = Random.Shared.Next(1000, 5000); - logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, randomWait / 1000.0); - await Task.Delay(randomWait); - } - try { - await state.ResiliencyPolicy.ExecuteAsync( + await state.RetryPolicy.ExecuteAsync( async () => { - var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client); + processedItemsCount++; - if (torrentInfo is not null) + var waitTime = instance.CalculateWaitTime(state); + + if (waitTime > TimeSpan.Zero) { - newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!)); + logger.LogInformation("Rate limit reached for {TorrentioInstance}", instance.Name); + logger.LogInformation( + "Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, waitTime / 1000.0); + await Task.Delay(waitTime); } + + if (processedItemsCount % 2 == 0) + { + var randomWait = Random.Shared.Next(1000, 5000); + logger.LogInformation( + "Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, randomWait / 1000.0); + await Task.Delay(randomWait); + } + + + await state.ResiliencyPolicy.ExecuteAsync( + async () => + { + var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client); + + if (torrentInfo is not null) + { + newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!)); + } + }); }); } catch (Exception error) { logger.LogError(error, "page processing error in TorrentioCrawler"); + logger.LogInformation("Setting possibly rate limited for {TorrentioInstance}", instance.Name); + instance.SetPossiblyRateLimited(state); } } - + if (newTorrents.Count > 0) { await InsertTorrents(newTorrents); @@ -115,7 +124,17 @@ public partial class TorrentioCrawler( private void SetupResiliencyPolicyForInstance(TorrentioInstance instance, TorrentioScrapeInstance state) { - var policy = Policy + var retryPolicy = Policy + .Handle() + .WaitAndRetryAsync( + retryCount: 2, // initial attempt + 2 retries + sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), + onRetry: (exception, timeSpan, retryCount, _) => + { + logger.LogWarning($"Retry {retryCount} encountered an exception: {exception.Message}. Pausing for {timeSpan.Seconds} seconds instance {instance.Name}"); + }); + + var circuitBreakerPolicy = Policy .Handle() .CircuitBreakerAsync( exceptionsAllowedBeforeBreaking: instance.RateLimit.ExceptionLimit, @@ -127,25 +146,18 @@ public partial class TorrentioCrawler( onReset: () => logger.LogInformation("Circuit closed for {TorrentioInstance}, calls will flow again", instance.Name), onHalfOpen: () => logger.LogInformation("Circuit is half-open for {TorrentioInstance}, next call is a trial if it should close or break again", instance.Name)); - var policyWrap = Policy.WrapAsync(policy, Policy.TimeoutAsync(TimeSpan.FromSeconds(30))); + var policyWrap = Policy.WrapAsync(circuitBreakerPolicy, Policy.TimeoutAsync(TimeSpan.FromSeconds(30))); state.ResiliencyPolicy = policyWrap; + state.RetryPolicy = retryPolicy; } private async Task?> ScrapeInstance(TorrentioInstance instance, string imdbId, HttpClient client) { logger.LogInformation("Searching Torrentio {TorrentioInstance}: {ImdbId}", instance.Name, imdbId); - try - { - var movieSlug = string.Format(MovieSlug, imdbId); - var urlSlug = string.Format(Url, movieSlug); - return await RunRequest(instance, urlSlug, imdbId, client); - } - catch (Exception error) - { - logger.LogError(error, "page processing error {TorrentioInstance}: {ImdbId}", instance.Name, imdbId); - throw; - } + var movieSlug = string.Format(MovieSlug, imdbId); + var urlSlug = string.Format(Url, movieSlug); + return await RunRequest(instance, urlSlug, imdbId, client); } private async Task?> RunRequest(TorrentioInstance instance, string urlSlug, string imdbId, HttpClient client) @@ -180,45 +192,37 @@ public partial class TorrentioCrawler( private Torrent ParseTorrentDetails(string title, TorrentioInstance instance, string infoHash, string imdbId) { - try + var torrent = new Torrent { - var torrent = new Torrent + Source = $"{Source}_{instance.Name}", + InfoHash = infoHash, + Category = "movies", // we only handle movies for now... + Imdb = imdbId, + }; + + var span = title.AsSpan(); + var titleEnd = span.IndexOf('\n'); + var titlePart = titleEnd >= 0 ? span[..titleEnd].ToString() : title; + + torrent.Name = titlePart.Replace('.', ' ').TrimEnd('.'); + + var sizeMatch = SizeMatcher().Match(title); + + if (sizeMatch.Success) + { + var size = double.Parse(sizeMatch.Groups[1].Value); // Size Value + var sizeUnit = sizeMatch.Groups[3].Value; // Size Unit (GB/MB) + + var sizeInBytes = sizeUnit switch { - Source = $"{Source}_{instance.Name}", - InfoHash = infoHash, - Category = "movies", // we only handle movies for now... - Imdb = imdbId, + "GB" => (long) (size * 1073741824), + "MB" => (long) (size * 1048576), + _ => 0, }; - var span = title.AsSpan(); - var titleEnd = span.IndexOf('\n'); - var titlePart = titleEnd >= 0 ? span[..titleEnd].ToString() : title; - - torrent.Name = titlePart.Replace('.', ' ').TrimEnd('.'); - - var sizeMatch = SizeMatcher().Match(title); - - if (sizeMatch.Success) - { - var size = double.Parse(sizeMatch.Groups[1].Value); // Size Value - var sizeUnit = sizeMatch.Groups[3].Value; // Size Unit (GB/MB) - - var sizeInBytes = sizeUnit switch - { - "GB" => (long) (size * 1073741824), - "MB" => (long) (size * 1048576), - _ => 0, - }; - - torrent.Size = sizeInBytes.ToString(); - } - - return torrent; - } - catch (Exception e) - { - logger.LogError(e, "Error parsing torrent details"); - throw; + torrent.Size = sizeInBytes.ToString(); } + + return torrent; } } \ No newline at end of file diff --git a/src/producer/Features/Crawlers/Torrentio/TorrentioInstanceExtensions.cs b/src/producer/Features/Crawlers/Torrentio/TorrentioInstanceExtensions.cs index e7a4f95..263496d 100644 --- a/src/producer/Features/Crawlers/Torrentio/TorrentioInstanceExtensions.cs +++ b/src/producer/Features/Crawlers/Torrentio/TorrentioInstanceExtensions.cs @@ -21,6 +21,17 @@ public static class TorrentioInstancesExtensions return remaining > TimeSpan.Zero ? remaining : TimeSpan.Zero; } + + public static void SetPossiblyRateLimited(this TorrentioInstance instance, TorrentioScrapeInstance state, int minutesToWait = 5) + { + // Set the start time to 15 minutes in the past so that the next check will result in a rate limit period of 15 minutes + var startedAt = DateTime.UtcNow.AddMinutes(-minutesToWait); + var requestCount = instance.RateLimit.RequestLimit; + + // Update the scraper state for the instance + state.StartedAt = startedAt; + state.RequestCount = requestCount; + } public static long TotalProcessedRequests(this TorrentioInstance instance, Dictionary scraperState) => !scraperState.TryGetValue(instance.Name, out var state) ? 0 : state.TotalProcessed; diff --git a/src/producer/Features/Crawlers/Torrentio/TorrentioScrapeInstance.cs b/src/producer/Features/Crawlers/Torrentio/TorrentioScrapeInstance.cs index 5724305..e904960 100644 --- a/src/producer/Features/Crawlers/Torrentio/TorrentioScrapeInstance.cs +++ b/src/producer/Features/Crawlers/Torrentio/TorrentioScrapeInstance.cs @@ -7,4 +7,5 @@ public class TorrentioScrapeInstance public int TotalProcessed { get; set; } public string? LastProcessedImdbId { get; set; } public IAsyncPolicy? ResiliencyPolicy { get; set; } + public IAsyncPolicy? RetryPolicy { get; set; } } \ No newline at end of file