Woke up to see a discussion about torrentio scraping: powered by community

Was a little inspired. Now we have a database (self populating) of imdb id's - why shouldn't we actually have the ability to scrape any other instance of torrentio, or knightcrawler?

Also restructured the producer to be vertically sliced to make it easier to work with
Too much flicking back and forth between Jobs and Crawlers when configuring
This commit is contained in:
iPromKnight
2024-03-02 18:41:57 +00:00
parent 98115e0cf7
commit 95fa48c851
59 changed files with 733 additions and 261 deletions

View File

@@ -0,0 +1,133 @@
namespace Producer.Features.DataProcessing;
public class DapperDataStorage(PostgresConfiguration configuration, RabbitMqConfiguration rabbitConfig, ILogger<DapperDataStorage> logger) : IDataStorage
{
private const string InsertTorrentSql =
"""
INSERT INTO ingested_torrents (name, source, category, info_hash, size, seeders, leechers, imdb, processed, "createdAt", "updatedAt")
VALUES (@Name, @Source, @Category, @InfoHash, @Size, @Seeders, @Leechers, @Imdb, @Processed, @CreatedAt, @UpdatedAt)
ON CONFLICT (source, info_hash) DO NOTHING
""";
private const string InsertIngestedPageSql =
"""
INSERT INTO ingested_pages (url, "createdAt", "updatedAt")
VALUES (@Url, @CreatedAt, @UpdatedAt)
""";
private const string GetMovieAndSeriesTorrentsNotProcessedSql =
"""
SELECT
id as "Id",
name as "Name",
source as "Source",
category as "Category",
info_hash as "InfoHash",
size as "Size",
seeders as "Seeders",
leechers as "Leechers",
imdb as "Imdb",
processed as "Processed",
"createdAt" as "CreatedAt",
"updatedAt" as "UpdatedAt"
FROM ingested_torrents
WHERE processed = false AND category != 'xxx'
""";
private const string UpdateProcessedSql =
"""
UPDATE ingested_torrents
Set
processed = true,
"updatedAt" = @UpdatedAt
WHERE id = @Id
""";
public async Task<InsertTorrentResult> InsertTorrents(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default)
{
try
{
await using var connection = new NpgsqlConnection(configuration.StorageConnectionString);
await connection.OpenAsync(cancellationToken);
var inserted = await connection.ExecuteAsync(InsertTorrentSql, torrents);
return new(true, inserted);
}
catch (Exception e)
{
return new(false, 0, e.Message);
}
}
public async Task<IReadOnlyCollection<Torrent>> GetPublishableTorrents(CancellationToken cancellationToken = default)
{
try
{
await using var connection = new NpgsqlConnection(configuration.StorageConnectionString);
await connection.OpenAsync(cancellationToken);
var torrents = await connection.QueryAsync<Torrent>(GetMovieAndSeriesTorrentsNotProcessedSql);
return torrents.Take(rabbitConfig.MaxPublishBatchSize).ToList();
}
catch (Exception e)
{
logger.LogError(e, "Error while getting publishable torrents from database");
return new List<Torrent>();
}
}
public async Task<UpdatedTorrentResult> SetTorrentsProcessed(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default)
{
try
{
await using var connection = new NpgsqlConnection(configuration.StorageConnectionString);
await connection.OpenAsync(cancellationToken);
foreach (var torrent in torrents)
{
torrent.UpdatedAt = DateTime.UtcNow;
}
var updated = await connection.ExecuteAsync(UpdateProcessedSql, torrents);
return new(true, updated);
}
catch (Exception e)
{
return new(false, 0, e.Message);
}
}
public async Task<bool> PageIngested(string pageId, CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(configuration.StorageConnectionString);
await connection.OpenAsync(cancellationToken);
const string query = "SELECT EXISTS (SELECT 1 FROM ingested_pages WHERE url = @Url)";
var result = await connection.ExecuteScalarAsync<bool>(query, new { Url = pageId });
return result;
}
public async Task<PageIngestedResult> MarkPageAsIngested(string pageId, CancellationToken cancellationToken = default)
{
await using var connection = new NpgsqlConnection(configuration.StorageConnectionString);
await connection.OpenAsync(cancellationToken);
try
{
var date = DateTime.UtcNow;
await connection.ExecuteAsync(InsertIngestedPageSql, new
{
Url = pageId,
CreatedAt = date,
UpdatedAt = date,
});
return new(true, "Page successfully marked as ingested");
}
catch (Exception e)
{
return new(false, $"Failed to mark page as ingested: {e.Message}");
}
}
}

View File

@@ -0,0 +1,10 @@
namespace Producer.Features.DataProcessing;
public interface IDataStorage
{
Task<InsertTorrentResult> InsertTorrents(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default);
Task<IReadOnlyCollection<Torrent>> GetPublishableTorrents(CancellationToken cancellationToken = default);
Task<UpdatedTorrentResult> SetTorrentsProcessed(IReadOnlyCollection<Torrent> torrents, CancellationToken cancellationToken = default);
Task<bool> PageIngested(string pageId, CancellationToken cancellationToken = default);
Task<PageIngestedResult> MarkPageAsIngested(string pageId, CancellationToken cancellationToken = default);
}

View File

@@ -0,0 +1,15 @@
namespace Producer.Features.DataProcessing;
public class ImdbEntry
{
[BsonId]
public string ImdbId { get; set; } = default!;
public string? TitleType { get; set; }
public string? PrimaryTitle { get; set; }
public string? OriginalTitle { get; set; }
public string? IsAdult { get; set; }
public string? StartYear { get; set; }
public string? EndYear { get; set; }
public string? RuntimeMinutes { get; set; }
public string? Genres { get; set; }
}

View File

@@ -0,0 +1,78 @@
namespace Producer.Features.DataProcessing;
public class ImdbMongoDbService
{
private readonly ILogger<ImdbMongoDbService> _logger;
private readonly IMongoCollection<ImdbEntry> _imdbCollection;
public ImdbMongoDbService(MongoConfiguration configuration, ILogger<ImdbMongoDbService> logger)
{
_logger = logger;
var client = new MongoClient(configuration.ConnectionString);
var database = client.GetDatabase(configuration.DbName);
_imdbCollection = database.GetCollection<ImdbEntry>("imdb-entries");
}
public async Task<IReadOnlyList<ImdbEntry>> GetImdbEntriesForRequests(string startYear, int requestLimit, string? startingId = null)
{
var sort = Builders<ImdbEntry>.Sort
.Descending(e => e.StartYear)
.Descending(e => e.ImdbId);
var filter = Builders<ImdbEntry>.Filter
.And(
Builders<ImdbEntry>.Filter.Eq(e => e.TitleType, "movie"),
Builders<ImdbEntry>.Filter.Lte(e => e.StartYear, startYear)
);
if (!string.IsNullOrWhiteSpace(startingId))
{
filter = Builders<ImdbEntry>.Filter.And(filter, Builders<ImdbEntry>.Filter.Lt(e => e.ImdbId, startingId));
}
return await _imdbCollection.Find(filter).Limit(requestLimit).Sort(sort).ToListAsync();
}
public async Task<long> GetTotalCountAsync()
{
var filter = Builders<ImdbEntry>.Filter.Eq(x => x.TitleType, "movie");
return await _imdbCollection.CountDocumentsAsync(filter);
}
public bool IsDatabaseInitialized()
{
try
{
// Compound index for PrimaryTitle, TitleType, and StartYear
var index1KeysDefinition = Builders<ImdbEntry>.IndexKeys
.Text(e => e.PrimaryTitle)
.Ascending(e => e.TitleType)
.Ascending(e => e.StartYear);
CreateIndex(index1KeysDefinition);
// Compound index for StartYear and _id in descending order
var index2KeysDefinition = Builders<ImdbEntry>.IndexKeys
.Descending(e => e.StartYear)
.Descending(e => e.ImdbId);
CreateIndex(index2KeysDefinition);
return true;
}
catch (Exception e)
{
_logger.LogError(e, "Error initializing database");
return false;
}
}
private void CreateIndex(IndexKeysDefinition<ImdbEntry> keysDefinition)
{
var createIndexOptions = new CreateIndexOptions { Background = true };
var indexModel = new CreateIndexModel<ImdbEntry>(keysDefinition, createIndexOptions);
_imdbCollection.Indexes.CreateOne(indexModel);
}
}

View File

@@ -0,0 +1,20 @@
namespace Producer.Features.DataProcessing;
public class MongoConfiguration
{
private const string Prefix = "MONGODB";
private const string HostVariable = "HOST";
private const string PortVariable = "PORT";
private const string DbVariable = "DB";
private const string UsernameVariable = "USER";
private const string PasswordVariable = "PASSWORD";
private string Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HostVariable);
private int Port { get; init; } = Prefix.GetEnvironmentVariableAsInt(PortVariable, 27017);
private string Username { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(UsernameVariable);
private string Password { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(PasswordVariable);
public string DbName { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(DbVariable);
public string ConnectionString => $"mongodb://{Username}:{Password}@{Host}:{Port}/{DbName}?tls=false&directConnection=true&authSource=admin";
}

View File

@@ -0,0 +1,19 @@
namespace Producer.Features.DataProcessing;
public class PostgresConfiguration
{
private const string Prefix = "POSTGRES";
private const string HostVariable = "HOST";
private const string UsernameVariable = "USER";
private const string PasswordVariable = "PASSWORD";
private const string DatabaseVariable = "DB";
private const string PortVariable = "PORT";
private string Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HostVariable);
private string Username { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(UsernameVariable);
private string Password { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(PasswordVariable);
private string Database { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(DatabaseVariable);
private int PORT { get; init; } = Prefix.GetEnvironmentVariableAsInt(PortVariable, 5432);
public string StorageConnectionString => $"Host={Host};Port={PORT};Username={Username};Password={Password};Database={Database};";
}

View File

@@ -0,0 +1,14 @@
namespace Producer.Features.DataProcessing;
internal static class ServiceCollectionExtensions
{
internal static IServiceCollection AddDataStorage(this IServiceCollection services)
{
services.LoadConfigurationFromEnv<PostgresConfiguration>();
services.LoadConfigurationFromEnv<MongoConfiguration>();
services.AddTransient<IDataStorage, DapperDataStorage>();
services.AddTransient<IMessagePublisher, TorrentPublisher>();
services.AddSingleton<ImdbMongoDbService>();
return services;
}
}