Producer / Consumer / Collector rewrite (#160)

* Converted metadata service to redis

* move to postgres instead

* fix global usings

* [skip ci] optimize wolverine by prebuilding static types

* [skip ci] Stop indexing mac folder indexes

* [skip ci] producer, metadata and migrations

removed mongodb
added redis cache
imdb meta in postgres
Enable pgtrm
Create trigrams index
Add search meta postgres function

* [skip ci] get rid of node folder, replace mongo with redis in consumer

also wire up postgres metadata searches

* [skip ci] change mongo to redis in the addon

* [skip ci] jackettio to redis

* Rest of mongo removed...

* Cleaner rerunning of metadata - without conflicts

* Add akas import as well as basic metadata

* Include episodes file too

* cascade truncate pre-import

* reverse order to avoid cascadeing

* separate out clean to separate handler

* Switch producer to use metadata matching pre-preocessing dmm

* More work

* Still porting PTN

* PTN port, adding tests

* [skip ci] Codec tests

* [skip ci] Complete Collection handler tests

* [skip ci] container tests

* [skip ci] Convert handlers tests

* [skip ci] DateHandler tests

* [skip ci] Dual Audio matching tests

* [skip ci] episode code tests

* [skip ci] Extended handler tests

* [skip ci] group handler tests

* [skip ci] some broken stuff right now

* [skip ci] more ptn

* [skip ci] PTN now in a separate nuget package, rebased this on the redis changes - i need them.

* [skip ci] Wire up PTN port. Tired - will test tomorrow

* [skip ci] Needs a lot of work - too many titles being missed now

* cleaner. done?

* Handle the date in the imdb search

- add integer function to confirm its a valid integer
- use the input date as a range of -+1 year

* [skip ci] Start of collector service for RD

[skip ci] WIP

Implemented metadata saga, along with channels to process up to a maximum of 100 infohashes each time
The saga will rety for each infohas by requeuing up to three times, before just marking as complete for that infoHash - meaning no data will be updated in the db for that torrent.

[skip ci] Ready to test with queue publishing

Will provision a fanout exchange if it doesn't exist, and create and bind a queue to it. Listens to the queue with 50 prefetch count.
Still needs PTN rewrite bringing in to parse the filename response from real debrid, and extract season and episode numbers if the file is a tvshow

[skip ci] Add Debrid Collector Build Job

Debrid Collector ready for testing

New consumer, new collector, producer has meta lookup and anti porn measures

[skip ci] WIP - moving from wolverine to MassTransit.

 not happy that wolverine cannot effectively control saga concurrency. we need to really.

[skip ci] Producer and new Consumer moved to MassTransit

Just the debrid collector to go now, then to write the optional qbit collector.

Collector now switched to mass transit too

hide porn titles in logs, clean up cache name in redis for imdb titles

[skip ci] Allow control of queues

[skip ci] Update deployment

Remove old consumer, fix deployment files, fix dockerfiles for shared project import

fix base deployment

* Add collector missing env var

* edits to kick off builds

* Add optional qbit deployment which qbit collector will use

* Qbit collector done

* reorder compose, and bring both qbit and qbitcollector into the compose, with 0 replicas as default

* Clean up compose file

* Ensure debrid collector errors if no debrid api key
This commit is contained in:
iPromKnight
2024-03-25 23:32:28 +00:00
committed by GitHub
parent 9c6c1ac249
commit 9a831e92d0
443 changed files with 4154 additions and 476262 deletions

View File

@@ -0,0 +1,36 @@
{
"Serilog": {
"Using": [ "Serilog.Sinks.Console" ],
"MinimumLevel": {
"Default": "Information",
"Override": {
"Microsoft": "Warning",
"System": "Warning",
"Npgsql.Command": "Warning",
"Marten.IDocumentStore": "Warning",
"Wolverine.Runtime.WolverineRuntime": "Warning",
"Wolverine.Runtime.Agents.NodeAgentController": "Warning",
"Oakton.Resources.ResourceSetupHostService": "Warning",
"System.Net.Http.HttpClient.Scraper.LogicalHandler": "Warning",
"System.Net.Http.HttpClient.Scraper.ClientHandler": "Warning",
"Quartz.Impl.StdSchedulerFactory": "Warning",
"Quartz.Core.QuartzScheduler": "Warning",
"Quartz.Simpl.RAMJobStore": "Warning",
"Quartz.Core.JobRunShell": "Warning",
"Quartz.Core.SchedulerSignalerImpl": "Warning"
}
},
"WriteTo": [
{
"Name": "Console",
"Args": {
"outputTemplate": "{Timestamp:HH:mm:ss} [{Level}] [{SourceContext}] {Message}{NewLine}{Exception}"
}
}
],
"Enrich": [ "FromLogContext", "WithMachineName", "WithThreadId" ],
"Properties": {
"Application": "Metadata"
}
}
}

View File

@@ -0,0 +1,38 @@
<Project Sdk="Microsoft.NET.Sdk.Worker">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<UserSecretsId>54cad2ee-57df-4bb2-a192-d5d501448e0a</UserSecretsId>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="2.1.35" />
<PackageReference Include="MassTransit" Version="8.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="8.2.0" />
<PackageReference Include="MassTransit.Redis" Version="8.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.3" />
<PackageReference Include="Polly" Version="8.3.1" />
<PackageReference Include="PromKnight.ParseTorrentTitle" Version="1.0.4" />
<PackageReference Include="Serilog" Version="3.1.1" />
<PackageReference Include="Serilog.AspNetCore" Version="8.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
<PackageReference Include="System.Interactive.Async" Version="6.0.1" />
</ItemGroup>
<ItemGroup>
<Content Remove="Configuration\logging.json" />
<None Include="Configuration\logging.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\shared\SharedContracts.csproj" />
</ItemGroup>
</Project>

View File

@@ -0,0 +1,27 @@

Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DebridCollector", "DebridCollector.csproj", "{64C3253C-0638-4825-AC82-7D5600D1F9C9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SharedContracts", "..\shared\SharedContracts.csproj", "{C9BE500C-CE04-480B-874F-A85D33CAA821}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "shared", "shared", "{2C0A0F53-28E6-404F-9EFE-DADFBEF8338B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{64C3253C-0638-4825-AC82-7D5600D1F9C9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{64C3253C-0638-4825-AC82-7D5600D1F9C9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64C3253C-0638-4825-AC82-7D5600D1F9C9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64C3253C-0638-4825-AC82-7D5600D1F9C9}.Release|Any CPU.Build.0 = Release|Any CPU
{C9BE500C-CE04-480B-874F-A85D33CAA821}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C9BE500C-CE04-480B-874F-A85D33CAA821}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C9BE500C-CE04-480B-874F-A85D33CAA821}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C9BE500C-CE04-480B-874F-A85D33CAA821}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{C9BE500C-CE04-480B-874F-A85D33CAA821} = {2C0A0F53-28E6-404F-9EFE-DADFBEF8338B}
EndGlobalSection
EndGlobal

View File

@@ -0,0 +1,20 @@
FROM --platform=$BUILDPLATFORM mcr.microsoft.com/dotnet/sdk:8.0 AS build
ARG TARGETARCH
WORKDIR /src
COPY shared/ shared/
COPY debrid-collector/ debrid-collector/
WORKDIR /src/debrid-collector/
RUN dotnet restore -a $TARGETARCH
RUN dotnet publish -c Release --no-restore -o /src/out -a $TARGETARCH
FROM mcr.microsoft.com/dotnet/aspnet:8.0-alpine
WORKDIR /app
COPY --from=build /src/out .
RUN addgroup -S debrid && adduser -S -G debrid debrid
USER debrid
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD pgrep -f dotnet || exit 1
ENTRYPOINT ["dotnet", "DebridCollector.dll"]

View File

@@ -0,0 +1,73 @@
using DebridCollector.Features.Configuration;
namespace DebridCollector.Extensions;
public static class ServiceCollectionExtensions
{
internal static IServiceCollection AddDatabase(this IServiceCollection services)
{
services.LoadConfigurationFromEnv<PostgresConfiguration>();
services.AddTransient<IDataStorage, DapperDataStorage>();
return services;
}
internal static IServiceCollection AddServiceConfiguration(this IServiceCollection services)
{
var serviceConfiguration = services.LoadConfigurationFromEnv<DebridCollectorConfiguration>();
services.AddRealDebridClient(serviceConfiguration);
services.AddSingleton<IParseTorrentTitle, ParseTorrentTitle>();
services.AddHostedService<DebridRequestProcessor>();
return services;
}
internal static IServiceCollection RegisterMassTransit(this IServiceCollection services)
{
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
var redisConfiguration = services.LoadConfigurationFromEnv<RedisConfiguration>();
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.AutoStart = true;
cfg.Host(
rabbitConfiguration.Host, h =>
{
h.Username(rabbitConfiguration.Username);
h.Password(rabbitConfiguration.Password);
});
cfg.Message<CollectMetadata>(e => e.SetEntityName(rabbitConfiguration.DebridCollectorQueueName));
cfg.ConfigureEndpoints(context);
});
x.AddConsumer<PerformMetadataRequestConsumer>();
x.AddConsumer<WriteMetadataConsumer>();
x.RegisterMetadataIngestionSaga(redisConfiguration, rabbitConfiguration);
});
return services;
}
private static void RegisterMetadataIngestionSaga(this IBusRegistrationConfigurator x, RedisConfiguration redisConfiguration, RabbitMqConfiguration rabbitMqConfiguration) =>
x.AddSagaStateMachine<InfohashMetadataSagaStateMachine, InfohashMetadataSagaState>(
cfg =>
{
cfg.UseMessageRetry(r => r.Intervals(1000,2000,5000));
cfg.UseInMemoryOutbox();
})
.RedisRepository(redisConfiguration.ConnectionString)
.Endpoint(
e =>
{
e.Name = rabbitMqConfiguration.DebridCollectorQueueName;
e.ConcurrentMessageLimit = 50;
e.PrefetchCount = 50;
});
}

View File

@@ -0,0 +1,8 @@
namespace DebridCollector.Features.Configuration;
public class DebridCollectorConfiguration
{
private const string Prefix = "COLLECTOR";
private const string RealDebridApiKeyVariable = "REAL_DEBRID_API_KEY";
public string RealDebridApiKey { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(RealDebridApiKeyVariable);
}

View File

@@ -0,0 +1,64 @@
namespace DebridCollector.Features.Debrid;
public class DebridRequestProcessor(IDebridHttpClient debridHttpClient, ILogger<DebridRequestProcessor> logger, IBus messageBus) : BackgroundService
{
private const int BatchDelay = 3000;
public const int MaxBatchSize = 100;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var requests = new List<PerformMetadataRequest>(MaxBatchSize);
var delay = TimeSpan.FromMilliseconds(BatchDelay);
while (!stoppingToken.IsCancellationRequested)
{
while (ProcessorChannel.Queue.Reader.TryRead(out var request))
{
if (requests.Count >= MaxBatchSize)
{
break;
}
if (requests.All(x => x.InfoHash != request.InfoHash))
{
requests.Add(request);
}
}
if (requests.Any())
{
await ProcessRequests(requests, stoppingToken);
requests.Clear();
}
await Task.Delay(delay, stoppingToken);
}
// After the loop ends, there may be remaining requests which were not processed. Let's process them:
if (requests.Count != 0)
{
await ProcessRequests(requests, stoppingToken);
requests.Clear();
}
}
private async Task ProcessRequests(IReadOnlyCollection<PerformMetadataRequest> requests, CancellationToken stoppingToken = default)
{
try
{
var results = await debridHttpClient.GetMetadataAsync(requests, stoppingToken);
await ProcessResponses(results);
logger.LogInformation("Processed: {Count} infoHashes", requests.Count);
}
catch (Exception e)
{
logger.LogError(e, "Failed to process infoHashes");
}
}
private async Task ProcessResponses(IEnumerable<TorrentMetadataResponse> results)
{
var messages = results.Select(response => new GotMetadata(response)).ToList();
await messageBus.PublishBatch(messages);
}
}

View File

@@ -0,0 +1,6 @@
namespace DebridCollector.Features.Debrid;
public interface IDebridHttpClient
{
public Task<IReadOnlyList<TorrentMetadataResponse>> GetMetadataAsync(IReadOnlyCollection<PerformMetadataRequest> infoHashes, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,12 @@
namespace DebridCollector.Features.Debrid;
public static class ProcessorChannel
{
public static Channel<PerformMetadataRequest> Queue = Channel.CreateUnbounded<PerformMetadataRequest>(new()
{
SingleReader = true,
SingleWriter = true,
});
public static bool AddToQueue(PerformMetadataRequest infoHash) => Queue.Writer.TryWrite(infoHash);
}

View File

@@ -0,0 +1,65 @@
namespace DebridCollector.Features.Debrid;
public class RealDebridClient(HttpClient client) : IDebridHttpClient
{
private const string TorrentsInstantAvailability = "torrents/instantAvailability/";
public async Task<IReadOnlyList<TorrentMetadataResponse>> GetMetadataAsync(IReadOnlyCollection<PerformMetadataRequest> requests, CancellationToken cancellationToken = default)
{
var responseAsString = await client.GetStringAsync($"{TorrentsInstantAvailability}{string.Join("/", requests.Select(x => x.InfoHash.ToLowerInvariant()))}", cancellationToken);
var document = JsonDocument.Parse(responseAsString);
var torrentMetadataResponses = new List<TorrentMetadataResponse>();
foreach (var request in requests)
{
if (document.RootElement.TryGetProperty(request.InfoHash.ToLowerInvariant(), out var dataElement) &&
dataElement.ValueKind == JsonValueKind.Object &&
dataElement.TryGetProperty("rd", out var rdDataElement) &&
rdDataElement.ValueKind == JsonValueKind.Array &&
rdDataElement.GetArrayLength() > 0)
{
MapResponseToMetadata(rdDataElement, torrentMetadataResponses, request);
continue;
}
torrentMetadataResponses.Add(new(request.CorrelationId, new()));
}
return torrentMetadataResponses;
}
private static void MapResponseToMetadata(JsonElement rdDataElement, List<TorrentMetadataResponse> torrentMetadataResponses, PerformMetadataRequest request)
{
var metaData = new FileDataDictionary();
foreach (var item in rdDataElement.EnumerateArray())
{
if (item.ValueKind == JsonValueKind.Object)
{
foreach (var property in item.EnumerateObject())
{
if (property.Value.ValueKind == JsonValueKind.Object)
{
var fileData = new FileData();
if (property.Value.TryGetProperty("filename", out var filenameElement) && filenameElement.ValueKind == JsonValueKind.String)
{
fileData.Filename = filenameElement.GetString();
}
if (property.Value.TryGetProperty("filesize", out var filesizeElement) && filesizeElement.ValueKind == JsonValueKind.Number)
{
fileData.Filesize = filesizeElement.GetInt64();
}
metaData[property.Name] = fileData;
}
}
}
}
torrentMetadataResponses.Add(new(request.CorrelationId, metaData));
}
}

View File

@@ -0,0 +1,24 @@
namespace DebridCollector.Features.Debrid;
public class RealDebridResponse : Dictionary<string, RdData?>
{
}
public class RdData
{
[JsonPropertyName("rd")]
public List<FileDataDictionary>? Rd { get; set; }
}
public class FileDataDictionary : Dictionary<string, FileData>
{
}
public class FileData
{
[JsonPropertyName("filename")]
public string? Filename { get; set; }
[JsonPropertyName("filesize")]
public long? Filesize { get; set; }
}

View File

@@ -0,0 +1,32 @@
using DebridCollector.Features.Configuration;
namespace DebridCollector.Features.Debrid;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddRealDebridClient(this IServiceCollection services, DebridCollectorConfiguration serviceConfiguration)
{
services.AddHttpClient<IDebridHttpClient, RealDebridClient>(
client =>
{
client.BaseAddress = new("https://api.real-debrid.com/rest/1.0/");
client.DefaultRequestHeaders.Add("Authorization", $"Bearer {serviceConfiguration.RealDebridApiKey}");
})
.AddPolicyHandler(GetRetryPolicy())
.AddPolicyHandler(GetCircuitBreakerPolicy());
return services;
}
private static AsyncPolicy<HttpResponseMessage> GetRetryPolicy(int MaxRetryCount = 5, int MaxJitterTime = 1000) =>
HttpPolicyExtensions
.HandleTransientHttpError()
.WaitAndRetryAsync(MaxRetryCount, RetryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, RetryAttempt)) +
TimeSpan.FromMilliseconds(Random.Shared.Next(0, MaxJitterTime)));
private static AsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy() =>
HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(handledEventsAllowedBeforeBreaking: 5, TimeSpan.FromSeconds(30));
}

View File

@@ -0,0 +1,82 @@
namespace DebridCollector.Features.Worker;
public static class DebridMetaToTorrentMeta
{
public static IReadOnlyList<TorrentFile> MapMetadataToFilesCollection(
IParseTorrentTitle torrentTitle,
Torrent torrent,
string ImdbId,
FileDataDictionary Metadata)
{
try
{
var files = new List<TorrentFile>();
foreach (var metadataEntry in Metadata.Where(m => Filetypes.VideoFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext))))
{
var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex);
var file = new TorrentFile
{
ImdbId = ImdbId,
KitsuId = 0,
InfoHash = torrent.InfoHash,
FileIndex = validFileIndex ? fileIndex : 0,
Title = metadataEntry.Value.Filename,
Size = metadataEntry.Value.Filesize.GetValueOrDefault(),
};
var parsedTitle = torrentTitle.Parse(file.Title);
file.ImdbSeason = parsedTitle.Seasons.FirstOrDefault();
file.ImdbEpisode = parsedTitle.Episodes.FirstOrDefault();
files.Add(file);
}
return files;
}
catch (Exception)
{
return [];
}
}
public static async Task<IReadOnlyList<SubtitleFile>> MapMetadataToSubtitlesCollection(IDataStorage storage, string InfoHash, FileDataDictionary Metadata)
{
try
{
var files = new List<SubtitleFile>();
var torrentFiles = await storage.GetTorrentFiles(InfoHash.ToLowerInvariant());
if (torrentFiles.Count == 0)
{
return files;
}
foreach (var metadataEntry in Metadata.Where(m => Filetypes.SubtitleFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext))))
{
var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex);
var fileId = torrentFiles.FirstOrDefault(
t => Path.GetFileNameWithoutExtension(t.Title) == Path.GetFileNameWithoutExtension(metadataEntry.Value.Filename))?.Id ?? 0;
var file = new SubtitleFile
{
InfoHash = InfoHash,
FileIndex = validFileIndex ? fileIndex : 0,
FileId = fileId,
Title = metadataEntry.Value.Filename,
};
files.Add(file);
}
return files;
}
catch (Exception)
{
return [];
}
}
}

View File

@@ -0,0 +1,104 @@
namespace DebridCollector.Features.Worker;
public static class Filetypes
{
public static IReadOnlyList<string> VideoFileExtensions =
[
".3g2",
".3gp",
".3gp2",
".3gpp",
".60d",
".ajp",
".asf",
".asx",
".avchd",
".avi",
".bik",
".bix",
".box",
".cam",
".dat",
".divx",
".dmf",
".dv",
".dvr-ms",
".evo",
".flc",
".fli",
".flic",
".flv",
".flx",
".gvi",
".gvp",
".h264",
".m1v",
".m2p",
".m2ts",
".m2v",
".m4e",
".m4v",
".mjp",
".mjpeg",
".mjpg",
".mkv",
".moov",
".mov",
".movhd",
".movie",
".movx",
".mp4",
".mpe",
".mpeg",
".mpg",
".mpv",
".mpv2",
".mxf",
".nsv",
".nut",
".ogg",
".ogm",
".omf",
".ps",
".qt",
".ram",
".rm",
".rmvb",
".swf",
".ts",
".vfw",
".vid",
".video",
".viv",
".vivo",
".vob",
".vro",
".wm",
".wmv",
".wmx",
".wrap",
".wvx",
".wx",
".x264",
".xvid",
];
public static IReadOnlyList<string> SubtitleFileExtensions =
[
".a",
".srt",
".ass",
".ssa",
".stl",
".scc",
".ttml",
".sbv",
".dks",
".qtx",
".jss",
".vtt",
".smi",
".usf",
".idx"
];
}

View File

@@ -0,0 +1,14 @@
namespace DebridCollector.Features.Worker;
public class InfohashMetadataSagaState : SagaStateMachineInstance, ISagaVersion
{
public Torrent? Torrent { get; set; }
public string? Title { get; set; }
public string? ImdbId { get; set; }
public TorrentMetadataResponse? Metadata { get; set; }
public int RetriesAllowed { get; set; } = 2;
public Guid CorrelationId { get; set; }
public int Version { get; set; }
public int CurrentState { get; set; }
}

View File

@@ -0,0 +1,63 @@
namespace DebridCollector.Features.Worker;
public class InfohashMetadataSagaStateMachine : MassTransitStateMachine<InfohashMetadataSagaState>
{
public State Ingesting { get; private set; } = null!;
public State Writing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public Event<CollectMetadata> CollectMetadata { get; private set; } = null!;
public Event<GotMetadata> GotMetadata { get; private set; } = null!;
public Event<MetadataWritten> MetadataWritten { get; private set; } = null!;
public InfohashMetadataSagaStateMachine(ILogger<InfohashMetadataSagaStateMachine> logger)
{
InstanceState(x => x.CurrentState);
Event(() => CollectMetadata, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => GotMetadata, x => x.CorrelateById(context => context.Message.CorrelationId));
Event(() => MetadataWritten, x => x.CorrelateById(context => context.Message.CorrelationId));
Initially(
When(CollectMetadata)
.ThenAsync(
async context =>
{
context.Saga.CorrelationId = context.Data.CorrelationId;
context.Saga.Torrent = context.Data.Torrent;
context.Saga.ImdbId = context.Data.ImdbId;
await context.Publish(new PerformMetadataRequest(context.Saga.CorrelationId, context.Saga.Torrent.InfoHash));
logger.LogInformation("Collecting Metadata for torrent {InfoHash} in Saga {SagaId}", context.Instance.Torrent.InfoHash, context.Instance.CorrelationId);
})
.TransitionTo(Ingesting));
During(
Ingesting,
When(GotMetadata)
.ThenAsync(
async context =>
{
context.Saga.Metadata = context.Data.Metadata;
await context.Publish(new WriteMetadata(context.Saga.Torrent, context.Saga.Metadata, context.Saga.ImdbId));
logger.LogInformation("Got Metadata for torrent {InfoHash} in Saga {SagaId}", context.Saga.Torrent.InfoHash, context.Saga.CorrelationId);
})
.TransitionTo(Writing));
During(
Writing,
When(MetadataWritten)
.Then(
context =>
{
logger.LogInformation("Metadata Written for torrent {InfoHash} in Saga {SagaId}", context.Saga.Torrent.InfoHash, context.Saga.CorrelationId);
})
.TransitionTo(Completed)
.Finalize());
SetCompletedWhenFinalized();
}
}

View File

@@ -0,0 +1,10 @@
namespace DebridCollector.Features.Worker;
public class PerformMetadataRequestConsumer : IConsumer<PerformMetadataRequest>
{
public Task Consume(ConsumeContext<PerformMetadataRequest> context)
{
ProcessorChannel.AddToQueue(context.Message);
return Task.CompletedTask;
}
}

View File

@@ -0,0 +1,22 @@
namespace DebridCollector.Features.Worker;
[EntityName("perform-metadata-request")]
public record PerformMetadataRequest(Guid CorrelationId, string InfoHash) : CorrelatedBy<Guid>;
[EntityName("torrent-metadata-response")]
public record GotMetadata(TorrentMetadataResponse Metadata) : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; } = Metadata.CorrelationId;
}
[EntityName("write-metadata")]
public record WriteMetadata(Torrent Torrent, TorrentMetadataResponse Metadata, string ImdbId) : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; } = Metadata.CorrelationId;
}
[EntityName("metadata-written")]
public record MetadataWritten(TorrentMetadataResponse Metadata) : CorrelatedBy<Guid>
{
public Guid CorrelationId { get; init; } = Metadata.CorrelationId;
}

View File

@@ -0,0 +1,4 @@
namespace DebridCollector.Features.Worker;
[EntityName("torrent-metadata-response")]
public record TorrentMetadataResponse(Guid CorrelationId, FileDataDictionary Metadata) : CorrelatedBy<Guid>;

View File

@@ -0,0 +1,25 @@
namespace DebridCollector.Features.Worker;
public class WriteMetadataConsumer(IParseTorrentTitle parseTorrentTitle, IDataStorage dataStorage) : IConsumer<WriteMetadata>
{
public async Task Consume(ConsumeContext<WriteMetadata> context)
{
var request = context.Message;
var torrentFiles = DebridMetaToTorrentMeta.MapMetadataToFilesCollection(parseTorrentTitle, request.Torrent, request.ImdbId, request.Metadata.Metadata);
if (torrentFiles.Any())
{
await dataStorage.InsertFiles(torrentFiles);
var subtitles = await DebridMetaToTorrentMeta.MapMetadataToSubtitlesCollection(dataStorage, request.Torrent.InfoHash, request.Metadata.Metadata);
if (subtitles.Any())
{
await dataStorage.InsertSubtitles(subtitles);
}
}
await context.Publish(new MetadataWritten(request.Metadata));
}
}

View File

@@ -0,0 +1,20 @@
// Global using directives
global using System.Text.Json;
global using System.Text.Json.Serialization;
global using System.Threading.Channels;
global using DebridCollector.Extensions;
global using DebridCollector.Features.Debrid;
global using DebridCollector.Features.Worker;
global using MassTransit;
global using MassTransit.Mediator;
global using Microsoft.AspNetCore.Builder;
global using Microsoft.Extensions.DependencyInjection;
global using Polly;
global using Polly.Extensions.Http;
global using PromKnight.ParseTorrentTitle;
global using SharedContracts.Configuration;
global using SharedContracts.Dapper;
global using SharedContracts.Extensions;
global using SharedContracts.Models;
global using SharedContracts.Requests;

View File

@@ -0,0 +1,17 @@
var builder = WebApplication.CreateBuilder();
builder.DisableIpPortBinding();
builder.Configuration
.AddServiceConfiguration();
builder.Host
.SetupSerilog(builder.Configuration);
builder.Services
.AddServiceConfiguration()
.AddDatabase()
.RegisterMassTransit();
var app = builder.Build();
app.Run();