2026-01-14 15:15:26 +01:00
package infrastructure
import (
"errors"
"fmt"
"strings"
"sync"
"time"
"cloud.o-forge.io/core/oc-lib/models/bill"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/robfig/cron"
)
/ *
* WorkflowSchedule is a struct that contains the scheduling information of a workflow
* It contains the mode of the schedule ( Task or Service ) , the name of the schedule , the start and end time of the schedule and the cron expression
* /
// it's a flying object only use in a session time. It's not stored in the database
type WorkflowSchedule struct {
UUID string ` json:"id" validate:"required" ` // ExecutionsID is the list of the executions id of the workflow
Workflow * workflow . Workflow ` json:"workflow,omitempty" ` // Workflow is the workflow dependancy of the schedule
WorkflowExecution [ ] * workflow_execution . WorkflowExecution ` json:"workflow_executions,omitempty" ` // WorkflowExecution is the list of executions of the workflow
Message string ` json:"message,omitempty" ` // Message is the message of the schedule
Warning string ` json:"warning,omitempty" ` // Warning is the warning message of the schedule
Start time . Time ` json:"start" validate:"required,ltfield=End" ` // Start is the start time of the schedule, is required and must be less than the End time
End * time . Time ` json:"end,omitempty" ` // End is the end time of the schedule, is required and must be greater than the Start time
DurationS float64 ` json:"duration_s" default:"-1" ` // End is the end time of the schedule
Cron string ` json:"cron,omitempty" ` // here the cron format : ss mm hh dd MM dw task
BookingMode booking . BookingMode ` json:"booking_mode,omitempty" ` // BookingMode qualify the preemption order of the scheduling. if no payment allowed with preemption set up When_Possible
SelectedInstances workflow . ConfigItem ` json:"selected_instances" `
SelectedPartnerships workflow . ConfigItem ` json:"selected_partnerships" `
SelectedBuyings workflow . ConfigItem ` json:"selected_buyings" `
SelectedStrategies workflow . ConfigItem ` json:"selected_strategies" `
SelectedBillingStrategy pricing . BillingStrategy ` json:"selected_billing_strategy" `
}
// TODO PREEMPTION !
/ *
To schedule a preempted , omg .
pour faire ça on doit alors lancé une exécution prioritaire qui passera devant toutes les autres , celon un niveau de priorité .
Preemptible = 7 , pour le moment il n ' existera que 0 et 7.
Dans le cas d ' une préemption l ' exécution est immédiable et bloquera tout le monde tant qu ' il n ' a pas été exécuté .
Une ressource doit pouvoir être preemptible pour être exécutée de la sorte .
Se qui implique si on est sur une ressource par ressource que si un élement n ' est pas préemptible ,
alors il devra être effectué dés que possible
Dans le cas dés que possible , la start date est immédiate MAIS !
ne pourra se lancé que SI il n ' existe pas d ' exécution se lançant durant la période indicative . ( Ultra complexe )
* /
func NewScheduler ( mode int , start string , end string , durationInS float64 , cron string ) * WorkflowSchedule {
ws := & WorkflowSchedule {
UUID : uuid . New ( ) . String ( ) ,
Start : time . Now ( ) ,
BookingMode : booking . BookingMode ( mode ) ,
DurationS : durationInS ,
Cron : cron ,
}
s , err := time . Parse ( "2006-01-02T15:04:05" , start )
if err == nil && ws . BookingMode == booking . PLANNED {
ws . Start = s // can apply a defined start other than now, if planned
}
e , err := time . Parse ( "2006-01-02T15:04:05" , end )
if err == nil {
ws . End = & e
}
return ws
}
func ( ws * WorkflowSchedule ) GetBuyAndBook ( wfID string , request * tools . APIRequest ) ( bool , * workflow . Workflow , [ ] * workflow_execution . WorkflowExecution , [ ] * purchase_resource . PurchaseResource , [ ] * booking . Booking , error ) {
if request . Caller == nil && request . Caller . URLS == nil && request . Caller . URLS [ tools . BOOKING ] == nil || request . Caller . URLS [ tools . BOOKING ] [ tools . GET ] == "" {
return false , nil , [ ] * workflow_execution . WorkflowExecution { } , [ ] * purchase_resource . PurchaseResource { } , [ ] * booking . Booking { } , errors . New ( "no caller defined" )
}
access := workflow . NewAccessor ( request )
res , code , err := access . LoadOne ( wfID )
if code != 200 {
return false , nil , [ ] * workflow_execution . WorkflowExecution { } , [ ] * purchase_resource . PurchaseResource { } , [ ] * booking . Booking { } , errors . New ( "could not load the workflow with id: " + err . Error ( ) )
}
wf := res . ( * workflow . Workflow )
isPreemptible , longest , priceds , wf , err := wf . Planify ( ws . Start , ws . End ,
ws . SelectedInstances , ws . SelectedPartnerships , ws . SelectedBuyings , ws . SelectedStrategies ,
int ( ws . BookingMode ) , request )
if err != nil {
return false , wf , [ ] * workflow_execution . WorkflowExecution { } , [ ] * purchase_resource . PurchaseResource { } , [ ] * booking . Booking { } , err
}
ws . DurationS = longest
ws . Message = "We estimate that the workflow will start at " + ws . Start . String ( ) + " and last " + fmt . Sprintf ( "%v" , ws . DurationS ) + " seconds."
if ws . End != nil && ws . Start . Add ( time . Duration ( longest ) * time . Second ) . After ( * ws . End ) {
ws . Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
}
execs , err := ws . GetExecutions ( wf , isPreemptible )
if err != nil {
return false , wf , [ ] * workflow_execution . WorkflowExecution { } , [ ] * purchase_resource . PurchaseResource { } , [ ] * booking . Booking { } , err
}
purchased := [ ] * purchase_resource . PurchaseResource { }
bookings := [ ] * booking . Booking { }
for _ , exec := range execs {
purchased = append ( purchased , exec . Buy ( ws . SelectedBillingStrategy , ws . UUID , wfID , priceds ) ... )
bookings = append ( bookings , exec . Book ( ws . UUID , wfID , priceds ) ... )
}
errCh := make ( chan error , len ( bookings ) )
var m sync . Mutex
for _ , b := range bookings {
go getBooking ( b , request , errCh , & m )
}
for i := 0 ; i < len ( bookings ) ; i ++ {
if err := <- errCh ; err != nil {
return false , wf , execs , purchased , bookings , err
}
}
return true , wf , execs , purchased , bookings , nil
}
func ( ws * WorkflowSchedule ) GenerateOrder ( purchases [ ] * purchase_resource . PurchaseResource , bookings [ ] * booking . Booking , request * tools . APIRequest ) error {
newOrder := & order . Order {
AbstractObject : utils . AbstractObject {
Name : "order_" + request . PeerID + "_" + time . Now ( ) . UTC ( ) . Format ( "2006-01-02T15:04:05" ) ,
IsDraft : true ,
} ,
ExecutionsID : ws . UUID ,
Purchases : purchases ,
Bookings : bookings ,
Status : enum . PENDING ,
}
if res , _ , err := order . NewAccessor ( request ) . StoreOne ( newOrder ) ; err == nil {
if _ , err := bill . DraftFirstBill ( res . ( * order . Order ) , request ) ; err != nil {
return err
}
return nil
} else {
return err
}
}
func getBooking ( b * booking . Booking , request * tools . APIRequest , errCh chan error , m * sync . Mutex ) {
m . Lock ( )
c , err := getCallerCopy ( request , errCh )
if err != nil {
errCh <- err
return
}
m . Unlock ( )
meth := c . URLS [ tools . BOOKING ] [ tools . GET ]
meth = strings . ReplaceAll ( meth , ":id" , b . ResourceID )
meth = strings . ReplaceAll ( meth , ":start_date" , b . ExpectedStartDate . Format ( "2006-01-02T15:04:05" ) )
meth = strings . ReplaceAll ( meth , ":end_date" , b . ExpectedEndDate . Format ( "2006-01-02T15:04:05" ) )
c . URLS [ tools . BOOKING ] [ tools . GET ] = meth
_ , err = ( & peer . Peer { } ) . LaunchPeerExecution ( b . DestPeerID , b . ResourceID , tools . BOOKING , tools . GET , nil , & c )
if err != nil {
2026-02-20 10:35:02 +01:00
errCh <- fmt . Errorf ( "%s" , "error on " + b . DestPeerID + err . Error ( ) )
2026-01-14 15:15:26 +01:00
return
}
errCh <- nil
}
func getCallerCopy ( request * tools . APIRequest , errCh chan error ) ( tools . HTTPCaller , error ) {
var c tools . HTTPCaller
err := request . Caller . DeepCopy ( c )
if err != nil {
errCh <- err
return tools . HTTPCaller { } , nil
}
c . URLS = request . Caller . URLS
return c , err
}
func ( ws * WorkflowSchedule ) Schedules ( wfID string , request * tools . APIRequest ) ( * WorkflowSchedule , * workflow . Workflow , [ ] * workflow_execution . WorkflowExecution , error ) {
if request == nil {
return ws , nil , [ ] * workflow_execution . WorkflowExecution { } , errors . New ( "no request found" )
}
c := request . Caller
if c == nil || c . URLS == nil || c . URLS [ tools . BOOKING ] == nil {
return ws , nil , [ ] * workflow_execution . WorkflowExecution { } , errors . New ( "no caller defined" )
}
methods := c . URLS [ tools . BOOKING ]
if _ , ok := methods [ tools . GET ] ; ! ok {
return ws , nil , [ ] * workflow_execution . WorkflowExecution { } , errors . New ( "no path found" )
}
ok , wf , executions , purchases , bookings , err := ws . GetBuyAndBook ( wfID , request )
ws . WorkflowExecution = executions
if ! ok || err != nil {
return ws , nil , executions , errors . New ( "could not book the workflow : " + fmt . Sprintf ( "%v" , err ) )
}
ws . Workflow = wf
var errCh = make ( chan error , len ( bookings ) )
var m sync . Mutex
2026-02-20 10:35:02 +01:00
for _ , purchase := range purchases { // TODO on Decentralize Stream.
2026-01-14 15:15:26 +01:00
go ws . CallDatacenter ( purchase , purchase . DestPeerID , tools . PURCHASE_RESOURCE , request , errCh , & m )
}
for i := 0 ; i < len ( purchases ) ; i ++ {
if err := <- errCh ; err != nil {
return ws , wf , executions , errors . New ( "could not launch the peer execution : " + fmt . Sprintf ( "%v" , err ) )
}
}
errCh = make ( chan error , len ( bookings ) )
2026-02-20 10:35:02 +01:00
for _ , booking := range bookings { // TODO on Decentralize Stream.
2026-01-14 15:15:26 +01:00
go ws . CallDatacenter ( booking , booking . DestPeerID , tools . BOOKING , request , errCh , & m )
}
for i := 0 ; i < len ( bookings ) ; i ++ {
if err := <- errCh ; err != nil {
return ws , wf , executions , errors . New ( "could not launch the peer execution : " + fmt . Sprintf ( "%v" , err ) )
}
}
if err := ws . GenerateOrder ( purchases , bookings , request ) ; err != nil {
return ws , wf , executions , err
}
fmt . Println ( "Schedules" )
for _ , exec := range executions {
err := exec . PurgeDraft ( request )
if err != nil {
return ws , nil , [ ] * workflow_execution . WorkflowExecution { } , errors . New ( "purge draft" + fmt . Sprintf ( "%v" , err ) )
}
exec . StoreDraftDefault ( )
utils . GenericStoreOne ( exec , workflow_execution . NewAccessor ( request ) )
}
fmt . Println ( "Schedules" )
wf . GetAccessor ( & tools . APIRequest { Admin : true } ) . UpdateOne ( wf , wf . GetID ( ) )
return ws , wf , executions , nil
}
func ( ws * WorkflowSchedule ) CallDatacenter ( purchase utils . DBObject , destPeerID string , dt tools . DataType , request * tools . APIRequest , errCh chan error , m * sync . Mutex ) {
m . Lock ( )
c , err := getCallerCopy ( request , errCh )
if err != nil {
errCh <- err
return
}
m . Unlock ( )
if res , err := ( & peer . Peer { } ) . LaunchPeerExecution ( destPeerID , "" , dt , tools . POST , purchase . Serialize ( purchase ) , & c ) ; err != nil {
errCh <- err
return
} else {
data := res [ "data" ] . ( map [ string ] interface { } )
purchase . SetID ( fmt . Sprintf ( "%v" , data [ "id" ] ) )
}
errCh <- nil
}
/ *
BOOKING IMPLIED TIME , not of subscription but of execution
so is processing time execution time applied on computes
data can improve the processing time
time should implied a security time border ( 10 sec ) if not from the same executions
VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING
* /
/ *
* getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution . WorkflowExecution
* /
func ( ws * WorkflowSchedule ) GetExecutions ( workflow * workflow . Workflow , isPreemptible bool ) ( [ ] * workflow_execution . WorkflowExecution , error ) {
workflows_executions := [ ] * workflow_execution . WorkflowExecution { }
dates , err := ws . GetDates ( )
if err != nil {
return workflows_executions , err
}
for _ , date := range dates {
obj := & workflow_execution . WorkflowExecution {
AbstractObject : utils . AbstractObject {
UUID : uuid . New ( ) . String ( ) , // set the uuid of the execution
Name : workflow . Name + "_execution_" + date . Start . String ( ) , // set the name of the execution
} ,
Priority : 1 ,
ExecutionsID : ws . UUID ,
ExecDate : date . Start , // set the execution date
EndDate : date . End , // set the end date
State : enum . DRAFT , // set the state to 1 (scheduled)
WorkflowID : workflow . GetID ( ) , // set the workflow id dependancy of the execution
}
if ws . BookingMode != booking . PLANNED {
obj . Priority = 0
}
if ws . BookingMode == booking . PREEMPTED && isPreemptible {
obj . Priority = 7
}
ws . SelectedStrategies = obj . SelectedStrategies
ws . SelectedPartnerships = obj . SelectedPartnerships
ws . SelectedBuyings = obj . SelectedBuyings
ws . SelectedInstances = obj . SelectedInstances
workflows_executions = append ( workflows_executions , obj )
}
return workflows_executions , nil
}
func ( ws * WorkflowSchedule ) GetDates ( ) ( [ ] Schedule , error ) {
schedule := [ ] Schedule { }
if len ( ws . Cron ) > 0 { // if cron is set then end date should be set
if ws . End == nil {
return schedule , errors . New ( "a cron task should have an end date" )
}
if ws . DurationS <= 0 {
ws . DurationS = ws . End . Sub ( ws . Start ) . Seconds ( )
}
cronStr := strings . Split ( ws . Cron , " " ) // split the cron string to treat it
if len ( cronStr ) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields)
return schedule , errors . New ( "Bad cron message: (" + ws . Cron + "). Should be at least ss mm hh dd MM dw" )
}
subCron := strings . Join ( cronStr [ : 6 ] , " " )
// cron should be parsed as ss mm hh dd MM dw t (min 6 fields)
specParser := cron . NewParser ( cron . Second | cron . Minute | cron . Hour | cron . Dom | cron . Month | cron . Dow ) // create a new cron parser
sched , err := specParser . Parse ( subCron ) // parse the cron string
if err != nil {
return schedule , errors . New ( "Bad cron message: " + err . Error ( ) )
}
// loop through the cron schedule to set the executions
for s := sched . Next ( ws . Start ) ; ! s . IsZero ( ) && s . Before ( * ws . End ) ; s = sched . Next ( s ) {
e := s . Add ( time . Duration ( ws . DurationS ) * time . Second )
schedule = append ( schedule , Schedule {
Start : s ,
End : & e ,
} )
}
} else { // if no cron, set the execution to the start date
schedule = append ( schedule , Schedule {
Start : ws . Start ,
End : ws . End ,
} )
}
return schedule , nil
}
type Schedule struct {
Start time . Time
End * time . Time
}
/ *
* TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET
* SET PROTECTION BORDER TIME
* /