From 44a1be565901ac1394c3451156f53e846cd3cbcc Mon Sep 17 00:00:00 2001 From: Dan Cojocaru Date: Thu, 10 Nov 2022 01:21:37 +0100 Subject: [PATCH] Fix Mongo crashes, perf, fire&forget db population --- .../Services/Implementations/DataManager.cs | 4 +- server/Services/Implementations/Database.cs | 101 ++++++++++++++---- server/Utils/AsyncThrottle.cs | 38 +++++++ server/server.csproj | 4 - 4 files changed, 123 insertions(+), 24 deletions(-) create mode 100644 server/Utils/AsyncThrottle.cs diff --git a/server/Services/Implementations/DataManager.cs b/server/Services/Implementations/DataManager.cs index 23506ea..2ede489 100644 --- a/server/Services/Implementations/DataManager.cs +++ b/server/Services/Implementations/DataManager.cs @@ -24,7 +24,7 @@ namespace Server.Services.Implementations { var station = await InfoferScraper.Scrapers.StationScraper.Scrape(stationName, zonedDate.ToDateTimeOffset()); if (station != null) { - await Database.OnStationData(station); + _ = Task.Run(async () => await Database.OnStationData(station)); } return station; }, TimeSpan.FromMinutes(1)); @@ -34,7 +34,7 @@ namespace Server.Services.Implementations { var train = await InfoferScraper.Scrapers.TrainScraper.Scrape(trainNumber, zonedDate.ToDateTimeOffset()); if (train != null) { - await Database.OnTrainData(train); + _ = Task.Run(async () => await Database.OnTrainData(train)); } return train; }, TimeSpan.FromSeconds(30)); diff --git a/server/Services/Implementations/Database.cs b/server/Services/Implementations/Database.cs index dbf1995..5c1368a 100644 --- a/server/Services/Implementations/Database.cs +++ b/server/Services/Implementations/Database.cs @@ -5,6 +5,7 @@ using System.Linq; using System.Text.Json; using System.Text.Json.Nodes; using System.Text.Json.Serialization; +using System.Threading; using System.Threading.Tasks; using InfoferScraper.Models.Station; using Microsoft.Extensions.Logging; @@ -13,6 +14,7 @@ using MongoDB.Bson; using MongoDB.Bson.Serialization.Attributes; using MongoDB.Driver; using Server.Models.Database; +using Server.Utils; namespace Server.Services.Implementations; @@ -43,13 +45,15 @@ public class Database : Server.Services.Interfaces.IDatabase { private readonly IMongoCollection dbRecordCollection; private readonly IMongoCollection trainListingsCollection; private readonly IMongoCollection stationListingsCollection; + private readonly AsyncThrottle throttle; public Database(ILogger logger, IOptions mongoSettings) { Logger = logger; var settings = MongoClientSettings.FromConnectionString(mongoSettings.Value.ConnectionString); - settings.MaxConnectionPoolSize = 100000; + settings.MaxConnectionPoolSize = 10000; MongoClient mongoClient = new(settings); + throttle = new(mongoClient.Settings.MaxConnectionPoolSize / 2); db = mongoClient.GetDatabase(mongoSettings.Value.DatabaseName) ?? throw new NullReferenceException("Unable to get Mongo database"); dbRecordCollection = db.GetCollection("db"); trainListingsCollection = db.GetCollection("trainListings"); @@ -162,42 +166,104 @@ public class Database : Server.Services.Interfaces.IDatabase { } } + private readonly SemaphoreSlim insertTrainLock = new (1, 1); public async Task FoundTrain(string rank, string number, string company) { number = string.Join("", number.TakeWhile(c => c is >= '0' and <= '9')); - if (!await (await trainListingsCollection.FindAsync(Builders.Filter.Eq("number", number))).AnyAsync()) { - Logger.LogDebug("Found train {Rank} {Number} from {Company}", rank, number, company); - await trainListingsCollection.InsertOneAsync(new(number: number, rank: rank, company: company)); + await insertTrainLock.WaitAsync(); + try { + if (!await (await throttle.MakeRequest(() => + trainListingsCollection.FindAsync(Builders.Filter.Eq("number", number)))) + .AnyAsync()) { + Logger.LogDebug("Found train {Rank} {Number} from {Company}", rank, number, company); + await throttle.MakeRequest(() => + trainListingsCollection.InsertOneAsync(new(number: number, rank: rank, company: company))); + } + } + finally { + insertTrainLock.Release(); } + return number; } + private readonly SemaphoreSlim insertStationLock = new (1, 1); + public async Task FoundStation(string name) { - if (!await (stationListingsCollection.Find(Builders.Filter.Eq("name", name))).AnyAsync()) { + // if (!await throttle.MakeRequest(() => stationListingsCollection.Find(Builders.Filter.Eq("name", name)).AnyAsync())) { + // Logger.LogDebug("Found station {StationName}", name); + // await throttle.MakeRequest(() => stationListingsCollection.InsertOneAsync(new(name, new()))); + // } + await insertStationLock.WaitAsync(); + UpdateResult update; + try { + update = await stationListingsCollection.UpdateOneAsync( + Builders.Filter.Eq("name", name), + Builders.Update.Combine( + Builders.Update.SetOnInsert("name", name), + Builders.Update.SetOnInsert("stoppedAtBy", new List()) + ), + new UpdateOptions { + IsUpsert = true, + } + ); + } + finally { + insertStationLock.Release(); + } + + if (update.IsAcknowledged && update.MatchedCount == 0) { Logger.LogDebug("Found station {StationName}", name); - await stationListingsCollection.InsertOneAsync(new(name, new())); } } + public async Task FoundStations(IEnumerable names) { + var enumerable = names as string[] ?? names.ToArray(); + var existingStations = await (await stationListingsCollection.FindAsync( + Builders.Filter.StringIn("name", enumerable.Select((n) => new StringOrRegularExpression(n))) + )).ToListAsync(); + var notExistingStations = enumerable.Where((n) => !existingStations.Select((s) => s.Name).Contains(n)).ToList(); + if (notExistingStations.Count == 0) return; + await stationListingsCollection.InsertManyAsync( + notExistingStations.Select( + (s) => new StationListing(s, new()) + ) + ); + Logger.LogDebug("Found stations {StationNames}", notExistingStations); + } + public async Task FoundTrainAtStation(string stationName, string trainNumber) { trainNumber = string.Join("", trainNumber.TakeWhile(c => c is >= '0' and <= '9')); await FoundStation(stationName); - var updateResult = await stationListingsCollection.UpdateOneAsync( + var updateResult = await throttle.MakeRequest(() => stationListingsCollection.UpdateOneAsync( Builders.Filter.Eq("name", stationName), Builders.Update.AddToSet("stoppedAtBy", trainNumber) - ); + )); if (updateResult.IsAcknowledged && updateResult.ModifiedCount > 0) { Logger.LogDebug("Found train {TrainNumber} at station {StationName}", trainNumber, stationName); } } + public async Task FoundTrainAtStations(IEnumerable stationNames, string trainNumber) { + trainNumber = string.Join("", trainNumber.TakeWhile(c => c is >= '0' and <= '9')); + var enumerable = stationNames as string[] ?? stationNames.ToArray(); + await FoundStations(enumerable); + var updateResult = await throttle.MakeRequest(() => stationListingsCollection.UpdateManyAsync( + Builders.Filter.StringIn("name", enumerable.Select(sn => new StringOrRegularExpression(sn))), + Builders.Update.AddToSet("stoppedAtBy", trainNumber) + )); + if (updateResult.IsAcknowledged && updateResult.ModifiedCount > 0) { + Logger.LogDebug("Found train {TrainNumber} at stations {StationNames}", trainNumber, stationNames); + } + } + public async Task OnTrainData(InfoferScraper.Models.Train.ITrainScrapeResult trainData) { var trainNumber = await FoundTrain(trainData.Rank, trainData.Number, trainData.Operator); - await Task.WhenAll( + await FoundTrainAtStations( trainData.Groups - .SelectMany(g => g.Stations) - .Select(trainStop => trainStop.Name) - .Distinct() - .Select(station => FoundTrainAtStation(station, trainNumber)) + .SelectMany(g => g.Stations) + .Select(trainStop => trainStop.Name) + .Distinct(), + trainNumber ); } @@ -207,10 +273,7 @@ public class Database : Server.Services.Interfaces.IDatabase { async Task ProcessTrain(InfoferScraper.Models.Station.IStationArrDep train) { var trainNumber = train.Train.Number; trainNumber = await FoundTrain(train.Train.Rank, trainNumber, train.Train.Operator); - await FoundTrainAtStation(stationName, trainNumber); - if (train.Train.Route.Count != 0) { - await Task.WhenAll(train.Train.Route.Select(station => FoundTrainAtStation(station, trainNumber))); - } + await FoundTrainAtStations(Enumerable.Repeat(stationName, 1).Concat(train.Train.Route).Distinct(), trainNumber); } List arrdep = new(); @@ -221,7 +284,9 @@ public class Database : Server.Services.Interfaces.IDatabase { arrdep.AddRange(stationData.Departures); } - await Task.WhenAll(arrdep.Select(ProcessTrain)); + foreach (var train in arrdep.DistinctBy((t) => t.Train.Number)) { + await ProcessTrain(train); + } } } diff --git a/server/Utils/AsyncThrottle.cs b/server/Utils/AsyncThrottle.cs new file mode 100644 index 0000000..85eb5ef --- /dev/null +++ b/server/Utils/AsyncThrottle.cs @@ -0,0 +1,38 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Server.Utils; + +// Inspired from: https://stackoverflow.com/a/57517920 +public class AsyncThrottle { + private readonly SemaphoreSlim openConnectionSemaphore; + + public AsyncThrottle(int limit) { + openConnectionSemaphore = new(limit, limit); + } + + public async Task MakeRequest(Task task) => await MakeRequest(() => task); + public async Task MakeRequest(Func> taskCreator) { + await openConnectionSemaphore.WaitAsync(); + try { + var result = await taskCreator(); + return result; + } + finally { + openConnectionSemaphore.Release(); + } + } + + + public async Task MakeRequest(Task task) => await MakeRequest(() => task); + public async Task MakeRequest(Func taskCreator) { + await openConnectionSemaphore.WaitAsync(); + try { + await taskCreator(); + } + finally { + openConnectionSemaphore.Release(); + } + } +} \ No newline at end of file diff --git a/server/server.csproj b/server/server.csproj index 35aefe3..550d7a0 100644 --- a/server/server.csproj +++ b/server/server.csproj @@ -20,8 +20,4 @@ - - - -