2024-07-04 09:14:25 +02:00
package main
import (
2024-08-06 11:40:30 +02:00
"bufio"
2025-02-17 16:54:25 +01:00
"encoding/base64"
2024-08-06 11:40:30 +02:00
"encoding/json"
"fmt"
"io"
2024-07-25 18:48:25 +02:00
"os"
2024-08-06 11:40:30 +02:00
"os/exec"
"regexp"
2025-02-17 16:54:25 +01:00
"slices"
2024-08-06 11:40:30 +02:00
"strings"
2024-08-07 17:24:09 +02:00
"sync"
2025-04-09 18:59:37 +02:00
"time"
2024-07-25 18:48:25 +02:00
2024-08-19 11:43:40 +02:00
"oc-monitord/conf"
"oc-monitord/models"
2025-02-17 16:54:25 +01:00
u "oc-monitord/utils"
2024-08-19 11:43:40 +02:00
"oc-monitord/workflow_builder"
2024-07-25 18:48:25 +02:00
2024-08-06 11:40:30 +02:00
oclib "cloud.o-forge.io/core/oc-lib"
2024-08-07 17:24:09 +02:00
2024-07-25 18:48:25 +02:00
"cloud.o-forge.io/core/oc-lib/logs"
2025-02-14 12:00:29 +01:00
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/peer"
2024-09-24 11:43:42 +02:00
"cloud.o-forge.io/core/oc-lib/models/utils"
2024-08-07 17:24:09 +02:00
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
2025-02-05 08:36:26 +01:00
"cloud.o-forge.io/core/oc-lib/tools"
2024-08-07 17:24:09 +02:00
2025-02-14 12:00:29 +01:00
tools2 "oc-monitord/tools"
2024-07-25 18:48:25 +02:00
"github.com/akamensky/argparse"
2024-08-06 11:40:30 +02:00
"github.com/google/uuid"
2024-07-25 18:48:25 +02:00
"github.com/goraz/onion"
"github.com/rs/zerolog"
2024-07-04 09:14:25 +02:00
)
2024-08-29 10:17:31 +02:00
// Command-line args:
// - url: Loki URL (default: "http://127.0.0.1:3100")
// - execution: Workflow Execution ID (required) to identify the current execution, allows to retrieve Workflow
// - mongo: MongoDB URL (default: "mongodb://127.0.0.1:27017")
// - db: MongoDB database name (default: "DC_myDC")
// - timeout: Execution timeout (default: -1)
2024-08-19 11:43:40 +02:00
var logger zerolog . Logger
var wf_logger zerolog . Logger
2025-04-09 18:59:37 +02:00
var pods_logger zerolog . Logger
2024-08-19 11:43:40 +02:00
var parser argparse . Parser
var workflowName string
2024-07-25 18:48:25 +02:00
2024-08-19 11:43:40 +02:00
const defaultConfigFile = "/etc/oc/ocmonitord_conf.json"
const localConfigFile = "./conf/local_ocmonitord_conf.json"
2024-07-25 18:48:25 +02:00
2024-07-04 09:14:25 +02:00
func main ( ) {
2024-08-29 10:17:31 +02:00
2025-02-17 16:54:25 +01:00
os . Setenv ( "test_service" , "true" ) // Only for service demo, delete before merging on main
parser = * argparse . NewParser ( "oc-monitord" , "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database" )
loadConfig ( false , & parser )
2024-11-07 13:36:28 +01:00
oclib . InitDaemon ( "oc-monitord" )
2024-08-13 17:19:25 +02:00
2024-09-05 11:23:43 +02:00
oclib . SetConfig (
conf . GetConfig ( ) . MongoURL ,
conf . GetConfig ( ) . Database ,
conf . GetConfig ( ) . NatsURL ,
conf . GetConfig ( ) . LokiURL ,
conf . GetConfig ( ) . Logs ,
)
2024-09-25 15:46:39 +02:00
logger = logs . CreateLogger ( "oc-monitord" )
logger . Debug ( ) . Msg ( "Loki URL : " + conf . GetConfig ( ) . LokiURL )
logger . Debug ( ) . Msg ( "Workflow executed : " + conf . GetConfig ( ) . ExecutionID )
2025-02-17 16:54:25 +01:00
exec := u . GetExecution ( conf . GetConfig ( ) . ExecutionID )
2025-02-14 12:00:29 +01:00
conf . GetConfig ( ) . WorkflowID = exec . WorkflowID
2024-08-13 17:19:25 +02:00
2025-02-14 12:00:29 +01:00
logger . Debug ( ) . Msg ( "Starting construction of yaml argo for workflow :" + exec . WorkflowID )
2024-08-13 16:15:39 +02:00
2024-08-19 11:43:40 +02:00
if _ , err := os . Stat ( "./argo_workflows/" ) ; os . IsNotExist ( err ) {
os . Mkdir ( "./argo_workflows/" , 0755 )
2024-08-13 16:15:39 +02:00
logger . Info ( ) . Msg ( "Created argo_workflows/" )
}
2024-08-07 17:24:09 +02:00
// // create argo
2024-08-06 11:40:30 +02:00
new_wf := workflow_builder . WorflowDB { }
2025-02-05 08:36:26 +01:00
err := new_wf . LoadFrom ( conf . GetConfig ( ) . WorkflowID , conf . GetConfig ( ) . PeerID )
2024-08-06 11:40:30 +02:00
if err != nil {
2025-04-09 18:59:37 +02:00
2024-08-06 11:40:30 +02:00
logger . Error ( ) . Msg ( "Could not retrieve workflow " + conf . GetConfig ( ) . WorkflowID + " from oc-catalog API" )
}
2025-04-02 11:40:14 +02:00
builder , stepMax , err := new_wf . ExportToArgo ( exec . ExecutionsID , conf . GetConfig ( ) . Timeout )
2024-08-06 11:40:30 +02:00
if err != nil {
2024-08-19 11:43:40 +02:00
logger . Error ( ) . Msg ( "Could not create the Argo file for " + conf . GetConfig ( ) . WorkflowID )
2024-08-06 11:40:30 +02:00
logger . Error ( ) . Msg ( err . Error ( ) )
2024-07-04 09:14:25 +02:00
}
2025-04-02 11:40:14 +02:00
2025-04-14 18:20:49 +02:00
argoFilePath , err := builder . CompleteBuild ( exec . ExecutionsID )
2025-04-02 11:40:14 +02:00
if err != nil {
logger . Error ( ) . Msg ( err . Error ( ) )
}
2024-08-06 11:40:30 +02:00
2025-04-14 18:20:49 +02:00
workflowName = getContainerName ( argoFilePath )
2024-08-06 11:40:30 +02:00
2024-08-19 11:43:40 +02:00
wf_logger = logger . With ( ) . Str ( "argo_name" , workflowName ) . Str ( "workflow_id" , conf . GetConfig ( ) . WorkflowID ) . Str ( "workflow_execution_id" , conf . GetConfig ( ) . ExecutionID ) . Logger ( )
2024-08-06 11:40:30 +02:00
wf_logger . Debug ( ) . Msg ( "Testing argo name" )
2025-04-02 11:40:14 +02:00
_ = stepMax
2024-08-06 11:40:30 +02:00
2025-04-02 11:40:14 +02:00
if conf . GetConfig ( ) . KubeHost == "" {
// Not in a k8s environment, get conf from parameters
fmt . Println ( "Executes outside of k8s" )
2025-04-14 18:20:49 +02:00
executeOutside ( argoFilePath , stepMax , builder . Workflow )
2025-04-02 11:40:14 +02:00
} else {
// Executed in a k8s environment
fmt . Println ( "Executes inside a k8s" )
2025-04-14 18:20:49 +02:00
// executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID()
executeInside ( conf . GetConfig ( ) . ExecutionID , conf . GetConfig ( ) . ExecutionID , argoFilePath , stepMax )
2024-08-07 17:24:09 +02:00
}
2024-07-04 09:14:25 +02:00
}
2024-07-25 18:48:25 +02:00
2024-09-24 11:45:32 +02:00
// So far we only log the output from
2025-02-17 16:54:25 +01:00
func executeInside ( execID string , ns string , argo_file_path string , stepMax int ) {
2025-02-14 12:00:29 +01:00
t , err := tools2 . NewService ( conf . GetConfig ( ) . Mode )
if err != nil {
logger . Error ( ) . Msg ( "Could not create KubernetesTool" )
}
2025-02-17 16:54:25 +01:00
name , err := t . CreateArgoWorkflow ( argo_file_path , ns )
if err != nil {
logger . Error ( ) . Msg ( "Could not create argo workflow : " + err . Error ( ) )
} else {
split := strings . Split ( argo_file_path , "_" )
argoLogs := models . NewArgoLogs ( split [ 0 ] , "argo" , stepMax )
argoLogs . StartStepRecording ( argoLogs . NewWatch ( ) , wf_logger )
err := t . LogWorkflow ( execID , ns , name , argo_file_path , stepMax , argoLogs . NewWatch ( ) , argoLogs . NewWatch ( ) , argoLogs , [ ] string { } , logWorkflow )
if err != nil {
logger . Error ( ) . Msg ( "Could not log workflow : " + err . Error ( ) )
}
}
2025-02-14 12:00:29 +01:00
}
2024-09-03 15:57:35 +02:00
2025-04-09 18:59:37 +02:00
func executeOutside ( argo_file_path string , stepMax int , workflow workflow_builder . Workflow ) {
// var stdoutSubmit, stderrSubmit, stdout_logs, stderr_logs io.ReadCloser
var stdoutSubmit , stderrSubmit io . ReadCloser
var stdoutLogs , stderrLogs io . ReadCloser
2024-08-07 17:24:09 +02:00
// var stderr io.ReadCloser
2025-04-09 18:59:37 +02:00
var wg sync . WaitGroup
2024-08-06 11:40:30 +02:00
var err error
2025-04-09 18:59:37 +02:00
2025-04-10 11:10:16 +02:00
logger . Debug ( ) . Msg ( "executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf . GetConfig ( ) . ExecutionID + " -n " + conf . GetConfig ( ) . ExecutionID )
2025-04-09 18:59:37 +02:00
cmdSubmit := exec . Command ( "argo" , "submit" , "--watch" , argo_file_path , "--serviceaccount" , "sa-" + conf . GetConfig ( ) . ExecutionID , "-n" , conf . GetConfig ( ) . ExecutionID )
if stdoutSubmit , err = cmdSubmit . StdoutPipe ( ) ; err != nil {
2024-08-06 11:40:30 +02:00
wf_logger . Error ( ) . Msg ( "Could not retrieve stdoutpipe " + err . Error ( ) )
return
}
2025-04-09 18:59:37 +02:00
// //======== Code block that implemented a method that logs both locally and container executed wf
// // Need to be improved, did not log well for local executions
// split := strings.Split(argo_file_path, "_")
// argoLogs := models.NewArgoLogs(split[0], conf.GetConfig().ExecutionID, stepMax)
// argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger)
// argoLogs.IsStreaming = true // Used to determine wether or not the logs are read from a docker container or on localhost
// // go logWorkflow(argo_file_path, stepMax, stdout, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, &wg)
// // =======
var steps [ ] string
for _ , template := range workflow . Spec . Templates {
steps = append ( steps , template . Name )
}
2025-04-10 11:10:16 +02:00
cmdLogs := exec . Command ( "argo" , "logs" , "oc-monitor-" + workflowName , "-n" , conf . GetConfig ( ) . ExecutionID , "--follow" , "--no-color" )
2025-04-09 18:59:37 +02:00
if stdoutLogs , err = cmdLogs . StdoutPipe ( ) ; err != nil {
wf_logger . Error ( ) . Msg ( "Could not retrieve stdoutpipe for 'argo logs'" + err . Error ( ) )
return
2024-08-08 10:11:40 +02:00
}
2025-04-09 18:59:37 +02:00
go models . LogLocalWorkflow ( workflowName , stdoutSubmit , & wg )
2025-04-10 11:10:16 +02:00
go models . LogPods ( workflowName , stdoutLogs , steps , & wg )
2025-04-09 18:59:37 +02:00
fmt . Println ( "Starting argo submit" )
if err := cmdSubmit . Start ( ) ; err != nil {
wf_logger . Error ( ) . Msg ( "Could not start argo submit" )
wf_logger . Error ( ) . Msg ( err . Error ( ) + bufio . NewScanner ( stderrSubmit ) . Text ( ) )
updateStatus ( "fatal" , "" )
}
time . Sleep ( 5 * time . Second )
fmt . Println ( "Running argo logs" )
if err := cmdLogs . Run ( ) ; err != nil {
2025-04-10 11:10:16 +02:00
wf_logger . Error ( ) . Msg ( "Could not run '" + strings . Join ( cmdLogs . Args , " " ) + "'" )
2025-04-09 18:59:37 +02:00
wf_logger . Fatal ( ) . Msg ( err . Error ( ) + bufio . NewScanner ( stderrLogs ) . Text ( ) )
}
fmt . Println ( "Waiting argo submit" )
if err := cmdSubmit . Wait ( ) ; err != nil {
2024-08-06 11:40:30 +02:00
wf_logger . Error ( ) . Msg ( "Could not execute argo submit" )
2025-04-09 18:59:37 +02:00
wf_logger . Error ( ) . Msg ( err . Error ( ) + bufio . NewScanner ( stderrSubmit ) . Text ( ) )
2025-02-14 12:00:29 +01:00
updateStatus ( "fatal" , "" )
2024-08-06 11:40:30 +02:00
}
2025-04-09 18:59:37 +02:00
2024-08-07 17:24:09 +02:00
wg . Wait ( )
2024-08-06 11:40:30 +02:00
}
2025-04-08 17:21:59 +02:00
// !!!! BUGGED !!!!
// Should be refactored to create a function dedicated to logging output from execution in a container
// LogLocalWorkflow() has been implemented to be used when oc-monitord is executed locally
2024-08-06 11:40:30 +02:00
// We could improve this function by creating an object with the same attribute as the output
// and only send a new log if the current object has different values than the previous
2025-02-17 16:54:25 +01:00
func logWorkflow ( argo_file_path string , stepMax int , pipe io . ReadCloser ,
current_watch * models . ArgoWatch , previous_watch * models . ArgoWatch ,
argoLogs * models . ArgoLogs , seen [ ] string , wg * sync . WaitGroup ) {
2024-08-06 11:40:30 +02:00
scanner := bufio . NewScanner ( pipe )
2025-02-17 16:54:25 +01:00
count := 0
see := ""
seeit := 0
2024-08-08 10:11:40 +02:00
for scanner . Scan ( ) {
log := scanner . Text ( )
2025-02-17 16:54:25 +01:00
if strings . Contains ( log , "capturing logs" ) && count == 0 {
if ! argoLogs . IsStreaming {
wg . Add ( 1 )
}
seeit ++
2025-04-09 18:59:37 +02:00
} else if count == 0 && ! argoLogs . IsStreaming {
2025-04-08 17:21:59 +02:00
break
2025-02-17 16:54:25 +01:00
}
if count == 1 {
see = log
if slices . Contains ( argoLogs . Seen , see ) && ! argoLogs . IsStreaming {
wg . Done ( )
seeit --
break
}
2024-10-11 13:44:16 +02:00
}
2025-02-17 16:54:25 +01:00
if ! slices . Contains ( current_watch . Logs , log ) {
2025-02-21 11:20:41 +01:00
current_watch . Logs = append ( current_watch . Logs , strings . ReplaceAll ( log , "\"" , "" ) )
2025-02-17 16:54:25 +01:00
}
count ++
2025-04-08 17:21:59 +02:00
if strings . Contains ( log , "sub-process exited" ) || argoLogs . IsStreaming {
2025-02-17 16:54:25 +01:00
current_watch = argoLogs . StopStepRecording ( current_watch )
argoLogs . Seen = append ( argoLogs . Seen , see )
if checkStatus ( current_watch , previous_watch , argoLogs ) {
count = 0
if ! argoLogs . IsStreaming {
wg . Done ( )
}
seeit --
}
2024-10-11 13:44:16 +02:00
jsonified , err := json . Marshal ( current_watch )
if err != nil {
logger . Error ( ) . Msg ( "Could not create watch log" )
}
if current_watch . Status == "Failed" {
wf_logger . Error ( ) . Msg ( string ( jsonified ) )
} else {
2024-08-08 10:11:40 +02:00
wf_logger . Info ( ) . Msg ( string ( jsonified ) )
2024-08-06 11:40:30 +02:00
}
2024-10-11 13:44:16 +02:00
previous_watch = current_watch
current_watch = & models . ArgoWatch { }
2025-04-08 17:21:59 +02:00
if argoLogs . IsStreaming {
current_watch . Logs = [ ] string { }
}
2024-08-06 11:40:30 +02:00
}
2024-08-08 10:11:40 +02:00
}
2024-08-06 11:40:30 +02:00
}
2024-08-19 11:43:40 +02:00
func loadConfig ( is_k8s bool , parser * argparse . Parser ) {
2024-07-25 18:48:25 +02:00
var o * onion . Onion
2024-08-19 11:43:40 +02:00
o = initOnion ( o )
2024-08-06 11:40:30 +02:00
setConf ( is_k8s , o , parser )
2025-03-07 16:19:26 +01:00
// if !IsValidUUID(conf.GetConfig().ExecutionID) {
// logger.Fatal().Msg("Provided ID is not an UUID")
// }
2024-08-06 11:40:30 +02:00
}
func setConf ( is_k8s bool , o * onion . Onion , parser * argparse . Parser ) {
2025-02-17 16:54:25 +01:00
url := parser . String ( "u" , "url" , & argparse . Options { Required : true , Default : "http://127.0.0.1:3100" , Help : "Url to the Loki database logs will be sent to" } )
mode := parser . String ( "M" , "mode" , & argparse . Options { Required : false , Default : "" , Help : "Mode of the execution" } )
execution := parser . String ( "e" , "execution" , & argparse . Options { Required : true , Help : "Execution ID of the workflow to request from oc-catalog API" } )
peer := parser . String ( "p" , "peer" , & argparse . Options { Required : false , Default : "" , Help : "Peer ID of the workflow to request from oc-catalog API" } )
mongo := parser . String ( "m" , "mongo" , & argparse . Options { Required : true , Default : "mongodb://127.0.0.1:27017" , Help : "URL to reach the MongoDB" } )
db := parser . String ( "d" , "database" , & argparse . Options { Required : true , Default : "DC_myDC" , Help : "Name of the database to query in MongoDB" } )
timeout := parser . Int ( "t" , "timeout" , & argparse . Options { Required : false , Default : - 1 , Help : "Timeout for the execution of the workflow" } )
ca := parser . String ( "c" , "ca" , & argparse . Options { Required : false , Default : "" , Help : "CA file for the Kubernetes cluster" } )
cert := parser . String ( "C" , "cert" , & argparse . Options { Required : false , Default : "" , Help : "Cert file for the Kubernetes cluster" } )
data := parser . String ( "D" , "data" , & argparse . Options { Required : false , Default : "" , Help : "Data file for the Kubernetes cluster" } )
host := parser . String ( "H" , "host" , & argparse . Options { Required : false , Default : "" , Help : "Host for the Kubernetes cluster" } )
port := parser . String ( "P" , "port" , & argparse . Options { Required : false , Default : "6443" , Help : "Port for the Kubernetes cluster" } )
2025-04-14 18:20:49 +02:00
// argoHost := parser.String("h", "argoHost", &argparse.Options{Required: false, Default: "", Help: "Host where Argo is running from"}) // can't use -h because its reserved to help
2025-02-17 16:54:25 +01:00
err := parser . Parse ( os . Args )
if err != nil {
fmt . Println ( parser . Usage ( err ) )
os . Exit ( 1 )
}
conf . GetConfig ( ) . Logs = "debug"
conf . GetConfig ( ) . LokiURL = * url
conf . GetConfig ( ) . MongoURL = * mongo
conf . GetConfig ( ) . Database = * db
conf . GetConfig ( ) . Timeout = * timeout
conf . GetConfig ( ) . Mode = * mode
conf . GetConfig ( ) . ExecutionID = * execution
conf . GetConfig ( ) . PeerID = * peer
conf . GetConfig ( ) . KubeHost = * host
conf . GetConfig ( ) . KubePort = * port
2025-04-14 18:20:49 +02:00
// conf.GetConfig().ArgoHost = *argoHost
2025-02-17 16:54:25 +01:00
decoded , err := base64 . StdEncoding . DecodeString ( * ca )
if err == nil {
conf . GetConfig ( ) . KubeCA = string ( decoded )
}
decoded , err = base64 . StdEncoding . DecodeString ( * cert )
if err == nil {
conf . GetConfig ( ) . KubeCert = string ( decoded )
}
decoded , err = base64 . StdEncoding . DecodeString ( * data )
if err == nil {
conf . GetConfig ( ) . KubeData = string ( decoded )
2024-08-06 11:40:30 +02:00
}
}
func initOnion ( o * onion . Onion ) * onion . Onion {
2024-09-05 11:23:43 +02:00
logger = logs . CreateLogger ( "oc-monitord" )
2024-07-25 18:48:25 +02:00
configFile := ""
2024-08-19 11:43:40 +02:00
l3 := onion . NewEnvLayerPrefix ( "_" , "OCMONITORD" )
2024-07-25 18:48:25 +02:00
l2 , err := onion . NewFileLayer ( defaultConfigFile , nil )
if err == nil {
logger . Info ( ) . Msg ( "Config file found : " + defaultConfigFile )
configFile = defaultConfigFile
}
l1 , err := onion . NewFileLayer ( localConfigFile , nil )
if err == nil {
logger . Info ( ) . Msg ( "Local config file found " + localConfigFile + ", overriding default file" )
configFile = localConfigFile
}
if configFile == "" {
logger . Info ( ) . Msg ( "No config file found, using env" )
o = onion . New ( l3 )
} else if l1 == nil && l2 == nil {
o = onion . New ( l1 , l2 , l3 )
} else if l1 == nil {
o = onion . New ( l2 , l3 )
} else if l2 == nil {
o = onion . New ( l1 , l3 )
}
2024-08-06 11:40:30 +02:00
return o
}
2024-07-25 18:48:25 +02:00
2024-08-06 11:40:30 +02:00
func IsValidUUID ( u string ) bool {
2024-08-19 11:43:40 +02:00
_ , err := uuid . Parse ( u )
return err == nil
}
2024-07-25 18:48:25 +02:00
2024-08-19 11:43:40 +02:00
func getContainerName ( argo_file string ) string {
2024-08-06 11:40:30 +02:00
regex := "([a-zA-Z]+-[a-zA-Z]+)"
re := regexp . MustCompile ( regex )
2024-07-25 18:48:25 +02:00
2024-08-06 11:40:30 +02:00
container_name := re . FindString ( argo_file )
2025-04-09 18:59:37 +02:00
2024-08-06 11:40:30 +02:00
return container_name
}
2024-08-07 17:24:09 +02:00
// Uses the ArgoWatch object to update status of the workflow execution object
2025-02-17 16:54:25 +01:00
func checkStatus ( current * models . ArgoWatch , previous * models . ArgoWatch , argoLogs * models . ArgoLogs ) bool {
if previous == nil || current . Status != previous . Status || argoLogs . IsStreaming {
argoLogs . StepCount += 1
2025-02-14 12:00:29 +01:00
if len ( current . Logs ) > 0 {
2025-02-17 16:54:25 +01:00
newLogs := [ ] string { }
for _ , log := range current . Logs {
if ! slices . Contains ( argoLogs . Logs , log ) {
newLogs = append ( newLogs , log )
}
}
updateStatus ( current . Status , strings . Join ( newLogs , "\n" ) )
current . Logs = newLogs
argoLogs . Logs = append ( argoLogs . Logs , newLogs ... )
2025-02-14 12:00:29 +01:00
} else {
updateStatus ( current . Status , "" )
}
2024-08-07 17:24:09 +02:00
}
2025-02-17 16:54:25 +01:00
return previous == nil || current . Status != previous . Status || argoLogs . IsStreaming
2024-08-07 17:24:09 +02:00
}
2025-02-14 12:00:29 +01:00
func updateStatus ( status string , log string ) {
2024-08-07 17:24:09 +02:00
exec_id := conf . GetConfig ( ) . ExecutionID
2025-02-14 12:00:29 +01:00
wf_exec := & workflow_execution . WorkflowExecution { AbstractObject : utils . AbstractObject { UUID : conf . GetConfig ( ) . ExecutionID } }
2024-08-07 17:24:09 +02:00
wf_exec . ArgoStatusToState ( status )
2025-02-14 12:00:29 +01:00
exec , _ , err := workflow_execution . NewAccessor ( & tools . APIRequest {
2025-02-05 08:36:26 +01:00
PeerID : conf . GetConfig ( ) . PeerID ,
} ) . UpdateOne ( wf_exec , exec_id )
if err != nil {
logger . Error ( ) . Msg ( "Could not update status for workflow execution " + exec_id + err . Error ( ) )
2024-08-07 17:24:09 +02:00
}
2025-02-14 12:00:29 +01:00
splitted := strings . Split ( log , "-" )
if len ( splitted ) > 1 {
we := exec . ( * workflow_execution . WorkflowExecution )
itemID := splitted [ len ( splitted ) - 1 ] // TODO: in logs found item ID
caller := & tools . HTTPCaller {
URLS : map [ tools . DataType ] map [ tools . METHOD ] string {
tools . PEER : {
tools . POST : "/status/" ,
} ,
tools . BOOKING : {
tools . PUT : "http://localhost:8080/booking/:id" ,
} ,
} ,
}
if we . PeerBookByGraph != nil {
for peerID , val := range we . PeerBookByGraph {
if val [ itemID ] == nil {
continue
}
for _ , log := range val [ itemID ] {
( & peer . Peer { } ) . LaunchPeerExecution ( peerID , log , tools . BOOKING , tools . PUT , & booking . Booking {
State : we . State ,
} , caller )
}
}
}
}
2024-08-19 11:43:40 +02:00
}