Upgrade RTN to 0.1.8, replace rabbitmq with drop in replacement lavinmq - better performance, lower resource usage. (#182)

This commit is contained in:
iPromKnight
2024-03-28 23:35:41 +00:00
committed by GitHub
parent bb260d78d6
commit 527d6cdf15
18 changed files with 196 additions and 146 deletions

View File

@@ -9,7 +9,7 @@ networks:
volumes: volumes:
postgres: postgres:
rabbitmq: lavinmq:
redis: redis:
services: services:
@@ -55,28 +55,29 @@ services:
volumes: volumes:
- redis:/data - redis:/data
## RabbitMQ is used as a message broker for the services. ## LavinMQ is used as a message broker for the services.
## It is a high performance drop in replacement for RabbitMQ.
## It is used to communicate between the services. ## It is used to communicate between the services.
rabbitmq: lavinmq:
env_file: stack.env env_file: stack.env
healthcheck:
test: ["CMD-SHELL", "rabbitmq-diagnostics -q ping"]
timeout: 10s
interval: 10s
retries: 3
start_period: 10s
# # If you need the database to be accessible from outside, please open the below port. # # If you need the database to be accessible from outside, please open the below port.
# # Furthermore, please, please, please, look at the documentation for rabbit on how to secure the service. # # Furthermore, please, please, please, look at the documentation for lavinmq / rabbitmq on how to secure the service.
# ports: # ports:
# - "5672:5672" # - "5672:5672"
# - "15672:15672" # - "15672:15672"
# - "15692:15692" # - "15692:15692"
image: rabbitmq:3-management image: cloudamqp/lavinmq:latest
healthcheck:
test: ["CMD-SHELL", "lavinmqctl status"]
timeout: 10s
interval: 10s
retries: 3
start_period: 10s
restart: unless-stopped
networks: networks:
- knightcrawler-network - knightcrawler-network
restart: unless-stopped
volumes: volumes:
- rabbitmq:/var/lib/rabbitmq - lavinmq:/var/lib/lavinmq/
## The addon. This is what is used in stremio ## The addon. This is what is used in stremio
addon: addon:
@@ -87,7 +88,7 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
redis: redis:
condition: service_healthy condition: service_healthy
@@ -111,7 +112,7 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
redis: redis:
condition: service_healthy condition: service_healthy
@@ -132,7 +133,7 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
redis: redis:
condition: service_healthy condition: service_healthy
@@ -176,7 +177,7 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
redis: redis:
condition: service_healthy condition: service_healthy
@@ -191,6 +192,16 @@ services:
## QBit collector utilizes QBitTorrent to download metadata. ## QBit collector utilizes QBitTorrent to download metadata.
qbitcollector: qbitcollector:
depends_on: depends_on:
metadata:
condition: service_completed_successfully
migrator:
condition: service_completed_successfully
postgres:
condition: service_healthy
lavinmq:
condition: service_healthy
redis:
condition: service_healthy
qbittorrent: qbittorrent:
condition: service_healthy condition: service_healthy
deploy: deploy:

View File

@@ -16,7 +16,7 @@ rule_files:
scrape_configs: scrape_configs:
- job_name: "rabbitmq" - job_name: "rabbitmq"
static_configs: static_configs:
- targets: ["rabbitmq:15692"] - targets: ["lavinmq:15692"]
- job_name: "postgres-exporter" - job_name: "postgres-exporter"
static_configs: static_configs:
- targets: ["postgres-exporter:9187"] - targets: ["postgres-exporter:9187"]

View File

@@ -4,8 +4,8 @@ x-basehealth: &base-health
retries: 3 retries: 3
start_period: 10s start_period: 10s
x-rabbithealth: &rabbitmq-health x-lavinhealth: &lavinmq-health
test: rabbitmq-diagnostics -q ping test: [ "CMD-SHELL", "lavinmqctl status" ]
<<: *base-health <<: *base-health
x-redishealth: &redis-health x-redishealth: &redis-health
@@ -52,21 +52,19 @@ services:
networks: networks:
- knightcrawler-network - knightcrawler-network
rabbitmq: lavinmq:
image: rabbitmq:3-management env_file: stack.env
# # If you need the database to be accessible from outside, please open the below port. # # If you need the database to be accessible from outside, please open the below port.
# # Furthermore, please, please, please, look at the documentation for rabbit on how to secure the service. # # Furthermore, please, please, please, look at the documentation for lavinmq / rabbitmq on how to secure the service.
# ports: # ports:
# - "5672:5672" # - "5672:5672"
# - "15672:15672" # - "15672:15672"
# - "15692:15692" # - "15692:15692"
volumes: image: cloudamqp/lavinmq:latest
- rabbitmq:/var/lib/rabbitmq healthcheck: *lavinmq-health
restart: unless-stopped restart: unless-stopped
healthcheck: *rabbitmq-health volumes:
env_file: ../../.env - lavinmq:/var/lib/lavinmq/
networks:
- knightcrawler-network
## QBitTorrent is a torrent client that can be used to download torrents. In this case its used to download metadata. ## QBitTorrent is a torrent client that can be used to download torrents. In this case its used to download metadata.
## The QBit collector requires this. ## The QBit collector requires this.

View File

@@ -11,7 +11,7 @@ x-depends: &knightcrawler-app-depends
condition: service_healthy condition: service_healthy
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
migrator: migrator:
condition: service_completed_successfully condition: service_completed_successfully

View File

@@ -1,4 +1,4 @@
volumes: volumes:
postgres: postgres:
redis: redis:
rabbitmq: lavinmq:

View File

@@ -13,8 +13,8 @@ REDIS_HOST=redis
REDIS_PORT=6379 REDIS_PORT=6379
REDIS_EXTRA=abortConnect=false,allowAdmin=true REDIS_EXTRA=abortConnect=false,allowAdmin=true
# RabbitMQ # AMQP
RABBITMQ_HOST=rabbitmq RABBITMQ_HOST=lavinmq
RABBITMQ_USER=guest RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest RABBITMQ_PASSWORD=guest
RABBITMQ_CONSUMER_QUEUE_NAME=ingested RABBITMQ_CONSUMER_QUEUE_NAME=ingested

View File

@@ -1,2 +1,3 @@
remove-item -recurse -force ../src/python
mkdir -p ../src/python mkdir -p ../src/python
pip install --force-reinstall rank-torrent-name==0.1.6 -t ../src/python/ pip install -r ../src/requirements.txt -t ../src/python/

View File

@@ -1,4 +1,5 @@
#!/bin/bash #!/bin/bash
rm -rf ../src/python
mkdir -p ../src/python mkdir -p ../src/python
pip install --force-reinstall rank-torrent-name==0.1.6 -t ../src/python/ python3 -m pip install -r ../src/requirements.txt -t ../src/python/

View File

@@ -13,13 +13,19 @@ FROM mcr.microsoft.com/dotnet/aspnet:8.0-alpine3.19
WORKDIR /app WORKDIR /app
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
RUN apk add --update --no-cache python3=~3.11.8-r0 py3-pip && ln -sf python3 /usr/bin/python RUN apk add --update --no-cache python3=~3.11.8-r0 py3-pip && ln -sf python3 /usr/bin/python
COPY --from=build /src/out . COPY --from=build /src/out .
RUN rm -rf /app/python && mkdir -p /app/python RUN rm -rf /app/python && mkdir -p /app/python
RUN pip3 install --force-reinstall rank-torrent-name==0.1.6 -t /app/python
RUN pip3 install -r /app/requirements.txt -t /app/python
RUN addgroup -S producer && adduser -S -G producer producer RUN addgroup -S producer && adduser -S -G producer producer
USER producer USER producer
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD pgrep -f dotnet || exit 1 CMD pgrep -f dotnet || exit 1

View File

@@ -33,6 +33,9 @@
<None Include="Configuration\*.json"> <None Include="Configuration\*.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory> <CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None> </None>
<None Update="requirements.txt">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup> </ItemGroup>
<ItemGroup Condition="'$(Configuration)' == 'Debug'"> <ItemGroup Condition="'$(Configuration)' == 'Debug'">

View File

@@ -0,0 +1 @@
rank-torrent-name==0.1.8

View File

@@ -1,5 +1,8 @@
// Global using directives // Global using directives
global using System.Collections.Concurrent;
global using System.Globalization;
global using System.Text;
global using System.Text.Json; global using System.Text.Json;
global using Dapper; global using Dapper;
global using MassTransit; global using MassTransit;

View File

@@ -0,0 +1,13 @@
namespace SharedContracts.Python;
public interface IPythonEngineService
{
ILogger<PythonEngineService> Logger { get; }
Task InitializePythonEngine(CancellationToken cancellationToken);
T ExecuteCommandOrScript<T>(string command, PyModule module, bool throwOnErrors);
T ExecutePythonOperation<T>(Func<T> operation, string operationName, bool throwOnErrors);
T ExecutePythonOperationWithDefault<T>(Func<T> operation, T? defaultValue, string operationName, bool throwOnErrors);
Task StopPythonEngine(CancellationToken cancellationToken);
dynamic? Sys { get; }
}

View File

@@ -0,0 +1,8 @@
namespace SharedContracts.Python;
public class PythonEngineManager(IPythonEngineService pythonEngineService) : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken) => pythonEngineService.InitializePythonEngine(cancellationToken);
public Task StopAsync(CancellationToken cancellationToken) => pythonEngineService.StopPythonEngine(cancellationToken);
}

View File

@@ -1,24 +1,28 @@
namespace SharedContracts.Python; namespace SharedContracts.Python;
public class PythonEngineService(ILogger<PythonEngineService> logger) : IHostedService public class PythonEngineService(ILogger<PythonEngineService> logger) : IPythonEngineService
{ {
private IntPtr _mainThreadState; private IntPtr _mainThreadState;
private bool _isInitialized; private bool _isInitialized;
public Task StartAsync(CancellationToken cancellationToken) public ILogger<PythonEngineService> Logger { get; } = logger;
public dynamic? Sys { get; private set; }
public Task InitializePythonEngine(CancellationToken cancellationToken)
{ {
if (_isInitialized) if (_isInitialized)
{ {
return Task.CompletedTask; return Task.CompletedTask;
} }
try try
{ {
var pythonDllEnv = Environment.GetEnvironmentVariable("PYTHONNET_PYDLL"); var pythonDllEnv = Environment.GetEnvironmentVariable("PYTHONNET_PYDLL");
if (string.IsNullOrWhiteSpace(pythonDllEnv)) if (string.IsNullOrWhiteSpace(pythonDllEnv))
{ {
logger.LogWarning("PYTHONNET_PYDLL env is not set. Exiting Application"); Logger.LogWarning("PYTHONNET_PYDLL env is not set. Exiting Application");
Environment.Exit(1); Environment.Exit(1);
return Task.CompletedTask; return Task.CompletedTask;
} }
@@ -26,24 +30,92 @@ public class PythonEngineService(ILogger<PythonEngineService> logger) : IHostedS
Runtime.PythonDLL = pythonDllEnv; Runtime.PythonDLL = pythonDllEnv;
PythonEngine.Initialize(); PythonEngine.Initialize();
_mainThreadState = PythonEngine.BeginAllowThreads(); _mainThreadState = PythonEngine.BeginAllowThreads();
_isInitialized = true; _isInitialized = true;
logger.LogInformation("Python engine initialized"); Logger.LogInformation("Python engine initialized");
} }
catch (Exception e) catch (Exception e)
{ {
logger.LogWarning(e, "Failed to initialize Python engine"); Logger.LogError(e, $"Failed to initialize Python engine: {e.Message}");
Environment.Exit(1); Environment.Exit(1);
} }
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task StopAsync(CancellationToken cancellationToken) public T ExecuteCommandOrScript<T>(string command, PyModule module, bool throwOnErrors) =>
ExecutePythonOperation(
() =>
{
var pyCompile = PythonEngine.Compile(command);
var nativeResult = module.Execute(pyCompile);
return nativeResult.As<T>();
}, nameof(ExecuteCommandOrScript), throwOnErrors);
public T ExecutePythonOperation<T>(Func<T> operation, string operationName, bool throwOnErrors) =>
ExecutePythonOperationWithDefault(operation, default, operationName, throwOnErrors);
public T ExecutePythonOperationWithDefault<T>(Func<T> operation, T? defaultValue, string operationName, bool throwOnErrors) =>
ExecutePythonOperationInternal(operation, defaultValue, operationName, throwOnErrors);
public void ExecuteOnGIL(Action act, bool throwOnErrors)
{
Sys ??= LoadSys();
try
{
using var gil = Py.GIL();
act();
}
catch (Exception ex)
{
Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(ExecuteOnGIL));
if (throwOnErrors)
{
throw;
}
}
}
public Task StopPythonEngine(CancellationToken cancellationToken)
{ {
PythonEngine.EndAllowThreads(_mainThreadState); PythonEngine.EndAllowThreads(_mainThreadState);
PythonEngine.Shutdown(); PythonEngine.Shutdown();
return Task.CompletedTask; return Task.CompletedTask;
} }
private static dynamic LoadSys()
{
using var gil = Py.GIL();
var sys = Py.Import("sys");
return sys;
}
// ReSharper disable once EntityNameCapturedOnly.Local
private T ExecutePythonOperationInternal<T>(Func<T> operation, T? defaultValue, string operationName, bool throwOnErrors)
{
Sys ??= LoadSys();
var result = defaultValue;
try
{
using var gil = Py.GIL();
result = operation();
}
catch (Exception ex)
{
Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(operationName));
if (throwOnErrors)
{
throw;
}
}
return result;
}
} }

View File

@@ -3,6 +3,4 @@ namespace SharedContracts.Python.RTN;
public interface IRankTorrentName public interface IRankTorrentName
{ {
ParseTorrentTitleResponse Parse(string title); ParseTorrentTitleResponse Parse(string title);
bool IsTrash(string title);
bool TitleMatch(string title, string checkTitle);
} }

View File

@@ -2,117 +2,53 @@ namespace SharedContracts.Python.RTN;
public class RankTorrentName : IRankTorrentName public class RankTorrentName : IRankTorrentName
{ {
private const string SysModuleName = "sys"; private readonly IPythonEngineService _pythonEngineService;
private const string RtnModuleName = "RTN"; private const string RtnModuleName = "RTN";
private readonly ILogger<RankTorrentName> _logger;
private dynamic? _sys;
private dynamic? _rtn; private dynamic? _rtn;
public RankTorrentName(ILogger<RankTorrentName> logger) public RankTorrentName(IPythonEngineService pythonEngineService)
{ {
_logger = logger; _pythonEngineService = pythonEngineService;
InitModules(); InitModules();
} }
public ParseTorrentTitleResponse Parse(string title) public ParseTorrentTitleResponse Parse(string title) =>
{ _pythonEngineService.ExecutePythonOperation(
try () =>
{
using var py = Py.GIL();
var result = _rtn?.parse(title);
if (result == null)
{ {
return new(false, string.Empty, 0); var result = _rtn?.parse(title);
} return ParseResult(result);
}, nameof(Parse), throwOnErrors: false);
return ParseResult(result);
}
catch (Exception e)
{
_logger.LogError(e, "Failed to parse title");
return new(false, string.Empty, 0);
}
}
public bool IsTrash(string title)
{
try
{
using var py = Py.GIL();
var result = _rtn?.check_trash(title);
if (result == null)
{
return false;
}
var response = result.As<bool>() ?? false;
return response;
}
catch (Exception e)
{
_logger.LogError(e, "Failed to parse title");
return false;
}
}
public bool TitleMatch(string title, string checkTitle)
{
try
{
using var py = Py.GIL();
var result = _rtn?.title_match(title, checkTitle);
if (result == null)
{
return false;
}
var response = result.As<bool>() ?? false;
return response;
}
catch (Exception e)
{
_logger.LogError(e, "Failed to parse title");
return false;
}
}
private static ParseTorrentTitleResponse ParseResult(dynamic result) private static ParseTorrentTitleResponse ParseResult(dynamic result)
{ {
if (result == null)
{
return new(false, string.Empty, 0);
}
var parsedTitle = result.GetAttr("parsed_title")?.As<string>() ?? string.Empty; var parsedTitle = result.GetAttr("parsed_title")?.As<string>() ?? string.Empty;
var year = result.GetAttr("year")?.As<int>() ?? 0; var year = result.GetAttr("year")?.As<int>() ?? 0;
var seasonList = result.GetAttr("season")?.As<PyList>(); var seasons = GetIntArray(result, "season");
var episodeList = result.GetAttr("episode")?.As<PyList>(); var episodes = GetIntArray(result, "episode");
int[]? seasons = seasonList?.Length() > 0 ? seasonList.As<int[]>() : null;
int[]? episodes = episodeList?.Length() > 0 ? episodeList.As<int[]>() : null;
return new ParseTorrentTitleResponse(true, parsedTitle, year, seasons, episodes); return new ParseTorrentTitleResponse(true, parsedTitle, year, seasons, episodes);
} }
private void InitModules() private static int[]? GetIntArray(dynamic result, string field)
{ {
using var py = Py.GIL(); var theList = result.GetAttr(field)?.As<PyList>();
_sys = Py.Import(SysModuleName); int[]? results = theList?.Length() > 0 ? theList.As<int[]>() : null;
if (_sys == null) return results;
{
_logger.LogError($"Failed to import Python module: {SysModuleName}");
return;
}
_sys.path.append(Path.Combine(AppContext.BaseDirectory, "python"));
_rtn = Py.Import(RtnModuleName);
if (_rtn == null)
{
_logger.LogError($"Failed to import Python module: {RtnModuleName}");
}
} }
private void InitModules() =>
_rtn =
_pythonEngineService.ExecutePythonOperation(() =>
{
_pythonEngineService.Sys.path.append(Path.Combine(AppContext.BaseDirectory, "python"));
return Py.Import(RtnModuleName);
}, nameof(InitModules), throwOnErrors: false);
} }

View File

@@ -4,9 +4,8 @@ public static class ServiceCollectionExtensions
{ {
public static IServiceCollection RegisterPythonEngine(this IServiceCollection services) public static IServiceCollection RegisterPythonEngine(this IServiceCollection services)
{ {
services.AddSingleton<PythonEngineService>(); services.AddSingleton<IPythonEngineService, PythonEngineService>();
services.AddHostedService<PythonEngineManager>();
services.AddHostedService(p => p.GetRequiredService<PythonEngineService>());
return services; return services;
} }