package controllers import ( "encoding/json" "fmt" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" beego "github.com/beego/beego/v2/server/web" "github.com/google/uuid" ) var orderCollection = oclib.LibDataEnum(oclib.ORDER) var logger = oclib.GetLogger() // Operations about workflow type WorkflowSchedulerController struct { beego.Controller } // @Title Schedule // @Description schedule workflow // @Param id path string true "id execution" // @Param body body models.compute true "The compute content" // @Success 200 {workspace} models.workspace // @router /:id [post] func (o *WorkflowSchedulerController) Schedule() { logger := oclib.GetLogger() code := 200 e := "" user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) wfId := o.Ctx.Input.Param(":id") var resp *workflow_execution.WorkflowSchedule json.Unmarshal(o.Ctx.Input.CopyBody(100000), &resp) caller := tools.NewHTTPCaller(map[tools.DataType]map[tools.METHOD]string{ // paths to call other OC services tools.PEER: { tools.POST: "/status/", }, tools.BOOKING: { tools.GET: "/booking/check/:id/:start_date/:end_date", tools.POST: "/booking/", }, }) logger.Info().Msg("Booking for " + wfId) req := oclib.NewRequest(collection, user, peerID, groups, caller) resp.UUID = uuid.New().String() sch, err := req.Schedule(wfId, resp) if err != nil { if sch != nil { for _, w := range sch.WorkflowExecution { req.DeleteOne(w.GetID()) } } o.Data["json"] = map[string]interface{}{ "data": nil, "code": 409, "error": "Error when scheduling your execution(s): " + err.Error(), } o.ServeJSON() return } logger.Info().Msg("Creating S3 service account if necessary") execs := sch.WorkflowExecution for _, exec := range execs { execId := exec.ExecutionsID logger.Info().Msg("S3 ServiceAccount for " + execId) // execId = "6cdaf6e4-5727-480e-ab97-f78853c4e553" err = createStorageServiceAccount(execId, peerID, wfId, resp, caller, user, groups) if err != nil { // if sch != nil { // for _, w := range sch.WorkflowExecution { // req.DeleteOne(w.GetID()) // } // } o.Data["json"] = map[string]interface{}{ "data": nil, "code": 409, "error": err.Error(), } o.ServeJSON() return } } o.Data["json"] = map[string]interface{}{ "data": sch.WorkflowExecution, "code": code, "error": e, } o.ServeJSON() } // @Title UnSchedule // @Description schedule workflow // @Param id path string true "id execution" // @Param body body models.compute true "The compute content" // @Success 200 {workspace} models.workspace // @router /:id [delete] func (o *WorkflowSchedulerController) UnSchedule() { user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) id := o.Ctx.Input.Param(":id") // TODO UNSCHEDULER filter := &dbs.Filters{ And: map[string][]dbs.Filter{ "workflow_id": {{Operator: dbs.EQUAL.String(), Value: id}}, }, } o.Data["json"] = oclib.NewRequest(collection, user, peerID, groups, nil).Search(filter, "", true) o.ServeJSON() } // @Title SearchScheduledDraftOrder // @Description schedule workflow // @Param id path string true "id execution" // @Success 200 {workspace} models.workspace // @router /:id/order [get] func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() { user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) id := o.Ctx.Input.Param(":id") filter := &dbs.Filters{ And: map[string][]dbs.Filter{ "workflow_id": {{Operator: dbs.EQUAL.String(), Value: id}}, "order_by": {{Operator: dbs.EQUAL.String(), Value: peerID}}, }, } o.Data["json"] = oclib.NewRequest(orderCollection, user, peerID, groups, nil).Search(filter, "", true) o.ServeJSON() } func createStorageServiceAccount(execId string, peerID string, wfId string, wfs *workflow_execution.WorkflowSchedule, caller *tools.HTTPCaller, user string, groups []string) error { l := oclib.GetLogger() // Retrieve the Workflow in the WorkflowSchedule // For each storage wf := loadWorkflow(wfId,peerID) for _, id := range wf.Storages { // Load the storage res := oclib.NewRequest(oclib.LibDataEnum(oclib.STORAGE_RESOURCE), user, peerID, groups,nil).LoadOne(id) if res.Code != 200 { l := oclib.GetLogger() l.Error().Msg("Error while loading a storage for creation of the serviceAccount") return fmt.Errorf(res.Err) } s := res.ToStorageResource() if s.StorageType == enum.S3 { fmt.Println("Creating a service account on " + peerID + " for " + s.Name) res = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{},nil).LoadOne(s.CreatorID) if res.Code != 200 { l.Error().Msg("Error while loading a peer for creation of the serviceAccount") return fmt.Errorf(res.Err) } p := res.ToPeer() caller.URLS[tools.MINIO_SVCACC] = map[tools.METHOD]string{ tools.POST: "/serviceaccount/" + s.UUID + "/" + execId , } l.Debug().Msg("Lauching execution on" + p.UUID) _, err := p.LaunchPeerExecution(p.UUID, wfId,tools.MINIO_SVCACC,tools.POST,nil,caller) if err != nil { l.Error().Msg("Error when executing on peer at " + p.Url) l.Error().Msg(err.Error()) return err } if caller.LastResults["code"].(int) != 200 { l.Error().Msg(fmt.Sprint("Error when trying to create a serviceAccount on storage " + s.Name + " on peer at " + p.Url)) if _, ok := caller.LastResults["body"]; ok { l.Error().Msg(string(caller.LastResults["body"].([]byte))) return fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte))) } } } } return nil } func loadWorkflow(workflowId string, peerId string) *workflow.Workflow { res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerId, []string{},nil).LoadOne(workflowId) if res.Code != 200 { l := oclib.GetLogger() l.Error().Msg("Error while loading a workflow for creation of the serviceAccount") return nil } return res.ToWorkflow() }