Create service to populate Imdb data collection within mongo

We can use this collection as an alternative source to lookup imdb ids, which would be executed before name_to_imdb is called in the consumer.
This commit is contained in:
iPromKnight
2024-02-27 22:38:10 +00:00
parent aad59c31e4
commit 79d0ef7f4d
26 changed files with 611 additions and 0 deletions

View File

@@ -0,0 +1,11 @@
namespace Metadata.Features.Configuration;
public class JobConfiguration
{
private const string Prefix = "METADATA";
private const string DownloadImdbDataVariable = "DOWNLOAD_IMDB_DATA_SCHEDULE";
private const string DownloadImdbDataOnceVariable = "DOWNLOAD_IMDB_DATA_ONCE";
public string DownloadImdbCronSchedule { get; init; } = Prefix.GetOptionalEnvironmentVariableAsString(DownloadImdbDataVariable, CronExpressions.EveryHour);
public bool DownloadImdbOnce { get; init; } = Prefix.GetEnvironmentVariableAsBool(DownloadImdbDataOnceVariable);
}

View File

@@ -0,0 +1,19 @@
namespace Metadata.Features.Configuration;
public class MongoConfiguration
{
private const string Prefix = "MONGODB";
private const string HostVariable = "HOST";
private const string PortVariable = "PORT";
private const string DbVariable = "DB";
private const string UsernameVariable = "USER";
private const string PasswordVariable = "PASSWORD";
private string Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HostVariable);
private int Port { get; init; } = Prefix.GetEnvironmentVariableAsInt(PortVariable, 27017);
private string Username { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(UsernameVariable);
private string Password { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(PasswordVariable);
public string DbName { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(DbVariable);
public string ConnectionString => $"mongodb://{Username}:{Password}@{Host}:{Port}/{DbName}?tls=false&directConnection=true&authSource=admin";
}

View File

@@ -0,0 +1,3 @@
namespace Metadata.Features.DeleteDownloadedImdbData;
public record DeleteDownloadedImdbDataRequest(string FilePath);

View File

@@ -0,0 +1,15 @@
namespace Metadata.Features.DeleteDownloadedImdbData;
public class DeleteDownloadedImdbDataRequestHandler(ILogger<DeleteDownloadedImdbDataRequestHandler> logger)
{
public Task Handle(DeleteDownloadedImdbDataRequest request, CancellationToken _)
{
logger.LogInformation("Deleting file {FilePath}", request.FilePath);
File.Delete(request.FilePath);
logger.LogInformation("File Deleted");
return Task.CompletedTask;
}
}

View File

@@ -0,0 +1,8 @@
namespace Metadata.Features.DownloadImdbData;
public class DownloadImdbDataJob(IMessageBus messageBus, JobConfiguration configuration) : BaseJob
{
public override bool IsScheduelable => !configuration.DownloadImdbOnce && !string.IsNullOrEmpty(configuration.DownloadImdbCronSchedule);
public override string JobName => nameof(DownloadImdbDataJob);
public override async Task Invoke() => await messageBus.SendAsync(new GetImdbDataRequest());
}

View File

@@ -0,0 +1,3 @@
namespace Metadata.Features.DownloadImdbData;
public record GetImdbDataRequest;

View File

@@ -0,0 +1,30 @@
namespace Metadata.Features.DownloadImdbData;
public class GetImdbDataRequestHandler(IHttpClientFactory clientFactory, ILogger<GetImdbDataRequestHandler> logger)
{
private const string TitleBasicsFileName = "title.basics.tsv";
public async Task<ImportImdbDataRequest> Handle(GetImdbDataRequest _, CancellationToken cancellationToken)
{
logger.LogInformation("Downloading IMDB data");
var client = clientFactory.CreateClient("imdb-data");
var response = await client.GetAsync($"{TitleBasicsFileName}.gz", cancellationToken);
var tempFile = Path.Combine(Path.GetTempPath(), TitleBasicsFileName);
response.EnsureSuccessStatusCode();
await using var stream = await response.Content.ReadAsStreamAsync(cancellationToken);
await using var gzipStream = new GZipStream(stream, CompressionMode.Decompress);
await using var fileStream = File.Create(tempFile);
await gzipStream.CopyToAsync(fileStream, cancellationToken);
logger.LogInformation("Downloaded IMDB data to {TempFile}", tempFile);
fileStream.Close();
return new(tempFile);
}
}

View File

@@ -0,0 +1,15 @@
namespace Metadata.Features.ImportImdbData;
public class ImdbEntry
{
[BsonId]
public string ImdbId { get; set; } = default!;
public string? TitleType { get; set; }
public string? PrimaryTitle { get; set; }
public string? OriginalTitle { get; set; }
public string? IsAdult { get; set; }
public string? StartYear { get; set; }
public string? EndYear { get; set; }
public string? RuntimeMinutes { get; set; }
public string? Genres { get; set; }
}

View File

@@ -0,0 +1,59 @@
namespace Metadata.Features.ImportImdbData;
public class ImdbMongoDbService
{
private readonly ILogger<ImdbMongoDbService> _logger;
private readonly IMongoCollection<ImdbEntry> _imdbCollection;
public ImdbMongoDbService(MongoConfiguration configuration, ILogger<ImdbMongoDbService> logger)
{
_logger = logger;
var client = new MongoClient(configuration.ConnectionString);
var database = client.GetDatabase(configuration.DbName);
_imdbCollection = database.GetCollection<ImdbEntry>("imdb-entries");
}
public async Task InsertImdbEntries(IEnumerable<ImdbEntry> entries)
{
var operations = new List<WriteModel<ImdbEntry>>();
foreach (var entry in entries)
{
var filter = Builders<ImdbEntry>.Filter.Eq(e => e.ImdbId, entry.ImdbId);
var update = Builders<ImdbEntry>.Update
.SetOnInsert(e => e.TitleType, entry.TitleType)
.SetOnInsert(e => e.PrimaryTitle, entry.PrimaryTitle)
.SetOnInsert(e => e.OriginalTitle, entry.OriginalTitle)
.SetOnInsert(e => e.IsAdult, entry.IsAdult)
.SetOnInsert(e => e.StartYear, entry.StartYear)
.SetOnInsert(e => e.EndYear, entry.EndYear)
.SetOnInsert(e => e.RuntimeMinutes, entry.RuntimeMinutes)
.SetOnInsert(e => e.Genres, entry.Genres);
operations.Add(new UpdateOneModel<ImdbEntry>(filter, update) { IsUpsert = true });
}
await _imdbCollection.BulkWriteAsync(operations);
}
public bool IsDatabaseInitialized()
{
try
{
// Create index for PrimaryTitle
var indexPrimaryTitle = Builders<ImdbEntry>.IndexKeys.Ascending(e => e.PrimaryTitle);
var modelPrimaryTitle = new CreateIndexModel<ImdbEntry>(indexPrimaryTitle);
_imdbCollection.Indexes.CreateOne(modelPrimaryTitle);
return true;
}
catch (Exception e)
{
_logger.LogError(e, "Error initializing database");
return false;
}
}
}

View File

@@ -0,0 +1,3 @@
namespace Metadata.Features.ImportImdbData;
public record ImportImdbDataRequest(string FilePath);

View File

@@ -0,0 +1,94 @@
namespace Metadata.Features.ImportImdbData;
public class ImportImdbDataRequestHandler(ILogger<ImportImdbDataRequestHandler> logger, ImdbMongoDbService mongoDbService)
{
private const int BatchSize = 50_000;
public async Task<DeleteDownloadedImdbDataRequest> Handle(ImportImdbDataRequest request, CancellationToken cancellationToken)
{
logger.LogInformation("Importing Downloaded IMDB data from {FilePath}", request.FilePath);
var config = new CsvConfiguration(CultureInfo.InvariantCulture)
{
Delimiter = "\t",
BadDataFound = null, // Skip Bad Data from Imdb
MissingFieldFound = null, // Skip Missing Fields from Imdb
};
using var reader = new StreamReader(request.FilePath);
using var csv = new CsvReader(reader, config);
var channel = Channel.CreateBounded<ImdbEntry>(new BoundedChannelOptions(BatchSize)
{
FullMode = BoundedChannelFullMode.Wait,
});
// Skip the header row
await csv.ReadAsync();
var batchInsertTask = CreateBatchOfEntries(channel, cancellationToken);
await ReadEntries(csv, channel, cancellationToken);
channel.Writer.Complete();
await batchInsertTask;
return new(request.FilePath);
}
private Task CreateBatchOfEntries(Channel<ImdbEntry, ImdbEntry> channel, CancellationToken cancellationToken) =>
Task.Run(async () =>
{
await foreach (var movieData in channel.Reader.ReadAllAsync(cancellationToken))
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
var batch = new List<ImdbEntry>
{
movieData,
};
while (batch.Count < BatchSize && channel.Reader.TryRead(out var nextMovieData))
{
batch.Add(nextMovieData);
}
if (batch.Count > 0)
{
await mongoDbService.InsertImdbEntries(batch);
logger.LogInformation("Imported batch of {BatchSize} starting with ImdbId {FirstImdbId}", batch.Count, batch.First().ImdbId);
}
}
}, cancellationToken);
private static async Task ReadEntries(CsvReader csv, Channel<ImdbEntry, ImdbEntry> channel, CancellationToken cancellationToken)
{
while (await csv.ReadAsync())
{
var movieData = new ImdbEntry
{
ImdbId = csv.GetField(0),
TitleType = csv.GetField(1),
PrimaryTitle = csv.GetField(2),
OriginalTitle = csv.GetField(3),
IsAdult = csv.GetField(4),
StartYear = csv.GetField(5),
EndYear = csv.GetField(6),
RuntimeMinutes = csv.GetField(7),
Genres = csv.GetField(8),
};
if (cancellationToken.IsCancellationRequested)
{
return;
}
await channel.Writer.WriteAsync(movieData, cancellationToken);
}
}
}

View File

@@ -0,0 +1,10 @@
namespace Metadata.Features.Jobs;
public abstract class BaseJob : IMetadataJob
{
public abstract bool IsScheduelable { get; }
public abstract string JobName { get; }
public abstract Task Invoke();
}

View File

@@ -0,0 +1,7 @@
namespace Metadata.Features.Jobs;
public interface IMetadataJob : IInvocable
{
bool IsScheduelable { get; }
string JobName { get; }
}

View File

@@ -0,0 +1,34 @@
namespace Metadata.Features.Jobs;
public class JobScheduler(IServiceProvider serviceProvider) : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
using var scope = serviceProvider.CreateAsyncScope();
var mongoDbService = scope.ServiceProvider.GetRequiredService<ImdbMongoDbService>();
if (!mongoDbService.IsDatabaseInitialized())
{
throw new InvalidOperationException("MongoDb is not initialized");
}
var jobConfigurations = scope.ServiceProvider.GetRequiredService<JobConfiguration>();
var downloadJob = scope.ServiceProvider.GetRequiredService<DownloadImdbDataJob>();
if (!downloadJob.IsScheduelable)
{
return downloadJob.Invoke();
}
var scheduler = scope.ServiceProvider.GetRequiredService<IScheduler>();
scheduler.Schedule<DownloadImdbDataJob>()
.Cron(jobConfigurations.DownloadImdbCronSchedule)
.PreventOverlapping(nameof(downloadJob.JobName));
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

View File

@@ -0,0 +1,9 @@
namespace Metadata.Features.Literals;
public static class CronExpressions
{
public const string EveryHour = "0 0 * * *";
public const string EveryDay = "0 0 0 * *";
public const string EveryWeek = "0 0 * * 0";
public const string EveryMonth = "0 0 0 * *";
}

View File

@@ -0,0 +1,7 @@
namespace Metadata.Features.Literals;
public static class HttpClients
{
public const string ImdbDataClientName = "imdb-data";
public const string ImdbClientBaseAddress = "https://datasets.imdbws.com/";
}