Change bus
This commit is contained in:
60
models/utils/change_bus.go
Normal file
60
models/utils/change_bus.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
// ChangeEvent is fired whenever a DB object is created, updated or deleted
|
||||
// within this process. Deleted=true means the object was removed; Object is
|
||||
// the last known snapshot before deletion.
|
||||
type ChangeEvent struct {
|
||||
DataType tools.DataType
|
||||
ID string
|
||||
Object ShallowDBObject // nil only when the load after the write failed
|
||||
Deleted bool
|
||||
}
|
||||
|
||||
var (
|
||||
changeBusMu sync.RWMutex
|
||||
changeBus = map[tools.DataType][]chan ChangeEvent{}
|
||||
)
|
||||
|
||||
// SubscribeChanges returns a channel that receives ChangeEvents for dt
|
||||
// whenever an object of that type is written or deleted in this process.
|
||||
// Call the returned cancel function to unsubscribe; after that the channel
|
||||
// will no longer receive events (it is not closed — use a context to stop
|
||||
// reading).
|
||||
func SubscribeChanges(dt tools.DataType) (<-chan ChangeEvent, func()) {
|
||||
ch := make(chan ChangeEvent, 32)
|
||||
changeBusMu.Lock()
|
||||
changeBus[dt] = append(changeBus[dt], ch)
|
||||
changeBusMu.Unlock()
|
||||
return ch, func() {
|
||||
changeBusMu.Lock()
|
||||
subs := changeBus[dt]
|
||||
for i, c := range subs {
|
||||
if c == ch {
|
||||
changeBus[dt] = append(subs[:i], subs[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
changeBusMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// NotifyChange broadcasts a ChangeEvent to all current subscribers for dt.
|
||||
// Non-blocking: events are dropped for subscribers whose buffer is full.
|
||||
func NotifyChange(dt tools.DataType, id string, obj ShallowDBObject, deleted bool) {
|
||||
changeBusMu.RLock()
|
||||
subs := changeBus[dt]
|
||||
changeBusMu.RUnlock()
|
||||
evt := ChangeEvent{DataType: dt, ID: id, Object: obj, Deleted: deleted}
|
||||
for _, ch := range subs {
|
||||
select {
|
||||
case ch <- evt:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -63,7 +63,11 @@ func GenericStoreOne(data DBObject, a Accessor) (DBObject, int, error) {
|
||||
a.GetLogger().Error().Msg("Could not store " + data.GetName() + " to db. Error: " + err.Error())
|
||||
return nil, code, err
|
||||
}
|
||||
return a.LoadOne(id)
|
||||
result, rcode, rerr := a.LoadOne(id)
|
||||
if rerr == nil && result != nil {
|
||||
go NotifyChange(a.GetType(), result.GetID(), result, false)
|
||||
}
|
||||
return result, rcode, rerr
|
||||
}
|
||||
|
||||
// GenericLoadOne loads one object from the database (generic)
|
||||
@@ -86,6 +90,7 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
|
||||
a.GetLogger().Error().Msg("Could not delete " + id + " to db. Error: " + err.Error())
|
||||
return nil, code, err
|
||||
}
|
||||
go NotifyChange(a.GetType(), id, res, true)
|
||||
return res, 200, nil
|
||||
}
|
||||
|
||||
@@ -142,7 +147,11 @@ func GenericUpdateOne(change map[string]interface{}, id string, a Accessor) (DBO
|
||||
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
|
||||
return nil, code, err
|
||||
}
|
||||
return a.LoadOne(id)
|
||||
result, rcode, rerr := a.LoadOne(id)
|
||||
if rerr == nil && result != nil {
|
||||
go NotifyChange(a.GetType(), result.GetID(), result, false)
|
||||
}
|
||||
return result, rcode, rerr
|
||||
}
|
||||
|
||||
func GenericLoadOne[T DBObject](id string, data T, f func(DBObject) (DBObject, int, error), a Accessor) (DBObject, int, error) {
|
||||
@@ -210,7 +219,11 @@ func GenericRawUpdateOne(set DBObject, id string, a Accessor) (DBObject, int, er
|
||||
a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
|
||||
return nil, code, err
|
||||
}
|
||||
return a.LoadOne(id)
|
||||
result, rcode, rerr := a.LoadOne(id)
|
||||
if rerr == nil && result != nil {
|
||||
go NotifyChange(a.GetType(), result.GetID(), result, false)
|
||||
}
|
||||
return result, rcode, rerr
|
||||
}
|
||||
|
||||
func GetMySelf(wfa Accessor) (ShallowDBObject, error) {
|
||||
|
||||
Reference in New Issue
Block a user