From e25c95f7ace8f00d049605b3d83101c19722cb0e Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 28 May 2026 08:33:13 +0200 Subject: [PATCH] neooclib --- .gitignore | 4 +- daemons/execute_monitor_local.go | 6 +- daemons/execution_manager.go | 142 +++++++++++++++++++++++++++++-- daemons/nats.go | 34 ++++++++ daemons/schedule_manager.go | 6 +- go.mod | 2 +- go.sum | 6 ++ main.go | 10 ++- 8 files changed, 192 insertions(+), 18 deletions(-) create mode 100644 daemons/nats.go diff --git a/.gitignore b/.gitignore index b29658b..fa5c937 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,6 @@ __debug_bin argo_workflows/* *.xml -*.txt \ No newline at end of file +*.txt + +oc-schedulerd \ No newline at end of file diff --git a/daemons/execute_monitor_local.go b/daemons/execute_monitor_local.go index 8e1a916..54a690e 100644 --- a/daemons/execute_monitor_local.go +++ b/daemons/execute_monitor_local.go @@ -8,6 +8,7 @@ import ( "time" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/models/common/enum" "github.com/rs/zerolog" ) @@ -47,6 +48,7 @@ func (lm *LocalMonitor) PrepareMonitorExec() []string { "-c", conf.GetConfig().KubeCA, "-C", conf.GetConfig().KubeCert, "-D", conf.GetConfig().KubeData, + "-N", config.GetConfig().NATSUrl, "-s", fmt.Sprintf("%d", lm.ScheduledTime.Unix()), } if lm.Duration > 0 { @@ -70,9 +72,9 @@ func (lm *LocalMonitor) LaunchMonitor(args []string, execID string, ns string, l stdoutMonitord, err := cmd.StdoutPipe() if err != nil { l.Error().Err(err).Msg("Could not retrieve stdout pipe for oc-monitord") - oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ + /*oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ "state": enum.FAILURE.EnumIndex(), - }, execID) + }, execID)*/ return } diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go index 812b705..045966a 100644 --- a/daemons/execution_manager.go +++ b/daemons/execution_manager.go @@ -2,13 +2,18 @@ package daemons import ( "fmt" + "strings" "time" "oc-schedulerd/conf" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/common/enum" + "cloud.o-forge.io/core/oc-lib/models/resources" + wf "cloud.o-forge.io/core/oc-lib/models/workflow" workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" + "github.com/rs/zerolog" ) var Executions = ScheduledExecution{Execs: map[string]workflow_execution.WorkflowExecution{}} @@ -45,9 +50,7 @@ func (em *ExecutionManager) RetrieveNextExecutions() { execId, exec.ExecDate.Format(time.RFC3339), lead)) // Mark as STARTED immediately (before goroutine) so the next // SchedulePolling cycle doesn't re-pick this execution from DB. - oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ - "state": enum.STARTED.EnumIndex(), - }, exec.GetID()) + emitExecStateUpdate(exec.GetID(), enum.STARTED) go em.executeExecution(&exec) delete(executions, execId) } @@ -60,6 +63,130 @@ func (em *ExecutionManager) RetrieveNextExecutions() { } } +// validateWorkflowIntegrity loads the workflow referenced by the execution and +// runs structural integrity checks before any resource is booked or any pod is +// started. This is the sovereign enforcement layer — oc-front may be bypassed +// via direct API calls, so oc-schedulerd re-validates independently. +// +// Two layers of validation are applied in order: +// 1. Structural integrity (cycles, missing compute links, variable refs, …). +// 2. Autorisation d'Exploitation (AE) — coupling and peer-usage constraints +// published by resource owners in oc-catalog. Violations are fraudulent and +// trigger a PEER_BEHAVIOR_EVENT(BehaviorFraud) against the consumer peer. +// +// Returns true when the execution is safe to proceed. +// On failure: emits FAILURE state, logs each violation, and returns false. +func (em *ExecutionManager) validateWorkflowIntegrity(execution *workflow_execution.WorkflowExecution, logger zerolog.Logger) bool { + res := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW), nil).LoadOne(execution.WorkflowID) + if res.Err != "" || res.Data == nil { + return true // can't load workflow — let the existing error path handle it downstream + } + workflow, ok := res.Data.(*wf.Workflow) + if !ok { + return true + } + + // ── 1. Structural integrity ─────────────────────────────────────────────── + violations := workflow.ValidateIntegrity() + var structErrors []wf.IntegrityViolation + for _, v := range violations { + if v.IsError() { + structErrors = append(structErrors, v) + } + } + if len(structErrors) > 0 { + msgs := make([]string, 0, len(structErrors)) + for _, v := range structErrors { + msgs = append(msgs, fmt.Sprintf("[%s] %s", v.Type, v.Message)) + } + logger.Error().Msg(fmt.Sprintf( + "workflow '%s' (exec %s) rejected — %d integrity violation(s):\n %s", + execution.WorkflowID, execution.GetID(), len(structErrors), strings.Join(msgs, "\n "), + )) + emitExecStateUpdate(execution.GetID(), enum.FAILURE) + return false + } + + // ── 2. Autorisation d'Exploitation (AE) ────────────────────────────────── + // Build a per-type map of resource IDs referenced in the workflow. + // The workflow's ResourceSet stores raw IDs in Datas/Processings/etc. + resourcesByType := map[tools.DataType][]string{ + tools.DATA_RESOURCE: workflow.Datas, + tools.PROCESSING_RESOURCE: workflow.Processings, + tools.STORAGE_RESOURCE: workflow.Storages, + tools.COMPUTE_RESOURCE: workflow.Computes, + tools.WORKFLOW_RESOURCE: workflow.Workflows, + tools.SERVICE_RESOURCE: workflow.Services, + } + + // Build a flat ID set for coupling membership checks. + idSet := map[string]struct{}{} + for _, ids := range resourcesByType { + for _, id := range ids { + idSet[id] = struct{}{} + } + } + + // Determine the consumer peer (this peer is executing the workflow). + consumerPeerID := "" + if self, err := oclib.GetMySelf(); err == nil && self != nil { + consumerPeerID = self.GetID() + } + + aeViolations := checkWorkflowAE(execution.WorkflowID, consumerPeerID, resourcesByType, idSet) + if len(aeViolations) > 0 { + msgs := make([]string, 0, len(aeViolations)) + for _, v := range aeViolations { + msgs = append(msgs, fmt.Sprintf("[%s] %s", v.Type, v.Message)) + } + logger.Error().Msg(fmt.Sprintf( + "workflow '%s' (exec %s) rejected — %d AE violation(s):\n %s", + execution.WorkflowID, execution.GetID(), len(aeViolations), strings.Join(msgs, "\n "), + )) + resources.EmitAEBehaviorReport(consumerPeerID, aeViolations) + emitExecStateUpdate(execution.GetID(), enum.FAILURE) + return false + } + + return true +} + +// checkWorkflowAE loads each workflow resource from the DB and checks its +// embedded ExploitationAuthorizations against the execution context. +// Kept in oc-schedulerd (not oc-lib/models/resources) to avoid a circular +// import: resources → oclib → models → resources. +func checkWorkflowAE( + workflowID string, + consumerPeerID string, + resourcesByType map[tools.DataType][]string, + idSet map[string]struct{}, +) []resources.AEViolation { + now := time.Now().UTC() + var violations []resources.AEViolation + + type hasAE interface { + GetExploitationAuthorizations() []resources.ExploitationAuthorization + } + + for dt, ids := range resourcesByType { + for _, id := range ids { + res := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).LoadOne(id) + if res.Err != "" || res.Data == nil { + continue + } + ra, ok := res.Data.(hasAE) + if !ok { + continue + } + for _, ae := range ra.GetExploitationAuthorizations() { + vs := ae.CheckAE(id, workflowID, consumerPeerID, idSet, now) + violations = append(violations, vs...) + } + } + } + return violations +} + func (em *ExecutionManager) executeExecution(execution *workflow_execution.WorkflowExecution) { // start execution // create the yaml that describes the pod : filename, path/url to Loki @@ -67,6 +194,11 @@ func (em *ExecutionManager) executeExecution(execution *workflow_execution.Workf // exec_method := os.Getenv("MONITOR_METHOD") logger := oclib.GetLogger() + // Sovereign integrity check — reject before touching any resource. + if !em.validateWorkflowIntegrity(execution, logger) { + return + } + duration := 0 if execution.EndDate != nil { duration = int(execution.EndDate.Sub(execution.ExecDate).Seconds()) @@ -80,9 +212,7 @@ func (em *ExecutionManager) executeExecution(execution *workflow_execution.Workf if executor == nil { logger.Fatal().Msg("Could not create executor") - oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ - "state": enum.FAILURE.EnumIndex(), - }, execution.GetID()) + emitExecStateUpdate(execution.GetID(), enum.FAILURE) return } diff --git a/daemons/nats.go b/daemons/nats.go new file mode 100644 index 0000000..0a359c5 --- /dev/null +++ b/daemons/nats.go @@ -0,0 +1,34 @@ +package daemons + +import ( + "encoding/json" + + "cloud.o-forge.io/core/oc-lib/models/common/enum" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" +) + +// emitExecStateUpdate loads the execution, sets its state and emits a +// CREATE_RESOURCE NATS event so oc-scheduler applies the change and fires +// NotifyChange for the WebSocket streams. +// Direct UpdateOne calls are replaced by this function so oc-scheduler remains +// the single writer for WorkflowExecution. +func emitExecStateUpdate(execID string, state enum.BookingStatus) { + adminReq := &tools.APIRequest{Admin: true} + res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(execID) + if err != nil || res == nil { + return + } + exec := res.(*workflow_execution.WorkflowExecution) + exec.State = state + payload, marshalErr := json.Marshal(exec) + if marshalErr != nil { + return + } + tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ + FromApp: "oc-schedulerd", + Datatype: tools.WORKFLOW_EXECUTION, + Method: int(tools.CREATE_RESOURCE), + Payload: payload, + }) +} diff --git a/daemons/schedule_manager.go b/daemons/schedule_manager.go index 4ea62af..c50d799 100644 --- a/daemons/schedule_manager.go +++ b/daemons/schedule_manager.go @@ -41,9 +41,7 @@ func (sb *ScheduledExecution) AddSchedules(new_executions []*workflow_execution. fmt.Println("Adding "+exec.UUID, !sb.execIsSet(exec)) if !sb.execIsSet(exec) { sb.Execs[exec.UUID] = *exec - oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ - "state": enum.SCHEDULED.EnumIndex(), - }, exec.GetID()) + emitExecStateUpdate(exec.GetID(), enum.SCHEDULED) } } } @@ -82,7 +80,7 @@ func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list "state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}}, }, } - res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", "", []string{}, nil).Search(&f, "", false) + res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", "", []string{}, nil).Search(&f, "", false, 0, 100000) if res.Code != 200 { s.Logger.Error().Msg("Error loading " + res.Err) return diff --git a/go.mod b/go.mod index 91b26b9..9582c27 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-schedulerd go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0 + cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b github.com/google/uuid v1.6.0 github.com/rs/zerolog v1.34.0 go.mongodb.org/mongo-driver v1.17.4 diff --git a/go.sum b/go.sum index a315176..80293db 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b h1:y0rppyzGIQTIy cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0 h1:pQf9k+GSzNGEmrUa00jn9Zcqfp9X4N1Z5ie7InvUf3g= cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260414104622-dc0041999d22 h1:lum7G12vCKYKQWXTOYtl2Qh9hLRlzrcOPO3pozUBL40= +cloud.o-forge.io/core/oc-lib v0.0.0-20260414104622-dc0041999d22/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260423074839-e70e89b630b7 h1:WdExXiecLnST8a7gAh6Z9Xd1Q+0/EjTy1P+b9ABoga8= +cloud.o-forge.io/core/oc-lib v0.0.0-20260423074839-e70e89b630b7/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b h1:TWhmHeurbBmdyevREh4+mHWOBehO2AK587RCIjCfvOc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= diff --git a/main.go b/main.go index 43307dc..fb56902 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "oc-schedulerd/conf" "oc-schedulerd/daemons" "os" @@ -24,9 +25,11 @@ func main() { conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", "kubernetes.default.svc.cluster.local") conf.GetConfig().KubePort = o.GetStringDefault("KUBERNETES_SERVICE_PORT", "6443") - conf.GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkakNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTnpNeE1qY3dPVFl3SGhjTk1qWXdNekV3TURjeE9ERTJXaGNOTXpZd016QTNNRGN4T0RFMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTnpNeE1qY3dPVFl3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFReG81cXQ0MGxEekczRHJKTE1wRVBrd0ZBY1FmbC8vVE1iWjZzemMreHAKbmVzVzRTSTdXK1lWdFpRYklmV2xBMTRaazQvRFlDMHc1YlgxZU94RVVuL0pvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVXBLM2pGK25IRlZSbDcwb3ZRVGZnCmZabGNQZE13Q2dZSUtvWkl6ajBFQXdJRFJ3QXdSQUlnVnkyaUx0Y0xaYm1vTnVoVHdKbU5sWlo3RVlBYjJKNW0KSjJYbG1UbVF5a2tDSUhLbzczaDBkdEtUZTlSa0NXYTJNdStkS1FzOXRFU0tBV0x1emlnYXBHYysKLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=") - conf.GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrakNDQVRlZ0F3SUJBZ0lJQUkvSUg2R2Rodm93Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOemN6TVRJM01EazJNQjRYRFRJMk1ETXhNREEzTVRneE5sb1hEVEkzTURNeApNREEzTVRneE5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJQTTdBVEZQSmFMMjUrdzAKUU1vZUIxV2hBRW4vWnViM0tSRERrYnowOFhwQWJ2akVpdmdnTkdpdG4wVmVsaEZHamRmNHpBT29Nd1J3M21kbgpYSGtHVDB5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCUVZLOThaMEMxcFFyVFJSMGVLZHhIa2o0ejFJREFLQmdncWhrak9QUVFEQWdOSkFEQkcKQWlFQXZYWll6Zk9iSUtlWTRtclNsRmt4ZS80a0E4K01ieDc1UDFKRmNlRS8xdGNDSVFDNnM0ZXlZclhQYmNWSgpxZm5EamkrZ1RacGttN0tWSTZTYTlZN2FSRGFabUE9PQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCi0tLS0tQkVHSU4gQ0VSVElGSUNBVEUtLS0tLQpNSUlCZURDQ0FSMmdBd0lCQWdJQkFEQUtCZ2dxaGtqT1BRUURBakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwClpXNTBMV05oUURFM056TXhNamN3T1RZd0hoY05Nall3TXpFd01EY3hPREUyV2hjTk16WXdNekEzTURjeE9ERTIKV2pBak1TRXdId1lEVlFRRERCaHJNM010WTJ4cFpXNTBMV05oUURFM056TXhNamN3T1RZd1dUQVRCZ2NxaGtqTwpQUUlCQmdncWhrak9QUU1CQndOQ0FBUzV1NGVJbStvVnV1SFI0aTZIOU1kVzlyUHdJbFVPNFhIMEJWaDRUTGNlCkNkMnRBbFVXUW5FakxMdlpDWlVaYTlzTlhKOUVtWWt5S0dtQWR2TE9FbUVrbzBJd1FEQU9CZ05WSFE4QkFmOEUKQkFNQ0FxUXdEd1lEVlIwVEFRSC9CQVV3QXdFQi96QWRCZ05WSFE0RUZnUVVGU3ZmR2RBdGFVSzAwVWRIaW5jUgo1SStNOVNBd0NnWUlLb1pJemowRUF3SURTUUF3UmdJaEFMY2xtQnR4TnpSVlBvV2hoVEVKSkM1Z3VNSGsvcFZpCjFvYXJ2UVJxTWRKcUFpRUEyR1dNTzlhZFFYTEQwbFZKdHZMVkc1M3I0M0lxMHpEUUQwbTExMVZyL1MwPQotLS0tLUVORCBDRVJUSUZJQ0FURS0tLS0tCg==") - conf.GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUVkSTRZN3lRU1ZwRGNrblhsQmJEaXBWZHRMWEVsYVBkN3VBZHdBWFFya2xvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFOHpzQk1VOGxvdmJuN0RSQXloNEhWYUVBU2Y5bTV2Y3BFTU9SdlBUeGVrQnUrTVNLK0NBMAphSzJmUlY2V0VVYU4xL2pNQTZnekJIRGVaMmRjZVFaUFRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=") + conf.GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", "") + conf.GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", "") + conf.GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", "") + + fmt.Println(conf.GetConfig().KubeCA) // Test if oc-monitor binary is reachable // For local executions @@ -54,5 +57,4 @@ func main() { }) go sch_mngr.SchedulePolling() exe_mngr.RetrieveNextExecutions() - }