18 Commits

Author SHA1 Message Date
iPromKnight
e6a63fd72e Allow configuration of producer urls (#203)
* Allow configuration of urls in scrapers by mounting the scrapers.json file over the one in the container

* version bump
2024-04-11 18:23:42 +01:00
iPromKnight
02101ac50a Allow qbit concurrency to be configurable (#200) 2024-04-11 18:02:29 +01:00
iPromKnight
3c8ffd5082 Fix Duplicates (#199)
* Fix Duplicates

* Version
2024-04-02 20:31:22 +01:00
iPromKnight
79e0a0f102 DMM Offline (#198)
* Process DMM all locally

single call to github to download the repo archive.
remove need for PAT
update RTN to 0.2.13
change to batch_parse for title parsing from RTN

* introduce concurrent dictionary, and parallelism
2024-04-02 17:01:22 +01:00
purple_emily
6181207513 Fix incorrect file index stored (#197)
* Fix incorrect file index stored

* Update `rank-torrent-name` to latest version

* Knight Crawler version update
2024-04-01 23:08:32 +01:00
iPromKnight
684dbba2f0 RTN-025 and title category parsing (#195)
* update rtn to 025

* Implement movie / show type parsing

* switch to RTN in collectors

* ensure env for pythonnet is loaded, and that requirements copy for qbit

* version bump
2024-03-31 22:01:09 +01:00
iPromKnight
c75ecd2707 add qbit housekeeping service to remove stale torrents (#193)
* Add housekeeping service to clean stale torrents

* version bump
2024-03-30 11:52:23 +00:00
iPromKnight
c493ef3376 Hotfix category, and roll back RTN to 0.1.8 (#192)
* Hotfix categories

Also roll back RTN to 0.1.8 as regression introduced in 0.2

* bump version
2024-03-30 04:47:36 +00:00
iPromKnight
655a39e35c patch the query with execute (#191) 2024-03-30 01:54:06 +00:00
iPromKnight
cfeee62f6b patch ratio (#190)
* add configurable threshold, default 0.95

* version bump
2024-03-30 01:43:21 +00:00
iPromKnight
c6d4c06d70 hotfix categories from imdb result instead (#189)
* category mapping from imdb

* version bump
2024-03-30 01:26:02 +00:00
iPromKnight
08639a3254 Patch isMovie (#188)
* fix is movie

* version bump
2024-03-30 00:28:35 +00:00
iPromKnight
d430850749 Patch message contract names (#187)
* ensure unique message contract names per collector type

* version bump
2024-03-30 00:09:13 +00:00
iPromKnight
82c0ea459b change qbittorrent settings (#186) 2024-03-29 23:35:27 +00:00
iPromKnight
1e83b4c5d8 Patch the addon (#185) 2024-03-29 19:08:17 +00:00
iPromKnight
66609c2a46 trigram performance increased and housekeeping (#184)
* add new indexes, and change year column to int

* Change gist to gin, and change year to int

* Producer changes for new gin query

* Fully map the rtn response using json dump from Pydantic

Also updates Rtn to 0.1.9

* Add housekeeping script to reconcile imdb ids.

* Join Torrent onto the ingested torrent table

Ensure that a torrent can always find the details of where it came from, and how it was parsed.

* Version bump for release

* missing quote on table name
2024-03-29 19:01:48 +00:00
iPromKnight
2d78dc2735 version bump for release (#183) 2024-03-28 23:37:35 +00:00
iPromKnight
527d6cdf15 Upgrade RTN to 0.1.8, replace rabbitmq with drop in replacement lavinmq - better performance, lower resource usage. (#182) 2024-03-28 23:35:41 +00:00
91 changed files with 1210 additions and 466 deletions

4
.gitignore vendored
View File

@@ -612,3 +612,7 @@ fabric.properties
# Mac directory indexes # Mac directory indexes
.DS_Store .DS_Store
deployment/docker/stack.env deployment/docker/stack.env
src/producer/src/python/
src/debrid-collector/python/
src/qbit-collector/python/

View File

@@ -12,11 +12,16 @@ enabled=false
program= program=
[BitTorrent] [BitTorrent]
Session\AnonymousModeEnabled=true
Session\BTProtocol=TCP
Session\ConnectionSpeed=150
Session\DefaultSavePath=/downloads/ Session\DefaultSavePath=/downloads/
Session\ExcludedFileNames= Session\ExcludedFileNames=
Session\MaxActiveDownloads=10 Session\MaxActiveCheckingTorrents=20
Session\MaxActiveDownloads=20
Session\MaxActiveTorrents=50 Session\MaxActiveTorrents=50
Session\MaxActiveUploads=50 Session\MaxActiveUploads=50
Session\MaxConcurrentHTTPAnnounces=1000
Session\MaxConnections=2000 Session\MaxConnections=2000
Session\Port=6881 Session\Port=6881
Session\QueueingSystemEnabled=true Session\QueueingSystemEnabled=true
@@ -50,9 +55,10 @@ MailNotification\req_auth=true
WebUI\Address=* WebUI\Address=*
WebUI\AuthSubnetWhitelist=0.0.0.0/0 WebUI\AuthSubnetWhitelist=0.0.0.0/0
WebUI\AuthSubnetWhitelistEnabled=true WebUI\AuthSubnetWhitelistEnabled=true
WebUI\HostHeaderValidation=false
WebUI\LocalHostAuth=false WebUI\LocalHostAuth=false
WebUI\ServerDomains=* WebUI\ServerDomains=*
[RSS] [RSS]
AutoDownloader\DownloadRepacks=true AutoDownloader\DownloadRepacks=true
AutoDownloader\SmartEpisodeFilter=s(\\d+)e(\\d+), (\\d+)x(\\d+), "(\\d{4}[.\\-]\\d{1,2}[.\\-]\\d{1,2})", "(\\d{1,2}[.\\-]\\d{1,2}[.\\-]\\d{4})" AutoDownloader\SmartEpisodeFilter=s(\\d+)e(\\d+), (\\d+)x(\\d+), "(\\d{4}[.\\-]\\d{1,2}[.\\-]\\d{1,2})", "(\\d{1,2}[.\\-]\\d{1,2}[.\\-]\\d{4})"

View File

@@ -9,7 +9,7 @@ networks:
volumes: volumes:
postgres: postgres:
rabbitmq: lavinmq:
redis: redis:
services: services:
@@ -55,28 +55,29 @@ services:
volumes: volumes:
- redis:/data - redis:/data
## RabbitMQ is used as a message broker for the services. ## LavinMQ is used as a message broker for the services.
## It is a high performance drop in replacement for RabbitMQ.
## It is used to communicate between the services. ## It is used to communicate between the services.
rabbitmq: lavinmq:
env_file: stack.env env_file: stack.env
healthcheck:
test: ["CMD-SHELL", "rabbitmq-diagnostics -q ping"]
timeout: 10s
interval: 10s
retries: 3
start_period: 10s
# # If you need the database to be accessible from outside, please open the below port. # # If you need the database to be accessible from outside, please open the below port.
# # Furthermore, please, please, please, look at the documentation for rabbit on how to secure the service. # # Furthermore, please, please, please, look at the documentation for lavinmq / rabbitmq on how to secure the service.
# ports: # ports:
# - "5672:5672" # - "5672:5672"
# - "15672:15672" # - "15672:15672"
# - "15692:15692" # - "15692:15692"
image: rabbitmq:3-management image: cloudamqp/lavinmq:latest
healthcheck:
test: ["CMD-SHELL", "lavinmqctl status"]
timeout: 10s
interval: 10s
retries: 3
start_period: 10s
restart: unless-stopped
networks: networks:
- knightcrawler-network - knightcrawler-network
restart: unless-stopped
volumes: volumes:
- rabbitmq:/var/lib/rabbitmq - lavinmq:/var/lib/lavinmq/
## The addon. This is what is used in stremio ## The addon. This is what is used in stremio
addon: addon:
@@ -87,13 +88,13 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
hostname: knightcrawler-addon hostname: knightcrawler-addon
image: gabisonfire/knightcrawler-addon:2.0.8 image: gabisonfire/knightcrawler-addon:2.0.23
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -111,12 +112,12 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-consumer:2.0.8 image: gabisonfire/knightcrawler-consumer:2.0.23
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -132,12 +133,12 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-debrid-collector:2.0.8 image: gabisonfire/knightcrawler-debrid-collector:2.0.23
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -151,7 +152,7 @@ services:
migrator: migrator:
condition: service_completed_successfully condition: service_completed_successfully
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-metadata:2.0.8 image: gabisonfire/knightcrawler-metadata:2.0.23
networks: networks:
- knightcrawler-network - knightcrawler-network
restart: "no" restart: "no"
@@ -162,7 +163,7 @@ services:
postgres: postgres:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-migrator:2.0.8 image: gabisonfire/knightcrawler-migrator:2.0.23
networks: networks:
- knightcrawler-network - knightcrawler-network
restart: "no" restart: "no"
@@ -176,12 +177,12 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
redis: redis:
condition: service_healthy condition: service_healthy
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-producer:2.0.8 image: gabisonfire/knightcrawler-producer:2.0.23
labels: labels:
logging: promtail logging: promtail
networks: networks:
@@ -191,12 +192,22 @@ services:
## QBit collector utilizes QBitTorrent to download metadata. ## QBit collector utilizes QBitTorrent to download metadata.
qbitcollector: qbitcollector:
depends_on: depends_on:
metadata:
condition: service_completed_successfully
migrator:
condition: service_completed_successfully
postgres:
condition: service_healthy
lavinmq:
condition: service_healthy
redis:
condition: service_healthy
qbittorrent: qbittorrent:
condition: service_healthy condition: service_healthy
deploy: deploy:
replicas: ${QBIT_REPLICAS:-0} replicas: ${QBIT_REPLICAS:-0}
env_file: stack.env env_file: stack.env
image: gabisonfire/knightcrawler-qbit-collector:2.0.8 image: gabisonfire/knightcrawler-qbit-collector:2.0.23
labels: labels:
logging: promtail logging: promtail
networks: networks:

View File

@@ -16,7 +16,7 @@ rule_files:
scrape_configs: scrape_configs:
- job_name: "rabbitmq" - job_name: "rabbitmq"
static_configs: static_configs:
- targets: ["rabbitmq:15692"] - targets: ["lavinmq:15692"]
- job_name: "postgres-exporter" - job_name: "postgres-exporter"
static_configs: static_configs:
- targets: ["postgres-exporter:9187"] - targets: ["postgres-exporter:9187"]

View File

@@ -4,8 +4,8 @@ x-basehealth: &base-health
retries: 3 retries: 3
start_period: 10s start_period: 10s
x-rabbithealth: &rabbitmq-health x-lavinhealth: &lavinmq-health
test: rabbitmq-diagnostics -q ping test: [ "CMD-SHELL", "lavinmqctl status" ]
<<: *base-health <<: *base-health
x-redishealth: &redis-health x-redishealth: &redis-health
@@ -52,21 +52,19 @@ services:
networks: networks:
- knightcrawler-network - knightcrawler-network
rabbitmq: lavinmq:
image: rabbitmq:3-management env_file: stack.env
# # If you need the database to be accessible from outside, please open the below port. # # If you need the database to be accessible from outside, please open the below port.
# # Furthermore, please, please, please, look at the documentation for rabbit on how to secure the service. # # Furthermore, please, please, please, look at the documentation for lavinmq / rabbitmq on how to secure the service.
# ports: # ports:
# - "5672:5672" # - "5672:5672"
# - "15672:15672" # - "15672:15672"
# - "15692:15692" # - "15692:15692"
volumes: image: cloudamqp/lavinmq:latest
- rabbitmq:/var/lib/rabbitmq healthcheck: *lavinmq-health
restart: unless-stopped restart: unless-stopped
healthcheck: *rabbitmq-health volumes:
env_file: ../../.env - lavinmq:/var/lib/lavinmq/
networks:
- knightcrawler-network
## QBitTorrent is a torrent client that can be used to download torrents. In this case its used to download metadata. ## QBitTorrent is a torrent client that can be used to download torrents. In this case its used to download metadata.
## The QBit collector requires this. ## The QBit collector requires this.

View File

@@ -11,7 +11,7 @@ x-depends: &knightcrawler-app-depends
condition: service_healthy condition: service_healthy
postgres: postgres:
condition: service_healthy condition: service_healthy
rabbitmq: lavinmq:
condition: service_healthy condition: service_healthy
migrator: migrator:
condition: service_completed_successfully condition: service_completed_successfully
@@ -20,7 +20,7 @@ x-depends: &knightcrawler-app-depends
services: services:
metadata: metadata:
image: gabisonfire/knightcrawler-metadata:2.0.8 image: gabisonfire/knightcrawler-metadata:2.0.23
env_file: ../../.env env_file: ../../.env
networks: networks:
- knightcrawler-network - knightcrawler-network
@@ -30,7 +30,7 @@ services:
condition: service_completed_successfully condition: service_completed_successfully
migrator: migrator:
image: gabisonfire/knightcrawler-migrator:2.0.8 image: gabisonfire/knightcrawler-migrator:2.0.23
env_file: ../../.env env_file: ../../.env
networks: networks:
- knightcrawler-network - knightcrawler-network
@@ -40,7 +40,7 @@ services:
condition: service_healthy condition: service_healthy
addon: addon:
image: gabisonfire/knightcrawler-addon:2.0.8 image: gabisonfire/knightcrawler-addon:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
hostname: knightcrawler-addon hostname: knightcrawler-addon
@@ -48,22 +48,22 @@ services:
- "7000:7000" - "7000:7000"
consumer: consumer:
image: gabisonfire/knightcrawler-consumer:2.0.8 image: gabisonfire/knightcrawler-consumer:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
debridcollector: debridcollector:
image: gabisonfire/knightcrawler-debrid-collector:2.0.8 image: gabisonfire/knightcrawler-debrid-collector:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
producer: producer:
image: gabisonfire/knightcrawler-producer:2.0.8 image: gabisonfire/knightcrawler-producer:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
qbitcollector: qbitcollector:
image: gabisonfire/knightcrawler-qbit-collector:2.0.8 image: gabisonfire/knightcrawler-qbit-collector:2.0.23
<<: [*knightcrawler-app, *knightcrawler-app-depends] <<: [*knightcrawler-app, *knightcrawler-app-depends]
restart: unless-stopped restart: unless-stopped
depends_on: depends_on:

View File

@@ -1,4 +1,4 @@
volumes: volumes:
postgres: postgres:
redis: redis:
rabbitmq: lavinmq:

View File

@@ -13,8 +13,8 @@ REDIS_HOST=redis
REDIS_PORT=6379 REDIS_PORT=6379
REDIS_EXTRA=abortConnect=false,allowAdmin=true REDIS_EXTRA=abortConnect=false,allowAdmin=true
# RabbitMQ # AMQP
RABBITMQ_HOST=rabbitmq RABBITMQ_HOST=lavinmq
RABBITMQ_USER=guest RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest RABBITMQ_PASSWORD=guest
RABBITMQ_CONSUMER_QUEUE_NAME=ingested RABBITMQ_CONSUMER_QUEUE_NAME=ingested
@@ -32,12 +32,10 @@ COLLECTOR_DEBRID_ENABLED=true
COLLECTOR_REAL_DEBRID_API_KEY= COLLECTOR_REAL_DEBRID_API_KEY=
QBIT_HOST=http://qbittorrent:8080 QBIT_HOST=http://qbittorrent:8080
QBIT_TRACKERS_URL=https://raw.githubusercontent.com/ngosang/trackerslist/master/trackers_all_http.txt QBIT_TRACKERS_URL=https://raw.githubusercontent.com/ngosang/trackerslist/master/trackers_all_http.txt
QBIT_CONCURRENCY=8
# Number of replicas for the qBittorrent collector and qBitTorrent client. Should be 0 or 1. # Number of replicas for the qBittorrent collector and qBitTorrent client. Should be 0 or 1.
QBIT_REPLICAS=0 QBIT_REPLICAS=0
# Addon # Addon
DEBUG_MODE=false DEBUG_MODE=false
# Producer
GITHUB_PAT=

View File

@@ -14,13 +14,12 @@ const Torrent = database.define('torrent',
{ {
infoHash: { type: Sequelize.STRING(64), primaryKey: true }, infoHash: { type: Sequelize.STRING(64), primaryKey: true },
provider: { type: Sequelize.STRING(32), allowNull: false }, provider: { type: Sequelize.STRING(32), allowNull: false },
torrentId: { type: Sequelize.STRING(128) }, ingestedTorrentId: { type: Sequelize.BIGINT, allowNull: false },
title: { type: Sequelize.STRING(256), allowNull: false }, title: { type: Sequelize.STRING(256), allowNull: false },
size: { type: Sequelize.BIGINT }, size: { type: Sequelize.BIGINT },
type: { type: Sequelize.STRING(16), allowNull: false }, type: { type: Sequelize.STRING(16), allowNull: false },
uploadDate: { type: Sequelize.DATE, allowNull: false }, uploadDate: { type: Sequelize.DATE, allowNull: false },
seeders: { type: Sequelize.SMALLINT }, seeders: { type: Sequelize.SMALLINT },
trackers: { type: Sequelize.STRING(4096) },
languages: { type: Sequelize.STRING(4096) }, languages: { type: Sequelize.STRING(4096) },
resolution: { type: Sequelize.STRING(16) } resolution: { type: Sequelize.STRING(16) }
} }

View File

@@ -17,7 +17,6 @@
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.3" /> <PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.3" />
<PackageReference Include="Polly" Version="8.3.1" /> <PackageReference Include="Polly" Version="8.3.1" />
<PackageReference Include="PromKnight.ParseTorrentTitle" Version="1.0.4" />
<PackageReference Include="Serilog" Version="3.1.1" /> <PackageReference Include="Serilog" Version="3.1.1" />
<PackageReference Include="Serilog.AspNetCore" Version="8.0.1" /> <PackageReference Include="Serilog.AspNetCore" Version="8.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" /> <PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />
@@ -29,10 +28,30 @@
<None Include="Configuration\logging.json"> <None Include="Configuration\logging.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory> <CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None> </None>
<None Update="requirements.txt">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<Content Remove="eng\**" />
<None Remove="eng\**" />
</ItemGroup>
<ItemGroup Condition="'$(Configuration)' == 'Debug'">
<Content Remove="python\**" />
<None Include="python\**">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\shared\SharedContracts.csproj" /> <ProjectReference Include="..\shared\SharedContracts.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Compile Remove="eng\**" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Remove="eng\**" />
</ItemGroup>
</Project> </Project>

View File

@@ -6,6 +6,12 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SharedContracts", "..\share
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "shared", "shared", "{2C0A0F53-28E6-404F-9EFE-DADFBEF8338B}" Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "shared", "shared", "{2C0A0F53-28E6-404F-9EFE-DADFBEF8338B}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "eng", "eng", "{72A042C3-B4F3-45C5-AC20-041FE8F41EFC}"
ProjectSection(SolutionItems) = preProject
eng\install-python-reqs.ps1 = eng\install-python-reqs.ps1
eng\install-python-reqs.sh = eng\install-python-reqs.sh
EndProjectSection
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU

View File

@@ -9,12 +9,23 @@ RUN dotnet restore -a $TARGETARCH
RUN dotnet publish -c Release --no-restore -o /src/out -a $TARGETARCH RUN dotnet publish -c Release --no-restore -o /src/out -a $TARGETARCH
FROM mcr.microsoft.com/dotnet/aspnet:8.0-alpine FROM mcr.microsoft.com/dotnet/aspnet:8.0-alpine3.19
WORKDIR /app WORKDIR /app
ENV PYTHONUNBUFFERED=1
RUN apk add --update --no-cache python3=~3.11.8-r0 py3-pip && ln -sf python3 /usr/bin/python
COPY --from=build /src/out . COPY --from=build /src/out .
RUN rm -rf /app/python && mkdir -p /app/python
RUN pip3 install -r /app/requirements.txt -t /app/python
RUN addgroup -S debrid && adduser -S -G debrid debrid RUN addgroup -S debrid && adduser -S -G debrid debrid
USER debrid USER debrid
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD pgrep -f dotnet || exit 1 CMD pgrep -f dotnet || exit 1
ENV PYTHONNET_PYDLL=/usr/lib/libpython3.11.so.1.0
ENTRYPOINT ["dotnet", "DebridCollector.dll"] ENTRYPOINT ["dotnet", "DebridCollector.dll"]

View File

@@ -1,5 +1,3 @@
using DebridCollector.Features.Configuration;
namespace DebridCollector.Extensions; namespace DebridCollector.Extensions;
public static class ServiceCollectionExtensions public static class ServiceCollectionExtensions
@@ -17,7 +15,8 @@ public static class ServiceCollectionExtensions
var serviceConfiguration = services.LoadConfigurationFromEnv<DebridCollectorConfiguration>(); var serviceConfiguration = services.LoadConfigurationFromEnv<DebridCollectorConfiguration>();
services.AddRealDebridClient(serviceConfiguration); services.AddRealDebridClient(serviceConfiguration);
services.AddSingleton<IParseTorrentTitle, ParseTorrentTitle>(); services.RegisterPythonEngine();
services.AddSingleton<IRankTorrentName, RankTorrentName>();
services.AddHostedService<DebridRequestProcessor>(); services.AddHostedService<DebridRequestProcessor>();
return services; return services;

View File

@@ -1,6 +1,4 @@
using DebridCollector.Features.Configuration; namespace DebridCollector.Features.Debrid;
namespace DebridCollector.Features.Debrid;
public static class ServiceCollectionExtensions public static class ServiceCollectionExtensions
{ {

View File

@@ -3,10 +3,11 @@ namespace DebridCollector.Features.Worker;
public static class DebridMetaToTorrentMeta public static class DebridMetaToTorrentMeta
{ {
public static IReadOnlyList<TorrentFile> MapMetadataToFilesCollection( public static IReadOnlyList<TorrentFile> MapMetadataToFilesCollection(
IParseTorrentTitle torrentTitle, IRankTorrentName rankTorrentName,
Torrent torrent, Torrent torrent,
string ImdbId, string ImdbId,
FileDataDictionary Metadata) FileDataDictionary Metadata,
ILogger<WriteMetadataConsumer> logger)
{ {
try try
{ {
@@ -15,34 +16,42 @@ public static class DebridMetaToTorrentMeta
foreach (var metadataEntry in Metadata.Where(m => Filetypes.VideoFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext)))) foreach (var metadataEntry in Metadata.Where(m => Filetypes.VideoFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext))))
{ {
var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex); var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex);
var fileIndexMinusOne = Math.Max(0, fileIndex - 1);
var file = new TorrentFile var file = new TorrentFile
{ {
ImdbId = ImdbId, ImdbId = ImdbId,
KitsuId = 0, KitsuId = 0,
InfoHash = torrent.InfoHash, InfoHash = torrent.InfoHash,
FileIndex = validFileIndex ? fileIndex : 0, FileIndex = validFileIndex ? fileIndexMinusOne : 0,
Title = metadataEntry.Value.Filename, Title = metadataEntry.Value.Filename,
Size = metadataEntry.Value.Filesize.GetValueOrDefault(), Size = metadataEntry.Value.Filesize.GetValueOrDefault(),
}; };
var parsedTitle = torrentTitle.Parse(file.Title); var parsedTitle = rankTorrentName.Parse(file.Title, false);
file.ImdbSeason = parsedTitle.Seasons.FirstOrDefault(); if (!parsedTitle.Success)
file.ImdbEpisode = parsedTitle.Episodes.FirstOrDefault(); {
logger.LogWarning("Failed to parse title {Title} for metadata mapping", file.Title);
continue;
}
file.ImdbSeason = parsedTitle.Response?.Season?.FirstOrDefault() ?? 0;
file.ImdbEpisode = parsedTitle.Response?.Episode?.FirstOrDefault() ?? 0;
files.Add(file); files.Add(file);
} }
return files; return files;
} }
catch (Exception) catch (Exception ex)
{ {
logger.LogWarning("Failed to map metadata to files collection: {Exception}", ex.Message);
return []; return [];
} }
} }
public static async Task<IReadOnlyList<SubtitleFile>> MapMetadataToSubtitlesCollection(IDataStorage storage, string InfoHash, FileDataDictionary Metadata) public static async Task<IReadOnlyList<SubtitleFile>> MapMetadataToSubtitlesCollection(IDataStorage storage, string InfoHash, FileDataDictionary Metadata, ILogger<WriteMetadataConsumer> logger)
{ {
try try
{ {
@@ -58,13 +67,14 @@ public static class DebridMetaToTorrentMeta
foreach (var metadataEntry in Metadata.Where(m => Filetypes.SubtitleFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext)))) foreach (var metadataEntry in Metadata.Where(m => Filetypes.SubtitleFileExtensions.Any(ext => m.Value.Filename.EndsWith(ext))))
{ {
var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex); var validFileIndex = int.TryParse(metadataEntry.Key, out var fileIndex);
var fileIndexMinusOne = Math.Max(0, fileIndex - 1);
var fileId = torrentFiles.FirstOrDefault( var fileId = torrentFiles.FirstOrDefault(
t => Path.GetFileNameWithoutExtension(t.Title) == Path.GetFileNameWithoutExtension(metadataEntry.Value.Filename))?.Id ?? 0; t => Path.GetFileNameWithoutExtension(t.Title) == Path.GetFileNameWithoutExtension(metadataEntry.Value.Filename))?.Id ?? 0;
var file = new SubtitleFile var file = new SubtitleFile
{ {
InfoHash = InfoHash, InfoHash = InfoHash,
FileIndex = validFileIndex ? fileIndex : 0, FileIndex = validFileIndex ? fileIndexMinusOne : 0,
FileId = fileId, FileId = fileId,
Title = metadataEntry.Value.Filename, Title = metadataEntry.Value.Filename,
}; };
@@ -74,8 +84,9 @@ public static class DebridMetaToTorrentMeta
return files; return files;
} }
catch (Exception) catch (Exception ex)
{ {
logger.LogWarning("Failed to map metadata to subtitles collection: {Exception}", ex.Message);
return []; return [];
} }
} }

View File

@@ -53,6 +53,12 @@ public class InfohashMetadataSagaStateMachine : MassTransitStateMachine<Infohash
.Then( .Then(
context => context =>
{ {
if (!context.Message.WithFiles)
{
logger.LogInformation("No files written for torrent {InfoHash} in Saga {SagaId}", context.Saga.Torrent.InfoHash, context.Saga.CorrelationId);
return;
}
logger.LogInformation("Metadata Written for torrent {InfoHash} in Saga {SagaId}", context.Saga.Torrent.InfoHash, context.Saga.CorrelationId); logger.LogInformation("Metadata Written for torrent {InfoHash} in Saga {SagaId}", context.Saga.Torrent.InfoHash, context.Saga.CorrelationId);
}) })
.TransitionTo(Completed) .TransitionTo(Completed)

View File

@@ -1,22 +1,22 @@
namespace DebridCollector.Features.Worker; namespace DebridCollector.Features.Worker;
[EntityName("perform-metadata-request")] [EntityName("perform-metadata-request-debrid-collector")]
public record PerformMetadataRequest(Guid CorrelationId, string InfoHash) : CorrelatedBy<Guid>; public record PerformMetadataRequest(Guid CorrelationId, string InfoHash) : CorrelatedBy<Guid>;
[EntityName("torrent-metadata-response")] [EntityName("torrent-metadata-response-debrid-collector")]
public record GotMetadata(TorrentMetadataResponse Metadata) : CorrelatedBy<Guid> public record GotMetadata(TorrentMetadataResponse Metadata) : CorrelatedBy<Guid>
{ {
public Guid CorrelationId { get; init; } = Metadata.CorrelationId; public Guid CorrelationId { get; init; } = Metadata.CorrelationId;
} }
[EntityName("write-metadata")] [EntityName("write-metadata-debrid-collector")]
public record WriteMetadata(Torrent Torrent, TorrentMetadataResponse Metadata, string ImdbId) : CorrelatedBy<Guid> public record WriteMetadata(Torrent Torrent, TorrentMetadataResponse Metadata, string ImdbId) : CorrelatedBy<Guid>
{ {
public Guid CorrelationId { get; init; } = Metadata.CorrelationId; public Guid CorrelationId { get; init; } = Metadata.CorrelationId;
} }
[EntityName("metadata-written")] [EntityName("metadata-written-debrid-colloctor")]
public record MetadataWritten(TorrentMetadataResponse Metadata) : CorrelatedBy<Guid> public record MetadataWritten(TorrentMetadataResponse Metadata, bool WithFiles) : CorrelatedBy<Guid>
{ {
public Guid CorrelationId { get; init; } = Metadata.CorrelationId; public Guid CorrelationId { get; init; } = Metadata.CorrelationId;
} }

View File

@@ -1,25 +1,28 @@
namespace DebridCollector.Features.Worker; namespace DebridCollector.Features.Worker;
public class WriteMetadataConsumer(IParseTorrentTitle parseTorrentTitle, IDataStorage dataStorage) : IConsumer<WriteMetadata> public class WriteMetadataConsumer(IRankTorrentName rankTorrentName, IDataStorage dataStorage, ILogger<WriteMetadataConsumer> logger) : IConsumer<WriteMetadata>
{ {
public async Task Consume(ConsumeContext<WriteMetadata> context) public async Task Consume(ConsumeContext<WriteMetadata> context)
{ {
var request = context.Message; var request = context.Message;
var torrentFiles = DebridMetaToTorrentMeta.MapMetadataToFilesCollection(parseTorrentTitle, request.Torrent, request.ImdbId, request.Metadata.Metadata); var torrentFiles = DebridMetaToTorrentMeta.MapMetadataToFilesCollection(rankTorrentName, request.Torrent, request.ImdbId, request.Metadata.Metadata, logger);
if (torrentFiles.Any()) if (!torrentFiles.Any())
{ {
await dataStorage.InsertFiles(torrentFiles); await context.Publish(new MetadataWritten(request.Metadata, false));
return;
var subtitles = await DebridMetaToTorrentMeta.MapMetadataToSubtitlesCollection(dataStorage, request.Torrent.InfoHash, request.Metadata.Metadata);
if (subtitles.Any())
{
await dataStorage.InsertSubtitles(subtitles);
}
} }
await context.Publish(new MetadataWritten(request.Metadata)); await dataStorage.InsertFiles(torrentFiles);
var subtitles = await DebridMetaToTorrentMeta.MapMetadataToSubtitlesCollection(dataStorage, request.Torrent.InfoHash, request.Metadata.Metadata, logger);
if (subtitles.Any())
{
await dataStorage.InsertSubtitles(subtitles);
}
await context.Publish(new MetadataWritten(request.Metadata, true));
} }
} }

View File

@@ -4,17 +4,18 @@ global using System.Text.Json;
global using System.Text.Json.Serialization; global using System.Text.Json.Serialization;
global using System.Threading.Channels; global using System.Threading.Channels;
global using DebridCollector.Extensions; global using DebridCollector.Extensions;
global using DebridCollector.Features.Configuration;
global using DebridCollector.Features.Debrid; global using DebridCollector.Features.Debrid;
global using DebridCollector.Features.Worker; global using DebridCollector.Features.Worker;
global using MassTransit; global using MassTransit;
global using MassTransit.Mediator;
global using Microsoft.AspNetCore.Builder; global using Microsoft.AspNetCore.Builder;
global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.DependencyInjection;
global using Polly; global using Polly;
global using Polly.Extensions.Http; global using Polly.Extensions.Http;
global using PromKnight.ParseTorrentTitle;
global using SharedContracts.Configuration; global using SharedContracts.Configuration;
global using SharedContracts.Dapper; global using SharedContracts.Dapper;
global using SharedContracts.Extensions; global using SharedContracts.Extensions;
global using SharedContracts.Models; global using SharedContracts.Models;
global using SharedContracts.Python;
global using SharedContracts.Python.RTN;
global using SharedContracts.Requests; global using SharedContracts.Requests;

View File

@@ -0,0 +1,2 @@
mkdir -p ../python
python -m pip install -r ../requirements.txt -t ../python/

View File

@@ -0,0 +1,5 @@
#!/bin/bash
rm -rf ../python
mkdir -p ../python
python3 -m pip install -r ../requirements.txt -t ../python/

View File

@@ -0,0 +1 @@
rank-torrent-name==0.2.13

View File

@@ -72,7 +72,7 @@ public class BasicsFile(ILogger<BasicsFile> logger, ImdbDbService dbService): IF
Category = csv.GetField(1), Category = csv.GetField(1),
Title = csv.GetField(2), Title = csv.GetField(2),
Adult = isAdultSet && adult == 1, Adult = isAdultSet && adult == 1,
Year = csv.GetField(5), Year = csv.GetField(5) == @"\N" ? 0 : int.Parse(csv.GetField(5)),
}; };
if (cancellationToken.IsCancellationRequested) if (cancellationToken.IsCancellationRequested)

View File

@@ -6,5 +6,5 @@ public class ImdbBasicEntry
public string? Category { get; set; } public string? Category { get; set; }
public string? Title { get; set; } public string? Title { get; set; }
public bool Adult { get; set; } public bool Adult { get; set; }
public string? Year { get; set; } public int Year { get; set; }
} }

View File

@@ -17,7 +17,7 @@ public class ImdbDbService(PostgresConfiguration configuration, ILogger<ImdbDbSe
await writer.WriteAsync(entry.ImdbId, NpgsqlDbType.Text); await writer.WriteAsync(entry.ImdbId, NpgsqlDbType.Text);
await writer.WriteAsync(entry.Category, NpgsqlDbType.Text); await writer.WriteAsync(entry.Category, NpgsqlDbType.Text);
await writer.WriteAsync(entry.Title, NpgsqlDbType.Text); await writer.WriteAsync(entry.Title, NpgsqlDbType.Text);
await writer.WriteAsync(entry.Year, NpgsqlDbType.Text); await writer.WriteAsync(entry.Year, NpgsqlDbType.Integer);
await writer.WriteAsync(entry.Adult, NpgsqlDbType.Boolean); await writer.WriteAsync(entry.Adult, NpgsqlDbType.Boolean);
} }
catch (Npgsql.PostgresException e) catch (Npgsql.PostgresException e)
@@ -116,7 +116,7 @@ public class ImdbDbService(PostgresConfiguration configuration, ILogger<ImdbDbSe
ExecuteCommandAsync( ExecuteCommandAsync(
async connection => async connection =>
{ {
await using var command = new NpgsqlCommand($"CREATE INDEX title_gist ON {TableNames.MetadataTable} USING gist(title gist_trgm_ops)", connection); await using var command = new NpgsqlCommand($"CREATE INDEX title_gin ON {TableNames.MetadataTable} USING gin(title gin_trgm_ops)", connection);
await command.ExecuteNonQueryAsync(); await command.ExecuteNonQueryAsync();
}, "Error while creating index on imdb_metadata table"); }, "Error while creating index on imdb_metadata table");
@@ -125,7 +125,7 @@ public class ImdbDbService(PostgresConfiguration configuration, ILogger<ImdbDbSe
async connection => async connection =>
{ {
logger.LogInformation("Dropping Trigrams index if it exists already"); logger.LogInformation("Dropping Trigrams index if it exists already");
await using var dropCommand = new NpgsqlCommand("DROP INDEX if exists title_gist", connection); await using var dropCommand = new NpgsqlCommand("DROP INDEX if exists title_gin", connection);
await dropCommand.ExecuteNonQueryAsync(); await dropCommand.ExecuteNonQueryAsync();
}, $"Error while dropping index on {TableNames.MetadataTable} table"); }, $"Error while dropping index on {TableNames.MetadataTable} table");

View File

@@ -0,0 +1,35 @@
-- Purpose: Change the year column to integer and add a search function that allows for searching by year.
ALTER TABLE imdb_metadata
ALTER COLUMN year TYPE integer USING (CASE WHEN year = '\N' THEN 0 ELSE year::integer END);
-- Remove the old search function
DROP FUNCTION IF EXISTS search_imdb_meta(TEXT, TEXT, TEXT, INT);
-- Add the new search function that allows for searching by year with a plus/minus one year range
CREATE OR REPLACE FUNCTION search_imdb_meta(search_term TEXT, category_param TEXT DEFAULT NULL, year_param INT DEFAULT NULL, limit_param INT DEFAULT 10)
RETURNS TABLE(imdb_id character varying(16), title character varying(1000),category character varying(50),year INT, score REAL) AS $$
BEGIN
SET pg_trgm.similarity_threshold = 0.9;
RETURN QUERY
SELECT imdb_metadata.imdb_id, imdb_metadata.title, imdb_metadata.category, imdb_metadata.year, similarity(imdb_metadata.title, search_term) as score
FROM imdb_metadata
WHERE (imdb_metadata.title % search_term)
AND (imdb_metadata.adult = FALSE)
AND (category_param IS NULL OR imdb_metadata.category = category_param)
AND (year_param IS NULL OR imdb_metadata.year BETWEEN year_param - 1 AND year_param + 1)
ORDER BY score DESC
LIMIT limit_param;
END; $$
LANGUAGE plpgsql;
-- Drop the old indexes
DROP INDEX IF EXISTS idx_imdb_metadata_adult;
DROP INDEX IF EXISTS idx_imdb_metadata_category;
DROP INDEX IF EXISTS idx_imdb_metadata_year;
DROP INDEX IF EXISTS title_gist;
-- Add indexes for the new columns
CREATE INDEX idx_imdb_metadata_adult ON imdb_metadata(adult);
CREATE INDEX idx_imdb_metadata_category ON imdb_metadata(category);
CREATE INDEX idx_imdb_metadata_year ON imdb_metadata(year);
CREATE INDEX title_gin ON imdb_metadata USING gin(title gin_trgm_ops);

View File

@@ -0,0 +1,40 @@
-- Purpose: Add the jsonb column to the ingested_torrents table to store the response from RTN
ALTER TABLE ingested_torrents
ADD COLUMN IF NOT EXISTS rtn_response jsonb;
-- Purpose: Drop torrentId column from torrents table
ALTER TABLE torrents
DROP COLUMN IF EXISTS "torrentId";
-- Purpose: Drop Trackers column from torrents table
ALTER TABLE torrents
DROP COLUMN IF EXISTS "trackers";
-- Purpose: Create a foreign key relationsship if it does not already exist between torrents and the source table ingested_torrents, but do not cascade on delete.
ALTER TABLE torrents
ADD COLUMN IF NOT EXISTS "ingestedTorrentId" bigint;
DO $$
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.table_constraints
WHERE constraint_name = 'fk_torrents_info_hash'
)
THEN
ALTER TABLE torrents
DROP CONSTRAINT fk_torrents_info_hash;
END IF;
END $$;
ALTER TABLE torrents
ADD CONSTRAINT fk_torrents_info_hash
FOREIGN KEY ("ingestedTorrentId")
REFERENCES ingested_torrents("id")
ON DELETE NO ACTION;
UPDATE torrents
SET "ingestedTorrentId" = ingested_torrents."id"
FROM ingested_torrents
WHERE torrents."infoHash" = ingested_torrents."info_hash"
AND torrents."provider" = ingested_torrents."source";

View File

@@ -0,0 +1,55 @@
DROP FUNCTION IF EXISTS kc_maintenance_reconcile_dmm_imdb_ids();
CREATE OR REPLACE FUNCTION kc_maintenance_reconcile_dmm_imdb_ids()
RETURNS INTEGER AS $$
DECLARE
rec RECORD;
imdb_rec RECORD;
rows_affected INTEGER := 0;
BEGIN
RAISE NOTICE 'Starting Reconciliation of DMM IMDB Ids...';
FOR rec IN
SELECT
it."id" as "ingestion_id",
t."infoHash",
it."category" as "ingestion_category",
f."id" as "file_Id",
f."title" as "file_Title",
(rtn_response->>'raw_title')::text as "raw_title",
(rtn_response->>'parsed_title')::text as "parsed_title",
(rtn_response->>'year')::int as "year"
FROM torrents t
JOIN ingested_torrents it ON t."ingestedTorrentId" = it."id"
JOIN files f ON t."infoHash" = f."infoHash"
WHERE t."provider" = 'DMM'
LOOP
RAISE NOTICE 'Processing record with file_Id: %', rec."file_Id";
FOR imdb_rec IN
SELECT * FROM search_imdb_meta(
rec."parsed_title",
CASE
WHEN rec."ingestion_category" = 'tv' THEN 'tvSeries'
WHEN rec."ingestion_category" = 'movies' THEN 'movie'
END,
CASE
WHEN rec."year" = 0 THEN NULL
ELSE rec."year" END,
1)
LOOP
IF imdb_rec IS NOT NULL THEN
RAISE NOTICE 'Updating file_Id: % with imdbId: %, parsed title: %, imdb title: %', rec."file_Id", imdb_rec."imdb_id", rec."parsed_title", imdb_rec."title";
UPDATE "files"
SET "imdbId" = imdb_rec."imdb_id"
WHERE "id" = rec."file_Id";
rows_affected := rows_affected + 1;
ELSE
RAISE NOTICE 'No IMDB ID found for file_Id: %, parsed title: %, imdb title: %, setting imdbId to NULL', rec."file_Id", rec."parsed_title", imdb_rec."title";
UPDATE "files"
SET "imdbId" = NULL
WHERE "id" = rec."file_Id";
END IF;
END LOOP;
END LOOP;
RAISE NOTICE 'Finished reconciliation. Total rows affected: %', rows_affected;
RETURN rows_affected;
END;
$$ LANGUAGE plpgsql;

View File

@@ -0,0 +1,19 @@
-- Remove the old search function
DROP FUNCTION IF EXISTS search_imdb_meta(TEXT, TEXT, INT, INT);
-- Add the new search function that allows for searching by year with a plus/minus one year range
CREATE OR REPLACE FUNCTION search_imdb_meta(search_term TEXT, category_param TEXT DEFAULT NULL, year_param INT DEFAULT NULL, limit_param INT DEFAULT 10, similarity_threshold REAL DEFAULT 0.95)
RETURNS TABLE(imdb_id character varying(16), title character varying(1000),category character varying(50),year INT, score REAL) AS $$
BEGIN
SET pg_trgm.similarity_threshold = similarity_threshold;
RETURN QUERY
SELECT imdb_metadata.imdb_id, imdb_metadata.title, imdb_metadata.category, imdb_metadata.year, similarity(imdb_metadata.title, search_term) as score
FROM imdb_metadata
WHERE (imdb_metadata.title % search_term)
AND (imdb_metadata.adult = FALSE)
AND (category_param IS NULL OR imdb_metadata.category = category_param)
AND (year_param IS NULL OR imdb_metadata.year BETWEEN year_param - 1 AND year_param + 1)
ORDER BY score DESC
LIMIT limit_param;
END; $$
LANGUAGE plpgsql;

View File

@@ -0,0 +1,19 @@
-- Remove the old search function
DROP FUNCTION IF EXISTS search_imdb_meta(TEXT, TEXT, INT, INT);
-- Add the new search function that allows for searching by year with a plus/minus one year range
CREATE OR REPLACE FUNCTION search_imdb_meta(search_term TEXT, category_param TEXT DEFAULT NULL, year_param INT DEFAULT NULL, limit_param INT DEFAULT 10, similarity_threshold REAL DEFAULT 0.95)
RETURNS TABLE(imdb_id character varying(16), title character varying(1000),category character varying(50),year INT, score REAL) AS $$
BEGIN
EXECUTE format('SET pg_trgm.similarity_threshold = %L', similarity_threshold);
RETURN QUERY
SELECT imdb_metadata.imdb_id, imdb_metadata.title, imdb_metadata.category, imdb_metadata.year, similarity(imdb_metadata.title, search_term) as score
FROM imdb_metadata
WHERE (imdb_metadata.title % search_term)
AND (imdb_metadata.adult = FALSE)
AND (category_param IS NULL OR imdb_metadata.category = category_param)
AND (year_param IS NULL OR imdb_metadata.year BETWEEN year_param - 1 AND year_param + 1)
ORDER BY score DESC
LIMIT limit_param;
END; $$
LANGUAGE plpgsql;

View File

@@ -0,0 +1,43 @@
-- Drop Duplicate Files in Files Table
DELETE FROM public.files
WHERE id NOT IN (
SELECT MAX(id)
FROM public.files
GROUP BY "infoHash", "fileIndex"
);
-- Add Index to files table
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'files_unique_infohash_fileindex'
) THEN
ALTER TABLE public.files
ADD CONSTRAINT files_unique_infohash_fileindex UNIQUE ("infoHash", "fileIndex");
END IF;
END $$;
-- Drop Duplicate subtitles in Subtitles Table
DELETE FROM public.subtitles
WHERE id NOT IN (
SELECT MAX(id)
FROM public.subtitles
GROUP BY "infoHash", "fileIndex"
);
-- Add Index to subtitles table
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM pg_constraint
WHERE conname = 'subtitles_unique_infohash_fileindex'
) THEN
ALTER TABLE public.subtitles
ADD CONSTRAINT subtitles_unique_infohash_fileindex UNIQUE ("infoHash", "fileIndex");
END IF;
END $$;

View File

@@ -1,2 +1,3 @@
remove-item -recurse -force ../src/python
mkdir -p ../src/python mkdir -p ../src/python
pip install --force-reinstall rank-torrent-name==0.1.6 -t ../src/python/ pip install -r ../src/requirements.txt -t ../src/python/

View File

@@ -1,4 +1,5 @@
#!/bin/bash #!/bin/bash
rm -rf ../src/python
mkdir -p ../src/python mkdir -p ../src/python
pip install --force-reinstall rank-torrent-name==0.1.6 -t ../src/python/ python3 -m pip install -r ../src/requirements.txt -t ../src/python/

View File

@@ -4,31 +4,38 @@
{ {
"Name": "SyncEzTvJob", "Name": "SyncEzTvJob",
"IntervalSeconds": 60, "IntervalSeconds": 60,
"Enabled": true "Enabled": true,
"Url": "https://eztv1.xyz/ezrss.xml",
"XmlNamespace": "http://xmlns.ezrss.it/0.1/"
}, },
{ {
"Name": "SyncNyaaJob", "Name": "SyncNyaaJob",
"IntervalSeconds": 60, "IntervalSeconds": 60,
"Enabled": true "Enabled": true,
"Url": "https://nyaa.si/?page=rss&c=1_2&f=0",
"XmlNamespace": "https://nyaa.si/xmlns/nyaa"
}, },
{ {
"Name": "SyncTpbJob", "Name": "SyncTpbJob",
"IntervalSeconds": 60, "IntervalSeconds": 60,
"Enabled": true "Enabled": true,
"Url": "https://apibay.org/precompiled/data_top100_recent.json"
}, },
{ {
"Name": "SyncYtsJob", "Name": "SyncYtsJob",
"IntervalSeconds": 60, "IntervalSeconds": 60,
"Enabled": true "Enabled": true,
"Url": "https://yts.am/rss"
}, },
{ {
"Name": "SyncTgxJob", "Name": "SyncTgxJob",
"IntervalSeconds": 60, "IntervalSeconds": 60,
"Enabled": true "Enabled": true,
"Url": "https://tgx.rs/rss"
}, },
{ {
"Name": "SyncDmmJob", "Name": "SyncDmmJob",
"IntervalSeconds": 1800, "IntervalSeconds": 10800,
"Enabled": true "Enabled": true
}, },
{ {

View File

@@ -13,13 +13,19 @@ FROM mcr.microsoft.com/dotnet/aspnet:8.0-alpine3.19
WORKDIR /app WORKDIR /app
ENV PYTHONUNBUFFERED=1 ENV PYTHONUNBUFFERED=1
RUN apk add --update --no-cache python3=~3.11.8-r0 py3-pip && ln -sf python3 /usr/bin/python RUN apk add --update --no-cache python3=~3.11.8-r0 py3-pip && ln -sf python3 /usr/bin/python
COPY --from=build /src/out . COPY --from=build /src/out .
RUN rm -rf /app/python && mkdir -p /app/python RUN rm -rf /app/python && mkdir -p /app/python
RUN pip3 install --force-reinstall rank-torrent-name==0.1.6 -t /app/python
RUN pip3 install -r /app/requirements.txt -t /app/python
RUN addgroup -S producer && adduser -S -G producer producer RUN addgroup -S producer && adduser -S -G producer producer
USER producer USER producer
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD pgrep -f dotnet || exit 1 CMD pgrep -f dotnet || exit 1

View File

@@ -6,6 +6,12 @@ public abstract class BaseJsonCrawler(IHttpClientFactory httpClientFactory, ILog
protected virtual async Task Execute(string collectionName) protected virtual async Task Execute(string collectionName)
{ {
if (string.IsNullOrWhiteSpace(Url))
{
logger.LogWarning("No URL provided for {Source} crawl", Source);
return;
}
logger.LogInformation("Starting {Source} crawl", Source); logger.LogInformation("Starting {Source} crawl", Source);
using var client = httpClientFactory.CreateClient("Scraper"); using var client = httpClientFactory.CreateClient("Scraper");

View File

@@ -4,6 +4,12 @@ public abstract class BaseXmlCrawler(IHttpClientFactory httpClientFactory, ILogg
{ {
public override async Task Execute() public override async Task Execute()
{ {
if (string.IsNullOrWhiteSpace(Url))
{
logger.LogWarning("No URL provided for {Source} crawl", Source);
return;
}
logger.LogInformation("Starting {Source} crawl", Source); logger.LogInformation("Starting {Source} crawl", Source);
using var client = httpClientFactory.CreateClient(Literals.CrawlerClient); using var client = httpClientFactory.CreateClient(Literals.CrawlerClient);

View File

@@ -7,4 +7,8 @@ public class Scraper
public int IntervalSeconds { get; set; } = 60; public int IntervalSeconds { get; set; } = 60;
public bool Enabled { get; set; } = true; public bool Enabled { get; set; } = true;
public string? Url { get; set; }
public string? XmlNamespace { get; set; }
} }

View File

@@ -0,0 +1,70 @@
namespace Producer.Features.Crawlers.Dmm;
public class DMMFileDownloader(HttpClient client, ILogger<DMMFileDownloader> logger) : IDMMFileDownloader
{
private const string Filename = "main.zip";
private readonly IReadOnlyCollection<string> _filesToIgnore = [
"index.html",
"404.html",
"dedupe.sh",
"CNAME",
];
public const string ClientName = "DmmFileDownloader";
public async Task<string> DownloadFileToTempPath(CancellationToken cancellationToken)
{
logger.LogInformation("Downloading DMM Hashlists");
var response = await client.GetAsync(Filename, cancellationToken);
var tempDirectory = Path.Combine(Path.GetTempPath(), "DMMHashlists");
EnsureDirectoryIsClean(tempDirectory);
response.EnsureSuccessStatusCode();
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
using var archive = new ZipArchive(stream);
logger.LogInformation("Extracting DMM Hashlists to {TempDirectory}", tempDirectory);
foreach (var entry in archive.Entries)
{
var entryPath = Path.Combine(tempDirectory, Path.GetFileName(entry.FullName));
if (!entry.FullName.EndsWith('/')) // It's a file
{
entry.ExtractToFile(entryPath, true);
}
}
foreach (var file in _filesToIgnore)
{
CleanRepoExtras(tempDirectory, file);
}
logger.LogInformation("Downloaded and extracted Repository to {TempDirectory}", tempDirectory);
return tempDirectory;
}
private static void CleanRepoExtras(string tempDirectory, string fileName)
{
var repoIndex = Path.Combine(tempDirectory, fileName);
if (File.Exists(repoIndex))
{
File.Delete(repoIndex);
}
}
private static void EnsureDirectoryIsClean(string tempDirectory)
{
if (Directory.Exists(tempDirectory))
{
Directory.Delete(tempDirectory, true);
}
Directory.CreateDirectory(tempDirectory);
}
}

View File

@@ -0,0 +1,6 @@
namespace Producer.Features.Crawlers.Dmm;
public class DMMHttpClient
{
}

View File

@@ -1,67 +1,99 @@
using Microsoft.VisualBasic;
namespace Producer.Features.Crawlers.Dmm; namespace Producer.Features.Crawlers.Dmm;
public partial class DebridMediaManagerCrawler( public partial class DebridMediaManagerCrawler(
IHttpClientFactory httpClientFactory, IDMMFileDownloader dmmFileDownloader,
ILogger<DebridMediaManagerCrawler> logger, ILogger<DebridMediaManagerCrawler> logger,
IDataStorage storage, IDataStorage storage,
GithubConfiguration githubConfiguration,
IRankTorrentName rankTorrentName, IRankTorrentName rankTorrentName,
IDistributedCache cache) : BaseCrawler(logger, storage) IDistributedCache cache) : BaseCrawler(logger, storage)
{ {
[GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")] [GeneratedRegex("""<iframe src="https:\/\/debridmediamanager.com\/hashlist#(.*)"></iframe>""")]
private static partial Regex HashCollectionMatcher(); private static partial Regex HashCollectionMatcher();
private LengthAwareRatioScorer _lengthAwareRatioScorer = new(); protected override string Url => "";
private const string DownloadBaseUrl = "https://raw.githubusercontent.com/debridmediamanager/hashlists/main";
protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>(); protected override IReadOnlyDictionary<string, string> Mappings => new Dictionary<string, string>();
protected override string Url => "https://api.github.com/repos/debridmediamanager/hashlists/git/trees/main?recursive=1";
protected override string Source => "DMM"; protected override string Source => "DMM";
private const int ParallelismCount = 4;
public override async Task Execute() public override async Task Execute()
{ {
var client = httpClientFactory.CreateClient("Scraper"); var tempDirectory = await dmmFileDownloader.DownloadFileToTempPath(CancellationToken.None);
client.DefaultRequestHeaders.Authorization = new("Bearer", githubConfiguration.PAT);
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
var jsonBody = await client.GetStringAsync(Url); var files = Directory.GetFiles(tempDirectory, "*.html", SearchOption.AllDirectories);
var json = JsonDocument.Parse(jsonBody); logger.LogInformation("Found {Files} files to parse", files.Length);
var entriesArray = json.RootElement.GetProperty("tree"); var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
logger.LogInformation("Found {Entries} total DMM pages", entriesArray.GetArrayLength()); await Parallel.ForEachAsync(files, options, async (file, token) =>
foreach (var entry in entriesArray.EnumerateArray())
{ {
await ParsePage(entry, client); var fileName = Path.GetFileName(file);
} var torrentDictionary = await ExtractPageContents(file, fileName);
if (torrentDictionary == null)
{
return;
}
await ParseTitlesWithRtn(fileName, torrentDictionary);
var results = await ParseTorrents(torrentDictionary);
if (results.Count <= 0)
{
return;
}
await InsertTorrents(results);
await Storage.MarkPageAsIngested(fileName, token);
});
} }
private async Task ParsePage(JsonElement entry, HttpClient client) private async Task ParseTitlesWithRtn(string fileName, IDictionary<string, DmmContent> page)
{ {
var (pageIngested, name) = await IsAlreadyIngested(entry); logger.LogInformation("Parsing titles for {Page}", fileName);
if (string.IsNullOrEmpty(name) || pageIngested) var batchProcessables = page.Select(value => new RtnBatchProcessable(value.Key, value.Value.Filename)).ToList();
var parsedResponses = rankTorrentName.BatchParse(
batchProcessables.Select<RtnBatchProcessable, string>(bp => bp.Filename).ToList(), trashGarbage: false);
// Filter out unsuccessful responses and match RawTitle to requesting title
var successfulResponses = parsedResponses
.Where(response => response != null && response.Success)
.GroupBy(response => response.Response.RawTitle!)
.ToDictionary(group => group.Key, group => group.First());
var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
await Parallel.ForEachAsync(batchProcessables.Select(t => t.InfoHash), options, (infoHash, _) =>
{ {
return; if (page.TryGetValue(infoHash, out var dmmContent) &&
} successfulResponses.TryGetValue(dmmContent.Filename, out var parsedResponse))
{
var pageSource = await client.GetStringAsync($"{DownloadBaseUrl}/{name}"); page[infoHash] = dmmContent with {ParseResponse = parsedResponse};
}
await ExtractPageContents(pageSource, name);
return ValueTask.CompletedTask;
});
} }
private async Task ExtractPageContents(string pageSource, string name) private async Task<ConcurrentDictionary<string, DmmContent>?> ExtractPageContents(string filePath, string filenameOnly)
{ {
var (pageIngested, name) = await IsAlreadyIngested(filenameOnly);
if (pageIngested)
{
return [];
}
var pageSource = await File.ReadAllTextAsync(filePath);
var match = HashCollectionMatcher().Match(pageSource); var match = HashCollectionMatcher().Match(pageSource);
if (!match.Success) if (!match.Success)
{ {
logger.LogWarning("Failed to match hash collection for {Name}", name); logger.LogWarning("Failed to match hash collection for {Name}", name);
await Storage.MarkPageAsIngested(name); await Storage.MarkPageAsIngested(filenameOnly);
return; return [];
} }
var encodedJson = match.Groups.Values.ElementAtOrDefault(1); var encodedJson = match.Groups.Values.ElementAtOrDefault(1);
@@ -69,132 +101,112 @@ public partial class DebridMediaManagerCrawler(
if (string.IsNullOrEmpty(encodedJson?.Value)) if (string.IsNullOrEmpty(encodedJson?.Value))
{ {
logger.LogWarning("Failed to extract encoded json for {Name}", name); logger.LogWarning("Failed to extract encoded json for {Name}", name);
return; return [];
} }
await ProcessExtractedContentsAsTorrentCollection(encodedJson.Value, name); var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson.Value);
}
private async Task ProcessExtractedContentsAsTorrentCollection(string encodedJson, string name)
{
var decodedJson = LZString.DecompressFromEncodedURIComponent(encodedJson);
var json = JsonDocument.Parse(decodedJson); var json = JsonDocument.Parse(decodedJson);
var torrents = await json.RootElement.EnumerateArray()
.ToAsyncEnumerable()
.Select(ParsePageContent)
.Where(t => t is not null)
.ToListAsync();
await InsertTorrentsForPage(json); if (torrents.Count == 0)
var result = await Storage.MarkPageAsIngested(name);
if (!result.IsSuccess)
{ {
logger.LogWarning("Failed to mark page as ingested: [{Error}]", result.Failure.ErrorMessage); logger.LogWarning("No torrents found in {Name}", name);
return; await Storage.MarkPageAsIngested(filenameOnly);
return [];
} }
var torrentDictionary = torrents
.Where(x => x is not null)
.GroupBy(x => x.InfoHash)
.ToConcurrentDictionary(g => g.Key, g => new DmmContent(g.First().Filename, g.First().Bytes, null));
logger.LogInformation("Successfully marked page as ingested"); logger.LogInformation("Parsed {Torrents} torrents for {Name}", torrentDictionary.Count, name);
return torrentDictionary;
} }
private async Task<IngestedTorrent?> ParseTorrent(JsonElement item) private async Task<List<IngestedTorrent>> ParseTorrents(IDictionary<string, DmmContent> page)
{ {
var ingestedTorrents = new List<IngestedTorrent>();
if (!item.TryGetProperty("filename", out var filenameElement) || var options = new ParallelOptions { MaxDegreeOfParallelism = ParallelismCount };
!item.TryGetProperty("bytes", out var bytesElement) ||
!item.TryGetProperty("hash", out var hashElement))
{
return null;
}
var torrentTitle = filenameElement.GetString(); await Parallel.ForEachAsync(page, options, async (kvp, ct) =>
{
var (infoHash, dmmContent) = kvp;
var parsedTorrent = dmmContent.ParseResponse;
if (parsedTorrent is not {Success: true})
{
return;
}
if (torrentTitle.IsNullOrEmpty()) var torrentType = parsedTorrent.Response.IsMovie ? "movie" : "tvSeries";
{ var cacheKey = GetCacheKey(torrentType, parsedTorrent.Response.ParsedTitle, parsedTorrent.Response.Year);
return null; var (cached, cachedResult) = await CheckIfInCacheAndReturn(cacheKey);
}
var parsedTorrent = rankTorrentName.Parse(torrentTitle.CleanTorrentTitleForImdb());
if (!parsedTorrent.Success)
{
return null;
}
var (cached, cachedResult) = await CheckIfInCacheAndReturn(parsedTorrent.ParsedTitle);
if (cached)
{
logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.ParsedTitle);
return MapToTorrent(cachedResult, bytesElement, hashElement, parsedTorrent);
}
var year = parsedTorrent.Year != 0 ? parsedTorrent.Year.ToString() : null; if (cached)
var imdbEntries = await Storage.FindImdbMetadata(parsedTorrent.ParsedTitle, parsedTorrent.IsMovie ? "movies" : "tv", year); {
logger.LogInformation("[{ImdbId}] Found cached imdb result for {Title}", cachedResult.ImdbId, parsedTorrent.Response.ParsedTitle);
lock (ingestedTorrents)
{
ingestedTorrents.Add(MapToTorrent(cachedResult, dmmContent.Bytes, infoHash, parsedTorrent));
}
return;
}
if (imdbEntries.Count == 0) int? year = parsedTorrent.Response.Year != 0 ? parsedTorrent.Response.Year : null;
{ var imdbEntry = await Storage.FindImdbMetadata(parsedTorrent.Response.ParsedTitle, torrentType, year, ct);
return null;
}
var scoredTitles = await ScoreTitles(parsedTorrent, imdbEntries);
if (!scoredTitles.Success)
{
return null;
}
logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", scoredTitles.BestMatch.Value.ImdbId, parsedTorrent.ParsedTitle, scoredTitles.BestMatch.Value.Title, scoredTitles.BestMatch.Score);
return MapToTorrent(scoredTitles.BestMatch.Value, bytesElement, hashElement, parsedTorrent); if (imdbEntry is null)
{
return;
}
await AddToCache(cacheKey, imdbEntry);
logger.LogInformation("[{ImdbId}] Found best match for {Title}: {BestMatch} with score {Score}", imdbEntry.ImdbId, parsedTorrent.Response.ParsedTitle, imdbEntry.Title, imdbEntry.Score);
lock (ingestedTorrents)
{
ingestedTorrents.Add(MapToTorrent(imdbEntry, dmmContent.Bytes, infoHash, parsedTorrent));
}
});
return ingestedTorrents;
} }
private IngestedTorrent MapToTorrent(ImdbEntry result, JsonElement bytesElement, JsonElement hashElement, ParseTorrentTitleResponse parsedTorrent) => private IngestedTorrent MapToTorrent(ImdbEntry result, long size, string infoHash, ParseTorrentTitleResponse parsedTorrent) =>
new() new()
{ {
Source = Source, Source = Source,
Name = result.Title, Name = result.Title,
Imdb = result.ImdbId, Imdb = result.ImdbId,
Size = bytesElement.GetInt64().ToString(), Size = size.ToString(),
InfoHash = hashElement.ToString(), InfoHash = infoHash,
Seeders = 0, Seeders = 0,
Leechers = 0, Leechers = 0,
Category = parsedTorrent.IsMovie switch Category = AssignCategory(result),
{ RtnResponse = parsedTorrent.Response.ToJson(),
true => "movies",
false => "tv",
},
}; };
private async Task<(bool Success, ExtractedResult<ImdbEntry>? BestMatch)> ScoreTitles(ParseTorrentTitleResponse parsedTorrent, List<ImdbEntry> imdbEntries) private Task AddToCache(string cacheKey, ImdbEntry best)
{
var lowerCaseTitle = parsedTorrent.ParsedTitle.ToLowerInvariant();
// Scoring directly operates on the List<ImdbEntry>, no need for lookup table.
var scoredResults = Process.ExtractAll(new(){Title = lowerCaseTitle}, imdbEntries, x => x.Title?.ToLowerInvariant(), scorer: _lengthAwareRatioScorer, cutoff: 90);
var best = scoredResults.MaxBy(x => x.Score);
if (best is null)
{
return (false, null);
}
await AddToCache(lowerCaseTitle, best);
return (true, best);
}
private Task AddToCache(string lowerCaseTitle, ExtractedResult<ImdbEntry> best)
{ {
var cacheOptions = new DistributedCacheEntryOptions var cacheOptions = new DistributedCacheEntryOptions
{ {
AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(1), AbsoluteExpirationRelativeToNow = TimeSpan.FromDays(1),
}; };
return cache.SetStringAsync(lowerCaseTitle, JsonSerializer.Serialize(best.Value), cacheOptions); return cache.SetStringAsync(cacheKey, JsonSerializer.Serialize(best), cacheOptions);
} }
private async Task<(bool Success, ImdbEntry? Entry)> CheckIfInCacheAndReturn(string title) private async Task<(bool Success, ImdbEntry? Entry)> CheckIfInCacheAndReturn(string cacheKey)
{ {
var cachedImdbId = await cache.GetStringAsync(title.ToLowerInvariant()); var cachedImdbId = await cache.GetStringAsync(cacheKey);
if (!string.IsNullOrEmpty(cachedImdbId)) if (!string.IsNullOrEmpty(cachedImdbId))
{ {
@@ -204,34 +216,36 @@ public partial class DebridMediaManagerCrawler(
return (false, null); return (false, null);
} }
private async Task InsertTorrentsForPage(JsonDocument json) private async Task<(bool Success, string? Name)> IsAlreadyIngested(string filename)
{ {
var torrents = await json.RootElement.EnumerateArray() var pageIngested = await Storage.PageIngested(filename);
.ToAsyncEnumerable()
.SelectAwait(async x => await ParseTorrent(x))
.Where(t => t is not null)
.ToListAsync();
if (torrents.Count == 0) return (pageIngested, filename);
{
logger.LogWarning("No torrents found in {Source} response", Source);
return;
}
await InsertTorrents(torrents!);
} }
private async Task<(bool Success, string? Name)> IsAlreadyIngested(JsonElement entry) private static string AssignCategory(ImdbEntry entry) =>
entry.Category.ToLower() switch
{
var category when string.Equals(category, "movie", StringComparison.OrdinalIgnoreCase) => "movies",
var category when string.Equals(category, "tvSeries", StringComparison.OrdinalIgnoreCase) => "tv",
_ => "unknown",
};
private static string GetCacheKey(string category, string title, int year) => $"{category.ToLowerInvariant()}:{year}:{title.ToLowerInvariant()}";
private static ExtractedDMMContent? ParsePageContent(JsonElement item)
{ {
var name = entry.GetProperty("path").GetString(); if (!item.TryGetProperty("filename", out var filenameElement) ||
!item.TryGetProperty("bytes", out var bytesElement) ||
if (string.IsNullOrEmpty(name)) !item.TryGetProperty("hash", out var hashElement))
{ {
return (false, null); return null;
} }
var pageIngested = await Storage.PageIngested(name); return new(filenameElement.GetString(), bytesElement.GetInt64(), hashElement.GetString());
return (pageIngested, name);
} }
private record DmmContent(string Filename, long Bytes, ParseTorrentTitleResponse? ParseResponse);
private record ExtractedDMMContent(string Filename, long Bytes, string InfoHash);
private record RtnBatchProcessable(string InfoHash, string Filename);
} }

View File

@@ -1,9 +0,0 @@
namespace Producer.Features.Crawlers.Dmm;
public class GithubConfiguration
{
private const string Prefix = "GITHUB";
private const string PatVariable = "PAT";
public string? PAT { get; init; } = Prefix.GetOptionalEnvironmentVariableAsString(PatVariable);
}

View File

@@ -0,0 +1,6 @@
namespace Producer.Features.Crawlers.Dmm;
public interface IDMMFileDownloader
{
Task<string> DownloadFileToTempPath(CancellationToken cancellationToken);
}

View File

@@ -0,0 +1,16 @@
namespace Producer.Features.Crawlers.Dmm;
public static class ServiceCollectionExtensions
{
public static IServiceCollection AddDmmSupport(this IServiceCollection services)
{
services.AddHttpClient<IDMMFileDownloader, DMMFileDownloader>(DMMFileDownloader.ClientName, client =>
{
client.BaseAddress = new("https://github.com/debridmediamanager/hashlists/zipball/main/");
client.DefaultRequestHeaders.Add("Accept-Encoding", "gzip");
client.DefaultRequestHeaders.UserAgent.ParseAdd("curl");
});
return services;
}
}

View File

@@ -1,11 +1,10 @@
namespace Producer.Features.Crawlers.EzTv; namespace Producer.Features.Crawlers.EzTv;
public class EzTvCrawler(IHttpClientFactory httpClientFactory, ILogger<EzTvCrawler> logger, IDataStorage storage) : BaseXmlCrawler(httpClientFactory, logger, storage) public class EzTvCrawler(IHttpClientFactory httpClientFactory, ILogger<EzTvCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseXmlCrawler(httpClientFactory, logger, storage)
{ {
protected override string Url => "https://eztv1.xyz/ezrss.xml"; protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncEzTvJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "EZTV"; protected override string Source => "EZTV";
private XNamespace XmlNamespace => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncEzTvJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
private static readonly XNamespace XmlNamespace = "http://xmlns.ezrss.it/0.1/";
protected override IReadOnlyDictionary<string, string> Mappings => protected override IReadOnlyDictionary<string, string> Mappings =>
new Dictionary<string, string> new Dictionary<string, string>

View File

@@ -1,11 +1,10 @@
namespace Producer.Features.Crawlers.Nyaa; namespace Producer.Features.Crawlers.Nyaa;
public class NyaaCrawler(IHttpClientFactory httpClientFactory, ILogger<NyaaCrawler> logger, IDataStorage storage) : BaseXmlCrawler(httpClientFactory, logger, storage) public class NyaaCrawler(IHttpClientFactory httpClientFactory, ILogger<NyaaCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseXmlCrawler(httpClientFactory, logger, storage)
{ {
protected override string Url => "https://nyaa.si/?page=rss&c=1_2&f=0"; protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncNyaaJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "Nyaa"; protected override string Source => "Nyaa";
private XNamespace XmlNamespace => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncNyaaJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
private static readonly XNamespace XmlNamespace = "https://nyaa.si/xmlns/nyaa";
protected override IReadOnlyDictionary<string, string> Mappings => protected override IReadOnlyDictionary<string, string> Mappings =>
new Dictionary<string, string> new Dictionary<string, string>

View File

@@ -1,13 +1,13 @@
namespace Producer.Features.Crawlers.Tgx; namespace Producer.Features.Crawlers.Tgx;
public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<TgxCrawler> logger, IDataStorage storage) : BaseXmlCrawler(httpClientFactory, logger, storage) public partial class TgxCrawler(IHttpClientFactory httpClientFactory, ILogger<TgxCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseXmlCrawler(httpClientFactory, logger, storage)
{ {
[GeneratedRegex(@"Size:\s+(.+?)\s+Added")] [GeneratedRegex(@"Size:\s+(.+?)\s+Added")]
private static partial Regex SizeStringExtractor(); private static partial Regex SizeStringExtractor();
[GeneratedRegex(@"(?i)\b(\d+(\.\d+)?)\s*([KMGT]?B)\b", RegexOptions.None, "en-GB")] [GeneratedRegex(@"(?i)\b(\d+(\.\d+)?)\s*([KMGT]?B)\b", RegexOptions.None, "en-GB")]
private static partial Regex SizeStringParser(); private static partial Regex SizeStringParser();
protected override string Url => "https://tgx.rs/rss"; protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncTgxJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "TorrentGalaxy"; protected override string Source => "TorrentGalaxy";
protected override IReadOnlyDictionary<string, string> Mappings protected override IReadOnlyDictionary<string, string> Mappings

View File

@@ -1,8 +1,8 @@
namespace Producer.Features.Crawlers.Tpb; namespace Producer.Features.Crawlers.Tpb;
public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler> logger, IDataStorage storage) : BaseJsonCrawler(httpClientFactory, logger, storage) public class TpbCrawler(IHttpClientFactory httpClientFactory, ILogger<TpbCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseJsonCrawler(httpClientFactory, logger, storage)
{ {
protected override string Url => "https://apibay.org/precompiled/data_top100_recent.json"; protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncTpbJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "TPB"; protected override string Source => "TPB";

View File

@@ -1,9 +1,8 @@
namespace Producer.Features.Crawlers.Yts; namespace Producer.Features.Crawlers.Yts;
public class YtsCrawler(IHttpClientFactory httpClientFactory, ILogger<YtsCrawler> logger, IDataStorage storage) : BaseXmlCrawler(httpClientFactory, logger, storage) public class YtsCrawler(IHttpClientFactory httpClientFactory, ILogger<YtsCrawler> logger, IDataStorage storage, ScrapeConfiguration scrapeConfiguration) : BaseXmlCrawler(httpClientFactory, logger, storage)
{ {
protected override string Url => "https://yts.am/rss"; protected override string Url => scrapeConfiguration.Scrapers.FirstOrDefault(x => x.Name.Equals("SyncYtsJob", StringComparison.OrdinalIgnoreCase))?.Url ?? string.Empty;
protected override string Source => "YTS"; protected override string Source => "YTS";
protected override IReadOnlyDictionary<string, string> Mappings protected override IReadOnlyDictionary<string, string> Mappings
=> new Dictionary<string, string> => new Dictionary<string, string>

View File

@@ -5,7 +5,6 @@ internal static class ServiceCollectionExtensions
internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration) internal static IServiceCollection AddQuartz(this IServiceCollection services, IConfiguration configuration)
{ {
var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName); var scrapeConfiguration = services.LoadConfigurationFromConfig<ScrapeConfiguration>(configuration, ScrapeConfiguration.SectionName);
var githubConfiguration = services.LoadConfigurationFromEnv<GithubConfiguration>();
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>(); var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
var jobTypes = Assembly.GetAssembly(typeof(BaseJob)) var jobTypes = Assembly.GetAssembly(typeof(BaseJob))
@@ -19,18 +18,13 @@ internal static class ServiceCollectionExtensions
services.AddTransient(type); services.AddTransient(type);
} }
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
{
services.AddTransient<SyncDmmJob>();
}
var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance); var openMethod = typeof(ServiceCollectionExtensions).GetMethod(nameof(AddJobWithTrigger), BindingFlags.NonPublic | BindingFlags.Static | BindingFlags.Instance);
services.AddQuartz( services.AddQuartz(
quartz => quartz =>
{ {
RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration); RegisterAutomaticRegistrationJobs(jobTypes, openMethod, quartz, scrapeConfiguration);
RegisterDmmJob(githubConfiguration, quartz, scrapeConfiguration); RegisterDmmJob(quartz, scrapeConfiguration);
RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration); RegisterTorrentioJob(services, quartz, configuration, scrapeConfiguration);
RegisterPublisher(quartz, rabbitConfiguration); RegisterPublisher(quartz, rabbitConfiguration);
}); });
@@ -64,13 +58,8 @@ internal static class ServiceCollectionExtensions
} }
} }
private static void RegisterDmmJob(GithubConfiguration githubConfiguration, IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) private static void RegisterDmmJob(IServiceCollectionQuartzConfigurator quartz, ScrapeConfiguration scrapeConfiguration) =>
{ AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
if (!string.IsNullOrEmpty(githubConfiguration.PAT))
{
AddJobWithTrigger<SyncDmmJob>(quartz, SyncDmmJob.Key, SyncDmmJob.Trigger, scrapeConfiguration);
}
}
private static void RegisterTorrentioJob( private static void RegisterTorrentioJob(
IServiceCollection services, IServiceCollection services,

View File

@@ -1,12 +1,12 @@
// Global using directives // Global using directives
global using System.Collections.Concurrent;
global using System.IO.Compression;
global using System.Reflection; global using System.Reflection;
global using System.Text; global using System.Text;
global using System.Text.Json; global using System.Text.Json;
global using System.Text.RegularExpressions; global using System.Text.RegularExpressions;
global using System.Xml.Linq; global using System.Xml.Linq;
global using FuzzySharp;
global using FuzzySharp.Extractor;
global using FuzzySharp.PreProcess; global using FuzzySharp.PreProcess;
global using FuzzySharp.SimilarityRatio.Scorer; global using FuzzySharp.SimilarityRatio.Scorer;
global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive; global using FuzzySharp.SimilarityRatio.Scorer.StrategySensitive;

View File

@@ -33,6 +33,9 @@
<None Include="Configuration\*.json"> <None Include="Configuration\*.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory> <CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None> </None>
<None Update="requirements.txt">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup> </ItemGroup>
<ItemGroup Condition="'$(Configuration)' == 'Debug'"> <ItemGroup Condition="'$(Configuration)' == 'Debug'">

View File

@@ -12,7 +12,8 @@ builder.Services
.RegisterMassTransit() .RegisterMassTransit()
.AddDataStorage() .AddDataStorage()
.AddCrawlers() .AddCrawlers()
.AddDmmSupport()
.AddQuartz(builder.Configuration); .AddQuartz(builder.Configuration);
var app = builder.Build(); var app = builder.Build();
app.Run(); app.Run();

View File

@@ -0,0 +1 @@
rank-torrent-name==0.2.13

View File

@@ -9,12 +9,23 @@ RUN dotnet restore -a $TARGETARCH
RUN dotnet publish -c Release --no-restore -o /src/out -a $TARGETARCH RUN dotnet publish -c Release --no-restore -o /src/out -a $TARGETARCH
FROM mcr.microsoft.com/dotnet/aspnet:8.0-alpine FROM mcr.microsoft.com/dotnet/aspnet:8.0-alpine3.19
WORKDIR /app WORKDIR /app
ENV PYTHONUNBUFFERED=1
RUN apk add --update --no-cache python3=~3.11.8-r0 py3-pip && ln -sf python3 /usr/bin/python
COPY --from=build /src/out . COPY --from=build /src/out .
RUN rm -rf /app/python && mkdir -p /app/python
RUN pip3 install -r /app/requirements.txt -t /app/python
RUN addgroup -S qbit && adduser -S -G qbit qbit RUN addgroup -S qbit && adduser -S -G qbit qbit
USER qbit USER qbit
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD pgrep -f dotnet || exit 1 CMD pgrep -f dotnet || exit 1
ENV PYTHONNET_PYDLL=/usr/lib/libpython3.11.so.1.0
ENTRYPOINT ["dotnet", "QBitCollector.dll"] ENTRYPOINT ["dotnet", "QBitCollector.dll"]

View File

@@ -13,11 +13,13 @@ public static class ServiceCollectionExtensions
internal static IServiceCollection AddServiceConfiguration(this IServiceCollection services) internal static IServiceCollection AddServiceConfiguration(this IServiceCollection services)
{ {
services.AddQBitTorrentClient(); services.AddQBitTorrentClient();
services.AddSingleton<IParseTorrentTitle, ParseTorrentTitle>(); services.RegisterPythonEngine();
services.AddSingleton<IRankTorrentName, RankTorrentName>();
services.AddSingleton<QbitRequestProcessor>(); services.AddSingleton<QbitRequestProcessor>();
services.AddHttpClient(); services.AddHttpClient();
services.AddSingleton<ITrackersService, TrackersService>(); services.AddSingleton<ITrackersService, TrackersService>();
services.AddHostedService<TrackersBackgroundService>(); services.AddHostedService<TrackersBackgroundService>();
services.AddHostedService<HousekeepingBackgroundService>();
return services; return services;
} }
@@ -42,6 +44,7 @@ public static class ServiceCollectionExtensions
{ {
var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>(); var rabbitConfiguration = services.LoadConfigurationFromEnv<RabbitMqConfiguration>();
var redisConfiguration = services.LoadConfigurationFromEnv<RedisConfiguration>(); var redisConfiguration = services.LoadConfigurationFromEnv<RedisConfiguration>();
var qbitConfiguration = services.LoadConfigurationFromEnv<QbitConfiguration>();
services.AddStackExchangeRedisCache( services.AddStackExchangeRedisCache(
option => option =>
@@ -78,8 +81,8 @@ public static class ServiceCollectionExtensions
e.ConfigureConsumer<WriteQbitMetadataConsumer>(context); e.ConfigureConsumer<WriteQbitMetadataConsumer>(context);
e.ConfigureConsumer<PerformQbitMetadataRequestConsumer>(context); e.ConfigureConsumer<PerformQbitMetadataRequestConsumer>(context);
e.ConfigureSaga<QbitMetadataSagaState>(context); e.ConfigureSaga<QbitMetadataSagaState>(context);
e.ConcurrentMessageLimit = 5; e.ConcurrentMessageLimit = qbitConfiguration.Concurrency;
e.PrefetchCount = 5; e.PrefetchCount = qbitConfiguration.Concurrency;
}); });
}); });
}); });
@@ -96,7 +99,7 @@ public static class ServiceCollectionExtensions
cfg.UseTimeout( cfg.UseTimeout(
timeout => timeout =>
{ {
timeout.Timeout = TimeSpan.FromMinutes(1); timeout.Timeout = TimeSpan.FromMinutes(3);
}); });
}) })
.RedisRepository(redisConfiguration.ConnectionString, options => .RedisRepository(redisConfiguration.ConnectionString, options =>
@@ -108,7 +111,7 @@ public static class ServiceCollectionExtensions
{ {
var qbitConfiguration = services.LoadConfigurationFromEnv<QbitConfiguration>(); var qbitConfiguration = services.LoadConfigurationFromEnv<QbitConfiguration>();
var client = new QBittorrentClient(new(qbitConfiguration.Host)); var client = new QBittorrentClient(new(qbitConfiguration.Host));
client.Timeout = TimeSpan.FromSeconds(10); client.Timeout = TimeSpan.FromSeconds(20);
services.AddSingleton<IQBittorrentClient>(client); services.AddSingleton<IQBittorrentClient>(client);
} }

View File

@@ -0,0 +1,52 @@
namespace QBitCollector.Features.Qbit;
public class HousekeepingBackgroundService(IQBittorrentClient client, ILogger<HousekeepingBackgroundService> logger) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Service is Running.");
await DoWork();
using PeriodicTimer timer = new(TimeSpan.FromMinutes(2));
try
{
while (await timer.WaitForNextTickAsync(stoppingToken))
{
await DoWork();
}
}
catch (OperationCanceledException)
{
logger.LogInformation("Service stopping.");
}
}
private async Task DoWork()
{
try
{
logger.LogInformation("Cleaning Stale Entries in Qbit...");
var torrents = await client.GetTorrentListAsync();
foreach (var torrentInfo in torrents)
{
if (!(torrentInfo.AddedOn < DateTimeOffset.UtcNow.AddMinutes(-1)))
{
continue;
}
logger.LogInformation("Torrent [{InfoHash}] Identified as stale because was added at {AddedOn}", torrentInfo.Hash, torrentInfo.AddedOn);
await client.DeleteAsync(new[] {torrentInfo.Hash}, deleteDownloadedData: true);
logger.LogInformation("Cleaned up stale torrent: [{InfoHash}]", torrentInfo.Hash);
}
}
catch (Exception e)
{
logger.LogError(e, "Error cleaning up stale torrents this interval.");
}
}
}

View File

@@ -1,6 +1,6 @@
namespace QBitCollector.Features.Qbit; namespace QBitCollector.Features.Qbit;
public class QbitRequestProcessor(IQBittorrentClient client, ITrackersService trackersService, ILogger<QbitRequestProcessor> logger) public class QbitRequestProcessor(IQBittorrentClient client, ITrackersService trackersService, ILogger<QbitRequestProcessor> logger, QbitConfiguration configuration)
{ {
public async Task<IReadOnlyList<TorrentContent>?> ProcessAsync(string infoHash, CancellationToken cancellationToken = default) public async Task<IReadOnlyList<TorrentContent>?> ProcessAsync(string infoHash, CancellationToken cancellationToken = default)
{ {
@@ -14,7 +14,7 @@ public class QbitRequestProcessor(IQBittorrentClient client, ITrackersService tr
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(TimeSpan.FromSeconds(30)); timeoutCts.CancelAfter(TimeSpan.FromSeconds(60));
try try
{ {
@@ -30,7 +30,7 @@ public class QbitRequestProcessor(IQBittorrentClient client, ITrackersService tr
break; break;
} }
await Task.Delay(TimeSpan.FromSeconds(1), timeoutCts.Token); await Task.Delay(TimeSpan.FromMilliseconds(200), timeoutCts.Token);
} }
} }
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested) catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested)

View File

@@ -5,7 +5,10 @@ public class QbitConfiguration
private const string Prefix = "QBIT"; private const string Prefix = "QBIT";
private const string HOST_VARIABLE = "HOST"; private const string HOST_VARIABLE = "HOST";
private const string TRACKERS_URL_VARIABLE = "TRACKERS_URL"; private const string TRACKERS_URL_VARIABLE = "TRACKERS_URL";
private const string CONCURRENCY_VARIABLE = "CONCURRENCY";
public string? Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HOST_VARIABLE); public string? Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HOST_VARIABLE);
public string? TrackersUrl { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(TRACKERS_URL_VARIABLE); public string? TrackersUrl { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(TRACKERS_URL_VARIABLE);
public int Concurrency { get; init; } = Prefix.GetEnvironmentVariableAsInt(CONCURRENCY_VARIABLE, 8);
} }

View File

@@ -3,10 +3,11 @@ namespace QBitCollector.Features.Worker;
public static class QbitMetaToTorrentMeta public static class QbitMetaToTorrentMeta
{ {
public static IReadOnlyList<TorrentFile> MapMetadataToFilesCollection( public static IReadOnlyList<TorrentFile> MapMetadataToFilesCollection(
IParseTorrentTitle torrentTitle, IRankTorrentName rankTorrentName,
Torrent torrent, Torrent torrent,
string ImdbId, string ImdbId,
IReadOnlyList<TorrentContent> Metadata) IReadOnlyList<TorrentContent> Metadata,
ILogger<WriteQbitMetadataConsumer> logger)
{ {
try try
{ {
@@ -24,23 +25,31 @@ public static class QbitMetaToTorrentMeta
Size = metadataEntry.Size, Size = metadataEntry.Size,
}; };
var parsedTitle = torrentTitle.Parse(file.Title); var parsedTitle = rankTorrentName.Parse(file.Title, false);
if (!parsedTitle.Success)
{
logger.LogWarning("Failed to parse title {Title} for metadata mapping", file.Title);
continue;
}
file.ImdbSeason = parsedTitle.Seasons.FirstOrDefault(); file.ImdbSeason = parsedTitle.Response?.Season?.FirstOrDefault() ?? 0;
file.ImdbEpisode = parsedTitle.Episodes.FirstOrDefault(); file.ImdbEpisode = parsedTitle.Response?.Episode?.FirstOrDefault() ?? 0;
files.Add(file); files.Add(file);
} }
return files; return files;
} }
catch (Exception) catch (Exception ex)
{ {
logger.LogWarning("Failed to map metadata to files collection: {Exception}", ex.Message);
return []; return [];
} }
} }
public static async Task<IReadOnlyList<SubtitleFile>> MapMetadataToSubtitlesCollection(IDataStorage storage, string InfoHash, IReadOnlyList<TorrentContent> Metadata) public static async Task<IReadOnlyList<SubtitleFile>> MapMetadataToSubtitlesCollection(IDataStorage storage, string InfoHash, IReadOnlyList<TorrentContent> Metadata,
ILogger<WriteQbitMetadataConsumer> logger)
{ {
try try
{ {
@@ -70,8 +79,9 @@ public static class QbitMetaToTorrentMeta
return files; return files;
} }
catch (Exception) catch (Exception ex)
{ {
logger.LogWarning("Failed to map metadata to subtitles collection: {Exception}", ex.Message);
return []; return [];
} }
} }

View File

@@ -53,6 +53,12 @@ public class QbitMetadataSagaStateMachine : MassTransitStateMachine<QbitMetadata
.Then( .Then(
context => context =>
{ {
if (!context.Message.WithFiles)
{
logger.LogInformation("No files written for torrent {InfoHash} in Saga {SagaId}", context.Saga.Torrent.InfoHash, context.Saga.CorrelationId);
return;
}
logger.LogInformation("Metadata Written for torrent {InfoHash} in Saga {SagaId}", context.Saga.Torrent.InfoHash, context.Saga.CorrelationId); logger.LogInformation("Metadata Written for torrent {InfoHash} in Saga {SagaId}", context.Saga.Torrent.InfoHash, context.Saga.CorrelationId);
}) })
.TransitionTo(Completed) .TransitionTo(Completed)

View File

@@ -1,22 +1,24 @@
namespace QBitCollector.Features.Worker; namespace QBitCollector.Features.Worker;
[EntityName("perform-metadata-request")] [EntityName("perform-metadata-request-qbit-collector")]
public record PerformQbitMetadataRequest(Guid CorrelationId, string InfoHash) : CorrelatedBy<Guid>; public record PerformQbitMetadataRequest(Guid CorrelationId, string InfoHash) : CorrelatedBy<Guid>;
[EntityName("torrent-metadata-response")] [EntityName("torrent-metadata-response-qbit-collector")]
public record GotQbitMetadata(QBitMetadataResponse Metadata) : CorrelatedBy<Guid> public record GotQbitMetadata(QBitMetadataResponse Metadata) : CorrelatedBy<Guid>
{ {
public Guid CorrelationId { get; init; } = Metadata.CorrelationId; public Guid CorrelationId { get; init; } = Metadata.CorrelationId;
} }
[EntityName("write-metadata")] [EntityName("write-metadata-qbit-collector")]
public record WriteQbitMetadata(Torrent Torrent, QBitMetadataResponse Metadata, string ImdbId) : CorrelatedBy<Guid> public record WriteQbitMetadata(Torrent Torrent, QBitMetadataResponse Metadata, string ImdbId) : CorrelatedBy<Guid>
{ {
public Guid CorrelationId { get; init; } = Metadata.CorrelationId; public Guid CorrelationId { get; init; } = Metadata.CorrelationId;
} }
[EntityName("metadata-written")] [EntityName("metadata-written-qbit-collector")]
public record QbitMetadataWritten(QBitMetadataResponse Metadata) : CorrelatedBy<Guid> public record QbitMetadataWritten(QBitMetadataResponse Metadata, bool WithFiles) : CorrelatedBy<Guid>
{ {
public Guid CorrelationId { get; init; } = Metadata.CorrelationId; public Guid CorrelationId { get; init; } = Metadata.CorrelationId;
public QBitMetadataResponse Metadata { get; init; } = Metadata;
} }

View File

@@ -1,25 +1,36 @@
namespace QBitCollector.Features.Worker; namespace QBitCollector.Features.Worker;
public class WriteQbitMetadataConsumer(IParseTorrentTitle parseTorrentTitle, IDataStorage dataStorage) : IConsumer<WriteQbitMetadata> public class WriteQbitMetadataConsumer(IRankTorrentName rankTorrentName, IDataStorage dataStorage, ILogger<WriteQbitMetadataConsumer> logger) : IConsumer<WriteQbitMetadata>
{ {
public async Task Consume(ConsumeContext<WriteQbitMetadata> context) public async Task Consume(ConsumeContext<WriteQbitMetadata> context)
{ {
var request = context.Message; var request = context.Message;
var torrentFiles = QbitMetaToTorrentMeta.MapMetadataToFilesCollection(parseTorrentTitle, request.Torrent, request.ImdbId, request.Metadata.Metadata); if (request.Metadata.Metadata.Count == 0)
if (torrentFiles.Any())
{ {
await dataStorage.InsertFiles(torrentFiles); await context.Publish(new QbitMetadataWritten(request.Metadata, false));
return;
var subtitles = await QbitMetaToTorrentMeta.MapMetadataToSubtitlesCollection(dataStorage, request.Torrent.InfoHash, request.Metadata.Metadata);
if (subtitles.Any())
{
await dataStorage.InsertSubtitles(subtitles);
}
} }
await context.Publish(new QbitMetadataWritten(request.Metadata)); var torrentFiles = QbitMetaToTorrentMeta.MapMetadataToFilesCollection(
rankTorrentName, request.Torrent, request.ImdbId, request.Metadata.Metadata, logger);
if (!torrentFiles.Any())
{
await context.Publish(new QbitMetadataWritten(request.Metadata, false));
return;
}
await dataStorage.InsertFiles(torrentFiles);
var subtitles = await QbitMetaToTorrentMeta.MapMetadataToSubtitlesCollection(
dataStorage, request.Torrent.InfoHash, request.Metadata.Metadata, logger);
if (subtitles.Any())
{
await dataStorage.InsertSubtitles(subtitles);
}
await context.Publish(new QbitMetadataWritten(request.Metadata, true));
} }
} }

View File

@@ -1,17 +1,11 @@
// Global using directives // Global using directives
global using System.Text.Json; global using System.Text.Json;
global using System.Text.Json.Serialization;
global using System.Threading.Channels;
global using MassTransit; global using MassTransit;
global using MassTransit.Mediator;
global using Microsoft.AspNetCore.Builder; global using Microsoft.AspNetCore.Builder;
global using Microsoft.Extensions.Caching.Distributed; global using Microsoft.Extensions.Caching.Distributed;
global using Microsoft.Extensions.Caching.Memory; global using Microsoft.Extensions.Caching.Memory;
global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.DependencyInjection;
global using Polly;
global using Polly.Extensions.Http;
global using PromKnight.ParseTorrentTitle;
global using QBitCollector.Extensions; global using QBitCollector.Extensions;
global using QBitCollector.Features.Qbit; global using QBitCollector.Features.Qbit;
global using QBitCollector.Features.Trackers; global using QBitCollector.Features.Trackers;
@@ -21,4 +15,6 @@ global using SharedContracts.Configuration;
global using SharedContracts.Dapper; global using SharedContracts.Dapper;
global using SharedContracts.Extensions; global using SharedContracts.Extensions;
global using SharedContracts.Models; global using SharedContracts.Models;
global using SharedContracts.Python;
global using SharedContracts.Python.RTN;
global using SharedContracts.Requests; global using SharedContracts.Requests;

View File

@@ -18,7 +18,6 @@
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.3" /> <PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.3" />
<PackageReference Include="Polly" Version="8.3.1" /> <PackageReference Include="Polly" Version="8.3.1" />
<PackageReference Include="PromKnight.ParseTorrentTitle" Version="1.0.4" />
<PackageReference Include="QBittorrent.Client" Version="1.9.23349.1" /> <PackageReference Include="QBittorrent.Client" Version="1.9.23349.1" />
<PackageReference Include="Serilog" Version="3.1.1" /> <PackageReference Include="Serilog" Version="3.1.1" />
<PackageReference Include="Serilog.AspNetCore" Version="8.0.1" /> <PackageReference Include="Serilog.AspNetCore" Version="8.0.1" />
@@ -31,10 +30,30 @@
<None Include="Configuration\logging.json"> <None Include="Configuration\logging.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory> <CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None> </None>
<Content Remove="eng\**" />
<None Remove="eng\**" />
<None Update="requirements.txt">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\shared\SharedContracts.csproj" /> <ProjectReference Include="..\shared\SharedContracts.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup Condition="'$(Configuration)' == 'Debug'">
<Content Remove="python\**" />
<None Include="python\**">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<Compile Remove="eng\**" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Remove="eng\**" />
</ItemGroup>
</Project> </Project>

View File

@@ -6,6 +6,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "shared", "shared", "{2C0A0F
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QBitCollector", "QBitCollector.csproj", "{1EF124BE-6EBE-4D9E-846C-FFF814999F3B}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QBitCollector", "QBitCollector.csproj", "{1EF124BE-6EBE-4D9E-846C-FFF814999F3B}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "eng", "eng", "{2F2EA33A-1303-405D-939B-E9394D262BC9}"
ProjectSection(SolutionItems) = preProject
eng\install-python-reqs.ps1 = eng\install-python-reqs.ps1
eng\install-python-reqs.sh = eng\install-python-reqs.sh
EndProjectSection
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU

View File

@@ -0,0 +1,3 @@
Remove-Item -Recurse -Force ../python
mkdir -p ../python
python -m pip install -r ../requirements.txt -t ../python/

View File

@@ -0,0 +1,5 @@
#!/bin/bash
rm -rf ../python
mkdir -p ../python
python3 -m pip install -r ../requirements.txt -t ../python/

View File

@@ -0,0 +1 @@
rank-torrent-name==0.2.13

View File

@@ -9,9 +9,9 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
const string query = const string query =
""" """
INSERT INTO ingested_torrents INSERT INTO ingested_torrents
("name", "source", "category", "info_hash", "size", "seeders", "leechers", "imdb", "processed", "createdAt", "updatedAt") ("name", "source", "category", "info_hash", "size", "seeders", "leechers", "imdb", "processed", "createdAt", "updatedAt", "rtn_response")
VALUES VALUES
(@Name, @Source, @Category, @InfoHash, @Size, @Seeders, @Leechers, @Imdb, @Processed, @CreatedAt, @UpdatedAt) (@Name, @Source, @Category, @InfoHash, @Size, @Seeders, @Leechers, @Imdb, @Processed, @CreatedAt, @UpdatedAt, @RtnResponse::jsonb)
ON CONFLICT (source, info_hash) DO NOTHING ON CONFLICT (source, info_hash) DO NOTHING
"""; """;
@@ -110,21 +110,21 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
public async Task<List<ImdbEntry>> GetImdbEntriesForRequests(int year, int batchSize, string? stateLastProcessedImdbId, CancellationToken cancellationToken = default) => public async Task<List<ImdbEntry>> GetImdbEntriesForRequests(int year, int batchSize, string? stateLastProcessedImdbId, CancellationToken cancellationToken = default) =>
await ExecuteCommandAsync(async connection => await ExecuteCommandAsync(async connection =>
{ {
const string query = @"SELECT imdb_id AS ImdbId, title as Title, category as Category, year as Year, adult as Adult FROM imdb_metadata WHERE CAST(NULLIF(Year, '\N') AS INTEGER) <= @Year AND imdb_id > @LastProcessedImdbId ORDER BY ImdbId LIMIT @BatchSize"; const string query = @"SELECT imdb_id AS ImdbId, title as Title, category as Category, year as Year, adult as Adult FROM imdb_metadata WHERE Year <= @Year AND imdb_id > @LastProcessedImdbId ORDER BY ImdbId LIMIT @BatchSize";
var result = await connection.QueryAsync<ImdbEntry>(query, new { Year = year, LastProcessedImdbId = stateLastProcessedImdbId, BatchSize = batchSize }); var result = await connection.QueryAsync<ImdbEntry>(query, new { Year = year, LastProcessedImdbId = stateLastProcessedImdbId, BatchSize = batchSize });
return result.ToList(); return result.ToList();
}, "Error getting imdb metadata.", cancellationToken); }, "Error getting imdb metadata.", cancellationToken);
public async Task<List<ImdbEntry>> FindImdbMetadata(string? parsedTorrentTitle, string torrentType, string? year, CancellationToken cancellationToken = default) => public async Task<ImdbEntry?> FindImdbMetadata(string? parsedTorrentTitle, string torrentType, int? year, CancellationToken cancellationToken = default) =>
await ExecuteCommandAsync(async connection => await ExecuteCommandAsync(async connection =>
{ {
var query = $"select \"imdb_id\" as \"ImdbId\", \"title\" as \"Title\", \"year\" as \"Year\" from search_imdb_meta('{parsedTorrentTitle.Replace("'", "").Replace("\"", "")}', '{(torrentType.Equals("movie", StringComparison.OrdinalIgnoreCase) ? "movie" : "tvSeries")}'"; var query = $"select \"imdb_id\" as \"ImdbId\", \"title\" as \"Title\", \"year\" as \"Year\", \"score\" as Score, \"category\" as Category from search_imdb_meta('{parsedTorrentTitle.Replace("'", "").Replace("\"", "")}', '{torrentType}'";
query += year is not null ? $", '{year}'" : ", NULL"; query += year is not null ? $", {year}" : ", NULL";
query += ", 15)"; query += ", 1)";
var result = await connection.QueryAsync<ImdbEntry>(query); var result = await connection.QueryAsync<ImdbEntry>(query);
var results = result.ToList();
return result.ToList(); return results.FirstOrDefault();
}, "Error finding imdb metadata.", cancellationToken); }, "Error finding imdb metadata.", cancellationToken);
public Task InsertTorrent(Torrent torrent, CancellationToken cancellationToken = default) => public Task InsertTorrent(Torrent torrent, CancellationToken cancellationToken = default) =>
@@ -134,9 +134,9 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
const string query = const string query =
""" """
INSERT INTO "torrents" INSERT INTO "torrents"
("infoHash", "provider", "torrentId", "title", "size", "type", "uploadDate", "seeders", "trackers", "languages", "resolution", "reviewed", "opened", "createdAt", "updatedAt") ("infoHash", "ingestedTorrentId", "provider", "title", "size", "type", "uploadDate", "seeders", "languages", "resolution", "reviewed", "opened", "createdAt", "updatedAt")
VALUES VALUES
(@InfoHash, @Provider, @TorrentId, @Title, 0, @Type, NOW(), @Seeders, NULL, NULL, NULL, false, false, NOW(), NOW()) (@InfoHash, @IngestedTorrentId, @Provider, @Title, 0, @Type, NOW(), @Seeders, NULL, NULL, false, false, NOW(), NOW())
ON CONFLICT ("infoHash") DO NOTHING ON CONFLICT ("infoHash") DO NOTHING
"""; """;
@@ -152,7 +152,8 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
INSERT INTO files INSERT INTO files
("infoHash", "fileIndex", title, "size", "imdbId", "imdbSeason", "imdbEpisode", "kitsuId", "kitsuEpisode", "createdAt", "updatedAt") ("infoHash", "fileIndex", title, "size", "imdbId", "imdbSeason", "imdbEpisode", "kitsuId", "kitsuEpisode", "createdAt", "updatedAt")
VALUES VALUES
(@InfoHash, @FileIndex, @Title, @Size, @ImdbId, @ImdbSeason, @ImdbEpisode, @KitsuId, @KitsuEpisode, Now(), Now()); (@InfoHash, @FileIndex, @Title, @Size, @ImdbId, @ImdbSeason, @ImdbEpisode, @KitsuId, @KitsuEpisode, Now(), Now())
ON CONFLICT ("infoHash", "fileIndex") DO NOTHING;
"""; """;
await connection.ExecuteAsync(query, files); await connection.ExecuteAsync(query, files);
@@ -168,11 +169,7 @@ public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConf
("infoHash", "fileIndex", "fileId", "title") ("infoHash", "fileIndex", "fileId", "title")
VALUES VALUES
(@InfoHash, @FileIndex, @FileId, @Title) (@InfoHash, @FileIndex, @FileId, @Title)
ON CONFLICT ON CONFLICT ("infoHash", "fileIndex") DO NOTHING;
("infoHash", "fileIndex")
DO UPDATE SET
"fileId" = COALESCE(subtitles."fileId", EXCLUDED."fileId"),
"title" = COALESCE(subtitles."title", EXCLUDED."title");
"""; """;
await connection.ExecuteAsync(query, subtitles); await connection.ExecuteAsync(query, subtitles);

View File

@@ -9,7 +9,7 @@ public interface IDataStorage
Task<DapperResult<PageIngestedResult, PageIngestedResult>> MarkPageAsIngested(string pageId, CancellationToken cancellationToken = default); Task<DapperResult<PageIngestedResult, PageIngestedResult>> MarkPageAsIngested(string pageId, CancellationToken cancellationToken = default);
Task<DapperResult<int, int>> GetRowCountImdbMetadata(CancellationToken cancellationToken = default); Task<DapperResult<int, int>> GetRowCountImdbMetadata(CancellationToken cancellationToken = default);
Task<List<ImdbEntry>> GetImdbEntriesForRequests(int year, int batchSize, string? stateLastProcessedImdbId, CancellationToken cancellationToken = default); Task<List<ImdbEntry>> GetImdbEntriesForRequests(int year, int batchSize, string? stateLastProcessedImdbId, CancellationToken cancellationToken = default);
Task<List<ImdbEntry>> FindImdbMetadata(string? parsedTorrentTitle, string parsedTorrentTorrentType, string? parsedTorrentYear, CancellationToken cancellationToken = default); Task<ImdbEntry?> FindImdbMetadata(string? parsedTorrentTitle, string parsedTorrentTorrentType, int? parsedTorrentYear, CancellationToken cancellationToken = default);
Task InsertTorrent(Torrent torrent, CancellationToken cancellationToken = default); Task InsertTorrent(Torrent torrent, CancellationToken cancellationToken = default);
Task InsertFiles(IEnumerable<TorrentFile> files, CancellationToken cancellationToken = default); Task InsertFiles(IEnumerable<TorrentFile> files, CancellationToken cancellationToken = default);
Task InsertSubtitles(IEnumerable<SubtitleFile> subtitles, CancellationToken cancellationToken = default); Task InsertSubtitles(IEnumerable<SubtitleFile> subtitles, CancellationToken cancellationToken = default);

View File

@@ -0,0 +1,19 @@
namespace SharedContracts.Extensions;
public static class DictionaryExtensions
{
public static ConcurrentDictionary<TKey, TValue> ToConcurrentDictionary<TSource, TKey, TValue>(
this IEnumerable<TSource> source,
Func<TSource, TKey> keySelector,
Func<TSource, TValue> valueSelector) where TKey : notnull
{
var concurrentDictionary = new ConcurrentDictionary<TKey, TValue>();
foreach (var element in source)
{
concurrentDictionary.TryAdd(keySelector(element), valueSelector(element));
}
return concurrentDictionary;
}
}

View File

@@ -0,0 +1,14 @@
namespace SharedContracts.Extensions;
public static class JsonExtensions
{
private static readonly JsonSerializerOptions JsonSerializerOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false,
ReferenceHandler = ReferenceHandler.IgnoreCycles,
NumberHandling = JsonNumberHandling.Strict,
};
public static string AsJson<T>(this T obj) => JsonSerializer.Serialize(obj, JsonSerializerOptions);
}

View File

@@ -1,5 +1,3 @@
using System.Text.RegularExpressions;
namespace SharedContracts.Extensions; namespace SharedContracts.Extensions;
public static partial class StringExtensions public static partial class StringExtensions

View File

@@ -1,6 +1,9 @@
// Global using directives // Global using directives
global using System.Collections.Concurrent;
global using System.Text.Json; global using System.Text.Json;
global using System.Text.Json.Serialization;
global using System.Text.RegularExpressions;
global using Dapper; global using Dapper;
global using MassTransit; global using MassTransit;
global using Microsoft.AspNetCore.Builder; global using Microsoft.AspNetCore.Builder;
@@ -14,4 +17,4 @@ global using Python.Runtime;
global using Serilog; global using Serilog;
global using SharedContracts.Configuration; global using SharedContracts.Configuration;
global using SharedContracts.Extensions; global using SharedContracts.Extensions;
global using SharedContracts.Models; global using SharedContracts.Models;

View File

@@ -7,4 +7,5 @@ public class ImdbEntry
public string? Category { get; set; } public string? Category { get; set; }
public string? Year { get; set; } public string? Year { get; set; }
public bool? Adult { get; set; } public bool? Adult { get; set; }
public decimal? Score { get; set; }
} }

View File

@@ -12,7 +12,9 @@ public class IngestedTorrent
public int Leechers { get; set; } public int Leechers { get; set; }
public string? Imdb { get; set; } public string? Imdb { get; set; }
public bool Processed { get; set; } = false; public bool Processed { get; set; }
public DateTime CreatedAt { get; set; } = DateTime.UtcNow; public DateTime CreatedAt { get; set; } = DateTime.UtcNow;
public DateTime UpdatedAt { get; set; } = DateTime.UtcNow; public DateTime UpdatedAt { get; set; } = DateTime.UtcNow;
public string? RtnResponse { get; set; }
} }

View File

@@ -3,6 +3,7 @@ namespace SharedContracts.Models;
public class Torrent public class Torrent
{ {
public string? InfoHash { get; set; } public string? InfoHash { get; set; }
public long? IngestedTorrentId { get; set; }
public string? Provider { get; set; } public string? Provider { get; set; }
public string? TorrentId { get; set; } public string? TorrentId { get; set; }
public string? Title { get; set; } public string? Title { get; set; }

View File

@@ -0,0 +1,13 @@
namespace SharedContracts.Python;
public interface IPythonEngineService
{
ILogger<PythonEngineService> Logger { get; }
Task InitializePythonEngine(CancellationToken cancellationToken);
T ExecuteCommandOrScript<T>(string command, PyModule module, bool throwOnErrors);
T ExecutePythonOperation<T>(Func<T> operation, string operationName, bool throwOnErrors);
T ExecutePythonOperationWithDefault<T>(Func<T> operation, T? defaultValue, string operationName, bool throwOnErrors, bool logErrors);
Task StopPythonEngine(CancellationToken cancellationToken);
dynamic? Sys { get; }
}

View File

@@ -0,0 +1,8 @@
namespace SharedContracts.Python;
public class PythonEngineManager(IPythonEngineService pythonEngineService) : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken) => pythonEngineService.InitializePythonEngine(cancellationToken);
public Task StopAsync(CancellationToken cancellationToken) => pythonEngineService.StopPythonEngine(cancellationToken);
}

View File

@@ -1,24 +1,28 @@
namespace SharedContracts.Python; namespace SharedContracts.Python;
public class PythonEngineService(ILogger<PythonEngineService> logger) : IHostedService public class PythonEngineService(ILogger<PythonEngineService> logger) : IPythonEngineService
{ {
private IntPtr _mainThreadState; private IntPtr _mainThreadState;
private bool _isInitialized; private bool _isInitialized;
public Task StartAsync(CancellationToken cancellationToken) public ILogger<PythonEngineService> Logger { get; } = logger;
public dynamic? Sys { get; private set; }
public Task InitializePythonEngine(CancellationToken cancellationToken)
{ {
if (_isInitialized) if (_isInitialized)
{ {
return Task.CompletedTask; return Task.CompletedTask;
} }
try try
{ {
var pythonDllEnv = Environment.GetEnvironmentVariable("PYTHONNET_PYDLL"); var pythonDllEnv = Environment.GetEnvironmentVariable("PYTHONNET_PYDLL");
if (string.IsNullOrWhiteSpace(pythonDllEnv)) if (string.IsNullOrWhiteSpace(pythonDllEnv))
{ {
logger.LogWarning("PYTHONNET_PYDLL env is not set. Exiting Application"); Logger.LogWarning("PYTHONNET_PYDLL env is not set. Exiting Application");
Environment.Exit(1); Environment.Exit(1);
return Task.CompletedTask; return Task.CompletedTask;
} }
@@ -26,24 +30,95 @@ public class PythonEngineService(ILogger<PythonEngineService> logger) : IHostedS
Runtime.PythonDLL = pythonDllEnv; Runtime.PythonDLL = pythonDllEnv;
PythonEngine.Initialize(); PythonEngine.Initialize();
_mainThreadState = PythonEngine.BeginAllowThreads(); _mainThreadState = PythonEngine.BeginAllowThreads();
_isInitialized = true; _isInitialized = true;
logger.LogInformation("Python engine initialized"); Logger.LogInformation("Python engine initialized");
} }
catch (Exception e) catch (Exception e)
{ {
logger.LogWarning(e, "Failed to initialize Python engine"); Logger.LogError(e, $"Failed to initialize Python engine: {e.Message}");
Environment.Exit(1); Environment.Exit(1);
} }
return Task.CompletedTask; return Task.CompletedTask;
} }
public Task StopAsync(CancellationToken cancellationToken) public T ExecuteCommandOrScript<T>(string command, PyModule module, bool throwOnErrors) =>
ExecutePythonOperation(
() =>
{
var pyCompile = PythonEngine.Compile(command);
var nativeResult = module.Execute(pyCompile);
return nativeResult.As<T>();
}, nameof(ExecuteCommandOrScript), throwOnErrors);
public T ExecutePythonOperation<T>(Func<T> operation, string operationName, bool throwOnErrors) =>
ExecutePythonOperationWithDefault(operation, default, operationName, throwOnErrors, true);
public T ExecutePythonOperationWithDefault<T>(Func<T> operation, T? defaultValue, string operationName, bool throwOnErrors, bool logErrors) =>
ExecutePythonOperationInternal(operation, defaultValue, operationName, throwOnErrors, logErrors);
public void ExecuteOnGIL(Action act, bool throwOnErrors)
{
Sys ??= LoadSys();
try
{
using var gil = Py.GIL();
act();
}
catch (Exception ex)
{
Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(ExecuteOnGIL));
if (throwOnErrors)
{
throw;
}
}
}
public Task StopPythonEngine(CancellationToken cancellationToken)
{ {
PythonEngine.EndAllowThreads(_mainThreadState); PythonEngine.EndAllowThreads(_mainThreadState);
PythonEngine.Shutdown(); PythonEngine.Shutdown();
return Task.CompletedTask; return Task.CompletedTask;
} }
private static dynamic LoadSys()
{
using var gil = Py.GIL();
var sys = Py.Import("sys");
return sys;
}
// ReSharper disable once EntityNameCapturedOnly.Local
private T ExecutePythonOperationInternal<T>(Func<T> operation, T? defaultValue, string operationName, bool throwOnErrors, bool logErrors)
{
Sys ??= LoadSys();
var result = defaultValue;
try
{
using var gil = Py.GIL();
result = operation();
}
catch (Exception ex)
{
if (logErrors)
{
Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(operationName));
}
if (throwOnErrors)
{
throw;
}
}
return result;
}
} }

View File

@@ -2,7 +2,6 @@ namespace SharedContracts.Python.RTN;
public interface IRankTorrentName public interface IRankTorrentName
{ {
ParseTorrentTitleResponse Parse(string title); ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
bool IsTrash(string title); List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false);
bool TitleMatch(string title, string checkTitle);
} }

View File

@@ -1,6 +1,3 @@
namespace SharedContracts.Python.RTN; namespace SharedContracts.Python.RTN;
public record ParseTorrentTitleResponse(bool Success, string ParsedTitle, int Year, int[]? Season = null, int[]? Episode = null) public record ParseTorrentTitleResponse(bool Success, RtnResponse? Response);
{
public bool IsMovie => Season == null && Episode == null;
}

View File

@@ -2,117 +2,119 @@ namespace SharedContracts.Python.RTN;
public class RankTorrentName : IRankTorrentName public class RankTorrentName : IRankTorrentName
{ {
private const string SysModuleName = "sys"; private readonly IPythonEngineService _pythonEngineService;
private const string RtnModuleName = "RTN"; private const string RtnModuleName = "RTN";
private readonly ILogger<RankTorrentName> _logger;
private dynamic? _sys;
private dynamic? _rtn; private dynamic? _rtn;
public RankTorrentName(ILogger<RankTorrentName> logger) public RankTorrentName(IPythonEngineService pythonEngineService)
{ {
_logger = logger; _pythonEngineService = pythonEngineService;
InitModules(); InitModules();
} }
public ParseTorrentTitleResponse Parse(string title, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
public ParseTorrentTitleResponse Parse(string title)
{ {
try try
{ {
using var py = Py.GIL(); using var gil = Py.GIL();
var result = _rtn?.parse(title); var result = _rtn?.parse(title, trashGarbage);
if (result == null)
{
return new(false, string.Empty, 0);
}
return ParseResult(result); return ParseResult(result);
} }
catch (Exception e) catch (Exception ex)
{ {
_logger.LogError(e, "Failed to parse title"); if (logErrors)
return new(false, string.Empty, 0); {
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
}
if (throwOnErrors)
{
throw;
}
return new(false, null);
} }
} }
public List<ParseTorrentTitleResponse?> BatchParse(IReadOnlyCollection<string> titles, int chunkSize = 500, int workers = 20, bool trashGarbage = true, bool logErrors = false, bool throwOnErrors = false)
{
var responses = new List<ParseTorrentTitleResponse?>();
public bool IsTrash(string title) try
{
if (titles.Count == 0)
{
return responses;
}
using var gil = Py.GIL();
var pythonList = new PyList(titles.Select(x => new PyString(x).As<PyObject>()).ToArray());
PyList results = _rtn?.batch_parse(pythonList, trashGarbage, chunkSize, workers);
if (results == null)
{
return responses;
}
responses.AddRange(results.Select(ParseResult));
}
catch (Exception ex)
{
if (logErrors)
{
_pythonEngineService.Logger.LogError(ex, "Python Error: {Message} ({OperationName})", ex.Message, nameof(Parse));
}
if (throwOnErrors)
{
throw;
}
}
return responses;
}
private static ParseTorrentTitleResponse? ParseResult(dynamic result)
{ {
try try
{ {
using var py = Py.GIL();
var result = _rtn?.check_trash(title);
if (result == null) if (result == null)
{ {
return false; return new(false, null);
} }
var response = result.As<bool>() ?? false; var json = result.model_dump_json()?.As<string?>();
return response;
}
catch (Exception e)
{
_logger.LogError(e, "Failed to parse title");
return false;
}
}
public bool TitleMatch(string title, string checkTitle)
{
try
{
using var py = Py.GIL();
var result = _rtn?.title_match(title, checkTitle);
if (result == null) if (json is null || string.IsNullOrEmpty(json))
{ {
return false; return new(false, null);
} }
var response = result.As<bool>() ?? false; var mediaType = result.GetAttr("type")?.As<string>();
return response; if (string.IsNullOrEmpty(mediaType))
{
return new(false, null);
}
var response = JsonSerializer.Deserialize<RtnResponse>(json);
response.IsMovie = mediaType.Equals("movie", StringComparison.OrdinalIgnoreCase);
return new(true, response);
} }
catch (Exception e) catch
{ {
_logger.LogError(e, "Failed to parse title"); return new(false, null);
return false;
} }
} }
private static ParseTorrentTitleResponse ParseResult(dynamic result)
{
var parsedTitle = result.GetAttr("parsed_title")?.As<string>() ?? string.Empty;
var year = result.GetAttr("year")?.As<int>() ?? 0;
var seasonList = result.GetAttr("season")?.As<PyList>();
var episodeList = result.GetAttr("episode")?.As<PyList>();
int[]? seasons = seasonList?.Length() > 0 ? seasonList.As<int[]>() : null;
int[]? episodes = episodeList?.Length() > 0 ? episodeList.As<int[]>() : null;
return new ParseTorrentTitleResponse(true, parsedTitle, year, seasons, episodes); private void InitModules() =>
} _rtn =
_pythonEngineService.ExecutePythonOperation(() =>
private void InitModules()
{
using var py = Py.GIL();
_sys = Py.Import(SysModuleName);
if (_sys == null)
{ {
_logger.LogError($"Failed to import Python module: {SysModuleName}"); _pythonEngineService.Sys.path.append(Path.Combine(AppContext.BaseDirectory, "python"));
return; return Py.Import(RtnModuleName);
} }, nameof(InitModules), throwOnErrors: false);
_sys.path.append(Path.Combine(AppContext.BaseDirectory, "python"));
_rtn = Py.Import(RtnModuleName);
if (_rtn == null)
{
_logger.LogError($"Failed to import Python module: {RtnModuleName}");
}
}
} }

View File

@@ -0,0 +1,83 @@
namespace SharedContracts.Python.RTN;
public class RtnResponse
{
[JsonPropertyName("raw_title")]
public string? RawTitle { get; set; }
[JsonPropertyName("parsed_title")]
public string? ParsedTitle { get; set; }
[JsonPropertyName("fetch")]
public bool Fetch { get; set; }
[JsonPropertyName("is_4k")]
public bool Is4K { get; set; }
[JsonPropertyName("is_multi_audio")]
public bool IsMultiAudio { get; set; }
[JsonPropertyName("is_multi_subtitle")]
public bool IsMultiSubtitle { get; set; }
[JsonPropertyName("is_complete")]
public bool IsComplete { get; set; }
[JsonPropertyName("year")]
public int Year { get; set; }
[JsonPropertyName("resolution")]
public List<string>? Resolution { get; set; }
[JsonPropertyName("quality")]
public List<string>? Quality { get; set; }
[JsonPropertyName("season")]
public List<int>? Season { get; set; }
[JsonPropertyName("episode")]
public List<int>? Episode { get; set; }
[JsonPropertyName("codec")]
public List<string>? Codec { get; set; }
[JsonPropertyName("audio")]
public List<string>? Audio { get; set; }
[JsonPropertyName("subtitles")]
public List<string>? Subtitles { get; set; }
[JsonPropertyName("language")]
public List<string>? Language { get; set; }
[JsonPropertyName("bit_depth")]
public List<int>? BitDepth { get; set; }
[JsonPropertyName("hdr")]
public string? Hdr { get; set; }
[JsonPropertyName("proper")]
public bool Proper { get; set; }
[JsonPropertyName("repack")]
public bool Repack { get; set; }
[JsonPropertyName("remux")]
public bool Remux { get; set; }
[JsonPropertyName("upscaled")]
public bool Upscaled { get; set; }
[JsonPropertyName("remastered")]
public bool Remastered { get; set; }
[JsonPropertyName("directors_cut")]
public bool DirectorsCut { get; set; }
[JsonPropertyName("extended")]
public bool Extended { get; set; }
public bool IsMovie { get; set; }
public string ToJson() => this.AsJson();
}

View File

@@ -4,9 +4,8 @@ public static class ServiceCollectionExtensions
{ {
public static IServiceCollection RegisterPythonEngine(this IServiceCollection services) public static IServiceCollection RegisterPythonEngine(this IServiceCollection services)
{ {
services.AddSingleton<PythonEngineService>(); services.AddSingleton<IPythonEngineService, PythonEngineService>();
services.AddHostedService<PythonEngineManager>();
services.AddHostedService(p => p.GetRequiredService<PythonEngineService>());
return services; return services;
} }

View File

@@ -82,11 +82,4 @@ public static class ServiceCollectionExtensions
x.AddConsumer<PerformIngestionConsumer>(); x.AddConsumer<PerformIngestionConsumer>();
} }
internal static IServiceCollection AddServiceConfiguration(this IServiceCollection services)
{
services.AddSingleton<IParseTorrentTitle, ParseTorrentTitle>();
return services;
}
} }

View File

@@ -11,6 +11,7 @@ public class PerformIngestionConsumer(IDataStorage dataStorage, ILogger<PerformI
var torrent = new Torrent var torrent = new Torrent
{ {
InfoHash = request.IngestedTorrent.InfoHash.ToLowerInvariant(), InfoHash = request.IngestedTorrent.InfoHash.ToLowerInvariant(),
IngestedTorrentId = request.IngestedTorrent.Id,
Provider = request.IngestedTorrent.Source, Provider = request.IngestedTorrent.Source,
Title = request.IngestedTorrent.Name, Title = request.IngestedTorrent.Name,
Type = request.IngestedTorrent.Category, Type = request.IngestedTorrent.Category,

View File

@@ -5,7 +5,6 @@ global using MassTransit;
global using MassTransit.Mediator; global using MassTransit.Mediator;
global using Microsoft.AspNetCore.Builder; global using Microsoft.AspNetCore.Builder;
global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.DependencyInjection;
global using PromKnight.ParseTorrentTitle;
global using SharedContracts.Configuration; global using SharedContracts.Configuration;
global using SharedContracts.Dapper; global using SharedContracts.Dapper;
global using SharedContracts.Extensions; global using SharedContracts.Extensions;

View File

@@ -10,7 +10,6 @@ builder.Host
builder.Services builder.Services
.RegisterMassTransit() .RegisterMassTransit()
.AddServiceConfiguration()
.AddDatabase(); .AddDatabase();
var app = builder.Build(); var app = builder.Build();

View File

@@ -16,7 +16,6 @@
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" /> <PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.3" /> <PackageReference Include="Microsoft.Extensions.Http.Polly" Version="8.0.3" />
<PackageReference Include="Polly" Version="8.3.1" /> <PackageReference Include="Polly" Version="8.3.1" />
<PackageReference Include="PromKnight.ParseTorrentTitle" Version="1.0.4" />
<PackageReference Include="Serilog" Version="3.1.1" /> <PackageReference Include="Serilog" Version="3.1.1" />
<PackageReference Include="Serilog.AspNetCore" Version="8.0.1" /> <PackageReference Include="Serilog.AspNetCore" Version="8.0.1" />
<PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" /> <PackageReference Include="Serilog.Sinks.Console" Version="5.0.1" />