Introduce a circuit breaker, also exit out of loop if mongo failures.
This commit is contained in:
@@ -6,7 +6,10 @@
|
|||||||
"Url": "https://torrentio.strem.fun",
|
"Url": "https://torrentio.strem.fun",
|
||||||
"RateLimit": {
|
"RateLimit": {
|
||||||
"RequestLimit": 300,
|
"RequestLimit": 300,
|
||||||
"IntervalInSeconds": 3600
|
"IntervalInSeconds": 3600,
|
||||||
|
"MongoBatchSize": 300,
|
||||||
|
"ExceptionLimit": 5,
|
||||||
|
"ExceptionIntervalInSeconds": 60
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ public partial class TorrentioCrawler(
|
|||||||
{
|
{
|
||||||
[GeneratedRegex(@"(\d+(\.\d+)?) (GB|MB)")]
|
[GeneratedRegex(@"(\d+(\.\d+)?) (GB|MB)")]
|
||||||
private static partial Regex SizeMatcher();
|
private static partial Regex SizeMatcher();
|
||||||
|
private const int MaximumEmptyItemsCount = 5;
|
||||||
|
|
||||||
private const string MovieSlug = "movie/{0}.json";
|
private const string MovieSlug = "movie/{0}.json";
|
||||||
protected override string Url => "sort=size%7Cqualityfilter=other,scr,cam,unknown/stream/{0}";
|
protected override string Url => "sort=size%7Cqualityfilter=other,scr,cam,unknown/stream/{0}";
|
||||||
@@ -31,20 +32,33 @@ public partial class TorrentioCrawler(
|
|||||||
Task.Run(
|
Task.Run(
|
||||||
async () =>
|
async () =>
|
||||||
{
|
{
|
||||||
while (instance.TotalProcessedRequests(_instanceStates) < totalRecordCount)
|
var emptyMongoDbItemsCount = 0;
|
||||||
|
|
||||||
|
var state = instance.EnsureStateExists(_instanceStates);
|
||||||
|
|
||||||
|
SetupResiliencyPolicyForInstance(instance, state);
|
||||||
|
|
||||||
|
while (state.TotalProcessed < totalRecordCount)
|
||||||
{
|
{
|
||||||
logger.LogInformation("Processing {TorrentioInstance}", instance.Name);
|
logger.LogInformation("Processing {TorrentioInstance}", instance.Name);
|
||||||
logger.LogInformation("Current processed requests: {ProcessedRequests}", instance.TotalProcessedRequests(_instanceStates));
|
logger.LogInformation("Current processed requests: {ProcessedRequests}", state.TotalProcessed);
|
||||||
|
|
||||||
var items = await imdbDataService.GetImdbEntriesForRequests(
|
var items = await imdbDataService.GetImdbEntriesForRequests(
|
||||||
DateTime.UtcNow.Year.ToString(),
|
DateTime.UtcNow.Year.ToString(),
|
||||||
instance.RateLimit.RequestLimit,
|
instance.RateLimit.MongoBatchSize,
|
||||||
instance.LastProcessedImdbId(_instanceStates));
|
state.LastProcessedImdbId);
|
||||||
|
|
||||||
if (items.Count == 0)
|
if (items.Count == 0)
|
||||||
{
|
{
|
||||||
|
emptyMongoDbItemsCount++;
|
||||||
logger.LogInformation("No items to process for {TorrentioInstance}", instance.Name);
|
logger.LogInformation("No items to process for {TorrentioInstance}", instance.Name);
|
||||||
await Task.Delay(10000);
|
await Task.Delay(10000);
|
||||||
|
if (emptyMongoDbItemsCount >= MaximumEmptyItemsCount)
|
||||||
|
{
|
||||||
|
logger.LogInformation("Maximum empty document count reached. Cancelling {TorrentioInstance}", instance.Name);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -55,29 +69,34 @@ public partial class TorrentioCrawler(
|
|||||||
{
|
{
|
||||||
processedItemsCount++;
|
processedItemsCount++;
|
||||||
|
|
||||||
var waitTime = instance.CalculateWaitTime(_instanceStates);
|
var waitTime = instance.CalculateWaitTime(state);
|
||||||
|
|
||||||
if (waitTime > TimeSpan.Zero)
|
if (waitTime > TimeSpan.Zero)
|
||||||
{
|
{
|
||||||
logger.LogInformation("Rate limit reached for {TorrentioInstance}", instance.Name);
|
logger.LogInformation("Rate limit reached for {TorrentioInstance}", instance.Name);
|
||||||
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime}", instance.Name, waitTime);
|
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, waitTime / 1000.0);
|
||||||
await Task.Delay(waitTime);
|
await Task.Delay(waitTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (processedItemsCount % 2 == 0)
|
if (processedItemsCount % 2 == 0)
|
||||||
{
|
{
|
||||||
var randomWait = new Random().Next(1000, 5000);
|
var randomWait = Random.Shared.Next(1000, 5000);
|
||||||
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime}", instance.Name, randomWait);
|
logger.LogInformation("Waiting for {TorrentioInstance}: {WaitTime} seconds", instance.Name, randomWait / 1000.0);
|
||||||
await Task.Delay(randomWait);
|
await Task.Delay(randomWait);
|
||||||
}
|
}
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client);
|
await state.ResiliencyPolicy.ExecuteAsync(
|
||||||
if (torrentInfo is not null)
|
async () =>
|
||||||
{
|
{
|
||||||
newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!));
|
var torrentInfo = await ScrapeInstance(instance, item.ImdbId, client);
|
||||||
}
|
|
||||||
|
if (torrentInfo is not null)
|
||||||
|
{
|
||||||
|
newTorrents.AddRange(torrentInfo.Where(x => x != null).Select(x => x!));
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
catch (Exception error)
|
catch (Exception error)
|
||||||
{
|
{
|
||||||
@@ -88,16 +107,31 @@ public partial class TorrentioCrawler(
|
|||||||
if (newTorrents.Count > 0)
|
if (newTorrents.Count > 0)
|
||||||
{
|
{
|
||||||
await InsertTorrents(newTorrents);
|
await InsertTorrents(newTorrents);
|
||||||
|
|
||||||
var currentState = _instanceStates[instance.Name];
|
state.LastProcessedImdbId = items[^1].ImdbId;
|
||||||
_instanceStates[instance.Name] = currentState with
|
|
||||||
{
|
|
||||||
LastProcessedImdbId = items[^1].ImdbId,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
private void SetupResiliencyPolicyForInstance(TorrentioInstance instance, TorrentioScrapeInstance state)
|
||||||
|
{
|
||||||
|
var policy = Policy
|
||||||
|
.Handle<Exception>()
|
||||||
|
.CircuitBreakerAsync(
|
||||||
|
exceptionsAllowedBeforeBreaking: instance.RateLimit.ExceptionLimit,
|
||||||
|
durationOfBreak: TimeSpan.FromSeconds(instance.RateLimit.ExceptionIntervalInSeconds),
|
||||||
|
onBreak: (ex, breakDelay) =>
|
||||||
|
{
|
||||||
|
logger.LogWarning(ex, "Breaking circuit for {TorrentioInstance} for {BreakDelay}ms due to {Exception}", instance.Name, breakDelay.TotalMilliseconds, ex.Message);
|
||||||
|
},
|
||||||
|
onReset: () => logger.LogInformation("Circuit closed for {TorrentioInstance}, calls will flow again", instance.Name),
|
||||||
|
onHalfOpen: () => logger.LogInformation("Circuit is half-open for {TorrentioInstance}, next call is a trial if it should close or break again", instance.Name));
|
||||||
|
|
||||||
|
var policyWrap = Policy.WrapAsync(policy, Policy.TimeoutAsync(TimeSpan.FromSeconds(30)));
|
||||||
|
|
||||||
|
state.ResiliencyPolicy = policyWrap;
|
||||||
|
}
|
||||||
|
|
||||||
private async Task<List<Torrent?>?> ScrapeInstance(TorrentioInstance instance, string imdbId, HttpClient client)
|
private async Task<List<Torrent?>?> ScrapeInstance(TorrentioInstance instance, string imdbId, HttpClient client)
|
||||||
{
|
{
|
||||||
logger.LogInformation("Searching Torrentio {TorrentioInstance}: {ImdbId}", instance.Name, imdbId);
|
logger.LogInformation("Searching Torrentio {TorrentioInstance}: {ImdbId}", instance.Name, imdbId);
|
||||||
|
|||||||
@@ -2,47 +2,37 @@ namespace Producer.Features.Crawlers.Torrentio;
|
|||||||
|
|
||||||
public static class TorrentioInstancesExtensions
|
public static class TorrentioInstancesExtensions
|
||||||
{
|
{
|
||||||
public static TimeSpan CalculateWaitTime(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState)
|
public static TimeSpan CalculateWaitTime(this TorrentioInstance instance, TorrentioScrapeInstance state)
|
||||||
{
|
{
|
||||||
if (!scraperState.TryGetValue(instance.Name, out var state))
|
if (state.RequestCount < instance.RateLimit.RequestLimit)
|
||||||
{
|
{
|
||||||
state = new (DateTime.UtcNow, 0, 0, null);
|
state.RequestCount++;
|
||||||
scraperState[instance.Name] = state;
|
state.TotalProcessed++;
|
||||||
}
|
|
||||||
|
|
||||||
var (startedAt, requestCount, totalProcessed, lastProcessedImdbId) = state;
|
|
||||||
|
|
||||||
if (requestCount < instance.RateLimit.RequestLimit)
|
|
||||||
{
|
|
||||||
scraperState[instance.Name] = new (startedAt, requestCount + 1, totalProcessed + 1, lastProcessedImdbId);
|
|
||||||
return TimeSpan.Zero;
|
return TimeSpan.Zero;
|
||||||
}
|
}
|
||||||
|
|
||||||
var elapsed = DateTime.UtcNow - startedAt;
|
var elapsed = DateTime.UtcNow - state.StartedAt;
|
||||||
var interval = TimeSpan.FromSeconds(instance.RateLimit.IntervalInSeconds);
|
var interval = TimeSpan.FromSeconds(instance.RateLimit.IntervalInSeconds);
|
||||||
var remaining = interval - elapsed;
|
var remaining = interval - elapsed;
|
||||||
|
|
||||||
// reset the state for the next interval
|
// reset the state for the next interval
|
||||||
scraperState[instance.Name] = new (DateTime.UtcNow, 0, totalProcessed, lastProcessedImdbId);
|
state.StartedAt = DateTime.UtcNow;
|
||||||
|
state.RequestCount = 0;
|
||||||
|
|
||||||
return remaining > TimeSpan.Zero ? remaining : TimeSpan.Zero;
|
return remaining > TimeSpan.Zero ? remaining : TimeSpan.Zero;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void SetPossiblyRateLimited(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState, int minutesToWait = 5)
|
public static void SetPossiblyRateLimited(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState, int minutesToWait = 5)
|
||||||
{
|
{
|
||||||
if (!scraperState.TryGetValue(instance.Name, out var state))
|
var state = instance.EnsureStateExists(scraperState);
|
||||||
{
|
|
||||||
state = new (DateTime.UtcNow, 0, 0, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
var (_, _, totalProcessed, lastProcessedImdbId) = state;
|
|
||||||
|
|
||||||
// Set the start time to 15 minutes in the past so that the next check will result in a rate limit period of 15 minutes
|
// Set the start time to 15 minutes in the past so that the next check will result in a rate limit period of 15 minutes
|
||||||
var startedAt = DateTime.UtcNow.AddMinutes(-minutesToWait);
|
var startedAt = DateTime.UtcNow.AddMinutes(-minutesToWait);
|
||||||
var requestCount = instance.RateLimit.RequestLimit;
|
var requestCount = instance.RateLimit.RequestLimit;
|
||||||
|
|
||||||
// Update the scraper state for the instance
|
// Update the scraper state for the instance
|
||||||
scraperState[instance.Name] = new (startedAt, requestCount, totalProcessed, lastProcessedImdbId);
|
state.StartedAt = startedAt;
|
||||||
|
state.RequestCount = requestCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
public static long TotalProcessedRequests(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
public static long TotalProcessedRequests(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
||||||
@@ -50,4 +40,15 @@ public static class TorrentioInstancesExtensions
|
|||||||
|
|
||||||
public static string? LastProcessedImdbId(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
public static string? LastProcessedImdbId(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState) =>
|
||||||
!scraperState.TryGetValue(instance.Name, out var state) ? null : state.LastProcessedImdbId;
|
!scraperState.TryGetValue(instance.Name, out var state) ? null : state.LastProcessedImdbId;
|
||||||
|
|
||||||
|
public static TorrentioScrapeInstance EnsureStateExists(this TorrentioInstance instance, Dictionary<string, TorrentioScrapeInstance> scraperState)
|
||||||
|
{
|
||||||
|
if (!scraperState.TryGetValue(instance.Name, out var state))
|
||||||
|
{
|
||||||
|
state = new();
|
||||||
|
scraperState[instance.Name] = state;
|
||||||
|
}
|
||||||
|
|
||||||
|
return state;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -4,4 +4,10 @@ public class TorrentioRateLimit
|
|||||||
{
|
{
|
||||||
public int RequestLimit { get; set; }
|
public int RequestLimit { get; set; }
|
||||||
public int IntervalInSeconds { get; set; }
|
public int IntervalInSeconds { get; set; }
|
||||||
|
|
||||||
|
public int MongoBatchSize { get; set; }
|
||||||
|
|
||||||
|
public int ExceptionLimit { get; set; }
|
||||||
|
|
||||||
|
public int ExceptionIntervalInSeconds { get; set; }
|
||||||
}
|
}
|
||||||
@@ -1,3 +1,10 @@
|
|||||||
namespace Producer.Features.Crawlers.Torrentio;
|
namespace Producer.Features.Crawlers.Torrentio;
|
||||||
|
|
||||||
public record TorrentioScrapeInstance(DateTime StartedAt, int RequestCount, int TotalProcessed, string? LastProcessedImdbId);
|
public class TorrentioScrapeInstance
|
||||||
|
{
|
||||||
|
public DateTime StartedAt { get; set; } = DateTime.UtcNow;
|
||||||
|
public int RequestCount { get; set; }
|
||||||
|
public int TotalProcessed { get; set; }
|
||||||
|
public string? LastProcessedImdbId { get; set; }
|
||||||
|
public IAsyncPolicy? ResiliencyPolicy { get; set; }
|
||||||
|
}
|
||||||
@@ -16,6 +16,7 @@ global using Microsoft.Extensions.Logging;
|
|||||||
global using MongoDB.Bson.Serialization.Attributes;
|
global using MongoDB.Bson.Serialization.Attributes;
|
||||||
global using MongoDB.Driver;
|
global using MongoDB.Driver;
|
||||||
global using Npgsql;
|
global using Npgsql;
|
||||||
|
global using Polly;
|
||||||
global using Quartz;
|
global using Quartz;
|
||||||
global using Producer.Extensions;
|
global using Producer.Extensions;
|
||||||
global using Producer.Features.Amqp;
|
global using Producer.Features.Amqp;
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
|
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
|
||||||
<PackageReference Include="MongoDB.Driver" Version="2.24.0" />
|
<PackageReference Include="MongoDB.Driver" Version="2.24.0" />
|
||||||
<PackageReference Include="Npgsql" Version="8.0.1" />
|
<PackageReference Include="Npgsql" Version="8.0.1" />
|
||||||
|
<PackageReference Include="Polly" Version="8.3.0" />
|
||||||
<PackageReference Include="Quartz.Extensions.DependencyInjection" Version="3.8.0" />
|
<PackageReference Include="Quartz.Extensions.DependencyInjection" Version="3.8.0" />
|
||||||
<PackageReference Include="Quartz.Extensions.Hosting" Version="3.8.0" />
|
<PackageReference Include="Quartz.Extensions.Hosting" Version="3.8.0" />
|
||||||
<PackageReference Include="Serilog" Version="3.1.1" />
|
<PackageReference Include="Serilog" Version="3.1.1" />
|
||||||
|
|||||||
Reference in New Issue
Block a user