diff --git a/deployment/docker/docker-compose.yaml b/deployment/docker/docker-compose.yaml index bba9e74..e713137 100644 --- a/deployment/docker/docker-compose.yaml +++ b/deployment/docker/docker-compose.yaml @@ -9,7 +9,7 @@ networks: volumes: postgres: - rabbitmq: + lavinmq: redis: services: @@ -55,28 +55,29 @@ services: volumes: - redis:/data - ## RabbitMQ is used as a message broker for the services. + ## LavinMQ is used as a message broker for the services. + ## It is a high performance drop in replacement for RabbitMQ. ## It is used to communicate between the services. - rabbitmq: + lavinmq: env_file: stack.env - healthcheck: - test: ["CMD-SHELL", "rabbitmq-diagnostics -q ping"] - timeout: 10s - interval: 10s - retries: 3 - start_period: 10s # # If you need the database to be accessible from outside, please open the below port. - # # Furthermore, please, please, please, look at the documentation for rabbit on how to secure the service. + # # Furthermore, please, please, please, look at the documentation for lavinmq / rabbitmq on how to secure the service. # ports: # - "5672:5672" # - "15672:15672" # - "15692:15692" - image: rabbitmq:3-management + image: cloudamqp/lavinmq:latest + healthcheck: + test: ["CMD-SHELL", "lavinmqctl status"] + timeout: 10s + interval: 10s + retries: 3 + start_period: 10s + restart: unless-stopped networks: - knightcrawler-network - restart: unless-stopped volumes: - - rabbitmq:/var/lib/rabbitmq + - lavinmq:/var/lib/lavinmq/ ## The addon. This is what is used in stremio addon: @@ -87,7 +88,7 @@ services: condition: service_completed_successfully postgres: condition: service_healthy - rabbitmq: + lavinmq: condition: service_healthy redis: condition: service_healthy @@ -111,7 +112,7 @@ services: condition: service_completed_successfully postgres: condition: service_healthy - rabbitmq: + lavinmq: condition: service_healthy redis: condition: service_healthy @@ -132,7 +133,7 @@ services: condition: service_completed_successfully postgres: condition: service_healthy - rabbitmq: + lavinmq: condition: service_healthy redis: condition: service_healthy @@ -176,7 +177,7 @@ services: condition: service_completed_successfully postgres: condition: service_healthy - rabbitmq: + lavinmq: condition: service_healthy redis: condition: service_healthy @@ -191,6 +192,16 @@ services: ## QBit collector utilizes QBitTorrent to download metadata. qbitcollector: depends_on: + metadata: + condition: service_completed_successfully + migrator: + condition: service_completed_successfully + postgres: + condition: service_healthy + lavinmq: + condition: service_healthy + redis: + condition: service_healthy qbittorrent: condition: service_healthy deploy: diff --git a/deployment/docker/optional-services/grafana-metrics/config/prometheus/config.yml b/deployment/docker/optional-services/grafana-metrics/config/prometheus/config.yml index 8a9d923..e0b0159 100644 --- a/deployment/docker/optional-services/grafana-metrics/config/prometheus/config.yml +++ b/deployment/docker/optional-services/grafana-metrics/config/prometheus/config.yml @@ -16,7 +16,7 @@ rule_files: scrape_configs: - job_name: "rabbitmq" static_configs: - - targets: ["rabbitmq:15692"] + - targets: ["lavinmq:15692"] - job_name: "postgres-exporter" static_configs: - targets: ["postgres-exporter:9187"] diff --git a/deployment/docker/src/components/infrastructure.yaml b/deployment/docker/src/components/infrastructure.yaml index e4043b3..21295a1 100644 --- a/deployment/docker/src/components/infrastructure.yaml +++ b/deployment/docker/src/components/infrastructure.yaml @@ -4,8 +4,8 @@ x-basehealth: &base-health retries: 3 start_period: 10s -x-rabbithealth: &rabbitmq-health - test: rabbitmq-diagnostics -q ping +x-lavinhealth: &lavinmq-health + test: [ "CMD-SHELL", "lavinmqctl status" ] <<: *base-health x-redishealth: &redis-health @@ -52,21 +52,19 @@ services: networks: - knightcrawler-network - rabbitmq: - image: rabbitmq:3-management + lavinmq: + env_file: stack.env # # If you need the database to be accessible from outside, please open the below port. - # # Furthermore, please, please, please, look at the documentation for rabbit on how to secure the service. + # # Furthermore, please, please, please, look at the documentation for lavinmq / rabbitmq on how to secure the service. # ports: # - "5672:5672" # - "15672:15672" # - "15692:15692" - volumes: - - rabbitmq:/var/lib/rabbitmq + image: cloudamqp/lavinmq:latest + healthcheck: *lavinmq-health restart: unless-stopped - healthcheck: *rabbitmq-health - env_file: ../../.env - networks: - - knightcrawler-network + volumes: + - lavinmq:/var/lib/lavinmq/ ## QBitTorrent is a torrent client that can be used to download torrents. In this case its used to download metadata. ## The QBit collector requires this. diff --git a/deployment/docker/src/components/knightcrawler.yaml b/deployment/docker/src/components/knightcrawler.yaml index 6d4bcbb..7c68ee5 100644 --- a/deployment/docker/src/components/knightcrawler.yaml +++ b/deployment/docker/src/components/knightcrawler.yaml @@ -11,7 +11,7 @@ x-depends: &knightcrawler-app-depends condition: service_healthy postgres: condition: service_healthy - rabbitmq: + lavinmq: condition: service_healthy migrator: condition: service_completed_successfully diff --git a/deployment/docker/src/components/volumes.yaml b/deployment/docker/src/components/volumes.yaml index b6fa654..8497a6c 100644 --- a/deployment/docker/src/components/volumes.yaml +++ b/deployment/docker/src/components/volumes.yaml @@ -1,4 +1,4 @@ volumes: postgres: redis: - rabbitmq: \ No newline at end of file + lavinmq: \ No newline at end of file diff --git a/deployment/docker/stack.env b/deployment/docker/stack.env index 6897e52..a2b6bd0 100644 --- a/deployment/docker/stack.env +++ b/deployment/docker/stack.env @@ -13,8 +13,8 @@ REDIS_HOST=redis REDIS_PORT=6379 REDIS_EXTRA=abortConnect=false,allowAdmin=true -# RabbitMQ -RABBITMQ_HOST=rabbitmq +# AMQP +RABBITMQ_HOST=lavinmq RABBITMQ_USER=guest RABBITMQ_PASSWORD=guest RABBITMQ_CONSUMER_QUEUE_NAME=ingested diff --git a/src/producer/eng/install-python-reqs.ps1 b/src/producer/eng/install-python-reqs.ps1 index cb1275b..6b164eb 100644 --- a/src/producer/eng/install-python-reqs.ps1 +++ b/src/producer/eng/install-python-reqs.ps1 @@ -1,2 +1,3 @@ +remove-item -recurse -force ../src/python mkdir -p ../src/python -pip install --force-reinstall rank-torrent-name==0.1.6 -t ../src/python/ \ No newline at end of file +pip install -r ../src/requirements.txt -t ../src/python/ \ No newline at end of file diff --git a/src/producer/eng/install-python-reqs.sh b/src/producer/eng/install-python-reqs.sh index 16beb5c..94c43f9 100644 --- a/src/producer/eng/install-python-reqs.sh +++ b/src/producer/eng/install-python-reqs.sh @@ -1,4 +1,5 @@ #!/bin/bash +rm -rf ../src/python mkdir -p ../src/python -pip install --force-reinstall rank-torrent-name==0.1.6 -t ../src/python/ \ No newline at end of file +python3 -m pip install -r ../src/requirements.txt -t ../src/python/ \ No newline at end of file diff --git a/src/producer/src/Dockerfile b/src/producer/src/Dockerfile index b8cfda4..f91fd74 100644 --- a/src/producer/src/Dockerfile +++ b/src/producer/src/Dockerfile @@ -13,13 +13,19 @@ FROM mcr.microsoft.com/dotnet/aspnet:8.0-alpine3.19 WORKDIR /app ENV PYTHONUNBUFFERED=1 + RUN apk add --update --no-cache python3=~3.11.8-r0 py3-pip && ln -sf python3 /usr/bin/python COPY --from=build /src/out . + RUN rm -rf /app/python && mkdir -p /app/python -RUN pip3 install --force-reinstall rank-torrent-name==0.1.6 -t /app/python + +RUN pip3 install -r /app/requirements.txt -t /app/python + RUN addgroup -S producer && adduser -S -G producer producer + USER producer + HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ CMD pgrep -f dotnet || exit 1 diff --git a/src/producer/src/Producer.csproj b/src/producer/src/Producer.csproj index 49f7ccf..739caeb 100644 --- a/src/producer/src/Producer.csproj +++ b/src/producer/src/Producer.csproj @@ -33,6 +33,9 @@ Always + + Always + diff --git a/src/producer/src/requirements.txt b/src/producer/src/requirements.txt new file mode 100644 index 0000000..7d700a4 --- /dev/null +++ b/src/producer/src/requirements.txt @@ -0,0 +1 @@ +rank-torrent-name==0.1.8 \ No newline at end of file diff --git a/src/shared/GlobalUsings.cs b/src/shared/GlobalUsings.cs index f57716e..a652e84 100644 --- a/src/shared/GlobalUsings.cs +++ b/src/shared/GlobalUsings.cs @@ -1,5 +1,8 @@ // Global using directives +global using System.Collections.Concurrent; +global using System.Globalization; +global using System.Text; global using System.Text.Json; global using Dapper; global using MassTransit; diff --git a/src/shared/Python/IPythonEngineService.cs b/src/shared/Python/IPythonEngineService.cs new file mode 100644 index 0000000..5502e59 --- /dev/null +++ b/src/shared/Python/IPythonEngineService.cs @@ -0,0 +1,13 @@ +namespace SharedContracts.Python; + +public interface IPythonEngineService +{ + ILogger Logger { get; } + + Task InitializePythonEngine(CancellationToken cancellationToken); + T ExecuteCommandOrScript(string command, PyModule module, bool throwOnErrors); + T ExecutePythonOperation(Func operation, string operationName, bool throwOnErrors); + T ExecutePythonOperationWithDefault(Func operation, T? defaultValue, string operationName, bool throwOnErrors); + Task StopPythonEngine(CancellationToken cancellationToken); + dynamic? Sys { get; } +} \ No newline at end of file diff --git a/src/shared/Python/PythonEngineManager.cs b/src/shared/Python/PythonEngineManager.cs new file mode 100644 index 0000000..8b0e6c7 --- /dev/null +++ b/src/shared/Python/PythonEngineManager.cs @@ -0,0 +1,8 @@ +namespace SharedContracts.Python; + +public class PythonEngineManager(IPythonEngineService pythonEngineService) : IHostedService +{ + public Task StartAsync(CancellationToken cancellationToken) => pythonEngineService.InitializePythonEngine(cancellationToken); + + public Task StopAsync(CancellationToken cancellationToken) => pythonEngineService.StopPythonEngine(cancellationToken); +} \ No newline at end of file diff --git a/src/shared/Python/PythonEngineService.cs b/src/shared/Python/PythonEngineService.cs index ab80689..9ae513c 100644 --- a/src/shared/Python/PythonEngineService.cs +++ b/src/shared/Python/PythonEngineService.cs @@ -1,24 +1,28 @@ namespace SharedContracts.Python; -public class PythonEngineService(ILogger logger) : IHostedService +public class PythonEngineService(ILogger logger) : IPythonEngineService { private IntPtr _mainThreadState; private bool _isInitialized; - - public Task StartAsync(CancellationToken cancellationToken) + + public ILogger Logger { get; } = logger; + + public dynamic? Sys { get; private set; } + + public Task InitializePythonEngine(CancellationToken cancellationToken) { if (_isInitialized) { return Task.CompletedTask; } - + try { var pythonDllEnv = Environment.GetEnvironmentVariable("PYTHONNET_PYDLL"); - + if (string.IsNullOrWhiteSpace(pythonDllEnv)) { - logger.LogWarning("PYTHONNET_PYDLL env is not set. Exiting Application"); + Logger.LogWarning("PYTHONNET_PYDLL env is not set. Exiting Application"); Environment.Exit(1); return Task.CompletedTask; } @@ -26,24 +30,92 @@ public class PythonEngineService(ILogger logger) : IHostedS Runtime.PythonDLL = pythonDllEnv; PythonEngine.Initialize(); _mainThreadState = PythonEngine.BeginAllowThreads(); - + _isInitialized = true; - logger.LogInformation("Python engine initialized"); + Logger.LogInformation("Python engine initialized"); } catch (Exception e) { - logger.LogWarning(e, "Failed to initialize Python engine"); + Logger.LogError(e, $"Failed to initialize Python engine: {e.Message}"); Environment.Exit(1); } - + return Task.CompletedTask; } - public Task StopAsync(CancellationToken cancellationToken) + public T ExecuteCommandOrScript(string command, PyModule module, bool throwOnErrors) => + ExecutePythonOperation( + () => + { + var pyCompile = PythonEngine.Compile(command); + var nativeResult = module.Execute(pyCompile); + return nativeResult.As(); + }, nameof(ExecuteCommandOrScript), throwOnErrors); + + public T ExecutePythonOperation(Func operation, string operationName, bool throwOnErrors) => + ExecutePythonOperationWithDefault(operation, default, operationName, throwOnErrors); + + public T ExecutePythonOperationWithDefault(Func operation, T? defaultValue, string operationName, bool throwOnErrors) => + ExecutePythonOperationInternal(operation, defaultValue, operationName, throwOnErrors); + + public void ExecuteOnGIL(Action act, bool throwOnErrors) + { + Sys ??= LoadSys(); + + try + { + using var gil = Py.GIL(); + act(); + } + catch (Exception ex) + { + Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(ExecuteOnGIL)); + + if (throwOnErrors) + { + throw; + } + } + } + + public Task StopPythonEngine(CancellationToken cancellationToken) { PythonEngine.EndAllowThreads(_mainThreadState); PythonEngine.Shutdown(); - + return Task.CompletedTask; } + + private static dynamic LoadSys() + { + using var gil = Py.GIL(); + var sys = Py.Import("sys"); + + return sys; + } + + // ReSharper disable once EntityNameCapturedOnly.Local + private T ExecutePythonOperationInternal(Func operation, T? defaultValue, string operationName, bool throwOnErrors) + { + Sys ??= LoadSys(); + + var result = defaultValue; + + try + { + using var gil = Py.GIL(); + result = operation(); + } + catch (Exception ex) + { + Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(operationName)); + + if (throwOnErrors) + { + throw; + } + } + + return result; + } } \ No newline at end of file diff --git a/src/shared/Python/RTN/IRankTorrentName.cs b/src/shared/Python/RTN/IRankTorrentName.cs index 9029534..b529e15 100644 --- a/src/shared/Python/RTN/IRankTorrentName.cs +++ b/src/shared/Python/RTN/IRankTorrentName.cs @@ -3,6 +3,4 @@ namespace SharedContracts.Python.RTN; public interface IRankTorrentName { ParseTorrentTitleResponse Parse(string title); - bool IsTrash(string title); - bool TitleMatch(string title, string checkTitle); } \ No newline at end of file diff --git a/src/shared/Python/RTN/RankTorrentName.cs b/src/shared/Python/RTN/RankTorrentName.cs index fd7c492..83ec681 100644 --- a/src/shared/Python/RTN/RankTorrentName.cs +++ b/src/shared/Python/RTN/RankTorrentName.cs @@ -2,117 +2,53 @@ namespace SharedContracts.Python.RTN; public class RankTorrentName : IRankTorrentName { - private const string SysModuleName = "sys"; + private readonly IPythonEngineService _pythonEngineService; private const string RtnModuleName = "RTN"; - private readonly ILogger _logger; - private dynamic? _sys; private dynamic? _rtn; - public RankTorrentName(ILogger logger) + public RankTorrentName(IPythonEngineService pythonEngineService) { - _logger = logger; + _pythonEngineService = pythonEngineService; InitModules(); } - - public ParseTorrentTitleResponse Parse(string title) - { - try - { - using var py = Py.GIL(); - var result = _rtn?.parse(title); - - if (result == null) + public ParseTorrentTitleResponse Parse(string title) => + _pythonEngineService.ExecutePythonOperation( + () => { - return new(false, string.Empty, 0); - } - - return ParseResult(result); - } - catch (Exception e) - { - _logger.LogError(e, "Failed to parse title"); - return new(false, string.Empty, 0); - } - } - - public bool IsTrash(string title) - { - try - { - using var py = Py.GIL(); - var result = _rtn?.check_trash(title); - - if (result == null) - { - return false; - } - - var response = result.As() ?? false; - - return response; - } - catch (Exception e) - { - _logger.LogError(e, "Failed to parse title"); - return false; - } - } - - public bool TitleMatch(string title, string checkTitle) - { - try - { - using var py = Py.GIL(); - var result = _rtn?.title_match(title, checkTitle); - - if (result == null) - { - return false; - } - - var response = result.As() ?? false; - - return response; - } - catch (Exception e) - { - _logger.LogError(e, "Failed to parse title"); - return false; - } - } - + var result = _rtn?.parse(title); + return ParseResult(result); + }, nameof(Parse), throwOnErrors: false); private static ParseTorrentTitleResponse ParseResult(dynamic result) { + if (result == null) + { + return new(false, string.Empty, 0); + } + var parsedTitle = result.GetAttr("parsed_title")?.As() ?? string.Empty; var year = result.GetAttr("year")?.As() ?? 0; - var seasonList = result.GetAttr("season")?.As(); - var episodeList = result.GetAttr("episode")?.As(); - int[]? seasons = seasonList?.Length() > 0 ? seasonList.As() : null; - int[]? episodes = episodeList?.Length() > 0 ? episodeList.As() : null; + var seasons = GetIntArray(result, "season"); + var episodes = GetIntArray(result, "episode"); return new ParseTorrentTitleResponse(true, parsedTitle, year, seasons, episodes); } - private void InitModules() + private static int[]? GetIntArray(dynamic result, string field) { - using var py = Py.GIL(); - _sys = Py.Import(SysModuleName); - - if (_sys == null) - { - _logger.LogError($"Failed to import Python module: {SysModuleName}"); - return; - } - - _sys.path.append(Path.Combine(AppContext.BaseDirectory, "python")); - - _rtn = Py.Import(RtnModuleName); - if (_rtn == null) - { - _logger.LogError($"Failed to import Python module: {RtnModuleName}"); - } + var theList = result.GetAttr(field)?.As(); + int[]? results = theList?.Length() > 0 ? theList.As() : null; + + return results; } + + private void InitModules() => + _rtn = + _pythonEngineService.ExecutePythonOperation(() => + { + _pythonEngineService.Sys.path.append(Path.Combine(AppContext.BaseDirectory, "python")); + return Py.Import(RtnModuleName); + }, nameof(InitModules), throwOnErrors: false); } \ No newline at end of file diff --git a/src/shared/Python/ServiceCollectionExtensions.cs b/src/shared/Python/ServiceCollectionExtensions.cs index 0fe2503..362193a 100644 --- a/src/shared/Python/ServiceCollectionExtensions.cs +++ b/src/shared/Python/ServiceCollectionExtensions.cs @@ -4,9 +4,8 @@ public static class ServiceCollectionExtensions { public static IServiceCollection RegisterPythonEngine(this IServiceCollection services) { - services.AddSingleton(); - - services.AddHostedService(p => p.GetRequiredService()); + services.AddSingleton(); + services.AddHostedService(); return services; }