Implement Max Queue and Max Batch size when publishing

MaxPublishBatchSize must be set, but MaxQueueSize can be set to 0 to disable check of the rabbitmq queue size
This commit is contained in:
iPromKnight
2024-02-02 14:43:29 +00:00
parent 8ad6cf731c
commit 57f4757541
7 changed files with 83 additions and 13 deletions

4
env/producer.env vendored
View File

@@ -4,7 +4,7 @@ RabbitMqConfiguration__QueueName=ingested
RabbitMqConfiguration__Username=guest
RabbitMqConfiguration__Password=guest
RabbitMqConfiguration__Durable=true
RabbitMqConfiguration__MaxQueueSize=1000
RabbitMqConfiguration__MaxPublishBatchSize=100
RabbitMqConfiguration__MaxQueueSize=0
RabbitMqConfiguration__MaxPublishBatchSize=500
RabbitMqConfiguration__PublishIntervalInSeconds=10
GithubSettings__PAT=

View File

@@ -5,8 +5,8 @@
"Password": "guest",
"QueueName": "test-queue",
"Durable": true,
"MaxQueueSize": 1000,
"MaxPublishBatchSize": 100,
"MaxQueueSize": 0,
"MaxPublishBatchSize": 1,
"PublishIntervalInSeconds": 10
}
}

View File

@@ -103,13 +103,21 @@ public static class ServiceCollectionExtensions
return githubConfiguration;
}
private static RabbitMqConfiguration LoadRabbitMQConfiguration(IServiceCollection services, IConfiguration configuration)
{
var rabbitConfiguration = configuration.GetSection(RabbitMqConfiguration.SectionName).Get<RabbitMqConfiguration>();
ArgumentNullException.ThrowIfNull(rabbitConfiguration, nameof(rabbitConfiguration));
if (rabbitConfiguration.MaxQueueSize > 0)
{
if (rabbitConfiguration.MaxPublishBatchSize > rabbitConfiguration.MaxQueueSize)
{
throw new InvalidOperationException("MaxPublishBatchSize cannot be greater than MaxQueueSize in RabbitMqConfiguration");
}
}
services.TryAddSingleton(rabbitConfiguration);
return rabbitConfiguration;

View File

@@ -2,5 +2,5 @@
public interface IMessagePublisher
{
Task PublishAsync(IEnumerable<Torrent> torrents, CancellationToken cancellationToken = default);
Task<bool> PublishAsync(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default);
}

View File

@@ -17,7 +17,13 @@ public class PublisherJob(IMessagePublisher publisher, IDataStorage storage, ILo
return;
}
await publisher.PublishAsync(torrents, cancellationToken);
var published = await publisher.PublishAsync(torrents, cancellationToken);
if (!published)
{
return;
}
var result = await storage.SetTorrentsProcessed(torrents, cancellationToken);
if (!result.Success)

View File

@@ -10,7 +10,7 @@ public class RabbitMqConfiguration
public string? Password { get; set; }
public string? QueueName { get; set; }
public bool Durable { get; set; }
public int MaxQueueSize { get; set; } = 1000;
public int MaxPublishBatchSize { get; set; } = 100;
public int MaxQueueSize { get; set; }
public int MaxPublishBatchSize { get; set; } = 500;
public int PublishIntervalInSeconds { get; set; } = 1000 * 10;
}

View File

@@ -1,12 +1,24 @@
namespace Producer.Services;
public class TorrentPublisher(ISendEndpointProvider sendEndpointProvider, RabbitMqConfiguration configuration) : IMessagePublisher
public class TorrentPublisher(
ISendEndpointProvider sendEndpointProvider,
RabbitMqConfiguration configuration,
IHttpClientFactory httpClientFactory,
ILogger<TorrentPublisher> logger) : IMessagePublisher
{
public async Task PublishAsync(IEnumerable<Torrent> torrents, CancellationToken cancellationToken = default)
public async Task<bool> PublishAsync(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default)
{
var queueAddress = ConstructQueue();
var sendEndpoint = await sendEndpointProvider.GetSendEndpoint(new(queueAddress));
if (!await CanPublishToRabbitMq(torrents, cancellationToken))
{
logger.LogWarning("Queue is full or not accessible, not publishing this time.");
return false;
}
await sendEndpoint.SendBatch(torrents, cancellationToken: cancellationToken);
return true;
}
private string ConstructQueue()
@@ -19,4 +31,48 @@ public class TorrentPublisher(ISendEndpointProvider sendEndpointProvider, Rabbit
return queueBuilder.ToString();
}
private async Task<bool> CanPublishToRabbitMq(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken)
{
if (configuration.MaxQueueSize == 0)
{
return true;
}
var client = httpClientFactory.CreateClient("RabbitMq");
client.DefaultRequestHeaders.Authorization =
new("Basic", Convert.ToBase64String(Encoding.UTF8.GetBytes($"{configuration.Username}:{configuration.Password}")));
var url = $"http://{configuration.Host}:15672/api/queues/{Uri.EscapeDataString("/")}/{configuration.QueueName}";
var response = await client.GetAsync(url, cancellationToken);
var body = await response.Content.ReadAsStringAsync(cancellationToken);
var doc = JsonDocument.Parse(body);
if (!doc.RootElement.TryGetProperty("messages", out var messages))
{
logger.LogWarning("Failed to get message count from RabbitMq");
return false;
}
if (!messages.TryGetInt32(out var messageCount))
{
logger.LogWarning("Failed to get message count from RabbitMq");
return false;
}
logger.LogInformation("Current queue message count: {MessageCount}", messageCount);
var canPublish = messageCount < configuration.MaxQueueSize + torrents.Count;
if (!canPublish)
{
logger.LogWarning("You have configured a max queue size of {MaxQueueSize}.", configuration.MaxQueueSize);
logger.LogWarning("Not publishing this time.");
}
return canPublish;
}
}