Merge pull request #107 from iPromKnight/circuit-breaker
Introduce a circuit breaker, also exit out of loop if mongo failures.
This commit is contained in:
@@ -6,7 +6,10 @@
|
||||
"Url": "https://torrentio.strem.fun",
|
||||
"RateLimit": {
|
||||
"RequestLimit": 300,
|
||||
"IntervalInSeconds": 3600
|
||||
"IntervalInSeconds": 3600,
|
||||
"MongoBatchSize": 300,
|
||||
"ExceptionLimit": 5,
|
||||
"ExceptionIntervalInSeconds": 60
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
@@ -11,6 +11,7 @@ public partial class TorrentioCrawler(
|
||||
{
|
||||
[GeneratedRegex(@"(\d+(\.\d+)?) (GB|MB)")]
|
||||
private static partial Regex SizeMatcher();
|
||||
private const int MaximumEmptyItemsCount = 5;
|
||||
|
||||
private const string MovieSlug = "movie/{0}.json";
|
||||
protected override string Url => "sort=size%7Cqualityfilter=other,scr,cam,unknown/stream/{0}";
|
||||
@@ -31,20 +32,33 @@ public partial class TorrentioCrawler(
|
||||
Task.Run(
|
||||
async () =>
|
||||
{
|
||||
while (instance.TotalProcessedRequests(_instanceStates) < totalRecordCount)
|
||||
var emptyMongoDbItemsCount = 0;
|
||||
|
||||
var state = instance.EnsureStateExists(_instanceStates);
|
||||
|
||||
SetupResiliencyPolicyForInstance(instance, state);
|
||||
|
||||
while (state.TotalProcessed < totalRecordCount)
|
||||
{
|
||||
logger.LogInformation("Processing {TorrentioInstance}", instance.Name);
|
||||
logger.LogInformation("Current processed requests: {ProcessedRequests}", instance.TotalProcessedRequests(_instanceStates));
|
||||
logger.LogInformation("Current processed requests: {ProcessedRequests}", state.TotalProcessed);
|
||||
|
||||
var items = await imdbDataService.GetImdbEntriesForRequests(
|
||||
DateTime.UtcNow.Year.ToString(),
|
||||
instance.RateLimit.RequestLimit,
|
||||
instance.LastProcessedImdbId(_instanceStates));
|
||||
instance.RateLimit.MongoBatchSize,
|
||||
state.LastProcessedImdbId);
|
||||
|
||||
if (items.Count == 0)
|
||||
{
|
||||
emptyMongoDbItemsCount++;
|
||||
logger.LogInformation("No items to process for {TorrentioInstance}", instance.Name);
|
||||
await Task.Delay(10000);
|
||||
if (emptyMongoDbItemsCount >= MaximumEmptyItemsCount)
|
||||
{
|
||||
logger.LogInformation("Maximum empty document count reached. Cancelling {TorrentioInstance}", instance.Name);
|
||||
break;
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -55,29 +69,34 @@ public partial class TorrentioCrawler(
|
||||
{
|
||||
processedItemsCount++;
|
||||
|
||||
var waitTime = instance.CalculateWaitTime(_instanceStates);
|
||||
var waitTime = instance.CalculateWaitTime(state);
|
||||
|
||||
if (waitTime > TimeSpan.Zero)
|
||||
{
|
||||
logger.LogInformation("Rate limit reached for {TorrentioInstance}", instance.Name);
|
||||
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime}", instance.Name, waitTime);
|
||||
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, waitTime / 1000.0);
|
||||
await Task.Delay(waitTime);
|
||||
}
|
||||
|
||||
if (processedItemsCount % 2 == 0)
|
||||
{
|
||||
var randomWait = new Random().Next(1000, 5000);
|
||||
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime}", instance.Name, randomWait);
|
||||
var randomWait = Random.Shared.Next(1000, 5000);
|
||||
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, randomWait / 1000.0);
|
||||
await Task.Delay(randomWait);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client);
|
||||
if (torrentInfo is not null)
|
||||
{
|
||||
newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!));
|
||||
}
|
||||
await state.ResiliencyPolicy.ExecuteAsync(
|
||||
async () =>
|
||||
{
|
||||
var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client);
|
||||
|
||||
if (torrentInfo is not null)
|
||||
{
|
||||
newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!));
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception error)
|
||||
{
|
||||
@@ -88,16 +107,31 @@ public partial class TorrentioCrawler(
|
||||
if (newTorrents.Count > 0)
|
||||
{
|
||||
await InsertTorrents(newTorrents);
|
||||
|
||||
var currentState = _instanceStates[instance.Name];
|
||||
_instanceStates[instance.Name] = currentState with
|
||||
{
|
||||
LastProcessedImdbId = items[^1].ImdbId,
|
||||
};
|
||||
|
||||
state.LastProcessedImdbId = items[^1].ImdbId;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
private void SetupResiliencyPolicyForInstance(TorrentioInstance instance, TorrentioScrapeInstance state)
|
||||
{
|
||||
var policy = Policy
|
||||
.Handle<Exception>()
|
||||
.CircuitBreakerAsync(
|
||||
exceptionsAllowedBeforeBreaking: instance.RateLimit.ExceptionLimit,
|
||||
durationOfBreak: TimeSpan.FromSeconds(instance.RateLimit.ExceptionIntervalInSeconds),
|
||||
onBreak: (ex, breakDelay) =>
|
||||
{
|
||||
logger.LogWarning(ex, "Breaking circuit for {TorrentioInstance} for {BreakDelay}ms due to {Exception}", instance.Name, breakDelay.TotalMilliseconds, ex.Message);
|
||||
},
|
||||
onReset: () => logger.LogInformation("Circuit closed for {TorrentioInstance}, calls will flow again", instance.Name),
|
||||
onHalfOpen: () => logger.LogInformation("Circuit is half-open for {TorrentioInstance}, next call is a trial if it should close or break again", instance.Name));
|
||||
|
||||
var policyWrap = Policy.WrapAsync(policy, Policy.TimeoutAsync(TimeSpan.FromSeconds(30)));
|
||||
|
||||
state.ResiliencyPolicy = policyWrap;
|
||||
}
|
||||
|
||||
private async Task<List<Torrent?>?> ScrapeInstance(TorrentioInstance instance, string imdbId, HttpClient client)
|
||||
{
|
||||
logger.LogInformation("Searching Torrentio {TorrentioInstance}: {ImdbId}", instance.Name, imdbId);
|
||||
|
||||
@@ -2,47 +2,37 @@ namespace Producer.Features.Crawlers.Torrentio;
|
||||
|
||||
public static class TorrentioInstancesExtensions
|
||||
{
|
||||
public static TimeSpan CalculateWaitTime(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState)
|
||||
public static TimeSpan CalculateWaitTime(this TorrentioInstance instance, TorrentioScrapeInstance state)
|
||||
{
|
||||
if (!scraperState.TryGetValue(instance.Name, out var state))
|
||||
if (state.RequestCount < instance.RateLimit.RequestLimit)
|
||||
{
|
||||
state = new (DateTime.UtcNow, 0, 0, null);
|
||||
scraperState[instance.Name] = state;
|
||||
}
|
||||
|
||||
var (startedAt, requestCount, totalProcessed, lastProcessedImdbId) = state;
|
||||
|
||||
if (requestCount < instance.RateLimit.RequestLimit)
|
||||
{
|
||||
scraperState[instance.Name] = new (startedAt, requestCount + 1, totalProcessed + 1, lastProcessedImdbId);
|
||||
state.RequestCount++;
|
||||
state.TotalProcessed++;
|
||||
return TimeSpan.Zero;
|
||||
}
|
||||
|
||||
var elapsed = DateTime.UtcNow - startedAt;
|
||||
var elapsed = DateTime.UtcNow - state.StartedAt;
|
||||
var interval = TimeSpan.FromSeconds(instance.RateLimit.IntervalInSeconds);
|
||||
var remaining = interval - elapsed;
|
||||
|
||||
// reset the state for the next interval
|
||||
scraperState[instance.Name] = new (DateTime.UtcNow, 0, totalProcessed, lastProcessedImdbId);
|
||||
state.StartedAt = DateTime.UtcNow;
|
||||
state.RequestCount = 0;
|
||||
|
||||
return remaining > TimeSpan.Zero ? remaining : TimeSpan.Zero;
|
||||
}
|
||||
|
||||
public static void SetPossiblyRateLimited(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState, int minutesToWait = 5)
|
||||
{
|
||||
if (!scraperState.TryGetValue(instance.Name, out var state))
|
||||
{
|
||||
state = new (DateTime.UtcNow, 0, 0, null);
|
||||
}
|
||||
|
||||
var (_, _, totalProcessed, lastProcessedImdbId) = state;
|
||||
var state = instance.EnsureStateExists(scraperState);
|
||||
|
||||
// Set the start time to 15 minutes in the past so that the next check will result in a rate limit period of 15 minutes
|
||||
var startedAt = DateTime.UtcNow.AddMinutes(-minutesToWait);
|
||||
var requestCount = instance.RateLimit.RequestLimit;
|
||||
|
||||
// Update the scraper state for the instance
|
||||
scraperState[instance.Name] = new (startedAt, requestCount, totalProcessed, lastProcessedImdbId);
|
||||
state.StartedAt = startedAt;
|
||||
state.RequestCount = requestCount;
|
||||
}
|
||||
|
||||
public static long TotalProcessedRequests(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
||||
@@ -50,4 +40,15 @@ public static class TorrentioInstancesExtensions
|
||||
|
||||
public static string? LastProcessedImdbId(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
||||
!scraperState.TryGetValue(instance.Name, out var state) ? null : state.LastProcessedImdbId;
|
||||
|
||||
public static TorrentioScrapeInstance EnsureStateExists(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState)
|
||||
{
|
||||
if (!scraperState.TryGetValue(instance.Name, out var state))
|
||||
{
|
||||
state = new();
|
||||
scraperState[instance.Name] = state;
|
||||
}
|
||||
|
||||
return state;
|
||||
}
|
||||
}
|
||||
@@ -4,4 +4,10 @@ public class TorrentioRateLimit
|
||||
{
|
||||
public int RequestLimit { get; set; }
|
||||
public int IntervalInSeconds { get; set; }
|
||||
|
||||
public int MongoBatchSize { get; set; }
|
||||
|
||||
public int ExceptionLimit { get; set; }
|
||||
|
||||
public int ExceptionIntervalInSeconds { get; set; }
|
||||
}
|
||||
@@ -1,3 +1,10 @@
|
||||
namespace Producer.Features.Crawlers.Torrentio;
|
||||
|
||||
public record TorrentioScrapeInstance(DateTime StartedAt, int RequestCount, int TotalProcessed, string? LastProcessedImdbId);
|
||||
public class TorrentioScrapeInstance
|
||||
{
|
||||
public DateTime StartedAt { get; set; } = DateTime.UtcNow;
|
||||
public int RequestCount { get; set; }
|
||||
public int TotalProcessed { get; set; }
|
||||
public string? LastProcessedImdbId { get; set; }
|
||||
public IAsyncPolicy? ResiliencyPolicy { get; set; }
|
||||
}
|
||||
@@ -16,6 +16,7 @@ global using Microsoft.Extensions.Logging;
|
||||
global using MongoDB.Bson.Serialization.Attributes;
|
||||
global using MongoDB.Driver;
|
||||
global using Npgsql;
|
||||
global using Polly;
|
||||
global using Quartz;
|
||||
global using Producer.Extensions;
|
||||
global using Producer.Features.Amqp;
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
|
||||
<PackageReference Include="MongoDB.Driver" Version="2.24.0" />
|
||||
<PackageReference Include="Npgsql" Version="8.0.1" />
|
||||
<PackageReference Include="Polly" Version="8.3.0" />
|
||||
<PackageReference Include="Quartz.Extensions.DependencyInjection" Version="3.8.0" />
|
||||
<PackageReference Include="Quartz.Extensions.Hosting" Version="3.8.0" />
|
||||
<PackageReference Include="Serilog" Version="3.1.1" />
|
||||
|
||||
Reference in New Issue
Block a user