Compare commits
9 Commits
feature/cr
...
feature/ev
| Author | SHA1 | Date | |
|---|---|---|---|
| 5302ed48b3 | |||
| 60492ed8f5 | |||
| de2f486ebb | |||
| 637bdcaeab | |||
|
|
6897fd192e | ||
| 97b51efd50 | |||
| 07ce87bb49 | |||
| 5d1acac9ae | |||
| e74f3adcaa |
3
.gitattributes
vendored
Normal file
3
.gitattributes
vendored
Normal file
@@ -0,0 +1,3 @@
|
||||
# Force Go as the main language
|
||||
*.go linguist-detectable=true
|
||||
* linguist-language=Go
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -8,7 +8,7 @@
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
|
||||
env.env
|
||||
# Test binary, built with `go test -c`
|
||||
*.test
|
||||
|
||||
|
||||
@@ -30,7 +30,6 @@ COPY . .
|
||||
RUN sed -i '/replace/d' go.mod
|
||||
RUN bee pack
|
||||
RUN mkdir -p /app/extracted && tar -zxvf oc-scheduler.tar.gz -C /app/extracted
|
||||
RUN sed -i 's/http:\/\/127.0.0.1:8080\/swagger\/swagger.json/swagger.json/g' /app/extracted/swagger/index.html
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
|
||||
|
||||
11
Makefile
11
Makefile
@@ -21,15 +21,20 @@ clean:
|
||||
rm -rf oc-peer.tar.gz
|
||||
|
||||
docker:
|
||||
DOCKER_BUILDKIT=1 docker build -t oc/oc-scheduler:0.0.1 -f Dockerfile .
|
||||
docker tag oc/oc-scheduler:0.0.1 oc/oc-scheduler:latest
|
||||
DOCKER_BUILDKIT=1 docker build -t oc-scheduler -f Dockerfile . --build-arg=HOST=$(HOST)
|
||||
docker tag oc-scheduler:latest oc/oc-scheduler:0.0.1
|
||||
|
||||
publish-kind:
|
||||
kind load docker-image oc/oc-scheduler:0.0.1 --name opencloud
|
||||
kind load docker-image oc/oc-scheduler:0.0.1 --name opencloud | true
|
||||
|
||||
publish-registry:
|
||||
@echo "TODO"
|
||||
|
||||
docker-deploy:
|
||||
docker compose up -d
|
||||
|
||||
run-docker: docker publish-kind publish-registry docker-deploy
|
||||
|
||||
all: docker publish-kind publish-registry
|
||||
|
||||
.PHONY: build run clean docker publish-kind publish-registry
|
||||
|
||||
@@ -58,6 +58,7 @@ func (o *LokiController) GetLogs() {
|
||||
path += "?query={" + strings.Join(query, ", ") + "}&start=" + start + "&end=" + end
|
||||
|
||||
resp, err := http.Get(config.GetConfig().LokiUrl + path) // CALL
|
||||
fmt.Println(resp, path)
|
||||
if err != nil {
|
||||
o.Ctx.ResponseWriter.WriteHeader(422)
|
||||
o.Data["json"] = map[string]string{"error": err.Error()}
|
||||
@@ -66,15 +67,16 @@ func (o *LokiController) GetLogs() {
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
var data map[string]interface{}
|
||||
err = json.Unmarshal(body,&data)
|
||||
var result map[string]interface{}
|
||||
// Unmarshal: string → []byte → object
|
||||
err = json.Unmarshal(body, &result)
|
||||
if err != nil {
|
||||
o.Ctx.ResponseWriter.WriteHeader(500)
|
||||
o.Data["json"] = err.Error()
|
||||
o.Ctx.ResponseWriter.WriteHeader(403)
|
||||
o.Data["json"] = map[string]string{"error": err.Error()}
|
||||
o.ServeJSON()
|
||||
return
|
||||
}
|
||||
|
||||
o.Data["json"] = data
|
||||
o.Data["json"] = result
|
||||
o.ServeJSON()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package controllers
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"oc-scheduler/infrastructure"
|
||||
"slices"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
@@ -11,7 +12,6 @@ import (
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
beego "github.com/beego/beego/v2/server/web"
|
||||
"github.com/google/uuid"
|
||||
@@ -38,7 +38,7 @@ func (o *WorkflowSchedulerController) Schedule() {
|
||||
e := ""
|
||||
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
wfId := o.Ctx.Input.Param(":id")
|
||||
var resp *workflow_execution.WorkflowSchedule
|
||||
var resp *infrastructure.WorkflowSchedule
|
||||
json.Unmarshal(o.Ctx.Input.CopyBody(100000), &resp)
|
||||
|
||||
caller := tools.NewHTTPCaller(map[tools.DataType]map[tools.METHOD]string{ // paths to call other OC services
|
||||
@@ -49,14 +49,18 @@ func (o *WorkflowSchedulerController) Schedule() {
|
||||
tools.GET: "/booking/check/:id/:start_date/:end_date",
|
||||
tools.POST: "/booking/",
|
||||
},
|
||||
|
||||
})
|
||||
|
||||
logger.Info().Msg("Booking for " + wfId)
|
||||
req := oclib.NewRequest(collection, user, peerID, groups, caller)
|
||||
resp.UUID = uuid.New().String()
|
||||
|
||||
sch, err := req.Schedule(wfId, resp)
|
||||
sch, _, execs, err := resp.Schedules(wfId, &tools.APIRequest{
|
||||
Username: user,
|
||||
PeerID: peerID,
|
||||
Groups: groups,
|
||||
Caller: caller,
|
||||
})
|
||||
if err != nil {
|
||||
if sch != nil {
|
||||
for _, w := range sch.WorkflowExecution {
|
||||
@@ -73,12 +77,11 @@ func (o *WorkflowSchedulerController) Schedule() {
|
||||
}
|
||||
|
||||
logger.Info().Msg("Creating S3 service account if necessary")
|
||||
execs := sch.WorkflowExecution
|
||||
for _, exec := range execs {
|
||||
execId := exec.ExecutionsID
|
||||
logger.Info().Msg("S3 ServiceAccount for " + execId)
|
||||
// execId = "6cdaf6e4-5727-480e-ab97-f78853c4e553"
|
||||
err = createStorageServiceAccount(execId, peerID, wfId, resp, caller, user, groups)
|
||||
err = createStorageServiceAccount(execId, peerID, wfId, sch, caller, user, groups)
|
||||
if err != nil {
|
||||
// if sch != nil {
|
||||
// for _, w := range sch.WorkflowExecution {
|
||||
@@ -140,10 +143,9 @@ func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() {
|
||||
o.ServeJSON()
|
||||
}
|
||||
|
||||
|
||||
func createStorageServiceAccount(execId string, peerID string, wfId string, wfs *workflow_execution.WorkflowSchedule, caller *tools.HTTPCaller, user string, groups []string) error {
|
||||
func createStorageServiceAccount(execId string, peerID string, wfId string, wfs *infrastructure.WorkflowSchedule, caller *tools.HTTPCaller, user string, groups []string) error {
|
||||
// Retrieve the Workflow in the WorkflowSchedule
|
||||
wf := loadWorkflow(wfId,peerID)
|
||||
wf := loadWorkflow(wfId, peerID)
|
||||
// storageItems := wf.GetGraphItems(wf.Graph.IsStorage)
|
||||
itemMap := wf.GetItemsByResources()
|
||||
// mapStorageRessources, err := getItemByRessourceId(wf, storageItems)
|
||||
@@ -160,7 +162,7 @@ func createStorageServiceAccount(execId string, peerID string, wfId string, wfs
|
||||
|
||||
// retrieve all the processing linked to a compute using the storage : processing -- compute -- storage
|
||||
// In this case we need to retrieve the Item ID(s) for each storage to be able to evaluate links with other items
|
||||
associatedComputingResources := getAssociatedComputeRessources(*wf, itemMap[tools.STORAGE_RESOURCE][id])
|
||||
associatedComputingResources := getAssociatedComputeRessources(*wf, itemMap[tools.STORAGE_RESOURCE][id])
|
||||
for _, computeId := range associatedComputingResources {
|
||||
|
||||
c, err := oclib.LoadOneComputing(computeId, user, peerID, groups)
|
||||
@@ -183,7 +185,7 @@ func createStorageServiceAccount(execId string, peerID string, wfId string, wfs
|
||||
return err
|
||||
}
|
||||
// post on computing datacenter /minio/createSAsecret
|
||||
err = postS3Secret(peerID, *s, caller, execId, wfId,*c, access, secret) // create the secret holding the retrieved access on c's peer
|
||||
err = postS3Secret(peerID, *s, caller, execId, wfId, *c, access, secret) // create the secret holding the retrieved access on c's peer
|
||||
if err != nil {
|
||||
// Add a logger.Info() here
|
||||
return err
|
||||
@@ -232,7 +234,7 @@ func postCreateServiceAccount(peerID string, s *resources.StorageResource, calle
|
||||
}
|
||||
|
||||
func loadWorkflow(workflowId string, peerId string) *workflow.Workflow {
|
||||
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerId, []string{},nil).LoadOne(workflowId)
|
||||
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerId, []string{}, nil).LoadOne(workflowId)
|
||||
if res.Code != 200 {
|
||||
l := oclib.GetLogger()
|
||||
l.Error().Msg("Error while loading a workflow for creation of the serviceAccount")
|
||||
@@ -248,13 +250,13 @@ func loadWorkflow(workflowId string, peerId string) *workflow.Workflow {
|
||||
// }
|
||||
|
||||
func getAssociatedComputeRessources(wf workflow.Workflow, storageNodes []string) []string {
|
||||
storageProcessingLinks := make([]string,0)
|
||||
for _, id := range storageNodes{
|
||||
processings := getStorageRelatedProcessing(wf, id) // Retrieve all the Processing item linked to one storage node
|
||||
storageProcessingLinks := make([]string, 0)
|
||||
for _, id := range storageNodes {
|
||||
processings := getStorageRelatedProcessing(wf, id) // Retrieve all the Processing item linked to one storage node
|
||||
for _, procId := range processings {
|
||||
computings := getComputeProcessing(wf, procId)
|
||||
if !slices.Contains(storageProcessingLinks,computings){
|
||||
storageProcessingLinks= append(storageProcessingLinks, computings)
|
||||
if !slices.Contains(storageProcessingLinks, computings) {
|
||||
storageProcessingLinks = append(storageProcessingLinks, computings)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -269,21 +271,27 @@ func getStorageRelatedProcessing(wf workflow.Workflow, storageId string) (relate
|
||||
// Only keep the links that are associated to the storage
|
||||
for _, link := range wf.Graph.Links {
|
||||
if link.Destination.ID == storageId || link.Source.ID == storageId {
|
||||
storageLinks = append(storageLinks,link)
|
||||
storageLinks = append(storageLinks, link)
|
||||
}
|
||||
}
|
||||
|
||||
for _, link := range storageLinks {
|
||||
var resourceId string
|
||||
if link.Source.ID != storageId { resourceId = link.Source.ID } else { resourceId = link.Destination.ID }
|
||||
if wf.Graph.IsProcessing(wf.Graph.Items[resourceId]){ relatedProcessing = append(relatedProcessing, resourceId) }
|
||||
if link.Source.ID != storageId {
|
||||
resourceId = link.Source.ID
|
||||
} else {
|
||||
resourceId = link.Destination.ID
|
||||
}
|
||||
if wf.Graph.IsProcessing(wf.Graph.Items[resourceId]) {
|
||||
relatedProcessing = append(relatedProcessing, resourceId)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func getComputeProcessing(wf workflow.Workflow, processingId string) (res string) {
|
||||
computeRel := wf.GetByRelatedProcessing(processingId,wf.Graph.IsCompute)
|
||||
computeRel := wf.GetByRelatedProcessing(processingId, wf.Graph.IsCompute)
|
||||
for _, rel := range computeRel {
|
||||
return rel.Node.GetID()
|
||||
}
|
||||
@@ -293,7 +301,7 @@ func getComputeProcessing(wf workflow.Workflow, processingId string) (res string
|
||||
|
||||
func getServiceAccountCredentials(peerID string, storageRes resources.StorageResource, caller *tools.HTTPCaller, execId string, wfId string, computeRes resources.ComputeResource) (string, string, error) {
|
||||
l := oclib.GetLogger()
|
||||
fmt.Println("Getting a service account for" + computeRes.CreatorID + " on S3 " + storageRes.Name + " on peer " + storageRes.CreatorID )
|
||||
fmt.Println("Getting a service account for" + computeRes.CreatorID + " on S3 " + storageRes.Name + " on peer " + storageRes.CreatorID)
|
||||
res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{}, nil).LoadOne(storageRes.CreatorID)
|
||||
if res.Code != 200 {
|
||||
l.Error().Msg("Error while loading a peer for creation of the serviceAccount")
|
||||
@@ -315,7 +323,7 @@ func getServiceAccountCredentials(peerID string, storageRes resources.StorageRes
|
||||
}
|
||||
|
||||
result_code := caller.LastResults["code"].(int)
|
||||
if !slices.Contains([]int{200,201}, result_code) {
|
||||
if !slices.Contains([]int{200, 201}, result_code) {
|
||||
l.Error().Msg(fmt.Sprint("Error when trying to create a serviceAccount on storage " + storageRes.Name + " on peer at " + p.Url))
|
||||
if _, ok := caller.LastResults["body"]; ok {
|
||||
l.Error().Msg(string(caller.LastResults["body"].([]byte)))
|
||||
@@ -336,7 +344,6 @@ func getServiceAccountCredentials(peerID string, storageRes resources.StorageRes
|
||||
secret = s.(string)
|
||||
}
|
||||
|
||||
|
||||
return access, secret, nil
|
||||
}
|
||||
|
||||
@@ -363,11 +370,11 @@ func postS3Secret(peerID string, s resources.StorageResource, caller *tools.HTTP
|
||||
}
|
||||
|
||||
result_code := caller.LastResults["code"].(int)
|
||||
if !slices.Contains([]int{200,201}, result_code) {
|
||||
l.Error().Msg(fmt.Sprint("Error when trying to post the credential to " + s.Name + "to a secret on peer at " + p.Url))
|
||||
if !slices.Contains([]int{200, 201}, result_code) {
|
||||
l.Error().Msg(fmt.Sprint("Error when trying to post the credential to " + s.Name + "to a secret on peer at " + p.Url))
|
||||
if _, ok := caller.LastResults["body"]; ok {
|
||||
l.Error().Msg(string(caller.LastResults["body"].([]byte)))
|
||||
return fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte)))
|
||||
return fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte)))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
4
env.env
Normal file
4
env.env
Normal file
@@ -0,0 +1,4 @@
|
||||
KUBERNETES_SERVICE_HOST=192.168.47.20
|
||||
KUBE_CA="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTVlk3ZHZhNEdYTVdkMy9jMlhLN3JLYjlnWXgyNSthaEE0NmkyNVBkSFAKRktQL2UxSVMyWVF0dzNYZW1TTUQxaStZdzJSaVppNUQrSVZUamNtNHdhcnFvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWtlUVJpNFJiODduME5yRnZaWjZHClc2SU55NnN3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUlnRXA5ck04WmdNclRZSHYxZjNzOW5DZXZZeWVVa3lZUk4KWjUzazdoaytJS1FDSVFDbk05TnVGKzlTakIzNDFacGZ5ays2NEpWdkpSM3BhcmVaejdMd2lhNm9kdz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K"
|
||||
KUBE_CERT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K"
|
||||
KUBE_DATA="LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="
|
||||
2
go.mod
2
go.mod
@@ -5,7 +5,7 @@ go 1.23.0
|
||||
toolchain go1.24.0
|
||||
|
||||
require (
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d
|
||||
github.com/beego/beego/v2 v2.3.8
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/smartystreets/goconvey v1.7.2
|
||||
|
||||
6
go.sum
6
go.sum
@@ -26,6 +26,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20250805112547-cc939451fd81 h1:539qIasa1Vz+F
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250805112547-cc939451fd81/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1 h1:53KzZ+1JqRY6J7EVzQpNBmLzNuxb8oHNW3UgqxkYABo=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260113150431-6d745fe92216 h1:9ab37/TK1JhdOOvYbqq9J9hDbipofBkq0l2GQ6umARY=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260113150431-6d745fe92216/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125532-0e378dc19c06 h1:kDTCqxzV8dvLeXPzPWIn4LgFqwgVprrXwNnP+ftA9C0=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125532-0e378dc19c06/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d h1:6oGSN4Fb+H7LNVbUEN7vaDtWBHZTdd2Y1BkBdZ7MLXE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc=
|
||||
github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg=
|
||||
|
||||
362
infrastructure/scheduler.go
Normal file
362
infrastructure/scheduler.go
Normal file
@@ -0,0 +1,362 @@
|
||||
package infrastructure
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/bill"
|
||||
"cloud.o-forge.io/core/oc-lib/models/booking"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
|
||||
"cloud.o-forge.io/core/oc-lib/models/order"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
"github.com/google/uuid"
|
||||
"github.com/robfig/cron"
|
||||
)
|
||||
|
||||
/*
|
||||
* WorkflowSchedule is a struct that contains the scheduling information of a workflow
|
||||
* It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression
|
||||
*/
|
||||
// it's a flying object only use in a session time. It's not stored in the database
|
||||
type WorkflowSchedule struct {
|
||||
UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow
|
||||
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
|
||||
WorkflowExecution []*workflow_execution.WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow
|
||||
Message string `json:"message,omitempty"` // Message is the message of the schedule
|
||||
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
|
||||
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
|
||||
End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time
|
||||
DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule
|
||||
Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task
|
||||
|
||||
BookingMode booking.BookingMode `json:"booking_mode,omitempty"` // BookingMode qualify the preemption order of the scheduling. if no payment allowed with preemption set up When_Possible
|
||||
SelectedInstances workflow.ConfigItem `json:"selected_instances"`
|
||||
SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"`
|
||||
SelectedBuyings workflow.ConfigItem `json:"selected_buyings"`
|
||||
SelectedStrategies workflow.ConfigItem `json:"selected_strategies"`
|
||||
|
||||
SelectedBillingStrategy pricing.BillingStrategy `json:"selected_billing_strategy"`
|
||||
}
|
||||
|
||||
// TODO PREEMPTION !
|
||||
/*
|
||||
To schedule a preempted, omg.
|
||||
pour faire ça on doit alors lancé une exécution prioritaire qui passera devant toutes les autres, celon un niveau de priorité.
|
||||
Preemptible = 7, pour le moment il n'existera que 0 et 7.
|
||||
Dans le cas d'une préemption l'exécution est immédiable et bloquera tout le monde tant qu'il n'a pas été exécuté.
|
||||
Une ressource doit pouvoir être preemptible pour être exécutée de la sorte.
|
||||
Se qui implique si on est sur une ressource par ressource que si un élement n'est pas préemptible,
|
||||
alors il devra être effectué dés que possible
|
||||
|
||||
Dans le cas dés que possible, la start date est immédiate MAIS !
|
||||
ne pourra se lancé que SI il n'existe pas d'exécution se lançant durant la période indicative. ( Ultra complexe )
|
||||
*/
|
||||
|
||||
func NewScheduler(mode int, start string, end string, durationInS float64, cron string) *WorkflowSchedule {
|
||||
ws := &WorkflowSchedule{
|
||||
UUID: uuid.New().String(),
|
||||
Start: time.Now(),
|
||||
BookingMode: booking.BookingMode(mode),
|
||||
DurationS: durationInS,
|
||||
Cron: cron,
|
||||
}
|
||||
s, err := time.Parse("2006-01-02T15:04:05", start)
|
||||
if err == nil && ws.BookingMode == booking.PLANNED {
|
||||
ws.Start = s // can apply a defined start other than now, if planned
|
||||
}
|
||||
|
||||
e, err := time.Parse("2006-01-02T15:04:05", end)
|
||||
if err == nil {
|
||||
ws.End = &e
|
||||
}
|
||||
return ws
|
||||
}
|
||||
|
||||
func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*workflow_execution.WorkflowExecution, []*purchase_resource.PurchaseResource, []*booking.Booking, error) {
|
||||
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
|
||||
return false, nil, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("no caller defined")
|
||||
}
|
||||
access := workflow.NewAccessor(request)
|
||||
res, code, err := access.LoadOne(wfID)
|
||||
if code != 200 {
|
||||
return false, nil, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error())
|
||||
}
|
||||
wf := res.(*workflow.Workflow)
|
||||
isPreemptible, longest, priceds, wf, err := wf.Planify(ws.Start, ws.End,
|
||||
ws.SelectedInstances, ws.SelectedPartnerships, ws.SelectedBuyings, ws.SelectedStrategies,
|
||||
int(ws.BookingMode), request)
|
||||
if err != nil {
|
||||
return false, wf, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err
|
||||
}
|
||||
ws.DurationS = longest
|
||||
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
|
||||
if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) {
|
||||
ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
|
||||
}
|
||||
execs, err := ws.GetExecutions(wf, isPreemptible)
|
||||
if err != nil {
|
||||
return false, wf, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err
|
||||
}
|
||||
purchased := []*purchase_resource.PurchaseResource{}
|
||||
bookings := []*booking.Booking{}
|
||||
for _, exec := range execs {
|
||||
purchased = append(purchased, exec.Buy(ws.SelectedBillingStrategy, ws.UUID, wfID, priceds)...)
|
||||
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
|
||||
}
|
||||
|
||||
errCh := make(chan error, len(bookings))
|
||||
var m sync.Mutex
|
||||
|
||||
for _, b := range bookings {
|
||||
go getBooking(b, request, errCh, &m)
|
||||
}
|
||||
|
||||
for i := 0; i < len(bookings); i++ {
|
||||
if err := <-errCh; err != nil {
|
||||
return false, wf, execs, purchased, bookings, err
|
||||
}
|
||||
}
|
||||
|
||||
return true, wf, execs, purchased, bookings, nil
|
||||
}
|
||||
|
||||
func (ws *WorkflowSchedule) GenerateOrder(purchases []*purchase_resource.PurchaseResource, bookings []*booking.Booking, request *tools.APIRequest) error {
|
||||
newOrder := &order.Order{
|
||||
AbstractObject: utils.AbstractObject{
|
||||
Name: "order_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"),
|
||||
IsDraft: true,
|
||||
},
|
||||
ExecutionsID: ws.UUID,
|
||||
Purchases: purchases,
|
||||
Bookings: bookings,
|
||||
Status: enum.PENDING,
|
||||
}
|
||||
if res, _, err := order.NewAccessor(request).StoreOne(newOrder); err == nil {
|
||||
if _, err := bill.DraftFirstBill(res.(*order.Order), request); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
|
||||
m.Lock()
|
||||
c, err := getCallerCopy(request, errCh)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
m.Unlock()
|
||||
|
||||
meth := c.URLS[tools.BOOKING][tools.GET]
|
||||
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
|
||||
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
|
||||
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
|
||||
c.URLS[tools.BOOKING][tools.GET] = meth
|
||||
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c)
|
||||
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("error on " + b.DestPeerID + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
errCh <- nil
|
||||
}
|
||||
|
||||
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
|
||||
var c tools.HTTPCaller
|
||||
err := request.Caller.DeepCopy(c)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return tools.HTTPCaller{}, nil
|
||||
}
|
||||
c.URLS = request.Caller.URLS
|
||||
return c, err
|
||||
}
|
||||
|
||||
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*workflow_execution.WorkflowExecution, error) {
|
||||
if request == nil {
|
||||
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no request found")
|
||||
}
|
||||
c := request.Caller
|
||||
if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil {
|
||||
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no caller defined")
|
||||
}
|
||||
methods := c.URLS[tools.BOOKING]
|
||||
if _, ok := methods[tools.GET]; !ok {
|
||||
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no path found")
|
||||
}
|
||||
ok, wf, executions, purchases, bookings, err := ws.GetBuyAndBook(wfID, request)
|
||||
ws.WorkflowExecution = executions
|
||||
if !ok || err != nil {
|
||||
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
|
||||
}
|
||||
ws.Workflow = wf
|
||||
|
||||
var errCh = make(chan error, len(bookings))
|
||||
var m sync.Mutex
|
||||
|
||||
for _, purchase := range purchases {
|
||||
go ws.CallDatacenter(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, request, errCh, &m)
|
||||
}
|
||||
for i := 0; i < len(purchases); i++ {
|
||||
if err := <-errCh; err != nil {
|
||||
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
|
||||
}
|
||||
}
|
||||
|
||||
errCh = make(chan error, len(bookings))
|
||||
|
||||
for _, booking := range bookings {
|
||||
go ws.CallDatacenter(booking, booking.DestPeerID, tools.BOOKING, request, errCh, &m)
|
||||
}
|
||||
|
||||
for i := 0; i < len(bookings); i++ {
|
||||
if err := <-errCh; err != nil {
|
||||
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := ws.GenerateOrder(purchases, bookings, request); err != nil {
|
||||
return ws, wf, executions, err
|
||||
}
|
||||
|
||||
fmt.Println("Schedules")
|
||||
for _, exec := range executions {
|
||||
err := exec.PurgeDraft(request)
|
||||
if err != nil {
|
||||
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err))
|
||||
}
|
||||
exec.StoreDraftDefault()
|
||||
utils.GenericStoreOne(exec, workflow_execution.NewAccessor(request))
|
||||
}
|
||||
fmt.Println("Schedules")
|
||||
|
||||
wf.GetAccessor(&tools.APIRequest{Admin: true}).UpdateOne(wf, wf.GetID())
|
||||
|
||||
return ws, wf, executions, nil
|
||||
}
|
||||
|
||||
func (ws *WorkflowSchedule) CallDatacenter(purchase utils.DBObject, destPeerID string, dt tools.DataType, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
|
||||
m.Lock()
|
||||
c, err := getCallerCopy(request, errCh)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
m.Unlock()
|
||||
if res, err := (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
} else {
|
||||
data := res["data"].(map[string]interface{})
|
||||
purchase.SetID(fmt.Sprintf("%v", data["id"]))
|
||||
}
|
||||
errCh <- nil
|
||||
}
|
||||
|
||||
/*
|
||||
BOOKING IMPLIED TIME, not of subscription but of execution
|
||||
so is processing time execution time applied on computes
|
||||
data can improve the processing time
|
||||
time should implied a security time border (10sec) if not from the same executions
|
||||
VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING
|
||||
*/
|
||||
|
||||
/*
|
||||
* getExecutions is a function that returns the executions of a workflow
|
||||
* it returns an array of workflow_execution.WorkflowExecution
|
||||
*/
|
||||
func (ws *WorkflowSchedule) GetExecutions(workflow *workflow.Workflow, isPreemptible bool) ([]*workflow_execution.WorkflowExecution, error) {
|
||||
workflows_executions := []*workflow_execution.WorkflowExecution{}
|
||||
dates, err := ws.GetDates()
|
||||
if err != nil {
|
||||
return workflows_executions, err
|
||||
}
|
||||
for _, date := range dates {
|
||||
obj := &workflow_execution.WorkflowExecution{
|
||||
AbstractObject: utils.AbstractObject{
|
||||
UUID: uuid.New().String(), // set the uuid of the execution
|
||||
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
|
||||
},
|
||||
Priority: 1,
|
||||
ExecutionsID: ws.UUID,
|
||||
ExecDate: date.Start, // set the execution date
|
||||
EndDate: date.End, // set the end date
|
||||
State: enum.DRAFT, // set the state to 1 (scheduled)
|
||||
WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution
|
||||
}
|
||||
if ws.BookingMode != booking.PLANNED {
|
||||
obj.Priority = 0
|
||||
}
|
||||
if ws.BookingMode == booking.PREEMPTED && isPreemptible {
|
||||
obj.Priority = 7
|
||||
}
|
||||
|
||||
ws.SelectedStrategies = obj.SelectedStrategies
|
||||
ws.SelectedPartnerships = obj.SelectedPartnerships
|
||||
ws.SelectedBuyings = obj.SelectedBuyings
|
||||
ws.SelectedInstances = obj.SelectedInstances
|
||||
|
||||
workflows_executions = append(workflows_executions, obj)
|
||||
}
|
||||
return workflows_executions, nil
|
||||
}
|
||||
|
||||
func (ws *WorkflowSchedule) GetDates() ([]Schedule, error) {
|
||||
schedule := []Schedule{}
|
||||
if len(ws.Cron) > 0 { // if cron is set then end date should be set
|
||||
if ws.End == nil {
|
||||
return schedule, errors.New("a cron task should have an end date")
|
||||
}
|
||||
if ws.DurationS <= 0 {
|
||||
ws.DurationS = ws.End.Sub(ws.Start).Seconds()
|
||||
}
|
||||
cronStr := strings.Split(ws.Cron, " ") // split the cron string to treat it
|
||||
if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields)
|
||||
return schedule, errors.New("Bad cron message: (" + ws.Cron + "). Should be at least ss mm hh dd MM dw")
|
||||
}
|
||||
subCron := strings.Join(cronStr[:6], " ")
|
||||
// cron should be parsed as ss mm hh dd MM dw t (min 6 fields)
|
||||
specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser
|
||||
sched, err := specParser.Parse(subCron) // parse the cron string
|
||||
if err != nil {
|
||||
return schedule, errors.New("Bad cron message: " + err.Error())
|
||||
}
|
||||
// loop through the cron schedule to set the executions
|
||||
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
|
||||
e := s.Add(time.Duration(ws.DurationS) * time.Second)
|
||||
schedule = append(schedule, Schedule{
|
||||
Start: s,
|
||||
End: &e,
|
||||
})
|
||||
}
|
||||
} else { // if no cron, set the execution to the start date
|
||||
schedule = append(schedule, Schedule{
|
||||
Start: ws.Start,
|
||||
End: ws.End,
|
||||
})
|
||||
}
|
||||
return schedule, nil
|
||||
}
|
||||
|
||||
type Schedule struct {
|
||||
Start time.Time
|
||||
End *time.Time
|
||||
}
|
||||
|
||||
/*
|
||||
* TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET
|
||||
* SET PROTECTION BORDER TIME
|
||||
*/
|
||||
Reference in New Issue
Block a user