package controllers import ( "encoding/json" "fmt" "slices" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow/graph" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" beego "github.com/beego/beego/v2/server/web" "github.com/google/uuid" ) var orderCollection = oclib.LibDataEnum(oclib.ORDER) var logger = oclib.GetLogger() // Operations about workflow type WorkflowSchedulerController struct { beego.Controller } // @Title Schedule // @Description schedule workflow // @Param id path string true "id execution" // @Param body body models.compute true "The compute content" // @Success 200 {workspace} models.workspace // @router /:id [post] func (o *WorkflowSchedulerController) Schedule() { logger := oclib.GetLogger() code := 200 e := "" user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) wfId := o.Ctx.Input.Param(":id") var resp *workflow_execution.WorkflowSchedule json.Unmarshal(o.Ctx.Input.CopyBody(100000), &resp) caller := tools.NewHTTPCaller(map[tools.DataType]map[tools.METHOD]string{ // paths to call other OC services tools.PEER: { tools.POST: "/status/", }, tools.BOOKING: { tools.GET: "/booking/check/:id/:start_date/:end_date", tools.POST: "/booking/", }, }) logger.Info().Msg("Booking for " + wfId) req := oclib.NewRequest(collection, user, peerID, groups, caller) resp.UUID = uuid.New().String() sch, err := req.Schedule(wfId, resp) if err != nil { if sch != nil { for _, w := range sch.WorkflowExecution { req.DeleteOne(w.GetID()) } } o.Data["json"] = map[string]interface{}{ "data": nil, "code": 409, "error": "Error when scheduling your execution(s): " + err.Error(), } o.ServeJSON() return } logger.Info().Msg("Creating S3 service account if necessary") execs := sch.WorkflowExecution for _, exec := range execs { execId := exec.ExecutionsID logger.Info().Msg("S3 ServiceAccount for " + execId) // execId = "6cdaf6e4-5727-480e-ab97-f78853c4e553" err = createStorageServiceAccount(execId, peerID, wfId, resp, caller, user, groups) if err != nil { // if sch != nil { // for _, w := range sch.WorkflowExecution { // req.DeleteOne(w.GetID()) // } // } o.Data["json"] = map[string]interface{}{ "data": nil, "code": 409, "error": err.Error(), } o.ServeJSON() return } } o.Data["json"] = map[string]interface{}{ "data": sch.WorkflowExecution, "code": code, "error": e, } o.ServeJSON() } // @Title UnSchedule // @Description schedule workflow // @Param id path string true "id execution" // @Param body body models.compute true "The compute content" // @Success 200 {workspace} models.workspace // @router /:id [delete] func (o *WorkflowSchedulerController) UnSchedule() { user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) id := o.Ctx.Input.Param(":id") // TODO UNSCHEDULER filter := &dbs.Filters{ And: map[string][]dbs.Filter{ "workflow_id": {{Operator: dbs.EQUAL.String(), Value: id}}, }, } o.Data["json"] = oclib.NewRequest(collection, user, peerID, groups, nil).Search(filter, "", true) o.ServeJSON() } // @Title SearchScheduledDraftOrder // @Description schedule workflow // @Param id path string true "id execution" // @Success 200 {workspace} models.workspace // @router /:id/order [get] func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() { user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) id := o.Ctx.Input.Param(":id") filter := &dbs.Filters{ And: map[string][]dbs.Filter{ "workflow_id": {{Operator: dbs.EQUAL.String(), Value: id}}, "order_by": {{Operator: dbs.EQUAL.String(), Value: peerID}}, }, } o.Data["json"] = oclib.NewRequest(orderCollection, user, peerID, groups, nil).Search(filter, "", true) o.ServeJSON() } func createStorageServiceAccount(execId string, peerID string, wfId string, wfs *workflow_execution.WorkflowSchedule, caller *tools.HTTPCaller, user string, groups []string) error { // Retrieve the Workflow in the WorkflowSchedule wf := loadWorkflow(wfId,peerID) // storageItems := wf.GetGraphItems(wf.Graph.IsStorage) itemMap := wf.GetItemsByResources() // mapStorageRessources, err := getItemByRessourceId(wf, storageItems) for id, items := range itemMap[tools.STORAGE_RESOURCE] { _ = items // Load the storage s, err := oclib.LoadOneStorage(id, user, peerID, groups) if err != nil { return err } if s.StorageType == enum.S3 { // DEV MULTI PEER MINIO CREDENTIAL CREATION // retrieve all the processing linked to a compute using the storage : processing -- compute -- storage // In this case we need to retrieve the Item ID(s) for each storage to be able to evaluate links with other items associatedComputingResources := getAssociatedComputeRessources(*wf, itemMap[tools.STORAGE_RESOURCE][id]) for _, computeId := range associatedComputingResources { c, err := oclib.LoadOneComputing(computeId, user, peerID, groups) if err != nil { return err } if c.CreatorID == s.CreatorID { // post on datacenter /minio/createServiceAccount err := postCreateServiceAccount(peerID, s, caller, execId, wfId) if err != nil { // Add a logger.Info() here return err } } else { // get on storage datacenter /minio/createServiceAccount access, secret, err := getServiceAccountCredentials(peerID, *s, caller, execId, wfId, *c) if err != nil { // Add a logger.Info() here return err } // post on computing datacenter /minio/createSAsecret err = postS3Secret(peerID, *s, caller, execId, wfId,*c, access, secret) // create the secret holding the retrieved access on c's peer if err != nil { // Add a logger.Info() here return err } } } } } return nil } func postCreateServiceAccount(peerID string, s *resources.StorageResource, caller *tools.HTTPCaller, execId string, wfId string) error { l := oclib.GetLogger() fmt.Println("Creating a service account on " + peerID + " for " + s.Name) res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{}, nil).LoadOne(s.CreatorID) if res.Code != 200 { l.Error().Msg("Error while loading a peer for creation of the serviceAccount") return fmt.Errorf(res.Err) } p := res.ToPeer() caller.URLS[tools.MINIO_SVCACC] = map[tools.METHOD]string{ tools.POST: "/serviceaccount/" + s.UUID + "/" + execId, } l.Debug().Msg("Lauching execution on" + p.UUID) _, err := p.LaunchPeerExecution(p.UUID, wfId, tools.MINIO_SVCACC, tools.POST, nil, caller) if err != nil { l.Error().Msg("Error when executing on peer at " + p.Url + " when creating a S3 service account") l.Error().Msg(err.Error()) return err } if caller.LastResults["code"].(int) != 200 { l.Error().Msg(fmt.Sprint("Error when trying to create a serviceAccount on storage " + s.Name + " on peer at " + p.Url)) if _, ok := caller.LastResults["body"]; ok { l.Error().Msg(string(caller.LastResults["body"].([]byte))) return fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte))) } } return nil } func loadWorkflow(workflowId string, peerId string) *workflow.Workflow { res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerId, []string{},nil).LoadOne(workflowId) if res.Code != 200 { l := oclib.GetLogger() l.Error().Msg("Error while loading a workflow for creation of the serviceAccount") return nil } return res.ToWorkflow() } // func getItemByRessourceId(storages string) (map[string][]string, error) { // var storagesMap map[string][]string // } func getAssociatedComputeRessources(wf workflow.Workflow, storageNodes []string) []string { storageProcessingLinks := make([]string,0) for _, id := range storageNodes{ processings := getStorageRelatedProcessing(wf, id) // Retrieve all the Processing item linked to one storage node for _, procId := range processings { computings := getComputeProcessing(wf, procId) if !slices.Contains(storageProcessingLinks,computings){ storageProcessingLinks= append(storageProcessingLinks, computings) } } } return storageProcessingLinks } // returns a list of processing item's Id that use the Storage // theses item Id can be used to instantiate the resource func getStorageRelatedProcessing(wf workflow.Workflow, storageId string) (relatedProcessing []string) { var storageLinks []graph.GraphLink // Only keep the links that are associated to the storage for _, link := range wf.Graph.Links { if link.Destination.ID == storageId || link.Source.ID == storageId { storageLinks = append(storageLinks,link) } } for _, link := range storageLinks { var resourceId string if link.Source.ID != storageId { resourceId = link.Source.ID } else { resourceId = link.Destination.ID } if wf.Graph.IsProcessing(wf.Graph.Items[resourceId]){ relatedProcessing = append(relatedProcessing, resourceId) } } return } func getComputeProcessing(wf workflow.Workflow, processingId string) (res string) { computeRel := wf.GetByRelatedProcessing(processingId,wf.Graph.IsCompute) for _, rel := range computeRel { return rel.Node.GetID() } return "" } func getServiceAccountCredentials(peerID string, storageRes resources.StorageResource, caller *tools.HTTPCaller, execId string, wfId string, computeRes resources.ComputeResource) (string, string, error) { l := oclib.GetLogger() fmt.Println("Getting a service account for" + computeRes.CreatorID + " on S3 " + storageRes.Name + " on peer " + storageRes.CreatorID ) res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{}, nil).LoadOne(storageRes.CreatorID) if res.Code != 200 { l.Error().Msg("Error while loading a peer for creation of the serviceAccount") return "", "", fmt.Errorf(res.Err) } p := res.ToPeer() caller.URLS[tools.MINIO_SVCACC] = map[tools.METHOD]string{ tools.POST: "/serviceaccount/" + storageRes.UUID + "/" + execId, } body := map[string]bool{"retrieve": true} l.Debug().Msg("Lauching execution on" + p.UUID) resp, err := p.LaunchPeerExecution(p.UUID, wfId, tools.MINIO_SVCACC, tools.POST, body, caller) if err != nil { l.Error().Msg("Error when executing on peer at " + p.Url + " when retrieving S3 credentials") l.Error().Msg(err.Error()) return "", "", err } result_code := caller.LastResults["code"].(int) if !slices.Contains([]int{200,201}, result_code) { l.Error().Msg(fmt.Sprint("Error when trying to create a serviceAccount on storage " + storageRes.Name + " on peer at " + p.Url)) if _, ok := caller.LastResults["body"]; ok { l.Error().Msg(string(caller.LastResults["body"].([]byte))) return "", "", fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte))) } } var access, secret string if a, ok := resp["access"]; !ok { return "", "", fmt.Errorf("Error in the response returned when creating a S3 serviceAccount on " + storageRes.Name + " on peer " + p.UUID) } else { access = a.(string) } if s, ok := resp["secret"]; !ok { return "", "", fmt.Errorf("Error in the response returned when creating a S3 serviceAccount on " + storageRes.Name + " on peer " + p.UUID) } else { secret = s.(string) } return access, secret, nil } func postS3Secret(peerID string, s resources.StorageResource, caller *tools.HTTPCaller, execId string, wfId string, c resources.ComputeResource, access string, secret string) error { l := oclib.GetLogger() res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{}, nil).LoadOne(c.CreatorID) if res.Code != 200 { l.Error().Msg("Error while loading a peer for creation of the serviceAccount") return fmt.Errorf(res.Err) } p := res.ToPeer() caller.URLS[tools.MINIO_SVCACC_SECRET] = map[tools.METHOD]string{ tools.POST: "/secret/" + s.UUID + "/" + execId, } body := map[string]string{"access": access, "secret": secret} _, err := p.LaunchPeerExecution(p.UUID, wfId, tools.MINIO_SVCACC_SECRET, tools.POST, body, caller) if err != nil { l.Error().Msg("Error when executing on peer at " + p.Url + " when creating a secret holding s3 credentials in namespace " + execId) l.Error().Msg(err.Error()) return fmt.Errorf("Error when executing on peer at " + p.Url + " when creating a secret holding s3 credentials" + " : " + err.Error()) } result_code := caller.LastResults["code"].(int) if !slices.Contains([]int{200,201}, result_code) { l.Error().Msg(fmt.Sprint("Error when trying to post the credential to " + s.Name + "to a secret on peer at " + p.Url)) if _, ok := caller.LastResults["body"]; ok { l.Error().Msg(string(caller.LastResults["body"].([]byte))) return fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte))) } } return nil }