package utils import ( "sync" "cloud.o-forge.io/core/oc-lib/tools" ) // ChangeEvent is fired whenever a DB object is created, updated or deleted // within this process. Deleted=true means the object was removed; Object is // the last known snapshot before deletion. type ChangeEvent struct { DataType tools.DataType ID string Object ShallowDBObject // nil only when the load after the write failed Deleted bool } var ( changeBusMu sync.RWMutex changeBus = map[tools.DataType][]chan ChangeEvent{} ) // SubscribeChanges returns a channel that receives ChangeEvents for dt // whenever an object of that type is written or deleted in this process. // Call the returned cancel function to unsubscribe; after that the channel // will no longer receive events (it is not closed — use a context to stop // reading). func SubscribeChanges(dt tools.DataType) (<-chan ChangeEvent, func()) { ch := make(chan ChangeEvent, 32) changeBusMu.Lock() changeBus[dt] = append(changeBus[dt], ch) changeBusMu.Unlock() return ch, func() { changeBusMu.Lock() subs := changeBus[dt] for i, c := range subs { if c == ch { changeBus[dt] = append(subs[:i], subs[i+1:]...) break } } changeBusMu.Unlock() } } // NotifyChange broadcasts a ChangeEvent to all current subscribers for dt. // Non-blocking: events are dropped for subscribers whose buffer is full. func NotifyChange(dt tools.DataType, id string, obj ShallowDBObject, deleted bool) { changeBusMu.RLock() subs := changeBus[dt] changeBusMu.RUnlock() evt := ChangeEvent{DataType: dt, ID: id, Object: obj, Deleted: deleted} for _, ch := range subs { select { case ch <- evt: default: } } }