From dc0041999d22ff731245bb12dfcf4b4b2f64714f Mon Sep 17 00:00:00 2001 From: mr Date: Tue, 14 Apr 2026 12:46:22 +0200 Subject: [PATCH] Change bus --- models/utils/change_bus.go | 60 ++++++++++++++++++++++++++++++++++++++ models/utils/common.go | 19 ++++++++++-- 2 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 models/utils/change_bus.go diff --git a/models/utils/change_bus.go b/models/utils/change_bus.go new file mode 100644 index 0000000..cd9528f --- /dev/null +++ b/models/utils/change_bus.go @@ -0,0 +1,60 @@ +package utils + +import ( + "sync" + + "cloud.o-forge.io/core/oc-lib/tools" +) + +// ChangeEvent is fired whenever a DB object is created, updated or deleted +// within this process. Deleted=true means the object was removed; Object is +// the last known snapshot before deletion. +type ChangeEvent struct { + DataType tools.DataType + ID string + Object ShallowDBObject // nil only when the load after the write failed + Deleted bool +} + +var ( + changeBusMu sync.RWMutex + changeBus = map[tools.DataType][]chan ChangeEvent{} +) + +// SubscribeChanges returns a channel that receives ChangeEvents for dt +// whenever an object of that type is written or deleted in this process. +// Call the returned cancel function to unsubscribe; after that the channel +// will no longer receive events (it is not closed — use a context to stop +// reading). +func SubscribeChanges(dt tools.DataType) (<-chan ChangeEvent, func()) { + ch := make(chan ChangeEvent, 32) + changeBusMu.Lock() + changeBus[dt] = append(changeBus[dt], ch) + changeBusMu.Unlock() + return ch, func() { + changeBusMu.Lock() + subs := changeBus[dt] + for i, c := range subs { + if c == ch { + changeBus[dt] = append(subs[:i], subs[i+1:]...) + break + } + } + changeBusMu.Unlock() + } +} + +// NotifyChange broadcasts a ChangeEvent to all current subscribers for dt. +// Non-blocking: events are dropped for subscribers whose buffer is full. +func NotifyChange(dt tools.DataType, id string, obj ShallowDBObject, deleted bool) { + changeBusMu.RLock() + subs := changeBus[dt] + changeBusMu.RUnlock() + evt := ChangeEvent{DataType: dt, ID: id, Object: obj, Deleted: deleted} + for _, ch := range subs { + select { + case ch <- evt: + default: + } + } +} diff --git a/models/utils/common.go b/models/utils/common.go index 7ac6acd..753abe6 100755 --- a/models/utils/common.go +++ b/models/utils/common.go @@ -63,7 +63,11 @@ func GenericStoreOne(data DBObject, a Accessor) (DBObject, int, error) { a.GetLogger().Error().Msg("Could not store " + data.GetName() + " to db. Error: " + err.Error()) return nil, code, err } - return a.LoadOne(id) + result, rcode, rerr := a.LoadOne(id) + if rerr == nil && result != nil { + go NotifyChange(a.GetType(), result.GetID(), result, false) + } + return result, rcode, rerr } // GenericLoadOne loads one object from the database (generic) @@ -86,6 +90,7 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) { a.GetLogger().Error().Msg("Could not delete " + id + " to db. Error: " + err.Error()) return nil, code, err } + go NotifyChange(a.GetType(), id, res, true) return res, 200, nil } @@ -142,7 +147,11 @@ func GenericUpdateOne(change map[string]interface{}, id string, a Accessor) (DBO a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error()) return nil, code, err } - return a.LoadOne(id) + result, rcode, rerr := a.LoadOne(id) + if rerr == nil && result != nil { + go NotifyChange(a.GetType(), result.GetID(), result, false) + } + return result, rcode, rerr } func GenericLoadOne[T DBObject](id string, data T, f func(DBObject) (DBObject, int, error), a Accessor) (DBObject, int, error) { @@ -210,7 +219,11 @@ func GenericRawUpdateOne(set DBObject, id string, a Accessor) (DBObject, int, er a.GetLogger().Error().Msg("Could not update " + id + " to db. Error: " + err.Error()) return nil, code, err } - return a.LoadOne(id) + result, rcode, rerr := a.LoadOne(id) + if rerr == nil && result != nil { + go NotifyChange(a.GetType(), result.GetID(), result, false) + } + return result, rcode, rerr } func GetMySelf(wfa Accessor) (ShallowDBObject, error) {