Build complete event
This commit is contained in:
2
go.mod
2
go.mod
@@ -3,7 +3,7 @@ module oc-monitord
|
||||
go 1.25.0
|
||||
|
||||
require (
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62
|
||||
github.com/akamensky/argparse v1.4.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/goraz/onion v0.1.3
|
||||
|
||||
6
go.sum
6
go.sum
@@ -36,6 +36,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260224122900-d18b031a293a h1:gdr886O31Ai5p
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260224122900-d18b031a293a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7 h1:p9uJjMY+QkE4neA+xRmIRtAm9us94EKZqgajDdLOd0Y=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312082544-d7a8f2adaa5c h1:kBb0dpxyInd4Gs1rriz8mkeKwSCOyomn2ZzWY7YWZqc=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312082544-d7a8f2adaa5c/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312083310-f5e199132416 h1:QHR5pzCI/HUawu8pst5Ggio6WPCUUf8XYjNMVk8kSqo=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312083310-f5e199132416/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62 h1:sHzacZxPIKHyjL4EkgG/c7MI8gM1xmLdhaoUx2ZsH+M=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc=
|
||||
github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
|
||||
|
||||
2
main.go
2
main.go
@@ -254,7 +254,7 @@ func updateStatus(status string, log string) {
|
||||
wf_exec.ArgoStatusToState(status)
|
||||
exec, _, err := workflow_execution.NewAccessor(&tools.APIRequest{
|
||||
PeerID: conf.GetConfig().PeerID,
|
||||
}).UpdateOne(wf_exec, exec_id)
|
||||
}).UpdateOne(wf_exec.Serialize(wf_exec), exec_id)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not update status for workflow execution " + exec_id + err.Error())
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"oc-monitord/conf"
|
||||
"os/exec"
|
||||
"strings"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/models"
|
||||
@@ -106,7 +107,7 @@ type Template struct {
|
||||
Resource ServiceResource `yaml:"resource,omitempty"`
|
||||
}
|
||||
|
||||
func (template *Template) CreateEventContainer(exec *workflow_execution.WorkflowExecution, nt *resources.NativeTool, dag *Dag) {
|
||||
func (template *Template) CreateEventContainer(execution *workflow_execution.WorkflowExecution, nt *resources.NativeTool, dag *Dag) {
|
||||
container := Container{Image: "natsio/nats-box"}
|
||||
container.Command = []string{"sh", "-c"} // all is bash
|
||||
|
||||
@@ -122,9 +123,29 @@ func (template *Template) CreateEventContainer(exec *workflow_execution.Workflow
|
||||
return
|
||||
}
|
||||
if event.WorkflowResourceID != "" {
|
||||
container.Args = append(container.Args, "nats pub --server "+conf.GetConfig().NatsURL+":4222 "+tools.WORKFLOW_EVENT.GenerateKey()+" '{\"workflow_id\":\""+event.WorkflowResourceID+"\"}'")
|
||||
container.Args = []string{strings.Join(container.Args, " ")}
|
||||
template.Container = container
|
||||
event.Payload = event.Input
|
||||
event.Input = ""
|
||||
if b, err := json.Marshal(event); err == nil {
|
||||
payload, err := json.Marshal(&tools.NATSResponse{
|
||||
FromApp: "oc-monitord",
|
||||
Datatype: tools.NATIVE_TOOL,
|
||||
Method: int(tools.WORKFLOW_EVENT),
|
||||
Payload: b,
|
||||
})
|
||||
if err == nil {
|
||||
cmd := exec.Command(
|
||||
"nats",
|
||||
"pub",
|
||||
"--server", conf.GetConfig().NatsURL+":4222",
|
||||
tools.WORKFLOW_EVENT.GenerateKey(),
|
||||
string(payload),
|
||||
)
|
||||
for _, args := range cmd.Args {
|
||||
container.Args = append(container.Args, args)
|
||||
}
|
||||
template.Container = container
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -257,7 +257,6 @@ func (b *ArgoBuilder) createArgoTemplates(
|
||||
_, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, obj, firstItems, lastItems)
|
||||
template := &Template{Name: getArgoName(obj.GetName(), id)}
|
||||
logger.Info().Msg(fmt.Sprint("Creating template for", template.Name))
|
||||
|
||||
// Vérifie si le processing est sur un peer distant (Admiralty).
|
||||
isReparted, peer := b.isReparted(obj, id)
|
||||
if obj.GetType() == tools.PROCESSING_RESOURCE.String() {
|
||||
|
||||
Reference in New Issue
Block a user