This commit is contained in:
mr
2026-05-28 08:33:13 +02:00
parent 115d175d45
commit e25c95f7ac
8 changed files with 192 additions and 18 deletions
+4 -2
View File
@@ -8,6 +8,7 @@ import (
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"github.com/rs/zerolog"
)
@@ -47,6 +48,7 @@ func (lm *LocalMonitor) PrepareMonitorExec() []string {
"-c", conf.GetConfig().KubeCA,
"-C", conf.GetConfig().KubeCert,
"-D", conf.GetConfig().KubeData,
"-N", config.GetConfig().NATSUrl,
"-s", fmt.Sprintf("%d", lm.ScheduledTime.Unix()),
}
if lm.Duration > 0 {
@@ -70,9 +72,9 @@ func (lm *LocalMonitor) LaunchMonitor(args []string, execID string, ns string, l
stdoutMonitord, err := cmd.StdoutPipe()
if err != nil {
l.Error().Err(err).Msg("Could not retrieve stdout pipe for oc-monitord")
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
/*oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execID)
}, execID)*/
return
}
+136 -6
View File
@@ -2,13 +2,18 @@ package daemons
import (
"fmt"
"strings"
"time"
"oc-schedulerd/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
wf "cloud.o-forge.io/core/oc-lib/models/workflow"
workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/rs/zerolog"
)
var Executions = ScheduledExecution{Execs: map[string]workflow_execution.WorkflowExecution{}}
@@ -45,9 +50,7 @@ func (em *ExecutionManager) RetrieveNextExecutions() {
execId, exec.ExecDate.Format(time.RFC3339), lead))
// Mark as STARTED immediately (before goroutine) so the next
// SchedulePolling cycle doesn't re-pick this execution from DB.
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.STARTED.EnumIndex(),
}, exec.GetID())
emitExecStateUpdate(exec.GetID(), enum.STARTED)
go em.executeExecution(&exec)
delete(executions, execId)
}
@@ -60,6 +63,130 @@ func (em *ExecutionManager) RetrieveNextExecutions() {
}
}
// validateWorkflowIntegrity loads the workflow referenced by the execution and
// runs structural integrity checks before any resource is booked or any pod is
// started. This is the sovereign enforcement layer — oc-front may be bypassed
// via direct API calls, so oc-schedulerd re-validates independently.
//
// Two layers of validation are applied in order:
// 1. Structural integrity (cycles, missing compute links, variable refs, …).
// 2. Autorisation d'Exploitation (AE) — coupling and peer-usage constraints
// published by resource owners in oc-catalog. Violations are fraudulent and
// trigger a PEER_BEHAVIOR_EVENT(BehaviorFraud) against the consumer peer.
//
// Returns true when the execution is safe to proceed.
// On failure: emits FAILURE state, logs each violation, and returns false.
func (em *ExecutionManager) validateWorkflowIntegrity(execution *workflow_execution.WorkflowExecution, logger zerolog.Logger) bool {
res := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW), nil).LoadOne(execution.WorkflowID)
if res.Err != "" || res.Data == nil {
return true // can't load workflow — let the existing error path handle it downstream
}
workflow, ok := res.Data.(*wf.Workflow)
if !ok {
return true
}
// ── 1. Structural integrity ───────────────────────────────────────────────
violations := workflow.ValidateIntegrity()
var structErrors []wf.IntegrityViolation
for _, v := range violations {
if v.IsError() {
structErrors = append(structErrors, v)
}
}
if len(structErrors) > 0 {
msgs := make([]string, 0, len(structErrors))
for _, v := range structErrors {
msgs = append(msgs, fmt.Sprintf("[%s] %s", v.Type, v.Message))
}
logger.Error().Msg(fmt.Sprintf(
"workflow '%s' (exec %s) rejected — %d integrity violation(s):\n %s",
execution.WorkflowID, execution.GetID(), len(structErrors), strings.Join(msgs, "\n "),
))
emitExecStateUpdate(execution.GetID(), enum.FAILURE)
return false
}
// ── 2. Autorisation d'Exploitation (AE) ──────────────────────────────────
// Build a per-type map of resource IDs referenced in the workflow.
// The workflow's ResourceSet stores raw IDs in Datas/Processings/etc.
resourcesByType := map[tools.DataType][]string{
tools.DATA_RESOURCE: workflow.Datas,
tools.PROCESSING_RESOURCE: workflow.Processings,
tools.STORAGE_RESOURCE: workflow.Storages,
tools.COMPUTE_RESOURCE: workflow.Computes,
tools.WORKFLOW_RESOURCE: workflow.Workflows,
tools.SERVICE_RESOURCE: workflow.Services,
}
// Build a flat ID set for coupling membership checks.
idSet := map[string]struct{}{}
for _, ids := range resourcesByType {
for _, id := range ids {
idSet[id] = struct{}{}
}
}
// Determine the consumer peer (this peer is executing the workflow).
consumerPeerID := ""
if self, err := oclib.GetMySelf(); err == nil && self != nil {
consumerPeerID = self.GetID()
}
aeViolations := checkWorkflowAE(execution.WorkflowID, consumerPeerID, resourcesByType, idSet)
if len(aeViolations) > 0 {
msgs := make([]string, 0, len(aeViolations))
for _, v := range aeViolations {
msgs = append(msgs, fmt.Sprintf("[%s] %s", v.Type, v.Message))
}
logger.Error().Msg(fmt.Sprintf(
"workflow '%s' (exec %s) rejected — %d AE violation(s):\n %s",
execution.WorkflowID, execution.GetID(), len(aeViolations), strings.Join(msgs, "\n "),
))
resources.EmitAEBehaviorReport(consumerPeerID, aeViolations)
emitExecStateUpdate(execution.GetID(), enum.FAILURE)
return false
}
return true
}
// checkWorkflowAE loads each workflow resource from the DB and checks its
// embedded ExploitationAuthorizations against the execution context.
// Kept in oc-schedulerd (not oc-lib/models/resources) to avoid a circular
// import: resources → oclib → models → resources.
func checkWorkflowAE(
workflowID string,
consumerPeerID string,
resourcesByType map[tools.DataType][]string,
idSet map[string]struct{},
) []resources.AEViolation {
now := time.Now().UTC()
var violations []resources.AEViolation
type hasAE interface {
GetExploitationAuthorizations() []resources.ExploitationAuthorization
}
for dt, ids := range resourcesByType {
for _, id := range ids {
res := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).LoadOne(id)
if res.Err != "" || res.Data == nil {
continue
}
ra, ok := res.Data.(hasAE)
if !ok {
continue
}
for _, ae := range ra.GetExploitationAuthorizations() {
vs := ae.CheckAE(id, workflowID, consumerPeerID, idSet, now)
violations = append(violations, vs...)
}
}
}
return violations
}
func (em *ExecutionManager) executeExecution(execution *workflow_execution.WorkflowExecution) {
// start execution
// create the yaml that describes the pod : filename, path/url to Loki
@@ -67,6 +194,11 @@ func (em *ExecutionManager) executeExecution(execution *workflow_execution.Workf
// exec_method := os.Getenv("MONITOR_METHOD")
logger := oclib.GetLogger()
// Sovereign integrity check — reject before touching any resource.
if !em.validateWorkflowIntegrity(execution, logger) {
return
}
duration := 0
if execution.EndDate != nil {
duration = int(execution.EndDate.Sub(execution.ExecDate).Seconds())
@@ -80,9 +212,7 @@ func (em *ExecutionManager) executeExecution(execution *workflow_execution.Workf
if executor == nil {
logger.Fatal().Msg("Could not create executor")
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execution.GetID())
emitExecStateUpdate(execution.GetID(), enum.FAILURE)
return
}
+34
View File
@@ -0,0 +1,34 @@
package daemons
import (
"encoding/json"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
)
// emitExecStateUpdate loads the execution, sets its state and emits a
// CREATE_RESOURCE NATS event so oc-scheduler applies the change and fires
// NotifyChange for the WebSocket streams.
// Direct UpdateOne calls are replaced by this function so oc-scheduler remains
// the single writer for WorkflowExecution.
func emitExecStateUpdate(execID string, state enum.BookingStatus) {
adminReq := &tools.APIRequest{Admin: true}
res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(execID)
if err != nil || res == nil {
return
}
exec := res.(*workflow_execution.WorkflowExecution)
exec.State = state
payload, marshalErr := json.Marshal(exec)
if marshalErr != nil {
return
}
tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
FromApp: "oc-schedulerd",
Datatype: tools.WORKFLOW_EXECUTION,
Method: int(tools.CREATE_RESOURCE),
Payload: payload,
})
}
+2 -4
View File
@@ -41,9 +41,7 @@ func (sb *ScheduledExecution) AddSchedules(new_executions []*workflow_execution.
fmt.Println("Adding "+exec.UUID, !sb.execIsSet(exec))
if !sb.execIsSet(exec) {
sb.Execs[exec.UUID] = *exec
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.SCHEDULED.EnumIndex(),
}, exec.GetID())
emitExecStateUpdate(exec.GetID(), enum.SCHEDULED)
}
}
}
@@ -82,7 +80,7 @@ func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list
"state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}},
},
}
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", "", []string{}, nil).Search(&f, "", false)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", "", []string{}, nil).Search(&f, "", false, 0, 100000)
if res.Code != 200 {
s.Logger.Error().Msg("Error loading " + res.Err)
return