375 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			375 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package controllers
 | 
						|
 | 
						|
import (
 | 
						|
	"encoding/json"
 | 
						|
	"fmt"
 | 
						|
	"slices"
 | 
						|
 | 
						|
	oclib "cloud.o-forge.io/core/oc-lib"
 | 
						|
	"cloud.o-forge.io/core/oc-lib/dbs"
 | 
						|
	"cloud.o-forge.io/core/oc-lib/models/common/enum"
 | 
						|
	"cloud.o-forge.io/core/oc-lib/models/resources"
 | 
						|
	"cloud.o-forge.io/core/oc-lib/models/workflow"
 | 
						|
	"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
 | 
						|
	"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
 | 
						|
	"cloud.o-forge.io/core/oc-lib/tools"
 | 
						|
	beego "github.com/beego/beego/v2/server/web"
 | 
						|
	"github.com/google/uuid"
 | 
						|
)
 | 
						|
 | 
						|
var orderCollection = oclib.LibDataEnum(oclib.ORDER)
 | 
						|
var logger = oclib.GetLogger()
 | 
						|
 | 
						|
// Operations about workflow
 | 
						|
type WorkflowSchedulerController struct {
 | 
						|
	beego.Controller
 | 
						|
}
 | 
						|
 | 
						|
// @Title Schedule
 | 
						|
// @Description schedule workflow
 | 
						|
// @Param	id		path 	string	true		"id execution"
 | 
						|
// @Param	body		body 	models.compute	true		"The compute content"
 | 
						|
// @Success 200 {workspace} models.workspace
 | 
						|
// @router /:id [post]
 | 
						|
func (o *WorkflowSchedulerController) Schedule() {
 | 
						|
	logger := oclib.GetLogger()
 | 
						|
 | 
						|
	code := 200
 | 
						|
	e := ""
 | 
						|
	user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
 | 
						|
	wfId := o.Ctx.Input.Param(":id")
 | 
						|
	var resp *workflow_execution.WorkflowSchedule
 | 
						|
	json.Unmarshal(o.Ctx.Input.CopyBody(100000), &resp)
 | 
						|
	
 | 
						|
	caller := tools.NewHTTPCaller(map[tools.DataType]map[tools.METHOD]string{ // paths to call other OC services
 | 
						|
		tools.PEER: {
 | 
						|
			tools.POST: "/status/",
 | 
						|
		},
 | 
						|
		tools.BOOKING: {
 | 
						|
			tools.GET:  "/booking/check/:id/:start_date/:end_date",
 | 
						|
			tools.POST: "/booking/",
 | 
						|
		},
 | 
						|
		
 | 
						|
	})
 | 
						|
 | 
						|
	logger.Info().Msg("Booking for " + wfId)
 | 
						|
	req := oclib.NewRequest(collection, user, peerID, groups, caller)
 | 
						|
	resp.UUID = uuid.New().String()
 | 
						|
 | 
						|
	sch, err := req.Schedule(wfId, resp)
 | 
						|
	if err != nil {
 | 
						|
		if sch != nil {
 | 
						|
			for _, w := range sch.WorkflowExecution {
 | 
						|
				req.DeleteOne(w.GetID())
 | 
						|
			}
 | 
						|
		}
 | 
						|
		o.Data["json"] = map[string]interface{}{
 | 
						|
			"data":  nil,
 | 
						|
			"code":  409,
 | 
						|
			"error": "Error when scheduling your execution(s): " + err.Error(),
 | 
						|
		}
 | 
						|
		o.ServeJSON()
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	logger.Info().Msg("Creating S3 service account if necessary")
 | 
						|
	execs := sch.WorkflowExecution
 | 
						|
	for _, exec := range execs {
 | 
						|
		execId := exec.ExecutionsID
 | 
						|
		logger.Info().Msg("S3 ServiceAccount for " + execId)
 | 
						|
		// execId = "6cdaf6e4-5727-480e-ab97-f78853c4e553"
 | 
						|
		err = createStorageServiceAccount(execId, peerID, wfId, resp, caller, user, groups)
 | 
						|
		if err != nil {
 | 
						|
			// if sch != nil {
 | 
						|
			// 	for _, w := range sch.WorkflowExecution {
 | 
						|
			// 		req.DeleteOne(w.GetID())
 | 
						|
			// 	}
 | 
						|
			// }
 | 
						|
			o.Data["json"] = map[string]interface{}{
 | 
						|
				"data":  nil,
 | 
						|
				"code":  409,
 | 
						|
				"error": err.Error(),
 | 
						|
			}
 | 
						|
			o.ServeJSON()
 | 
						|
			return
 | 
						|
		}	
 | 
						|
	}
 | 
						|
	
 | 
						|
	o.Data["json"] = map[string]interface{}{
 | 
						|
		"data":  sch.WorkflowExecution,
 | 
						|
		"code":  code,
 | 
						|
		"error": e,
 | 
						|
	}
 | 
						|
	o.ServeJSON()
 | 
						|
}
 | 
						|
 | 
						|
// @Title UnSchedule
 | 
						|
// @Description schedule workflow
 | 
						|
// @Param	id		path 	string	true		"id execution"
 | 
						|
// @Param	body		body 	models.compute	true		"The compute content"
 | 
						|
// @Success 200 {workspace} models.workspace
 | 
						|
// @router /:id [delete]
 | 
						|
func (o *WorkflowSchedulerController) UnSchedule() {
 | 
						|
	user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
 | 
						|
	id := o.Ctx.Input.Param(":id")
 | 
						|
	// TODO UNSCHEDULER
 | 
						|
	filter := &dbs.Filters{
 | 
						|
		And: map[string][]dbs.Filter{
 | 
						|
			"workflow_id": {{Operator: dbs.EQUAL.String(), Value: id}},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	o.Data["json"] = oclib.NewRequest(collection, user, peerID, groups, nil).Search(filter, "", true)
 | 
						|
	o.ServeJSON()
 | 
						|
}
 | 
						|
 | 
						|
// @Title SearchScheduledDraftOrder
 | 
						|
// @Description schedule workflow
 | 
						|
// @Param	id		path 	string	true		"id execution"
 | 
						|
// @Success 200 {workspace} models.workspace
 | 
						|
// @router /:id/order [get]
 | 
						|
func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() {
 | 
						|
	user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
 | 
						|
	id := o.Ctx.Input.Param(":id")
 | 
						|
	filter := &dbs.Filters{
 | 
						|
		And: map[string][]dbs.Filter{
 | 
						|
			"workflow_id": {{Operator: dbs.EQUAL.String(), Value: id}},
 | 
						|
			"order_by":    {{Operator: dbs.EQUAL.String(), Value: peerID}},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	o.Data["json"] = oclib.NewRequest(orderCollection, user, peerID, groups, nil).Search(filter, "", true)
 | 
						|
	o.ServeJSON()
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
func  createStorageServiceAccount(execId string, peerID string, wfId string, wfs *workflow_execution.WorkflowSchedule, caller *tools.HTTPCaller, user string, groups []string) error {
 | 
						|
	// Retrieve the Workflow in the WorkflowSchedule
 | 
						|
	wf := loadWorkflow(wfId,peerID)
 | 
						|
	// storageItems := wf.GetGraphItems(wf.Graph.IsStorage)
 | 
						|
	itemMap := wf.GetItemsByResources()
 | 
						|
	// mapStorageRessources, err := getItemByRessourceId(wf, storageItems)
 | 
						|
	for id, items := range itemMap[tools.STORAGE_RESOURCE] {
 | 
						|
		_ = items
 | 
						|
		// Load the storage
 | 
						|
		s, err := oclib.LoadOneStorage(id, user, peerID, groups)
 | 
						|
		if err != nil {
 | 
						|
			return err
 | 
						|
		}
 | 
						|
 | 
						|
		if s.StorageType == enum.S3 {
 | 
						|
			// DEV MULTI PEER MINIO CREDENTIAL CREATION
 | 
						|
	
 | 
						|
			// retrieve all the processing linked to a compute using the storage : processing -- compute -- storage
 | 
						|
			// In this case we need to retrieve the Item ID(s) for each storage to be able to evaluate links with other items 
 | 
						|
 				associatedComputingResources := getAssociatedComputeRessources(*wf, itemMap[tools.STORAGE_RESOURCE][id])
 | 
						|
			for _, computeId := range associatedComputingResources {
 | 
						|
 | 
						|
				c, err := oclib.LoadOneComputing(computeId, user, peerID, groups)
 | 
						|
				if err != nil {
 | 
						|
					return err
 | 
						|
				}
 | 
						|
				
 | 
						|
				if c.CreatorID == s.CreatorID {
 | 
						|
					// post on datacenter /minio/createServiceAccount
 | 
						|
					err := postCreateServiceAccount(peerID, s, caller, execId, wfId)
 | 
						|
					if err != nil {
 | 
						|
						// Add a logger.Info() here
 | 
						|
						return err
 | 
						|
					}
 | 
						|
				} else {
 | 
						|
					// get on storage datacenter /minio/createServiceAccount
 | 
						|
					access, secret, err := getServiceAccountCredentials(peerID, *s, caller, execId, wfId, *c)
 | 
						|
					if err != nil {
 | 
						|
						// Add a logger.Info() here
 | 
						|
						return err
 | 
						|
					}
 | 
						|
					// post on computing datacenter /minio/createSAsecret
 | 
						|
					err = postS3Secret(peerID, *s, caller, execId, wfId,*c, access, secret)	 // create the secret holding the retrieved access on c's peer
 | 
						|
					if err != nil {
 | 
						|
						// Add a logger.Info() here
 | 
						|
						return err
 | 
						|
					}
 | 
						|
				}
 | 
						|
 | 
						|
			}
 | 
						|
 | 
						|
		}
 | 
						|
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func postCreateServiceAccount(peerID string, s *resources.StorageResource, caller *tools.HTTPCaller, execId string, wfId string) error {
 | 
						|
	l := oclib.GetLogger()
 | 
						|
	fmt.Println("Creating a service account on " + peerID + " for " + s.Name)
 | 
						|
	res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{}, nil).LoadOne(s.CreatorID)
 | 
						|
	if res.Code != 200 {
 | 
						|
		l.Error().Msg("Error while loading a peer for creation of the serviceAccount")
 | 
						|
		return fmt.Errorf(res.Err)
 | 
						|
	}
 | 
						|
	p := res.ToPeer()
 | 
						|
 | 
						|
	caller.URLS[tools.MINIO_SVCACC] = map[tools.METHOD]string{
 | 
						|
		tools.POST: "/serviceaccount/" + s.UUID + "/" + execId,
 | 
						|
	}
 | 
						|
 | 
						|
	l.Debug().Msg("Lauching execution on" + p.UUID)
 | 
						|
	_, err := p.LaunchPeerExecution(p.UUID, wfId, tools.MINIO_SVCACC, tools.POST, nil, caller)
 | 
						|
	if err != nil {
 | 
						|
		l.Error().Msg("Error when executing on peer at " + p.Url + " when creating a S3 service account")
 | 
						|
		l.Error().Msg(err.Error())
 | 
						|
		return err
 | 
						|
	}
 | 
						|
	if caller.LastResults["code"].(int) != 200 {
 | 
						|
		l.Error().Msg(fmt.Sprint("Error when trying to create a serviceAccount on storage " + s.Name + " on peer at " + p.Url))
 | 
						|
		if _, ok := caller.LastResults["body"]; ok {
 | 
						|
			l.Error().Msg(string(caller.LastResults["body"].([]byte)))
 | 
						|
			return fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte)))
 | 
						|
		}
 | 
						|
 | 
						|
	}
 | 
						|
	return nil
 | 
						|
}
 | 
						|
 | 
						|
func loadWorkflow(workflowId string, peerId string) *workflow.Workflow {
 | 
						|
	res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerId, []string{},nil).LoadOne(workflowId)
 | 
						|
	if res.Code != 200 {
 | 
						|
		l := oclib.GetLogger()
 | 
						|
		l.Error().Msg("Error while loading a workflow for creation of the serviceAccount")
 | 
						|
		return nil 
 | 
						|
	}
 | 
						|
 | 
						|
	return res.ToWorkflow()
 | 
						|
 | 
						|
} 
 | 
						|
 | 
						|
// func getItemByRessourceId(storages string) (map[string][]string, error) {
 | 
						|
// 	var storagesMap map[string][]string
 | 
						|
// }
 | 
						|
 | 
						|
func getAssociatedComputeRessources(wf workflow.Workflow, storageNodes []string) []string {
 | 
						|
	storageProcessingLinks := make([]string,0)
 | 
						|
	for _, id := range storageNodes{
 | 
						|
		processings := getStorageRelatedProcessing(wf, id)	// Retrieve all the Processing item linked to one storage node
 | 
						|
		for _, procId := range processings {
 | 
						|
			computings := getComputeProcessing(wf, procId)
 | 
						|
			if !slices.Contains(storageProcessingLinks,computings){
 | 
						|
				storageProcessingLinks= append(storageProcessingLinks, computings) 
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return storageProcessingLinks
 | 
						|
}
 | 
						|
 | 
						|
// returns a list of processing item's Id  that use the Storage
 | 
						|
// theses item Id can be used to instantiate the resource 
 | 
						|
func getStorageRelatedProcessing(wf workflow.Workflow, storageId string) (relatedProcessing []string) {
 | 
						|
	var storageLinks []graph.GraphLink
 | 
						|
	// Only keep the links that are associated to the storage
 | 
						|
	for _, link := range wf.Graph.Links {
 | 
						|
		if link.Destination.ID == storageId || link.Source.ID == storageId {
 | 
						|
			storageLinks = append(storageLinks,link)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	for _, link := range storageLinks {
 | 
						|
		var resourceId string
 | 
						|
		if link.Source.ID != storageId { resourceId = link.Source.ID } else { resourceId = link.Destination.ID }
 | 
						|
		if wf.Graph.IsProcessing(wf.Graph.Items[resourceId]){ relatedProcessing = append(relatedProcessing, resourceId) }
 | 
						|
	}
 | 
						|
 | 
						|
	return
 | 
						|
}
 | 
						|
 | 
						|
func getComputeProcessing(wf workflow.Workflow, processingId string) (res string) {
 | 
						|
	computeRel := wf.GetByRelatedProcessing(processingId,wf.Graph.IsCompute)
 | 
						|
	for _, rel := range computeRel {
 | 
						|
		return rel.Node.GetID()
 | 
						|
	}
 | 
						|
	
 | 
						|
	return ""
 | 
						|
}
 | 
						|
 | 
						|
func getServiceAccountCredentials(peerID string, storageRes resources.StorageResource, caller *tools.HTTPCaller, execId string, wfId string, computeRes resources.ComputeResource) (string, string, error) {
 | 
						|
	l := oclib.GetLogger()
 | 
						|
	fmt.Println("Getting a service account for" + computeRes.CreatorID + " on S3 " + storageRes.Name + " on peer " + storageRes.CreatorID )
 | 
						|
	res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{}, nil).LoadOne(storageRes.CreatorID)
 | 
						|
	if res.Code != 200 {
 | 
						|
		l.Error().Msg("Error while loading a peer for creation of the serviceAccount")
 | 
						|
		return "", "", fmt.Errorf(res.Err)
 | 
						|
	}
 | 
						|
	p := res.ToPeer()
 | 
						|
 | 
						|
	caller.URLS[tools.MINIO_SVCACC] = map[tools.METHOD]string{
 | 
						|
		tools.POST: "/serviceaccount/" + storageRes.UUID + "/" + execId,
 | 
						|
	}
 | 
						|
	body := map[string]bool{"retrieve": true}
 | 
						|
 | 
						|
	l.Debug().Msg("Lauching execution on" + p.UUID)
 | 
						|
	resp, err := p.LaunchPeerExecution(p.UUID, wfId, tools.MINIO_SVCACC, tools.POST, body, caller)
 | 
						|
	if err != nil {
 | 
						|
		l.Error().Msg("Error when executing on peer at " + p.Url + " when retrieving S3 credentials")
 | 
						|
		l.Error().Msg(err.Error())
 | 
						|
		return "", "", err
 | 
						|
	}
 | 
						|
	
 | 
						|
	result_code := caller.LastResults["code"].(int)
 | 
						|
	if !slices.Contains([]int{200,201}, result_code)  {
 | 
						|
		l.Error().Msg(fmt.Sprint("Error when trying to create a serviceAccount on storage " + storageRes.Name + " on peer at " + p.Url))
 | 
						|
		if _, ok := caller.LastResults["body"]; ok {
 | 
						|
			l.Error().Msg(string(caller.LastResults["body"].([]byte)))
 | 
						|
			return "", "", fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte)))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	var access, secret string
 | 
						|
	if a, ok := resp["access"]; !ok {
 | 
						|
		return "", "", fmt.Errorf("Error in the response returned when creating a S3 serviceAccount on " + storageRes.Name + " on peer " + p.UUID)
 | 
						|
	} else {
 | 
						|
		access = a.(string)		
 | 
						|
	}
 | 
						|
 | 
						|
	if s, ok := resp["secret"]; !ok {
 | 
						|
		return "", "", fmt.Errorf("Error in the response returned when creating a S3 serviceAccount on " + storageRes.Name + " on peer " + p.UUID)
 | 
						|
	} else {
 | 
						|
		secret = s.(string)
 | 
						|
	}
 | 
						|
 | 
						|
	
 | 
						|
	return access, secret, nil
 | 
						|
}
 | 
						|
 | 
						|
func postS3Secret(peerID string, s resources.StorageResource, caller *tools.HTTPCaller, execId string, wfId string, c resources.ComputeResource, access string, secret string) error {
 | 
						|
	l := oclib.GetLogger()
 | 
						|
 | 
						|
	res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{}, nil).LoadOne(c.CreatorID)
 | 
						|
	if res.Code != 200 {
 | 
						|
		l.Error().Msg("Error while loading a peer for creation of the serviceAccount")
 | 
						|
		return fmt.Errorf(res.Err)
 | 
						|
	}
 | 
						|
	p := res.ToPeer()
 | 
						|
 | 
						|
	caller.URLS[tools.MINIO_SVCACC_SECRET] = map[tools.METHOD]string{
 | 
						|
		tools.POST: "/secret/" + s.UUID + "/" + execId,
 | 
						|
	}
 | 
						|
	body := map[string]string{"access": access, "secret": secret}
 | 
						|
 | 
						|
	_, err := p.LaunchPeerExecution(p.UUID, wfId, tools.MINIO_SVCACC_SECRET, tools.POST, body, caller)
 | 
						|
	if err != nil {
 | 
						|
		l.Error().Msg("Error when executing on peer at " + p.Url + " when creating a secret holding s3 credentials in namespace " + execId)
 | 
						|
		l.Error().Msg(err.Error())
 | 
						|
		return fmt.Errorf("Error when executing on peer at " + p.Url + " when creating a secret holding s3 credentials" + " : " + err.Error()) 
 | 
						|
	}
 | 
						|
	
 | 
						|
	result_code := caller.LastResults["code"].(int)
 | 
						|
	if !slices.Contains([]int{200,201}, result_code)  {
 | 
						|
		l.Error().Msg(fmt.Sprint("Error when trying to post the credential to " + s.Name +  "to a secret on peer at " + p.Url))
 | 
						|
		if _, ok := caller.LastResults["body"]; ok {
 | 
						|
			l.Error().Msg(string(caller.LastResults["body"].([]byte)))
 | 
						|
			return  fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte)))
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	return nil
 | 
						|
} |