| 
							
							
							
						 |  |  | @@ -1,10 +1,10 @@ | 
		
	
		
			
				|  |  |  |  | package workflow_execution | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | import ( | 
		
	
		
			
				|  |  |  |  | 	"encoding/json" | 
		
	
		
			
				|  |  |  |  | 	"errors" | 
		
	
		
			
				|  |  |  |  | 	"fmt" | 
		
	
		
			
				|  |  |  |  | 	"strings" | 
		
	
		
			
				|  |  |  |  | 	"sync" | 
		
	
		
			
				|  |  |  |  | 	"time" | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	"cloud.o-forge.io/core/oc-lib/logs" | 
		
	
	
		
			
				
					
					|  |  |  | @@ -16,7 +16,6 @@ import ( | 
		
	
		
			
				|  |  |  |  | 	"cloud.o-forge.io/core/oc-lib/tools" | 
		
	
		
			
				|  |  |  |  | 	"github.com/google/uuid" | 
		
	
		
			
				|  |  |  |  | 	"github.com/robfig/cron" | 
		
	
		
			
				|  |  |  |  | 	"github.com/rs/zerolog" | 
		
	
		
			
				|  |  |  |  | ) | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | /* | 
		
	
	
		
			
				
					
					|  |  |  | @@ -55,8 +54,6 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W | 
		
	
		
			
				|  |  |  |  | } | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { | 
		
	
		
			
				|  |  |  |  | 	l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() | 
		
	
		
			
				|  |  |  |  | 	l.Debug().Msg("Checking booking") | 
		
	
		
			
				|  |  |  |  | 	if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { | 
		
	
		
			
				|  |  |  |  | 		return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
	
		
			
				
					
					|  |  |  | @@ -75,69 +72,27 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) | 
		
	
		
			
				|  |  |  |  | 	if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { | 
		
	
		
			
				|  |  |  |  | 		ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  | 	l.Debug().Msg("Getting executions") | 
		
	
		
			
				|  |  |  |  | 	execs, err := ws.getExecutions(wf) | 
		
	
		
			
				|  |  |  |  | 	if err != nil { | 
		
	
		
			
				|  |  |  |  | 		return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  | 	bookings := []*booking.Booking{} | 
		
	
		
			
				|  |  |  |  | 	for i, exec := range execs { | 
		
	
		
			
				|  |  |  |  | 		l.Debug().Msg("looping throughs execs : " + string(i)) | 
		
	
		
			
				|  |  |  |  | 	for _, exec := range execs { | 
		
	
		
			
				|  |  |  |  | 		bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	errCh := make(chan error, len(bookings)) | 
		
	
		
			
				|  |  |  |  | 	var m sync.Mutex | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 		for _, b := range bookings { | 
		
	
		
			
				|  |  |  |  | 		go getBooking(l, b, request, wf, execs, bookings, errCh, &m) | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	for i := 0; i < len(bookings); i++ { | 
		
	
		
			
				|  |  |  |  | 		if err := <-errCh; err != nil { | 
		
	
		
			
				|  |  |  |  | 			return false, wf, execs, bookings, err | 
		
	
		
			
				|  |  |  |  | 		} | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	return true, wf, execs, bookings, nil | 
		
	
		
			
				|  |  |  |  | } | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	m.Lock() | 
		
	
		
			
				|  |  |  |  | 	c, err := getCallerCopy(request, errCh) | 
		
	
		
			
				|  |  |  |  | 	if err != nil { | 
		
	
		
			
				|  |  |  |  | 		errCh <- err | 
		
	
		
			
				|  |  |  |  | 		return | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  | 	m.Unlock() | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	bl := l.With().Str("booking", b.UUID).Logger() | 
		
	
		
			
				|  |  |  |  | 			meth := request.Caller.URLS[tools.BOOKING][tools.GET] | 
		
	
		
			
				|  |  |  |  | 			meth = strings.ReplaceAll(meth, ":id", b.ResourceID) | 
		
	
		
			
				|  |  |  |  | 			meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) | 
		
	
		
			
				|  |  |  |  | 			meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) | 
		
	
		
			
				|  |  |  |  | 			request.Caller.URLS[tools.BOOKING][tools.GET] = meth | 
		
	
		
			
				|  |  |  |  | 	bl.Debug().Msg("Get booking " + b.UUID + " on " + b.DestPeerID) | 
		
	
		
			
				|  |  |  |  | 	_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) | 
		
	
		
			
				|  |  |  |  | 	bl.Debug().Msg("Received response from Get booking " + b.UUID + " on " + b.DestPeerID) | 
		
	
		
			
				|  |  |  |  | 			_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) | 
		
	
		
			
				|  |  |  |  | 			if err != nil { | 
		
	
		
			
				|  |  |  |  | 		errCh <- err | 
		
	
		
			
				|  |  |  |  | 		return | 
		
	
		
			
				|  |  |  |  | 				return false, wf, execs, bookings, err | 
		
	
		
			
				|  |  |  |  | 			} | 
		
	
		
			
				|  |  |  |  | 		} | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	errCh <- nil | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) { | 
		
	
		
			
				|  |  |  |  | 	var c tools.HTTPCaller | 
		
	
		
			
				|  |  |  |  | 	err := request.Caller.DeepCopy(c) | 
		
	
		
			
				|  |  |  |  | 	if err != nil { | 
		
	
		
			
				|  |  |  |  | 		errCh <- err | 
		
	
		
			
				|  |  |  |  | 		return tools.HTTPCaller{}, nil | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  | 	c.URLS = request.Caller.URLS | 
		
	
		
			
				|  |  |  |  | 	return c, err | 
		
	
		
			
				|  |  |  |  | 	return true, wf, execs, bookings, nil | 
		
	
		
			
				|  |  |  |  | } | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { | 
		
	
	
		
			
				
					
					|  |  |  | @@ -158,20 +113,15 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* | 
		
	
		
			
				|  |  |  |  | 		return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  | 	ws.Workflow = wf | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	var errCh = make(chan error, len(bookings)) | 
		
	
		
			
				|  |  |  |  | 	var m sync.Mutex | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	for _, booking := range bookings { | 
		
	
		
			
				|  |  |  |  | 		go ws.BookExecs(booking, request, errCh, &m) | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  | 	 | 
		
	
		
			
				|  |  |  |  | 	for i := 0; i < len(bookings); i++ { | 
		
	
		
			
				|  |  |  |  | 		if err := <- errCh ; err != nil { | 
		
	
		
			
				|  |  |  |  | 		l := logs.GetLogger() | 
		
	
		
			
				|  |  |  |  | 		l.Info().Msg("Booking on " + booking.DestPeerID) | 
		
	
		
			
				|  |  |  |  | 		_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", | 
		
	
		
			
				|  |  |  |  | 			tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) | 
		
	
		
			
				|  |  |  |  | 		if err != nil { | 
		
	
		
			
				|  |  |  |  | 			return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) | 
		
	
		
			
				|  |  |  |  | 		} | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	fmt.Println("Schedules") | 
		
	
		
			
				|  |  |  |  | 	for _, exec := range executions { | 
		
	
		
			
				|  |  |  |  | 		err := exec.PurgeDraft(request) | 
		
	
	
		
			
				
					
					|  |  |  | @@ -185,30 +135,6 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* | 
		
	
		
			
				|  |  |  |  | 	return ws, wf, executions, nil | 
		
	
		
			
				|  |  |  |  | } | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	m.Lock() | 
		
	
		
			
				|  |  |  |  | 	c, err := getCallerCopy(request, errCh) | 
		
	
		
			
				|  |  |  |  | 	if err != nil { | 
		
	
		
			
				|  |  |  |  | 		errCh <- err | 
		
	
		
			
				|  |  |  |  | 		return | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  | 	m.Unlock() | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() | 
		
	
		
			
				|  |  |  |  | 	l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID) | 
		
	
		
			
				|  |  |  |  | 	_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", | 
		
	
		
			
				|  |  |  |  | 		tools.BOOKING, tools.POST, booking.Serialize(booking), &c) | 
		
	
		
			
				|  |  |  |  | 	l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID) | 
		
	
		
			
				|  |  |  |  | 	 | 
		
	
		
			
				|  |  |  |  | 	if err != nil { | 
		
	
		
			
				|  |  |  |  | 		errCh <- err | 
		
	
		
			
				|  |  |  |  | 		return | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 	errCh <- nil | 
		
	
		
			
				|  |  |  |  | } | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | /* | 
		
	
		
			
				|  |  |  |  | BOOKING IMPLIED TIME, not of subscription but of execution | 
		
	
		
			
				|  |  |  |  | so is processing time execution time applied on computes | 
		
	
	
		
			
				
					
					|  |  |  | @@ -228,6 +154,9 @@ func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*Workf | 
		
	
		
			
				|  |  |  |  | 		return workflows_executions, err | 
		
	
		
			
				|  |  |  |  | 	} | 
		
	
		
			
				|  |  |  |  | 	for _, date := range dates { | 
		
	
		
			
				|  |  |  |  | 		fmt.Println("============") | 
		
	
		
			
				|  |  |  |  | 		fmt.Println("Date : " + fmt.Sprint(date)) | 
		
	
		
			
				|  |  |  |  | 		fmt.Println("============") | 
		
	
		
			
				|  |  |  |  | 		obj := &WorkflowExecution{ | 
		
	
		
			
				|  |  |  |  | 			AbstractObject: utils.AbstractObject{ | 
		
	
		
			
				|  |  |  |  | 				UUID: uuid.New().String(),                                 // set the uuid of the execution | 
		
	
	
		
			
				
					
					|  |  |  | @@ -264,8 +193,17 @@ func (ws *WorkflowSchedule) getDates() ([]Schedule, error) { | 
		
	
		
			
				|  |  |  |  | 		if err != nil { | 
		
	
		
			
				|  |  |  |  | 			return schedule, errors.New("Bad cron message: " + err.Error()) | 
		
	
		
			
				|  |  |  |  | 		} | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 		toto, _ := json.MarshalIndent(sched,"","") | 
		
	
		
			
				|  |  |  |  | 		fmt.Println(string(toto)) | 
		
	
		
			
				|  |  |  |  | 		s := sched.Next(ws.Start) | 
		
	
		
			
				|  |  |  |  | 		fmt.Println("s.IsZero() : " + fmt.Sprint(s.IsZero()) + "s.Before(*ws.End) : " +  fmt.Sprint(s.Before(*ws.End)) ) | 
		
	
		
			
				|  |  |  |  | 		tata, _ := json.MarshalIndent(s,"","") | 
		
	
		
			
				|  |  |  |  | 		fmt.Println("s = " + string(tata) + " et ws.End = " + fmt.Sprint(ws.End)) | 
		
	
		
			
				|  |  |  |  |  | 
		
	
		
			
				|  |  |  |  | 		// loop through the cron schedule to set the executions | 
		
	
		
			
				|  |  |  |  | 		for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) { | 
		
	
		
			
				|  |  |  |  | 			fmt.Println("next execution :" + fmt.Sprint(s)) | 
		
	
		
			
				|  |  |  |  | 			e := s.Add(time.Duration(ws.DurationS) * time.Second) | 
		
	
		
			
				|  |  |  |  | 			schedule = append(schedule, Schedule{ | 
		
	
		
			
				|  |  |  |  | 				Start: s, | 
		
	
	
		
			
				
					
					|  |  |  |   |