mirror of
https://github.com/knightcrawler-stremio/knightcrawler.git
synced 2024-12-20 03:29:51 +00:00
retry polic and circuit breaker policy
This commit is contained in:
@@ -67,43 +67,52 @@ public partial class TorrentioCrawler(
|
|||||||
|
|
||||||
foreach (var item in items)
|
foreach (var item in items)
|
||||||
{
|
{
|
||||||
processedItemsCount++;
|
|
||||||
|
|
||||||
var waitTime = instance.CalculateWaitTime(state);
|
|
||||||
|
|
||||||
if (waitTime > TimeSpan.Zero)
|
|
||||||
{
|
|
||||||
logger.LogInformation("Rate limit reached for {TorrentioInstance}", instance.Name);
|
|
||||||
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, waitTime / 1000.0);
|
|
||||||
await Task.Delay(waitTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (processedItemsCount % 2 == 0)
|
|
||||||
{
|
|
||||||
var randomWait = Random.Shared.Next(1000, 5000);
|
|
||||||
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, randomWait / 1000.0);
|
|
||||||
await Task.Delay(randomWait);
|
|
||||||
}
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
await state.ResiliencyPolicy.ExecuteAsync(
|
await state.RetryPolicy.ExecuteAsync(
|
||||||
async () =>
|
async () =>
|
||||||
{
|
{
|
||||||
var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client);
|
processedItemsCount++;
|
||||||
|
|
||||||
if (torrentInfo is not null)
|
var waitTime = instance.CalculateWaitTime(state);
|
||||||
|
|
||||||
|
if (waitTime > TimeSpan.Zero)
|
||||||
{
|
{
|
||||||
newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!));
|
logger.LogInformation("Rate limit reached for {TorrentioInstance}", instance.Name);
|
||||||
|
logger.LogInformation(
|
||||||
|
"Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, waitTime / 1000.0);
|
||||||
|
await Task.Delay(waitTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (processedItemsCount % 2 == 0)
|
||||||
|
{
|
||||||
|
var randomWait = Random.Shared.Next(1000, 5000);
|
||||||
|
logger.LogInformation(
|
||||||
|
"Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, randomWait / 1000.0);
|
||||||
|
await Task.Delay(randomWait);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
await state.ResiliencyPolicy.ExecuteAsync(
|
||||||
|
async () =>
|
||||||
|
{
|
||||||
|
var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client);
|
||||||
|
|
||||||
|
if (torrentInfo is not null)
|
||||||
|
{
|
||||||
|
newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!));
|
||||||
|
}
|
||||||
|
});
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
catch (Exception error)
|
catch (Exception error)
|
||||||
{
|
{
|
||||||
logger.LogError(error, "page processing error in TorrentioCrawler");
|
logger.LogError(error, "page processing error in TorrentioCrawler");
|
||||||
|
logger.LogInformation("Setting possibly rate limited for {TorrentioInstance}", instance.Name);
|
||||||
|
instance.SetPossiblyRateLimited(state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (newTorrents.Count > 0)
|
if (newTorrents.Count > 0)
|
||||||
{
|
{
|
||||||
await InsertTorrents(newTorrents);
|
await InsertTorrents(newTorrents);
|
||||||
@@ -115,7 +124,17 @@ public partial class TorrentioCrawler(
|
|||||||
|
|
||||||
private void SetupResiliencyPolicyForInstance(TorrentioInstance instance, TorrentioScrapeInstance state)
|
private void SetupResiliencyPolicyForInstance(TorrentioInstance instance, TorrentioScrapeInstance state)
|
||||||
{
|
{
|
||||||
var policy = Policy
|
var retryPolicy = Policy
|
||||||
|
.Handle<Exception>()
|
||||||
|
.WaitAndRetryAsync(
|
||||||
|
retryCount: 2, // initial attempt + 2 retries
|
||||||
|
sleepDurationProvider: retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
|
||||||
|
onRetry: (exception, timeSpan, retryCount, _) =>
|
||||||
|
{
|
||||||
|
logger.LogWarning($"Retry {retryCount} encountered an exception: {exception.Message}. Pausing for {timeSpan.Seconds} seconds instance {instance.Name}");
|
||||||
|
});
|
||||||
|
|
||||||
|
var circuitBreakerPolicy = Policy
|
||||||
.Handle<Exception>()
|
.Handle<Exception>()
|
||||||
.CircuitBreakerAsync(
|
.CircuitBreakerAsync(
|
||||||
exceptionsAllowedBeforeBreaking: instance.RateLimit.ExceptionLimit,
|
exceptionsAllowedBeforeBreaking: instance.RateLimit.ExceptionLimit,
|
||||||
@@ -127,25 +146,18 @@ public partial class TorrentioCrawler(
|
|||||||
onReset: () => logger.LogInformation("Circuit closed for {TorrentioInstance}, calls will flow again", instance.Name),
|
onReset: () => logger.LogInformation("Circuit closed for {TorrentioInstance}, calls will flow again", instance.Name),
|
||||||
onHalfOpen: () => logger.LogInformation("Circuit is half-open for {TorrentioInstance}, next call is a trial if it should close or break again", instance.Name));
|
onHalfOpen: () => logger.LogInformation("Circuit is half-open for {TorrentioInstance}, next call is a trial if it should close or break again", instance.Name));
|
||||||
|
|
||||||
var policyWrap = Policy.WrapAsync(policy, Policy.TimeoutAsync(TimeSpan.FromSeconds(30)));
|
var policyWrap = Policy.WrapAsync(circuitBreakerPolicy, Policy.TimeoutAsync(TimeSpan.FromSeconds(30)));
|
||||||
|
|
||||||
state.ResiliencyPolicy = policyWrap;
|
state.ResiliencyPolicy = policyWrap;
|
||||||
|
state.RetryPolicy = retryPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<List<Torrent?>?> ScrapeInstance(TorrentioInstance instance, string imdbId, HttpClient client)
|
private async Task<List<Torrent?>?> ScrapeInstance(TorrentioInstance instance, string imdbId, HttpClient client)
|
||||||
{
|
{
|
||||||
logger.LogInformation("Searching Torrentio {TorrentioInstance}: {ImdbId}", instance.Name, imdbId);
|
logger.LogInformation("Searching Torrentio {TorrentioInstance}: {ImdbId}", instance.Name, imdbId);
|
||||||
try
|
var movieSlug = string.Format(MovieSlug, imdbId);
|
||||||
{
|
var urlSlug = string.Format(Url, movieSlug);
|
||||||
var movieSlug = string.Format(MovieSlug, imdbId);
|
return await RunRequest(instance, urlSlug, imdbId, client);
|
||||||
var urlSlug = string.Format(Url, movieSlug);
|
|
||||||
return await RunRequest(instance, urlSlug, imdbId, client);
|
|
||||||
}
|
|
||||||
catch (Exception error)
|
|
||||||
{
|
|
||||||
logger.LogError(error, "page processing error {TorrentioInstance}: {ImdbId}", instance.Name, imdbId);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task<List<Torrent?>?> RunRequest(TorrentioInstance instance, string urlSlug, string imdbId, HttpClient client)
|
private async Task<List<Torrent?>?> RunRequest(TorrentioInstance instance, string urlSlug, string imdbId, HttpClient client)
|
||||||
@@ -180,45 +192,37 @@ public partial class TorrentioCrawler(
|
|||||||
|
|
||||||
private Torrent ParseTorrentDetails(string title, TorrentioInstance instance, string infoHash, string imdbId)
|
private Torrent ParseTorrentDetails(string title, TorrentioInstance instance, string infoHash, string imdbId)
|
||||||
{
|
{
|
||||||
try
|
var torrent = new Torrent
|
||||||
{
|
{
|
||||||
var torrent = new Torrent
|
Source = $"{Source}_{instance.Name}",
|
||||||
|
InfoHash = infoHash,
|
||||||
|
Category = "movies", // we only handle movies for now...
|
||||||
|
Imdb = imdbId,
|
||||||
|
};
|
||||||
|
|
||||||
|
var span = title.AsSpan();
|
||||||
|
var titleEnd = span.IndexOf('\n');
|
||||||
|
var titlePart = titleEnd >= 0 ? span[..titleEnd].ToString() : title;
|
||||||
|
|
||||||
|
torrent.Name = titlePart.Replace('.', ' ').TrimEnd('.');
|
||||||
|
|
||||||
|
var sizeMatch = SizeMatcher().Match(title);
|
||||||
|
|
||||||
|
if (sizeMatch.Success)
|
||||||
|
{
|
||||||
|
var size = double.Parse(sizeMatch.Groups[1].Value); // Size Value
|
||||||
|
var sizeUnit = sizeMatch.Groups[3].Value; // Size Unit (GB/MB)
|
||||||
|
|
||||||
|
var sizeInBytes = sizeUnit switch
|
||||||
{
|
{
|
||||||
Source = $"{Source}_{instance.Name}",
|
"GB" => (long) (size * 1073741824),
|
||||||
InfoHash = infoHash,
|
"MB" => (long) (size * 1048576),
|
||||||
Category = "movies", // we only handle movies for now...
|
_ => 0,
|
||||||
Imdb = imdbId,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
var span = title.AsSpan();
|
torrent.Size = sizeInBytes.ToString();
|
||||||
var titleEnd = span.IndexOf('\n');
|
|
||||||
var titlePart = titleEnd >= 0 ? span[..titleEnd].ToString() : title;
|
|
||||||
|
|
||||||
torrent.Name = titlePart.Replace('.', ' ').TrimEnd('.');
|
|
||||||
|
|
||||||
var sizeMatch = SizeMatcher().Match(title);
|
|
||||||
|
|
||||||
if (sizeMatch.Success)
|
|
||||||
{
|
|
||||||
var size = double.Parse(sizeMatch.Groups[1].Value); // Size Value
|
|
||||||
var sizeUnit = sizeMatch.Groups[3].Value; // Size Unit (GB/MB)
|
|
||||||
|
|
||||||
var sizeInBytes = sizeUnit switch
|
|
||||||
{
|
|
||||||
"GB" => (long) (size * 1073741824),
|
|
||||||
"MB" => (long) (size * 1048576),
|
|
||||||
_ => 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
torrent.Size = sizeInBytes.ToString();
|
|
||||||
}
|
|
||||||
|
|
||||||
return torrent;
|
|
||||||
}
|
|
||||||
catch (Exception e)
|
|
||||||
{
|
|
||||||
logger.LogError(e, "Error parsing torrent details");
|
|
||||||
throw;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return torrent;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -21,6 +21,17 @@ public static class TorrentioInstancesExtensions
|
|||||||
|
|
||||||
return remaining > TimeSpan.Zero ? remaining : TimeSpan.Zero;
|
return remaining > TimeSpan.Zero ? remaining : TimeSpan.Zero;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void SetPossiblyRateLimited(this TorrentioInstance instance, TorrentioScrapeInstance state, int minutesToWait = 5)
|
||||||
|
{
|
||||||
|
// Set the start time to 15 minutes in the past so that the next check will result in a rate limit period of 15 minutes
|
||||||
|
var startedAt = DateTime.UtcNow.AddMinutes(-minutesToWait);
|
||||||
|
var requestCount = instance.RateLimit.RequestLimit;
|
||||||
|
|
||||||
|
// Update the scraper state for the instance
|
||||||
|
state.StartedAt = startedAt;
|
||||||
|
state.RequestCount = requestCount;
|
||||||
|
}
|
||||||
|
|
||||||
public static long TotalProcessedRequests(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
public static long TotalProcessedRequests(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
||||||
!scraperState.TryGetValue(instance.Name, out var state) ? 0 : state.TotalProcessed;
|
!scraperState.TryGetValue(instance.Name, out var state) ? 0 : state.TotalProcessed;
|
||||||
|
|||||||
@@ -7,4 +7,5 @@ public class TorrentioScrapeInstance
|
|||||||
public int TotalProcessed { get; set; }
|
public int TotalProcessed { get; set; }
|
||||||
public string? LastProcessedImdbId { get; set; }
|
public string? LastProcessedImdbId { get; set; }
|
||||||
public IAsyncPolicy? ResiliencyPolicy { get; set; }
|
public IAsyncPolicy? ResiliencyPolicy { get; set; }
|
||||||
|
public IAsyncPolicy? RetryPolicy { get; set; }
|
||||||
}
|
}
|
||||||
Reference in New Issue
Block a user