Browse Source

Fix Mongo crashes, perf, fire&forget db population

master
Kenneth Bruen 2 years ago
parent
commit
44a1be5659
Signed by: kbruen
GPG Key ID: C1980A470C3EE5B1
  1. 4
      server/Services/Implementations/DataManager.cs
  2. 95
      server/Services/Implementations/Database.cs
  3. 38
      server/Utils/AsyncThrottle.cs
  4. 4
      server/server.csproj

4
server/Services/Implementations/DataManager.cs

@ -24,7 +24,7 @@ namespace Server.Services.Implementations {
var station = await InfoferScraper.Scrapers.StationScraper.Scrape(stationName, zonedDate.ToDateTimeOffset()); var station = await InfoferScraper.Scrapers.StationScraper.Scrape(stationName, zonedDate.ToDateTimeOffset());
if (station != null) { if (station != null) {
await Database.OnStationData(station); _ = Task.Run(async () => await Database.OnStationData(station));
} }
return station; return station;
}, TimeSpan.FromMinutes(1)); }, TimeSpan.FromMinutes(1));
@ -34,7 +34,7 @@ namespace Server.Services.Implementations {
var train = await InfoferScraper.Scrapers.TrainScraper.Scrape(trainNumber, zonedDate.ToDateTimeOffset()); var train = await InfoferScraper.Scrapers.TrainScraper.Scrape(trainNumber, zonedDate.ToDateTimeOffset());
if (train != null) { if (train != null) {
await Database.OnTrainData(train); _ = Task.Run(async () => await Database.OnTrainData(train));
} }
return train; return train;
}, TimeSpan.FromSeconds(30)); }, TimeSpan.FromSeconds(30));

95
server/Services/Implementations/Database.cs

@ -5,6 +5,7 @@ using System.Linq;
using System.Text.Json; using System.Text.Json;
using System.Text.Json.Nodes; using System.Text.Json.Nodes;
using System.Text.Json.Serialization; using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using InfoferScraper.Models.Station; using InfoferScraper.Models.Station;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -13,6 +14,7 @@ using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes; using MongoDB.Bson.Serialization.Attributes;
using MongoDB.Driver; using MongoDB.Driver;
using Server.Models.Database; using Server.Models.Database;
using Server.Utils;
namespace Server.Services.Implementations; namespace Server.Services.Implementations;
@ -43,13 +45,15 @@ public class Database : Server.Services.Interfaces.IDatabase {
private readonly IMongoCollection<DbRecord> dbRecordCollection; private readonly IMongoCollection<DbRecord> dbRecordCollection;
private readonly IMongoCollection<TrainListing> trainListingsCollection; private readonly IMongoCollection<TrainListing> trainListingsCollection;
private readonly IMongoCollection<StationListing> stationListingsCollection; private readonly IMongoCollection<StationListing> stationListingsCollection;
private readonly AsyncThrottle throttle;
public Database(ILogger<Database> logger, IOptions<MongoSettings> mongoSettings) { public Database(ILogger<Database> logger, IOptions<MongoSettings> mongoSettings) {
Logger = logger; Logger = logger;
var settings = MongoClientSettings.FromConnectionString(mongoSettings.Value.ConnectionString); var settings = MongoClientSettings.FromConnectionString(mongoSettings.Value.ConnectionString);
settings.MaxConnectionPoolSize = 100000; settings.MaxConnectionPoolSize = 10000;
MongoClient mongoClient = new(settings); MongoClient mongoClient = new(settings);
throttle = new(mongoClient.Settings.MaxConnectionPoolSize / 2);
db = mongoClient.GetDatabase(mongoSettings.Value.DatabaseName) ?? throw new NullReferenceException("Unable to get Mongo database"); db = mongoClient.GetDatabase(mongoSettings.Value.DatabaseName) ?? throw new NullReferenceException("Unable to get Mongo database");
dbRecordCollection = db.GetCollection<DbRecord>("db"); dbRecordCollection = db.GetCollection<DbRecord>("db");
trainListingsCollection = db.GetCollection<TrainListing>("trainListings"); trainListingsCollection = db.GetCollection<TrainListing>("trainListings");
@ -162,42 +166,104 @@ public class Database : Server.Services.Interfaces.IDatabase {
} }
} }
private readonly SemaphoreSlim insertTrainLock = new (1, 1);
public async Task<string> FoundTrain(string rank, string number, string company) { public async Task<string> FoundTrain(string rank, string number, string company) {
number = string.Join("", number.TakeWhile(c => c is >= '0' and <= '9')); number = string.Join("", number.TakeWhile(c => c is >= '0' and <= '9'));
if (!await (await trainListingsCollection.FindAsync(Builders<TrainListing>.Filter.Eq("number", number))).AnyAsync()) { await insertTrainLock.WaitAsync();
try {
if (!await (await throttle.MakeRequest(() =>
trainListingsCollection.FindAsync(Builders<TrainListing>.Filter.Eq("number", number))))
.AnyAsync()) {
Logger.LogDebug("Found train {Rank} {Number} from {Company}", rank, number, company); Logger.LogDebug("Found train {Rank} {Number} from {Company}", rank, number, company);
await trainListingsCollection.InsertOneAsync(new(number: number, rank: rank, company: company)); await throttle.MakeRequest(() =>
trainListingsCollection.InsertOneAsync(new(number: number, rank: rank, company: company)));
}
}
finally {
insertTrainLock.Release();
} }
return number; return number;
} }
private readonly SemaphoreSlim insertStationLock = new (1, 1);
public async Task FoundStation(string name) { public async Task FoundStation(string name) {
if (!await (stationListingsCollection.Find(Builders<StationListing>.Filter.Eq("name", name))).AnyAsync()) { // if (!await throttle.MakeRequest(() => stationListingsCollection.Find(Builders<StationListing>.Filter.Eq("name", name)).AnyAsync())) {
// Logger.LogDebug("Found station {StationName}", name);
// await throttle.MakeRequest(() => stationListingsCollection.InsertOneAsync(new(name, new())));
// }
await insertStationLock.WaitAsync();
UpdateResult update;
try {
update = await stationListingsCollection.UpdateOneAsync(
Builders<StationListing>.Filter.Eq("name", name),
Builders<StationListing>.Update.Combine(
Builders<StationListing>.Update.SetOnInsert("name", name),
Builders<StationListing>.Update.SetOnInsert("stoppedAtBy", new List<string>())
),
new UpdateOptions {
IsUpsert = true,
}
);
}
finally {
insertStationLock.Release();
}
if (update.IsAcknowledged && update.MatchedCount == 0) {
Logger.LogDebug("Found station {StationName}", name); Logger.LogDebug("Found station {StationName}", name);
await stationListingsCollection.InsertOneAsync(new(name, new()));
} }
} }
public async Task FoundStations(IEnumerable<string> names) {
var enumerable = names as string[] ?? names.ToArray();
var existingStations = await (await stationListingsCollection.FindAsync(
Builders<StationListing>.Filter.StringIn("name", enumerable.Select((n) => new StringOrRegularExpression(n)))
)).ToListAsync();
var notExistingStations = enumerable.Where((n) => !existingStations.Select((s) => s.Name).Contains(n)).ToList();
if (notExistingStations.Count == 0) return;
await stationListingsCollection.InsertManyAsync(
notExistingStations.Select(
(s) => new StationListing(s, new())
)
);
Logger.LogDebug("Found stations {StationNames}", notExistingStations);
}
public async Task FoundTrainAtStation(string stationName, string trainNumber) { public async Task FoundTrainAtStation(string stationName, string trainNumber) {
trainNumber = string.Join("", trainNumber.TakeWhile(c => c is >= '0' and <= '9')); trainNumber = string.Join("", trainNumber.TakeWhile(c => c is >= '0' and <= '9'));
await FoundStation(stationName); await FoundStation(stationName);
var updateResult = await stationListingsCollection.UpdateOneAsync( var updateResult = await throttle.MakeRequest(() => stationListingsCollection.UpdateOneAsync(
Builders<StationListing>.Filter.Eq("name", stationName), Builders<StationListing>.Filter.Eq("name", stationName),
Builders<StationListing>.Update.AddToSet("stoppedAtBy", trainNumber) Builders<StationListing>.Update.AddToSet("stoppedAtBy", trainNumber)
); ));
if (updateResult.IsAcknowledged && updateResult.ModifiedCount > 0) { if (updateResult.IsAcknowledged && updateResult.ModifiedCount > 0) {
Logger.LogDebug("Found train {TrainNumber} at station {StationName}", trainNumber, stationName); Logger.LogDebug("Found train {TrainNumber} at station {StationName}", trainNumber, stationName);
} }
} }
public async Task FoundTrainAtStations(IEnumerable<string> stationNames, string trainNumber) {
trainNumber = string.Join("", trainNumber.TakeWhile(c => c is >= '0' and <= '9'));
var enumerable = stationNames as string[] ?? stationNames.ToArray();
await FoundStations(enumerable);
var updateResult = await throttle.MakeRequest(() => stationListingsCollection.UpdateManyAsync(
Builders<StationListing>.Filter.StringIn("name", enumerable.Select(sn => new StringOrRegularExpression(sn))),
Builders<StationListing>.Update.AddToSet("stoppedAtBy", trainNumber)
));
if (updateResult.IsAcknowledged && updateResult.ModifiedCount > 0) {
Logger.LogDebug("Found train {TrainNumber} at stations {StationNames}", trainNumber, stationNames);
}
}
public async Task OnTrainData(InfoferScraper.Models.Train.ITrainScrapeResult trainData) { public async Task OnTrainData(InfoferScraper.Models.Train.ITrainScrapeResult trainData) {
var trainNumber = await FoundTrain(trainData.Rank, trainData.Number, trainData.Operator); var trainNumber = await FoundTrain(trainData.Rank, trainData.Number, trainData.Operator);
await Task.WhenAll( await FoundTrainAtStations(
trainData.Groups trainData.Groups
.SelectMany(g => g.Stations) .SelectMany(g => g.Stations)
.Select(trainStop => trainStop.Name) .Select(trainStop => trainStop.Name)
.Distinct() .Distinct(),
.Select(station => FoundTrainAtStation(station, trainNumber)) trainNumber
); );
} }
@ -207,10 +273,7 @@ public class Database : Server.Services.Interfaces.IDatabase {
async Task ProcessTrain(InfoferScraper.Models.Station.IStationArrDep train) { async Task ProcessTrain(InfoferScraper.Models.Station.IStationArrDep train) {
var trainNumber = train.Train.Number; var trainNumber = train.Train.Number;
trainNumber = await FoundTrain(train.Train.Rank, trainNumber, train.Train.Operator); trainNumber = await FoundTrain(train.Train.Rank, trainNumber, train.Train.Operator);
await FoundTrainAtStation(stationName, trainNumber); await FoundTrainAtStations(Enumerable.Repeat(stationName, 1).Concat(train.Train.Route).Distinct(), trainNumber);
if (train.Train.Route.Count != 0) {
await Task.WhenAll(train.Train.Route.Select(station => FoundTrainAtStation(station, trainNumber)));
}
} }
List<IStationArrDep> arrdep = new(); List<IStationArrDep> arrdep = new();
@ -221,7 +284,9 @@ public class Database : Server.Services.Interfaces.IDatabase {
arrdep.AddRange(stationData.Departures); arrdep.AddRange(stationData.Departures);
} }
await Task.WhenAll(arrdep.Select(ProcessTrain)); foreach (var train in arrdep.DistinctBy((t) => t.Train.Number)) {
await ProcessTrain(train);
}
} }
} }

38
server/Utils/AsyncThrottle.cs

@ -0,0 +1,38 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Server.Utils;
// Inspired from: https://stackoverflow.com/a/57517920
public class AsyncThrottle {
private readonly SemaphoreSlim openConnectionSemaphore;
public AsyncThrottle(int limit) {
openConnectionSemaphore = new(limit, limit);
}
public async Task<T> MakeRequest<T>(Task<T> task) => await MakeRequest(() => task);
public async Task<T> MakeRequest<T>(Func<Task<T>> taskCreator) {
await openConnectionSemaphore.WaitAsync();
try {
var result = await taskCreator();
return result;
}
finally {
openConnectionSemaphore.Release();
}
}
public async Task MakeRequest(Task task) => await MakeRequest(() => task);
public async Task MakeRequest(Func<Task> taskCreator) {
await openConnectionSemaphore.WaitAsync();
try {
await taskCreator();
}
finally {
openConnectionSemaphore.Release();
}
}
}

4
server/server.csproj

@ -20,8 +20,4 @@
<ProjectReference Include="..\scraper\scraper.csproj" /> <ProjectReference Include="..\scraper\scraper.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Folder Include="Utils" />
</ItemGroup>
</Project> </Project>

Loading…
Cancel
Save