From 649a1098c3a44a3ed52da855d621b113a5a7830f Mon Sep 17 00:00:00 2001 From: mr Date: Wed, 21 Aug 2024 10:04:09 +0200 Subject: [PATCH] NATS is an OCLIB tool --- entrypoint.go | 2 +- go.mod | 7 ++- go.sum | 6 +++ models/workflow/workflow_mongo_accessor.go | 24 ++++----- conf.go => tools/conf.go | 6 ++- tools/nats_caller.go | 62 ++++++++++++++++++++++ 6 files changed, 89 insertions(+), 18 deletions(-) rename conf.go => tools/conf.go (83%) create mode 100644 tools/nats_caller.go diff --git a/entrypoint.go b/entrypoint.go index 70427d5..397e8f9 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -75,7 +75,7 @@ func AddPath(collection LibDataEnum, path string) { func Init(appName string) { logs.SetAppName(appName) logs.SetLogger(logs.CreateLogger("main", "")) - mongo.MONGOService.Init(models.GetModelsNames(), GetConfig()) + mongo.MONGOService.Init(models.GetModelsNames(), tools.GetConfig()) accessor := (&resource_model.ResourceModel{}).GetAccessor(nil) for _, model := range []string{utils.DATA_RESOURCE.String(), utils.PROCESSING_RESOURCE.String(), utils.STORAGE_RESOURCE.String(), utils.DATACENTER_RESOURCE.String(), utils.WORKFLOW_RESOURCE.String()} { data, code, _ := accessor.Search(nil, model) diff --git a/go.mod b/go.mod index d6a875a..0512b6b 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,12 @@ require ( github.com/stretchr/testify v1.9.0 ) -require github.com/robfig/cron/v3 v3.0.1 // indirect +require ( + github.com/nats-io/nats.go v1.37.0 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect +) require ( github.com/mattn/go-colorable v0.1.13 // indirect diff --git a/go.sum b/go.sum index e48473d..3cb7eb9 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,12 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/models/workflow/workflow_mongo_accessor.go b/models/workflow/workflow_mongo_accessor.go index 4a890e7..a85a7a3 100644 --- a/models/workflow/workflow_mongo_accessor.go +++ b/models/workflow/workflow_mongo_accessor.go @@ -138,28 +138,22 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w } func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) { - if realData.Schedule == nil && realData.ScheduleActive { - res, code, err := wfa.LoadOne(id) - if code == 200 { - r := res.(*Workflow) - if r.Schedule != nil { - err = wfa.book(id, r, []*workflow_execution.WorkflowExecution{}) - } - } - return 200, err + var err error + nats := tools.NATSCaller{} + res, code, _ := wfa.LoadOne(id) + if code != 200 { + return 404, errors.New("could not load workflow") } - if realData.Schedule == nil && !realData.ScheduleActive { + if (realData.Schedule == nil && !realData.ScheduleActive) || (realData.Schedule == nil && realData.ScheduleActive) { mongo.MONGOService.DeleteMultiple(map[string]interface{}{ "state": 1, "workflow_id": id, }, utils.WORKFLOW_EXECUTION.String()) err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{}) + nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData) return 200, err } - res, code, _ := wfa.LoadOne(id) - if code != 200 { - return 404, errors.New("could not load workflow") - } + r := res.(*Workflow) if r.Schedule != nil && r.Schedule.Start == realData.Schedule.Start && r.Schedule.End == realData.Schedule.End && r.Schedule.Cron == realData.Schedule.Cron { return 200, nil @@ -178,6 +172,7 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet "workflow_id": id, "state": 1, }, utils.WORKFLOW_EXECUTION.String()) + nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData) } if len(execs) > 0 { for _, obj := range execs { @@ -186,6 +181,7 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet return code, err } } + nats.SetNATSPub(utils.WORKFLOW.String(), tools.CREATE, realData) } else { return 422, err } diff --git a/conf.go b/tools/conf.go similarity index 83% rename from conf.go rename to tools/conf.go index 2c96823..e857f87 100644 --- a/conf.go +++ b/tools/conf.go @@ -1,4 +1,4 @@ -package oclib +package tools import "sync" @@ -9,6 +9,7 @@ import "sync" // =================================================== type Config struct { + NATSUrl string MongoUrl string MongoDatabase string } @@ -31,11 +32,12 @@ func GetConfig() *Config { return instance } -func SetConfig(url string, database string) *Config { +func SetConfig(url string, database string, natsUrl string) *Config { once.Do(func() { instance = &Config{ MongoUrl: url, MongoDatabase: database, + NATSUrl: natsUrl, } }) return instance diff --git a/tools/nats_caller.go b/tools/nats_caller.go new file mode 100644 index 0000000..c7693bc --- /dev/null +++ b/tools/nats_caller.go @@ -0,0 +1,62 @@ +package tools + +import ( + "encoding/json" + "strings" + + "github.com/nats-io/nats.go" +) + +type NATSMethod int + +const ( + REMOVE NATSMethod = iota + CREATE +) + +func NameToMethod(name string) NATSMethod { + for _, v := range [...]NATSMethod{REMOVE, CREATE} { + if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) { + return v + } + } + return -1 +} + +func (d NATSMethod) GenerateKey(name string) string { + return name + "_" + d.String() +} + +func (d NATSMethod) String() string { + return [...]string{"remove", "create"}[d] +} + +type NATSCaller struct { + natsUrl string +} + +func NewNATSCaller(url string) *NATSCaller { + return &NATSCaller{ + natsUrl: url, + } +} + +func (o *NATSCaller) SetNATSPub(dataName string, method NATSMethod, data interface{}) string { + if o.natsUrl == "" { + return " -> NATS_SERVER is not set" + } + nc, err := nats.Connect(o.natsUrl) + if err != nil { + return " -> Could not reach NATS server : " + err.Error() + } + defer nc.Close() + js, err := json.Marshal(data) + if err != nil { + return " -> " + err.Error() + } + err = nc.Publish(method.GenerateKey(dataName), js) + if err != nil { + return " -> " + err.Error() + } + return "" +}