@ -98,15 +98,14 @@ func (sub *Subscriptions) DeleteSubscription(chatId int64, messageId int) (*SubD
break
break
}
}
}
}
var result * SubData
var result SubData
if deleteIndex != - 1 {
if deleteIndex != - 1 {
result = & SubData { }
result = datas [ deleteIndex ]
* result = datas [ deleteIndex ]
datas [ deleteIndex ] = datas [ len ( datas ) - 1 ]
datas [ deleteIndex ] = datas [ len ( datas ) - 1 ]
datas = datas [ : len ( datas ) - 1 ]
datas = datas [ : len ( datas ) - 1 ]
_ , err := database . WriteDB ( func ( db * gorm . DB ) ( * gorm . DB , error ) {
_ , err := database . WriteDB ( func ( db * gorm . DB ) ( * gorm . DB , error ) {
db . Delete ( result )
db . Delete ( & result )
return db , db . Error
return db , db . Error
} )
} )
if err != nil {
if err != nil {
@ -120,7 +119,7 @@ func (sub *Subscriptions) DeleteSubscription(chatId int64, messageId int) (*SubD
} else {
} else {
sub . data [ chatId ] = datas
sub . data [ chatId ] = datas
}
}
return result , nil
return & result , nil
}
}
func ( sub * Subscriptions ) CheckSubscriptions ( ctx context . Context ) {
func ( sub * Subscriptions ) CheckSubscriptions ( ctx context . Context ) {
@ -142,20 +141,29 @@ type workerData struct {
data SubData
data SubData
}
}
type unsubscribe struct {
chatId int64
messageId int
}
type workerResponseData struct {
unsubscribe * unsubscribe
}
func ( sub * Subscriptions ) executeChecks ( ctx context . Context ) {
func ( sub * Subscriptions ) executeChecks ( ctx context . Context ) {
sub . mutex . RLock ( )
sub . mutex . RLock ( )
defer sub . mutex . RUnlock ( )
// Only allow 8 concurrent requests
// Only allow 8 concurrent requests
// TODO: Make configurable instead of hardcoded
// TODO: Make configurable instead of hardcoded
workerCount := 8
workerCount := 8
workerChan := make ( chan workerData , workerCount )
workerChan := make ( chan workerData , workerCount )
wg := & sync . WaitGroup { }
responseChan := make ( chan * workerResponseData , workerCount )
defer close ( responseChan )
for i := 0 ; i < workerCount ; i ++ {
for i := 0 ; i < workerCount ; i ++ {
wg . Add ( 1 )
go checkWorker ( ctx , workerChan , responseChan )
go checkWorker ( ctx , workerChan , wg )
}
}
go func ( ) {
for _ , datas := range sub . data {
for _ , datas := range sub . data {
for i := range datas {
for i := range datas {
workerChan <- workerData {
workerChan <- workerData {
@ -165,12 +173,42 @@ func (sub *Subscriptions) executeChecks(ctx context.Context) {
}
}
}
}
close ( workerChan )
close ( workerChan )
wg . Wait ( )
} ( )
responses := make ( [ ] * workerResponseData , 0 , len ( sub . data ) )
for _ , datas := range sub . data {
for range datas {
if resp := <- responseChan ; resp != nil && resp . unsubscribe != nil {
responses = append ( responses , resp )
}
}
}
}
func checkWorker ( ctx context . Context , workerChan <- chan workerData , wg * sync . WaitGroup ) {
sub . mutex . RUnlock ( )
defer wg . Done ( )
for i := range responses {
if responses [ i ] . unsubscribe != nil {
// Ignore error since this is optional optimisation
deletedSub , err := sub . DeleteSubscription ( responses [ i ] . unsubscribe . chatId , responses [ i ] . unsubscribe . messageId )
if err == nil && deletedSub != nil {
_ , _ = sub . tgBot . EditMessageReplyMarkup ( ctx , & bot . EditMessageReplyMarkupParams {
ChatID : responses [ i ] . unsubscribe . chatId ,
MessageID : responses [ i ] . unsubscribe . messageId ,
ReplyMarkup : handlers . GetTrainNumberCommandResponseButtons ( deletedSub . TrainNumber , deletedSub . Date , deletedSub . GroupIndex , handlers . TrainInfoResponseButtonExcludeSub ) ,
} )
}
}
}
}
func checkWorker ( ctx context . Context , workerChan <- chan workerData , responseChan chan <- * workerResponseData ) {
for wData := range workerChan {
for wData := range workerChan {
func ( ) {
var response * workerResponseData
defer func ( ) {
responseChan <- response
} ( )
data := wData . data
data := wData . data
log . Printf ( "DEBUG: Timer tick, update for chat %d, train %s, date %s, group %d" , data . ChatId , data . TrainNumber , data . Date . Format ( "2006-01-02" ) , data . GroupIndex )
log . Printf ( "DEBUG: Timer tick, update for chat %d, train %s, date %s, group %d" , data . ChatId , data . TrainNumber , data . Date . Format ( "2006-01-02" ) , data . GroupIndex )
@ -179,6 +217,14 @@ func checkWorker(ctx context.Context, workerChan <-chan workerData, wg *sync.Wai
if ! ok || resp == nil || resp . Message == nil {
if ! ok || resp == nil || resp . Message == nil {
// Silently discard update errors
// Silently discard update errors
log . Printf ( "DEBUG: Error when updating chat %d, train %s, date %s, group %d" , data . ChatId , data . TrainNumber , data . Date . Format ( "2006-01-02" ) , data . GroupIndex )
log . Printf ( "DEBUG: Error when updating chat %d, train %s, date %s, group %d" , data . ChatId , data . TrainNumber , data . Date . Format ( "2006-01-02" ) , data . GroupIndex )
if resp != nil && resp . ShouldUnsubscribe {
response = & workerResponseData {
unsubscribe : & unsubscribe {
chatId : data . ChatId ,
messageId : data . MessageId ,
} ,
}
}
return
return
}
}
@ -191,5 +237,14 @@ func checkWorker(ctx context.Context, workerChan <-chan workerData, wg *sync.Wai
DisableWebPagePreview : resp . Message . DisableWebPagePreview ,
DisableWebPagePreview : resp . Message . DisableWebPagePreview ,
ReplyMarkup : resp . Message . ReplyMarkup ,
ReplyMarkup : resp . Message . ReplyMarkup ,
} )
} )
response = & workerResponseData { }
if resp . ShouldUnsubscribe {
response . unsubscribe = & unsubscribe {
chatId : data . ChatId ,
messageId : data . MessageId ,
}
}
} ( )
}
}
}
}