You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
390 lines
15 KiB
390 lines
15 KiB
using System; |
|
using System.Collections.Generic; |
|
using System.IO; |
|
using System.Linq; |
|
using System.Threading; |
|
using System.Threading.Tasks; |
|
using InfoferScraper.Models.Station; |
|
using Microsoft.Extensions.Logging; |
|
using Microsoft.Extensions.Options; |
|
using MongoDB.Bson; |
|
using MongoDB.Bson.Serialization.Attributes; |
|
using MongoDB.Driver; |
|
using Newtonsoft.Json; |
|
using Newtonsoft.Json.Linq; |
|
using Newtonsoft.Json.Serialization; |
|
using scraper.Models.Itinerary; |
|
using Server.Models.Database; |
|
using Server.Utils; |
|
|
|
namespace Server.Services.Implementations; |
|
|
|
public class Database : Server.Services.Interfaces.IDatabase { |
|
private static readonly JsonSerializerSettings jsonSerializerSettings = new() { |
|
ContractResolver = new DefaultContractResolver { |
|
NamingStrategy = new CamelCaseNamingStrategy(), |
|
}, |
|
}; |
|
|
|
private ILogger<Database> Logger { get; } |
|
|
|
public DbRecord DbData { get; private set; } = new(3); |
|
|
|
public IReadOnlyList<StationListing> Stations => stationListingsCollection |
|
.Aggregate(PipelineDefinition<StationListing, StationListing>.Create( |
|
"{ $addFields: { stoppedAtCount: { $size: \"$stoppedAtBy\" } } }", |
|
"{ $sort: { stoppedAtCount: -1 } }", |
|
"{ $unset: \"stoppedAtCount\" }" |
|
)) |
|
.ToList(); |
|
public IReadOnlyList<TrainListing> Trains => trainListingsCollection.FindSync(_ => true).ToList(); |
|
public IReadOnlyList<StationAlias> StationAliases => stationAliasCollection.FindSync(_ => true).ToList(); |
|
|
|
private static readonly string DbDir = Environment.GetEnvironmentVariable("DB_DIR") ?? Path.Join(Environment.CurrentDirectory, "db"); |
|
private static readonly string DbFile = Path.Join(DbDir, "db.json"); |
|
private static readonly string StationsFile = Path.Join(DbDir, "stations.json"); |
|
private static readonly string TrainsFile = Path.Join(DbDir, "trains.json"); |
|
|
|
private readonly IMongoDatabase db; |
|
private readonly IMongoCollection<DbRecord> dbRecordCollection; |
|
private readonly IMongoCollection<TrainListing> trainListingsCollection; |
|
private readonly IMongoCollection<StationListing> stationListingsCollection; |
|
private readonly IMongoCollection<StationAlias> stationAliasCollection; |
|
private readonly AsyncThrottle throttle; |
|
|
|
private readonly Dictionary<string, string> trainObjectIds = new(); |
|
private readonly Dictionary<string, string> stationObjectIds = new(); |
|
|
|
public Database(ILogger<Database> logger, IOptions<MongoSettings> mongoSettings) { |
|
Logger = logger; |
|
|
|
var settings = MongoClientSettings.FromConnectionString(mongoSettings.Value.ConnectionString); |
|
settings.ServerApi = new(ServerApiVersion.V1); |
|
settings.MaxConnectionPoolSize = 10000; |
|
MongoClient mongoClient = new(settings); |
|
Logger.LogDebug("Created monogClient"); |
|
throttle = new(mongoClient.Settings.MaxConnectionPoolSize / 2); |
|
db = mongoClient.GetDatabase(mongoSettings.Value.DatabaseName) ?? throw new NullReferenceException("Unable to get Mongo database"); |
|
Logger.LogDebug("Created db"); |
|
dbRecordCollection = db.GetCollection<DbRecord>("db"); |
|
trainListingsCollection = db.GetCollection<TrainListing>("trainListings"); |
|
stationListingsCollection = db.GetCollection<StationListing>("stationListings"); |
|
stationAliasCollection = db.GetCollection<StationAlias>("stationAliases"); |
|
|
|
Migration(); |
|
|
|
Task.Run(async () => await Initialize()); |
|
} |
|
|
|
private void Migration() { |
|
if (!File.Exists(DbFile) && File.Exists(TrainsFile)) { |
|
Logger.LogInformation("Migrating DB version 1 -> 2"); |
|
if (File.Exists(StationsFile)) { |
|
Logger.LogDebug("Converting StationsFile"); |
|
var oldStations = JToken.Parse(File.ReadAllText(StationsFile)); |
|
List<StationListing> stations = new(); |
|
if (oldStations != null) { |
|
Logger.LogDebug("Found {StationsCount} stations", oldStations.Children().Count()); |
|
foreach (var station in oldStations.Children()) { |
|
if (station == null) continue; |
|
station["stoppedAtBy"] = new JArray(station["stoppedAtBy"]!.Children().Select(num => (JToken)(num!).ToString()!).ToArray()); |
|
} |
|
stations = oldStations.ToObject<List<StationListing>>(JsonSerializer.Create(jsonSerializerSettings))!; |
|
} |
|
Logger.LogDebug("Rewriting StationsFile"); |
|
File.WriteAllText(StationsFile, JsonConvert.SerializeObject(stations, jsonSerializerSettings)); |
|
} |
|
if (File.Exists(TrainsFile)) { |
|
Logger.LogDebug("Converting TrainsFile"); |
|
var oldTrains = JToken.Parse(File.ReadAllText(TrainsFile)); |
|
List<TrainListing> trains = new(); |
|
if (oldTrains != null) { |
|
Logger.LogDebug("Found {TrainsCount} trains", oldTrains.Children().Count()); |
|
foreach (var train in oldTrains.Children()) { |
|
if (train == null) continue; |
|
train["number"] = train["numberString"]; |
|
train["numberString"]?.Remove(); |
|
} |
|
trains = oldTrains.ToObject<List<TrainListing>>(JsonSerializer.Create(jsonSerializerSettings))!; |
|
} |
|
Logger.LogDebug("Rewriting TrainsFile"); |
|
File.WriteAllText(TrainsFile, JsonConvert.SerializeObject(trains, jsonSerializerSettings)); |
|
} |
|
DbData = new(2); |
|
File.WriteAllText(DbFile, JsonConvert.SerializeObject(DbData, jsonSerializerSettings)); |
|
Migration(); |
|
} |
|
else if (File.Exists(DbFile)) { |
|
var oldDbData = JToken.Parse(File.ReadAllText(DbFile)); |
|
if (((int?)oldDbData?["version"]) == 2) { |
|
Logger.LogInformation("Migrating DB version 2 -> 3 (transition from fs+JSON to MongoDB)"); |
|
|
|
if (File.Exists(StationsFile)) { |
|
Logger.LogDebug("Converting StationsFile"); |
|
var stations = JsonConvert.DeserializeObject<List<StationListing>>(File.ReadAllText(StationsFile)); |
|
stationListingsCollection.InsertMany(stations); |
|
File.Delete(StationsFile); |
|
} |
|
|
|
if (File.Exists(TrainsFile)) { |
|
Logger.LogDebug("Converting TrainsFile"); |
|
var trains = JsonConvert.DeserializeObject<List<TrainListing>>(File.ReadAllText(TrainsFile)); |
|
trainListingsCollection.InsertMany(trains); |
|
File.Delete(TrainsFile); |
|
} |
|
|
|
File.Delete(DbFile); |
|
try { |
|
Directory.Delete(DbDir); |
|
} |
|
catch (Exception) { |
|
// Deleting of the directory is optional; may not be allowed in Docker or similar |
|
} |
|
|
|
var x = dbRecordCollection.FindSync(_ => true).ToList()!; |
|
if (x.Count != 0) { |
|
Logger.LogWarning("db collection contained data when migrating to V3"); |
|
using (var _ = Logger.BeginScope("Already existing data:")) { |
|
foreach (var dbRecord in x) { |
|
Logger.LogInformation("Id: {Id}, Version: {Version}", dbRecord.Id, dbRecord.Version); |
|
} |
|
} |
|
Logger.LogInformation("Backing up existing data"); |
|
var backupDbRecordCollection = db.GetCollection<DbRecord>("db-backup"); |
|
backupDbRecordCollection.InsertMany(x); |
|
Logger.LogDebug("Removing existing data"); |
|
dbRecordCollection.DeleteMany(_ => true); |
|
} |
|
dbRecordCollection.InsertOne(new(3)); |
|
Migration(); |
|
} |
|
else { |
|
throw new("Unexpected Database version, only DB Version 2 uses DbFile"); |
|
} |
|
} |
|
else { |
|
var datas = dbRecordCollection.FindSync(_ => true).ToList(); |
|
if (datas.Count == 0) { |
|
Logger.LogInformation("No db record found, new database"); |
|
dbRecordCollection.InsertOne(DbData); |
|
} |
|
else { |
|
DbData = datas[0]; |
|
} |
|
if (DbData.Version == 3) { |
|
Logger.LogInformation("Using MongoDB Database Version 3; noop"); |
|
} |
|
else { |
|
throw new($"Unexpected Database version: {DbData.Version}"); |
|
} |
|
} |
|
} |
|
|
|
private async Task Initialize() { |
|
await foreach (var entry in await stationAliasCollection.FindAsync(_ => true)) { |
|
if (entry?.ListingId is null) continue; |
|
stationObjectIds.Add(entry.Name, entry.ListingId); |
|
} |
|
} |
|
|
|
private readonly SemaphoreSlim insertTrainLock = new (1, 1); |
|
public async Task<string> FoundTrain(string rank, string number, string company) { |
|
number = string.Join("", number.TakeWhile(c => c is >= '0' and <= '9')); |
|
// If there is a matching ObjectId, then it's already in the database |
|
if (trainObjectIds.ContainsKey(number)) return number; |
|
await insertTrainLock.WaitAsync(); |
|
try { |
|
var possibleTrains = await (await throttle.MakeRequest(() => trainListingsCollection.FindAsync( |
|
Builders<TrainListing>.Filter.Eq("number", number) |
|
))).ToListAsync(); |
|
if (possibleTrains.Count == 0) { |
|
Logger.LogDebug("Found train {Rank} {Number} from {Company}", rank, number, company); |
|
TrainListing listing = new(number: number, rank: rank, company: company); |
|
await throttle.MakeRequest(() => trainListingsCollection.InsertOneAsync(listing)); |
|
if (listing.Id != null) { |
|
trainObjectIds[number] = listing.Id; |
|
} |
|
} |
|
else { |
|
foreach (var possibleTrain in possibleTrains) { |
|
trainObjectIds[possibleTrain.Number] = possibleTrain.Id!; |
|
} |
|
} |
|
} |
|
finally { |
|
insertTrainLock.Release(); |
|
} |
|
|
|
return number; |
|
} |
|
|
|
private readonly SemaphoreSlim insertStationLock = new (1, 1); |
|
|
|
public async Task FoundStation(string name) { |
|
// if (!await throttle.MakeRequest(() => stationListingsCollection.Find(Builders<StationListing>.Filter.Eq("name", name)).AnyAsync())) { |
|
// Logger.LogDebug("Found station {StationName}", name); |
|
// await throttle.MakeRequest(() => stationListingsCollection.InsertOneAsync(new(name, new()))); |
|
|
|
// } |
|
// If there is a matching ObjectId, then it's already in the database |
|
if (stationObjectIds.ContainsKey(name)) return; |
|
|
|
await insertStationLock.WaitAsync(); |
|
UpdateResult update; |
|
try { |
|
update = await stationListingsCollection.UpdateOneAsync( |
|
Builders<StationListing>.Filter.Eq("name", name), |
|
Builders<StationListing>.Update.Combine( |
|
Builders<StationListing>.Update.SetOnInsert("name", name), |
|
Builders<StationListing>.Update.SetOnInsert("stoppedAtBy", new List<string>()) |
|
), |
|
new UpdateOptions { |
|
IsUpsert = true, |
|
} |
|
); |
|
if (update.IsAcknowledged && update.ModifiedCount > 0) { |
|
var listingId = update.UpsertedId.AsObjectId.ToString(); |
|
stationObjectIds[name] = listingId; |
|
await stationAliasCollection.UpdateOneAsync( |
|
Builders<StationAlias>.Filter.Eq("name", name), |
|
Builders<StationAlias>.Update.Combine( |
|
Builders<StationAlias>.Update.SetOnInsert("name", name), |
|
Builders<StationAlias>.Update.SetOnInsert("listingId", listingId) |
|
), |
|
new UpdateOptions { IsUpsert = true } |
|
); |
|
} |
|
} |
|
finally { |
|
insertStationLock.Release(); |
|
} |
|
|
|
if (update.IsAcknowledged && update.MatchedCount == 0) { |
|
Logger.LogDebug("Found station {StationName}", name); |
|
} |
|
} |
|
|
|
public async Task FoundStations(IEnumerable<string> names) { |
|
var unknownStations = names.ToList(); |
|
if (unknownStations.All(s => stationObjectIds.ContainsKey(s))) { |
|
return; |
|
} |
|
|
|
unknownStations.RemoveAll(s => stationObjectIds.ContainsKey(s)); |
|
var existingStations = await (await stationListingsCollection.FindAsync( |
|
Builders<StationListing>.Filter.StringIn("name", unknownStations.Select((n) => new StringOrRegularExpression(n))) |
|
)).ToListAsync(); |
|
foreach (var existingStation in existingStations) { |
|
stationObjectIds[existingStation.Name] = existingStation.Id!; |
|
} |
|
|
|
unknownStations.RemoveAll(s => existingStations.Select(st => st.Name).Contains(s)); |
|
if (unknownStations.Count == 0) return; |
|
var unknownStationListings = unknownStations.Select((s) => new StationListing(s, new())).ToList(); |
|
await stationListingsCollection.InsertManyAsync(unknownStationListings); |
|
foreach (var listing in unknownStationListings) { |
|
stationObjectIds[listing.Name] = listing.Id!; |
|
} |
|
Logger.LogDebug("Found stations {StationNames}", unknownStations); |
|
} |
|
|
|
public async Task FoundTrainAtStation(string stationName, string trainNumber) { |
|
trainNumber = string.Join("", trainNumber.TakeWhile(c => c is >= '0' and <= '9')); |
|
await FoundStation(stationName); |
|
UpdateResult updateResult; |
|
if (stationObjectIds.ContainsKey(stationName)) { |
|
updateResult = await throttle.MakeRequest(() => stationListingsCollection.UpdateOneAsync( |
|
Builders<StationListing>.Filter.Eq("_id", ObjectId.Parse(stationObjectIds[stationName])), |
|
Builders<StationListing>.Update.AddToSet("stoppedAtBy", trainNumber) |
|
)); |
|
} |
|
else { |
|
updateResult = await throttle.MakeRequest(() => stationListingsCollection.UpdateOneAsync( |
|
Builders<StationListing>.Filter.Eq("name", stationName), |
|
Builders<StationListing>.Update.AddToSet("stoppedAtBy", trainNumber) |
|
)); |
|
} |
|
if (updateResult.IsAcknowledged && updateResult.ModifiedCount > 0) { |
|
Logger.LogDebug("Found train {TrainNumber} at station {StationName}", trainNumber, stationName); |
|
} |
|
} |
|
|
|
public async Task FoundTrainAtStations(IEnumerable<string> stationNames, string trainNumber) { |
|
trainNumber = string.Join("", trainNumber.TakeWhile(c => c is >= '0' and <= '9')); |
|
var enumerable = stationNames as string[] ?? stationNames.ToArray(); |
|
await FoundStations(enumerable); |
|
var objectIds = enumerable |
|
.Select<string, ObjectId?>((stationName) => stationObjectIds.ContainsKey(stationName) ? ObjectId.Parse(stationObjectIds[stationName]) : null) |
|
.ToList(); |
|
UpdateResult updateResult; |
|
if (!objectIds.Any((id) => id is null)) { |
|
updateResult = await throttle.MakeRequest(() => stationListingsCollection.UpdateManyAsync( |
|
Builders<StationListing>.Filter.In("_id", objectIds), |
|
Builders<StationListing>.Update.AddToSet("stoppedAtBy", trainNumber) |
|
)); |
|
} |
|
else { |
|
updateResult = await throttle.MakeRequest(() => stationListingsCollection.UpdateManyAsync( |
|
Builders<StationListing>.Filter.StringIn("name", enumerable.Select(sn => new StringOrRegularExpression(sn))), |
|
Builders<StationListing>.Update.AddToSet("stoppedAtBy", trainNumber) |
|
)); |
|
} |
|
if (updateResult.IsAcknowledged && updateResult.ModifiedCount > 0) { |
|
Logger.LogDebug("Found train {TrainNumber} at stations {StationNames}", trainNumber, stationNames); |
|
} |
|
} |
|
|
|
public async Task OnTrainData(InfoferScraper.Models.Train.ITrainScrapeResult trainData) { |
|
var trainNumber = await FoundTrain(trainData.Rank, trainData.Number, trainData.Operator); |
|
await FoundTrainAtStations( |
|
trainData.Groups |
|
.SelectMany(g => g.Stations) |
|
.Select(trainStop => trainStop.Name) |
|
.Distinct(), |
|
trainNumber |
|
); |
|
} |
|
|
|
public async Task OnStationData(InfoferScraper.Models.Station.IStationScrapeResult stationData) { |
|
var stationName = stationData.StationName; |
|
|
|
async Task ProcessTrain(InfoferScraper.Models.Station.IStationArrDep train) { |
|
var trainNumber = train.Train.Number; |
|
trainNumber = await FoundTrain(train.Train.Rank, trainNumber, train.Train.Operator); |
|
await FoundTrainAtStations(Enumerable.Repeat(stationName, 1).Concat(train.Train.Route).Distinct(), trainNumber); |
|
} |
|
|
|
List<IStationArrDep> arrdep = new(); |
|
if (stationData.Arrivals != null) { |
|
arrdep.AddRange(stationData.Arrivals); |
|
} |
|
if (stationData.Departures != null) { |
|
arrdep.AddRange(stationData.Departures); |
|
} |
|
|
|
foreach (var train in arrdep.DistinctBy((t) => t.Train.Number)) { |
|
await ProcessTrain(train); |
|
} |
|
} |
|
|
|
public async Task OnItineraries(IReadOnlyList<IItinerary> itineraries) { |
|
foreach (var itinerary in itineraries) { |
|
foreach (var train in itinerary.Trains) { |
|
await FoundTrainAtStations( |
|
train.IntermediateStops.Concat(new[] { train.From, train.To }), |
|
train.TrainNumber |
|
); |
|
} |
|
} |
|
} |
|
} |
|
|
|
public record DbRecord( |
|
[property: BsonId] |
|
[property: BsonRepresentation(BsonType.ObjectId)] |
|
[property: JsonProperty(NullValueHandling = NullValueHandling.Ignore)] |
|
string? Id, |
|
int Version |
|
) { |
|
public DbRecord(int version) : this(null, version) { } |
|
}
|
|
|