From d0346f29bfb58956e7d9df03cd6e2ed4e3df3c6c Mon Sep 17 00:00:00 2001 From: iPromKnight Date: Wed, 28 Feb 2024 14:57:26 +0000 Subject: [PATCH] Add cleanup of initialize mongodb, ensure indexes are created for compound searching --- deployment/docker/.env.example | 4 ++- .../Configuration/JobConfiguration.cs | 4 ++- .../Configuration/MongoConfiguration.cs | 1 + .../ImportImdbData/ImdbMongoDbService.cs | 15 +++++++---- .../ImportImdbDataRequestHandler.cs | 8 +++--- .../lib/mongo/models/imdb_entries_model.ts | 2 ++ .../src/lib/mongo/mongo_repository.ts | 26 +++++++++++++++---- .../test/services/mongo_repository.test.ts | 17 ++++++++++-- 8 files changed, 58 insertions(+), 19 deletions(-) diff --git a/deployment/docker/.env.example b/deployment/docker/.env.example index fba1e4d..c8d14d9 100644 --- a/deployment/docker/.env.example +++ b/deployment/docker/.env.example @@ -27,9 +27,11 @@ RABBITMQ_PUBLISH_INTERVAL_IN_SECONDS=10 # Metadata ## Only used if DATA_ONCE is set to false. If true, the schedule is ignored -METADATA_DOWNLOAD_IMDB_DATA_SCHEDULE=0 0 1 * * * +METADATA_DOWNLOAD_IMDB_DATA_SCHEDULE="0 0 1 * * *" ## If true, the metadata will be downloaded once and then the schedule will be ignored METADATA_DOWNLOAD_IMDB_DATA_ONCE=true +## Controls the amount of records processed in memory at any given time during import, higher values will consume more memory +METADATA_INSERT_BATCH_SIZE=25000 # Addon DEBUG_MODE=false diff --git a/src/metadata/Features/Configuration/JobConfiguration.cs b/src/metadata/Features/Configuration/JobConfiguration.cs index 9565dfa..090ba56 100644 --- a/src/metadata/Features/Configuration/JobConfiguration.cs +++ b/src/metadata/Features/Configuration/JobConfiguration.cs @@ -5,7 +5,9 @@ public class JobConfiguration private const string Prefix = "METADATA"; private const string DownloadImdbDataVariable = "DOWNLOAD_IMDB_DATA_SCHEDULE"; private const string DownloadImdbDataOnceVariable = "DOWNLOAD_IMDB_DATA_ONCE"; - + private const string InsertBatchSizeVariable = "INSERT_BATCH_SIZE"; + + public int InsertBatchSize { get; init; } = Prefix.GetEnvironmentVariableAsInt(InsertBatchSizeVariable, 25_000); public string DownloadImdbCronSchedule { get; init; } = Prefix.GetOptionalEnvironmentVariableAsString(DownloadImdbDataVariable, CronExpressions.EveryHour); public bool DownloadImdbOnce { get; init; } = Prefix.GetEnvironmentVariableAsBool(DownloadImdbDataOnceVariable); } \ No newline at end of file diff --git a/src/metadata/Features/Configuration/MongoConfiguration.cs b/src/metadata/Features/Configuration/MongoConfiguration.cs index e7bff7c..278a78c 100644 --- a/src/metadata/Features/Configuration/MongoConfiguration.cs +++ b/src/metadata/Features/Configuration/MongoConfiguration.cs @@ -8,6 +8,7 @@ public class MongoConfiguration private const string DbVariable = "DB"; private const string UsernameVariable = "USER"; private const string PasswordVariable = "PASSWORD"; + private string Host { get; init; } = Prefix.GetRequiredEnvironmentVariableAsString(HostVariable); private int Port { get; init; } = Prefix.GetEnvironmentVariableAsInt(PortVariable, 27017); diff --git a/src/metadata/Features/ImportImdbData/ImdbMongoDbService.cs b/src/metadata/Features/ImportImdbData/ImdbMongoDbService.cs index ae11d9c..e1ed1d3 100644 --- a/src/metadata/Features/ImportImdbData/ImdbMongoDbService.cs +++ b/src/metadata/Features/ImportImdbData/ImdbMongoDbService.cs @@ -42,11 +42,16 @@ public class ImdbMongoDbService { try { - - // Create index for PrimaryTitle - var indexPrimaryTitle = Builders.IndexKeys.Ascending(e => e.PrimaryTitle); - var modelPrimaryTitle = new CreateIndexModel(indexPrimaryTitle); - _imdbCollection.Indexes.CreateOne(modelPrimaryTitle); + // Create compound index for PrimaryTitle, TitleType, and StartYear + var indexKeysDefinition = Builders.IndexKeys + .Text(e => e.PrimaryTitle) + .Ascending(e => e.TitleType) + .Ascending(e => e.StartYear); + + var createIndexOptions = new CreateIndexOptions { Background = true }; + var indexModel = new CreateIndexModel(indexKeysDefinition, createIndexOptions); + + _imdbCollection.Indexes.CreateOne(indexModel); return true; } diff --git a/src/metadata/Features/ImportImdbData/ImportImdbDataRequestHandler.cs b/src/metadata/Features/ImportImdbData/ImportImdbDataRequestHandler.cs index e5d3568..f429b72 100644 --- a/src/metadata/Features/ImportImdbData/ImportImdbDataRequestHandler.cs +++ b/src/metadata/Features/ImportImdbData/ImportImdbDataRequestHandler.cs @@ -1,9 +1,7 @@ namespace Metadata.Features.ImportImdbData; -public class ImportImdbDataRequestHandler(ILogger logger, ImdbMongoDbService mongoDbService) +public class ImportImdbDataRequestHandler(ILogger logger, ImdbMongoDbService mongoDbService, JobConfiguration configuration) { - private const int BatchSize = 50_000; - public async Task Handle(ImportImdbDataRequest request, CancellationToken cancellationToken) { logger.LogInformation("Importing Downloaded IMDB data from {FilePath}", request.FilePath); @@ -18,7 +16,7 @@ public class ImportImdbDataRequestHandler(ILogger using var reader = new StreamReader(request.FilePath); using var csv = new CsvReader(reader, config); - var channel = Channel.CreateBounded(new BoundedChannelOptions(BatchSize) + var channel = Channel.CreateBounded(new BoundedChannelOptions(configuration.InsertBatchSize) { FullMode = BoundedChannelFullMode.Wait, }); @@ -53,7 +51,7 @@ public class ImportImdbDataRequestHandler(ILogger movieData, }; - while (batch.Count < BatchSize && channel.Reader.TryRead(out var nextMovieData)) + while (batch.Count < configuration.InsertBatchSize && channel.Reader.TryRead(out var nextMovieData)) { batch.Add(nextMovieData); } diff --git a/src/node/consumer/src/lib/mongo/models/imdb_entries_model.ts b/src/node/consumer/src/lib/mongo/models/imdb_entries_model.ts index 9d1877a..4eb8e44 100644 --- a/src/node/consumer/src/lib/mongo/models/imdb_entries_model.ts +++ b/src/node/consumer/src/lib/mongo/models/imdb_entries_model.ts @@ -13,4 +13,6 @@ const ImdbEntriesSchema: Schema = new Schema({ TitleType: { type: String, default: "" }, }); +ImdbEntriesSchema.index({ PrimaryTitle: 'text', TitleType: 1, StartYear: 1 }, { background: true }); + export const ImdbEntryModel = mongoose.model('ImdbEntry', ImdbEntriesSchema, 'imdb-entries'); \ No newline at end of file diff --git a/src/node/consumer/src/lib/mongo/mongo_repository.ts b/src/node/consumer/src/lib/mongo/mongo_repository.ts index d697d46..66177a9 100644 --- a/src/node/consumer/src/lib/mongo/mongo_repository.ts +++ b/src/node/consumer/src/lib/mongo/mongo_repository.ts @@ -1,17 +1,28 @@ import {TorrentType} from "@enums/torrent_types"; +import {ILoggingService} from "@interfaces/logging_service"; import {IMongoMetadataQuery} from "@mongo/interfaces/mongo_metadata_query"; import {IMongoRepository} from "@mongo/interfaces/mongo_repository"; import {ImdbEntryModel} from "@mongo/models/imdb_entries_model"; import {configurationService} from '@services/configuration_service'; -import {injectable} from "inversify"; +import {IocTypes} from "@setup/ioc_types"; +import {inject, injectable} from "inversify"; import mongoose from 'mongoose'; @injectable() export class MongoRepository implements IMongoRepository { + @inject(IocTypes.ILoggingService) private logger: ILoggingService; private db: typeof mongoose = mongoose; - + async connect() : Promise { - await this.db.connect(configurationService.cacheConfig.MONGO_URI, {directConnection: true}); + try { + await this.db.connect(configurationService.cacheConfig.MONGO_URI, {directConnection: true}); + this.logger.info('Successfully connected to mongo db'); + } + catch (error) { + this.logger.debug('Failed to connect to mongo db', error); + this.logger.error('Failed to connect to mongo db'); + process.exit(1); + } } async getImdbId(title: string, category: string, year?: string | number) : Promise { @@ -35,7 +46,12 @@ export class MongoRepository implements IMongoRepository { query.StartYear = year.toString(); } - const result = await ImdbEntryModel.findOne(query); - return result ? result._id : null; + try { + const result = await ImdbEntryModel.findOne(query, '_id').maxTimeMS(30000); + return result ? result._id : null; + } catch (error) { + this.logger.error('Query exceeded the 30 seconds time limit', error); + return null; + } } } \ No newline at end of file diff --git a/src/node/consumer/test/services/mongo_repository.test.ts b/src/node/consumer/test/services/mongo_repository.test.ts index 9af4b5f..e5076f0 100644 --- a/src/node/consumer/test/services/mongo_repository.test.ts +++ b/src/node/consumer/test/services/mongo_repository.test.ts @@ -1,7 +1,9 @@ import "reflect-metadata"; // required import {TorrentType} from "@enums/torrent_types"; +import {ILoggingService} from "@interfaces/logging_service"; import {MongoRepository} from "@mongo/mongo_repository"; -import {Container} from "inversify"; +import {IocTypes} from "@setup/ioc_types"; +import {Container, inject} from "inversify"; jest.mock('@services/configuration_service', () => { return { @@ -20,13 +22,24 @@ jest.mock('@services/configuration_service', () => { } }); +jest.mock('@services/logging_service', () => { + return { + error: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + } +}) + xdescribe('MongoRepository Tests - Manual Tests against real cluster. Skipped by default.', () => { - let mongoRepository: MongoRepository; + let mongoRepository: MongoRepository, + mockLogger: ILoggingService; beforeEach(() => { jest.clearAllMocks(); process.env.LOG_LEVEL = 'debug'; + mockLogger = jest.requireMock('@services/logging_service'); const container = new Container(); + container.bind(IocTypes.ILoggingService).toConstantValue(mockLogger); container.bind(MongoRepository).toSelf(); mongoRepository = container.get(MongoRepository); });