NATS is an OCLIB tool
This commit is contained in:
parent
e31815f576
commit
649a1098c3
@ -75,7 +75,7 @@ func AddPath(collection LibDataEnum, path string) {
|
|||||||
func Init(appName string) {
|
func Init(appName string) {
|
||||||
logs.SetAppName(appName)
|
logs.SetAppName(appName)
|
||||||
logs.SetLogger(logs.CreateLogger("main", ""))
|
logs.SetLogger(logs.CreateLogger("main", ""))
|
||||||
mongo.MONGOService.Init(models.GetModelsNames(), GetConfig())
|
mongo.MONGOService.Init(models.GetModelsNames(), tools.GetConfig())
|
||||||
accessor := (&resource_model.ResourceModel{}).GetAccessor(nil)
|
accessor := (&resource_model.ResourceModel{}).GetAccessor(nil)
|
||||||
for _, model := range []string{utils.DATA_RESOURCE.String(), utils.PROCESSING_RESOURCE.String(), utils.STORAGE_RESOURCE.String(), utils.DATACENTER_RESOURCE.String(), utils.WORKFLOW_RESOURCE.String()} {
|
for _, model := range []string{utils.DATA_RESOURCE.String(), utils.PROCESSING_RESOURCE.String(), utils.STORAGE_RESOURCE.String(), utils.DATACENTER_RESOURCE.String(), utils.WORKFLOW_RESOURCE.String()} {
|
||||||
data, code, _ := accessor.Search(nil, model)
|
data, code, _ := accessor.Search(nil, model)
|
||||||
|
7
go.mod
7
go.mod
@ -9,7 +9,12 @@ require (
|
|||||||
github.com/stretchr/testify v1.9.0
|
github.com/stretchr/testify v1.9.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require github.com/robfig/cron/v3 v3.0.1 // indirect
|
require (
|
||||||
|
github.com/nats-io/nats.go v1.37.0 // indirect
|
||||||
|
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||||
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
|
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||||
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||||
|
6
go.sum
6
go.sum
@ -32,6 +32,12 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
|
|||||||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||||
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
|
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
|
||||||
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
|
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
|
||||||
|
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
|
||||||
|
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||||
|
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||||
|
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
@ -138,28 +138,22 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) {
|
func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) {
|
||||||
if realData.Schedule == nil && realData.ScheduleActive {
|
var err error
|
||||||
res, code, err := wfa.LoadOne(id)
|
nats := tools.NATSCaller{}
|
||||||
if code == 200 {
|
res, code, _ := wfa.LoadOne(id)
|
||||||
r := res.(*Workflow)
|
if code != 200 {
|
||||||
if r.Schedule != nil {
|
return 404, errors.New("could not load workflow")
|
||||||
err = wfa.book(id, r, []*workflow_execution.WorkflowExecution{})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 200, err
|
|
||||||
}
|
}
|
||||||
if realData.Schedule == nil && !realData.ScheduleActive {
|
if (realData.Schedule == nil && !realData.ScheduleActive) || (realData.Schedule == nil && realData.ScheduleActive) {
|
||||||
mongo.MONGOService.DeleteMultiple(map[string]interface{}{
|
mongo.MONGOService.DeleteMultiple(map[string]interface{}{
|
||||||
"state": 1,
|
"state": 1,
|
||||||
"workflow_id": id,
|
"workflow_id": id,
|
||||||
}, utils.WORKFLOW_EXECUTION.String())
|
}, utils.WORKFLOW_EXECUTION.String())
|
||||||
err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{})
|
err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{})
|
||||||
|
nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData)
|
||||||
return 200, err
|
return 200, err
|
||||||
}
|
}
|
||||||
res, code, _ := wfa.LoadOne(id)
|
|
||||||
if code != 200 {
|
|
||||||
return 404, errors.New("could not load workflow")
|
|
||||||
}
|
|
||||||
r := res.(*Workflow)
|
r := res.(*Workflow)
|
||||||
if r.Schedule != nil && r.Schedule.Start == realData.Schedule.Start && r.Schedule.End == realData.Schedule.End && r.Schedule.Cron == realData.Schedule.Cron {
|
if r.Schedule != nil && r.Schedule.Start == realData.Schedule.Start && r.Schedule.End == realData.Schedule.End && r.Schedule.Cron == realData.Schedule.Cron {
|
||||||
return 200, nil
|
return 200, nil
|
||||||
@ -178,6 +172,7 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet
|
|||||||
"workflow_id": id,
|
"workflow_id": id,
|
||||||
"state": 1,
|
"state": 1,
|
||||||
}, utils.WORKFLOW_EXECUTION.String())
|
}, utils.WORKFLOW_EXECUTION.String())
|
||||||
|
nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData)
|
||||||
}
|
}
|
||||||
if len(execs) > 0 {
|
if len(execs) > 0 {
|
||||||
for _, obj := range execs {
|
for _, obj := range execs {
|
||||||
@ -186,6 +181,7 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet
|
|||||||
return code, err
|
return code, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
nats.SetNATSPub(utils.WORKFLOW.String(), tools.CREATE, realData)
|
||||||
} else {
|
} else {
|
||||||
return 422, err
|
return 422, err
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package oclib
|
package tools
|
||||||
|
|
||||||
import "sync"
|
import "sync"
|
||||||
|
|
||||||
@ -9,6 +9,7 @@ import "sync"
|
|||||||
// ===================================================
|
// ===================================================
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
|
NATSUrl string
|
||||||
MongoUrl string
|
MongoUrl string
|
||||||
MongoDatabase string
|
MongoDatabase string
|
||||||
}
|
}
|
||||||
@ -31,11 +32,12 @@ func GetConfig() *Config {
|
|||||||
return instance
|
return instance
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetConfig(url string, database string) *Config {
|
func SetConfig(url string, database string, natsUrl string) *Config {
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
instance = &Config{
|
instance = &Config{
|
||||||
MongoUrl: url,
|
MongoUrl: url,
|
||||||
MongoDatabase: database,
|
MongoDatabase: database,
|
||||||
|
NATSUrl: natsUrl,
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
return instance
|
return instance
|
62
tools/nats_caller.go
Normal file
62
tools/nats_caller.go
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
package tools
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type NATSMethod int
|
||||||
|
|
||||||
|
const (
|
||||||
|
REMOVE NATSMethod = iota
|
||||||
|
CREATE
|
||||||
|
)
|
||||||
|
|
||||||
|
func NameToMethod(name string) NATSMethod {
|
||||||
|
for _, v := range [...]NATSMethod{REMOVE, CREATE} {
|
||||||
|
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d NATSMethod) GenerateKey(name string) string {
|
||||||
|
return name + "_" + d.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d NATSMethod) String() string {
|
||||||
|
return [...]string{"remove", "create"}[d]
|
||||||
|
}
|
||||||
|
|
||||||
|
type NATSCaller struct {
|
||||||
|
natsUrl string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewNATSCaller(url string) *NATSCaller {
|
||||||
|
return &NATSCaller{
|
||||||
|
natsUrl: url,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *NATSCaller) SetNATSPub(dataName string, method NATSMethod, data interface{}) string {
|
||||||
|
if o.natsUrl == "" {
|
||||||
|
return " -> NATS_SERVER is not set"
|
||||||
|
}
|
||||||
|
nc, err := nats.Connect(o.natsUrl)
|
||||||
|
if err != nil {
|
||||||
|
return " -> Could not reach NATS server : " + err.Error()
|
||||||
|
}
|
||||||
|
defer nc.Close()
|
||||||
|
js, err := json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
return " -> " + err.Error()
|
||||||
|
}
|
||||||
|
err = nc.Publish(method.GenerateKey(dataName), js)
|
||||||
|
if err != nil {
|
||||||
|
return " -> " + err.Error()
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user