From 56bc342d24226304d119e54c3881acbeb1a6a9ba Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 12 Mar 2026 11:49:52 +0100 Subject: [PATCH] Build complete event --- go.mod | 2 +- go.sum | 6 ++++++ main.go | 2 +- models/template.go | 29 +++++++++++++++++++++++++---- workflow_builder/argo_builder.go | 1 - 5 files changed, 33 insertions(+), 7 deletions(-) diff --git a/go.mod b/go.mod index 1e88349..a93430c 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-monitord go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7 + cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62 github.com/akamensky/argparse v1.4.0 github.com/google/uuid v1.6.0 github.com/goraz/onion v0.1.3 diff --git a/go.sum b/go.sum index 0809ab5..2dc5ff5 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260224122900-d18b031a293a h1:gdr886O31Ai5p cloud.o-forge.io/core/oc-lib v0.0.0-20260224122900-d18b031a293a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7 h1:p9uJjMY+QkE4neA+xRmIRtAm9us94EKZqgajDdLOd0Y= cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312082544-d7a8f2adaa5c h1:kBb0dpxyInd4Gs1rriz8mkeKwSCOyomn2ZzWY7YWZqc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312082544-d7a8f2adaa5c/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312083310-f5e199132416 h1:QHR5pzCI/HUawu8pst5Ggio6WPCUUf8XYjNMVk8kSqo= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312083310-f5e199132416/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62 h1:sHzacZxPIKHyjL4EkgG/c7MI8gM1xmLdhaoUx2ZsH+M= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= diff --git a/main.go b/main.go index 8a25828..ccc3819 100644 --- a/main.go +++ b/main.go @@ -254,7 +254,7 @@ func updateStatus(status string, log string) { wf_exec.ArgoStatusToState(status) exec, _, err := workflow_execution.NewAccessor(&tools.APIRequest{ PeerID: conf.GetConfig().PeerID, - }).UpdateOne(wf_exec, exec_id) + }).UpdateOne(wf_exec.Serialize(wf_exec), exec_id) if err != nil { logger.Error().Msg("Could not update status for workflow execution " + exec_id + err.Error()) } diff --git a/models/template.go b/models/template.go index c2268e1..e2804f3 100644 --- a/models/template.go +++ b/models/template.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "oc-monitord/conf" + "os/exec" "strings" "cloud.o-forge.io/core/oc-lib/models/common/models" @@ -106,7 +107,7 @@ type Template struct { Resource ServiceResource `yaml:"resource,omitempty"` } -func (template *Template) CreateEventContainer(exec *workflow_execution.WorkflowExecution, nt *resources.NativeTool, dag *Dag) { +func (template *Template) CreateEventContainer(execution *workflow_execution.WorkflowExecution, nt *resources.NativeTool, dag *Dag) { container := Container{Image: "natsio/nats-box"} container.Command = []string{"sh", "-c"} // all is bash @@ -122,9 +123,29 @@ func (template *Template) CreateEventContainer(exec *workflow_execution.Workflow return } if event.WorkflowResourceID != "" { - container.Args = append(container.Args, "nats pub --server "+conf.GetConfig().NatsURL+":4222 "+tools.WORKFLOW_EVENT.GenerateKey()+" '{\"workflow_id\":\""+event.WorkflowResourceID+"\"}'") - container.Args = []string{strings.Join(container.Args, " ")} - template.Container = container + event.Payload = event.Input + event.Input = "" + if b, err := json.Marshal(event); err == nil { + payload, err := json.Marshal(&tools.NATSResponse{ + FromApp: "oc-monitord", + Datatype: tools.NATIVE_TOOL, + Method: int(tools.WORKFLOW_EVENT), + Payload: b, + }) + if err == nil { + cmd := exec.Command( + "nats", + "pub", + "--server", conf.GetConfig().NatsURL+":4222", + tools.WORKFLOW_EVENT.GenerateKey(), + string(payload), + ) + for _, args := range cmd.Args { + container.Args = append(container.Args, args) + } + template.Container = container + } + } } } diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 044173f..1a2a1c1 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -257,7 +257,6 @@ func (b *ArgoBuilder) createArgoTemplates( _, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, obj, firstItems, lastItems) template := &Template{Name: getArgoName(obj.GetName(), id)} logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) - // Vérifie si le processing est sur un peer distant (Admiralty). isReparted, peer := b.isReparted(obj, id) if obj.GetType() == tools.PROCESSING_RESOURCE.String() {