diff --git a/env/producer.env b/env/producer.env index c0ac48b..4d6a52f 100644 --- a/env/producer.env +++ b/env/producer.env @@ -4,7 +4,7 @@ RabbitMqConfiguration__QueueName=ingested RabbitMqConfiguration__Username=guest RabbitMqConfiguration__Password=guest RabbitMqConfiguration__Durable=true -RabbitMqConfiguration__MaxQueueSize=1000 -RabbitMqConfiguration__MaxPublishBatchSize=100 +RabbitMqConfiguration__MaxQueueSize=0 +RabbitMqConfiguration__MaxPublishBatchSize=500 RabbitMqConfiguration__PublishIntervalInSeconds=10 GithubSettings__PAT= \ No newline at end of file diff --git a/src/producer/Configuration/rabbitmq.json b/src/producer/Configuration/rabbitmq.json index 11b0d91..3544149 100644 --- a/src/producer/Configuration/rabbitmq.json +++ b/src/producer/Configuration/rabbitmq.json @@ -5,8 +5,8 @@ "Password": "guest", "QueueName": "test-queue", "Durable": true, - "MaxQueueSize": 1000, - "MaxPublishBatchSize": 100, + "MaxQueueSize": 0, + "MaxPublishBatchSize": 1, "PublishIntervalInSeconds": 10 } } \ No newline at end of file diff --git a/src/producer/Extensions/ServiceCollectionExtensions.cs b/src/producer/Extensions/ServiceCollectionExtensions.cs index 1fbeddb..7ce067a 100644 --- a/src/producer/Extensions/ServiceCollectionExtensions.cs +++ b/src/producer/Extensions/ServiceCollectionExtensions.cs @@ -103,13 +103,21 @@ public static class ServiceCollectionExtensions return githubConfiguration; } - + private static RabbitMqConfiguration LoadRabbitMQConfiguration(IServiceCollection services, IConfiguration configuration) { var rabbitConfiguration = configuration.GetSection(RabbitMqConfiguration.SectionName).Get(); - + ArgumentNullException.ThrowIfNull(rabbitConfiguration, nameof(rabbitConfiguration)); - + + if (rabbitConfiguration.MaxQueueSize > 0) + { + if (rabbitConfiguration.MaxPublishBatchSize > rabbitConfiguration.MaxQueueSize) + { + throw new InvalidOperationException("MaxPublishBatchSize cannot be greater than MaxQueueSize in RabbitMqConfiguration"); + } + } + services.TryAddSingleton(rabbitConfiguration); return rabbitConfiguration; diff --git a/src/producer/Interfaces/IMessagePublisher.cs b/src/producer/Interfaces/IMessagePublisher.cs index c6444ab..19c273c 100644 --- a/src/producer/Interfaces/IMessagePublisher.cs +++ b/src/producer/Interfaces/IMessagePublisher.cs @@ -2,5 +2,5 @@ public interface IMessagePublisher { - Task PublishAsync(IEnumerable torrents, CancellationToken cancellationToken = default); + Task PublishAsync(IReadOnlyCollection torrents, CancellationToken cancellationToken = default); } \ No newline at end of file diff --git a/src/producer/Jobs/PublisherJob.cs b/src/producer/Jobs/PublisherJob.cs index e6ca3ad..2e25294 100644 --- a/src/producer/Jobs/PublisherJob.cs +++ b/src/producer/Jobs/PublisherJob.cs @@ -17,7 +17,13 @@ public class PublisherJob(IMessagePublisher publisher, IDataStorage storage, ILo return; } - await publisher.PublishAsync(torrents, cancellationToken); + var published = await publisher.PublishAsync(torrents, cancellationToken); + + if (!published) + { + return; + } + var result = await storage.SetTorrentsProcessed(torrents, cancellationToken); if (!result.Success) diff --git a/src/producer/Models/RabbitMqConfiguration.cs b/src/producer/Models/RabbitMqConfiguration.cs index 4b86073..ccc0277 100644 --- a/src/producer/Models/RabbitMqConfiguration.cs +++ b/src/producer/Models/RabbitMqConfiguration.cs @@ -10,7 +10,7 @@ public class RabbitMqConfiguration public string? Password { get; set; } public string? QueueName { get; set; } public bool Durable { get; set; } - public int MaxQueueSize { get; set; } = 1000; - public int MaxPublishBatchSize { get; set; } = 100; + public int MaxQueueSize { get; set; } + public int MaxPublishBatchSize { get; set; } = 500; public int PublishIntervalInSeconds { get; set; } = 1000 * 10; } \ No newline at end of file diff --git a/src/producer/Services/TorrentPublisher.cs b/src/producer/Services/TorrentPublisher.cs index f0697ff..ac39745 100644 --- a/src/producer/Services/TorrentPublisher.cs +++ b/src/producer/Services/TorrentPublisher.cs @@ -1,12 +1,24 @@ namespace Producer.Services; -public class TorrentPublisher(ISendEndpointProvider sendEndpointProvider, RabbitMqConfiguration configuration) : IMessagePublisher +public class TorrentPublisher( + ISendEndpointProvider sendEndpointProvider, + RabbitMqConfiguration configuration, + IHttpClientFactory httpClientFactory, + ILogger logger) : IMessagePublisher { - public async Task PublishAsync(IEnumerable torrents, CancellationToken cancellationToken = default) + public async Task PublishAsync(IReadOnlyCollection torrents, CancellationToken cancellationToken = default) { var queueAddress = ConstructQueue(); var sendEndpoint = await sendEndpointProvider.GetSendEndpoint(new(queueAddress)); + + if (!await CanPublishToRabbitMq(torrents, cancellationToken)) + { + logger.LogWarning("Queue is full or not accessible, not publishing this time."); + return false; + } + await sendEndpoint.SendBatch(torrents, cancellationToken: cancellationToken); + return true; } private string ConstructQueue() @@ -19,4 +31,48 @@ public class TorrentPublisher(ISendEndpointProvider sendEndpointProvider, Rabbit return queueBuilder.ToString(); } + + private async Task CanPublishToRabbitMq(IReadOnlyCollection torrents, CancellationToken cancellationToken) + { + if (configuration.MaxQueueSize == 0) + { + return true; + } + + var client = httpClientFactory.CreateClient("RabbitMq"); + + client.DefaultRequestHeaders.Authorization = + new("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{configuration.Username}:{configuration.Password}"))); + + var url = $"http://{configuration.Host}:15672/api/queues/{Uri.EscapeDataString("/")}/{configuration.QueueName}"; + + var response = await client.GetAsync(url, cancellationToken); + + var body = await response.Content.ReadAsStringAsync(cancellationToken); + var doc = JsonDocument.Parse(body); + + if (!doc.RootElement.TryGetProperty("messages", out var messages)) + { + logger.LogWarning("Failed to get message count from RabbitMq"); + return false; + } + + if (!messages.TryGetInt32(out var messageCount)) + { + logger.LogWarning("Failed to get message count from RabbitMq"); + return false; + } + + logger.LogInformation("Current queue message count: {MessageCount}", messageCount); + + var canPublish = messageCount < configuration.MaxQueueSize + torrents.Count; + + if (!canPublish) + { + logger.LogWarning("You have configured a max queue size of {MaxQueueSize}.", configuration.MaxQueueSize); + logger.LogWarning("Not publishing this time."); + } + + return canPublish; + } } \ No newline at end of file