|  |  | @@ -4,6 +4,7 @@ import ( | 
			
		
	
		
		
			
				
					
					|  |  |  | 	"errors" |  |  |  | 	"errors" | 
			
		
	
		
		
			
				
					
					|  |  |  | 	"fmt" |  |  |  | 	"fmt" | 
			
		
	
		
		
			
				
					
					|  |  |  | 	"strings" |  |  |  | 	"strings" | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	"sync" | 
			
		
	
		
		
			
				
					
					|  |  |  | 	"time" |  |  |  | 	"time" | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 	"cloud.o-forge.io/core/oc-lib/logs" |  |  |  | 	"cloud.o-forge.io/core/oc-lib/logs" | 
			
		
	
	
		
		
			
				
					
					|  |  | @@ -15,6 +16,7 @@ import ( | 
			
		
	
		
		
			
				
					
					|  |  |  | 	"cloud.o-forge.io/core/oc-lib/tools" |  |  |  | 	"cloud.o-forge.io/core/oc-lib/tools" | 
			
		
	
		
		
			
				
					
					|  |  |  | 	"github.com/google/uuid" |  |  |  | 	"github.com/google/uuid" | 
			
		
	
		
		
			
				
					
					|  |  |  | 	"github.com/robfig/cron" |  |  |  | 	"github.com/robfig/cron" | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	"github.com/rs/zerolog" | 
			
		
	
		
		
			
				
					
					|  |  |  | ) |  |  |  | ) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | /* |  |  |  | /* | 
			
		
	
	
		
		
			
				
					
					|  |  | @@ -53,6 +55,8 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W | 
			
		
	
		
		
			
				
					
					|  |  |  | } |  |  |  | } | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { |  |  |  | func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	l.Debug().Msg("Checking booking") | 
			
		
	
		
		
			
				
					
					|  |  |  | 	if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { |  |  |  | 	if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { | 
			
		
	
		
		
			
				
					
					|  |  |  | 		return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") |  |  |  | 		return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") | 
			
		
	
		
		
			
				
					
					|  |  |  | 	} |  |  |  | 	} | 
			
		
	
	
		
		
			
				
					
					|  |  | @@ -71,29 +75,71 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) | 
			
		
	
		
		
			
				
					
					|  |  |  | 	if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { |  |  |  | 	if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { | 
			
		
	
		
		
			
				
					
					|  |  |  | 		ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" |  |  |  | 		ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" | 
			
		
	
		
		
			
				
					
					|  |  |  | 	} |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	l.Debug().Msg("Getting executions") | 
			
		
	
		
		
			
				
					
					|  |  |  | 	execs, err := ws.getExecutions(wf) |  |  |  | 	execs, err := ws.getExecutions(wf) | 
			
		
	
		
		
			
				
					
					|  |  |  | 	if err != nil { |  |  |  | 	if err != nil { | 
			
		
	
		
		
			
				
					
					|  |  |  | 		return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err |  |  |  | 		return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err | 
			
		
	
		
		
			
				
					
					|  |  |  | 	} |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  | 	bookings := []*booking.Booking{} |  |  |  | 	bookings := []*booking.Booking{} | 
			
		
	
		
		
			
				
					
					|  |  |  | 	for _, exec := range execs { |  |  |  | 	for i, exec := range execs { | 
			
				
				
			
		
	
		
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		l.Debug().Msg("looping throughs execs : " + string(i)) | 
			
		
	
		
		
			
				
					
					|  |  |  | 		bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) |  |  |  | 		bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) | 
			
		
	
		
		
			
				
					
					|  |  |  | 		for _, b := range bookings { |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 			meth := request.Caller.URLS[tools.BOOKING][tools.GET] |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 			meth = strings.ReplaceAll(meth, ":id", b.ResourceID) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 			meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 			meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 			request.Caller.URLS[tools.BOOKING][tools.GET] = meth |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 			_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 			if err != nil { |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 				return false, wf, execs, bookings, err |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 			} |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 		} |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 	} |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	errCh := make(chan error, len(bookings)) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	var m sync.Mutex | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	for _, b := range bookings { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		go getBooking(l, b, request, wf, execs, bookings, errCh, &m) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	for i := 0; i < len(bookings); i++ { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		if err := <-errCh; err != nil { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 			return false, wf, execs, bookings, err | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 	return true, wf, execs, bookings, nil |  |  |  | 	return true, wf, execs, bookings, nil | 
			
		
	
		
		
			
				
					
					|  |  |  | } |  |  |  | } | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	m.Lock() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	c, err := getCallerCopy(request, errCh) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	if err != nil { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		errCh <- err | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		return | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	m.Unlock() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	bl := l.With().Str("booking", b.UUID).Logger() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	meth := request.Caller.URLS[tools.BOOKING][tools.GET] | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	meth = strings.ReplaceAll(meth, ":id", b.ResourceID) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	request.Caller.URLS[tools.BOOKING][tools.GET] = meth | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	bl.Debug().Msg("Get booking " + b.UUID + " on " + b.DestPeerID) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	bl.Debug().Msg("Received response from Get booking " + b.UUID + " on " + b.DestPeerID) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	if err != nil { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		errCh <- err | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		return | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	errCh <- nil | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | } | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	var c tools.HTTPCaller | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	err := request.Caller.DeepCopy(c) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	if err != nil { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		errCh <- err | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		return tools.HTTPCaller{}, nil | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	c.URLS = request.Caller.URLS | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	return c, err | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | } | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { |  |  |  | func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { | 
			
		
	
		
		
			
				
					
					|  |  |  | 	if request == nil { |  |  |  | 	if request == nil { | 
			
		
	
		
		
			
				
					
					|  |  |  | 		return ws, nil, []*WorkflowExecution{}, errors.New("no request found") |  |  |  | 		return ws, nil, []*WorkflowExecution{}, errors.New("no request found") | 
			
		
	
	
		
		
			
				
					
					|  |  | @@ -112,15 +158,20 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* | 
			
		
	
		
		
			
				
					
					|  |  |  | 		return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) |  |  |  | 		return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) | 
			
		
	
		
		
			
				
					
					|  |  |  | 	} |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  | 	ws.Workflow = wf |  |  |  | 	ws.Workflow = wf | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	var errCh = make(chan error, len(bookings)) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	var m sync.Mutex | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 	for _, booking := range bookings { |  |  |  | 	for _, booking := range bookings { | 
			
		
	
		
		
			
				
					
					|  |  |  | 		l := logs.GetLogger() |  |  |  | 		go ws.BookExecs(booking, request, errCh, &m) | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | 		l.Info().Msg("Booking on " + booking.DestPeerID) |  |  |  | 	} | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | 		_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", |  |  |  | 	 | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | 			tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) |  |  |  | 	for i := 0; i < len(bookings); i++ { | 
			
				
				
			
		
	
		
		
			
				
					
					|  |  |  | 		if err != nil { |  |  |  | 		if err := <- errCh ; err != nil { | 
			
				
				
			
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
	
		
		
			
				
					
					|  |  |  | 			return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) |  |  |  | 			return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) | 
			
		
	
		
		
			
				
					
					|  |  |  | 		} |  |  |  | 		} | 
			
		
	
		
		
			
				
					
					|  |  |  | 	} |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | 	fmt.Println("Schedules") |  |  |  | 	fmt.Println("Schedules") | 
			
		
	
		
		
			
				
					
					|  |  |  | 	for _, exec := range executions { |  |  |  | 	for _, exec := range executions { | 
			
		
	
		
		
			
				
					
					|  |  |  | 		err := exec.PurgeDraft(request) |  |  |  | 		err := exec.PurgeDraft(request) | 
			
		
	
	
		
		
			
				
					
					|  |  | @@ -134,6 +185,30 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* | 
			
		
	
		
		
			
				
					
					|  |  |  | 	return ws, wf, executions, nil |  |  |  | 	return ws, wf, executions, nil | 
			
		
	
		
		
			
				
					
					|  |  |  | } |  |  |  | } | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	m.Lock() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	c, err := getCallerCopy(request, errCh) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	if err != nil { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		errCh <- err | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		return | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	m.Unlock() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		tools.BOOKING, tools.POST, booking.Serialize(booking), &c) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID) | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	 | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	if err != nil { | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		errCh <- err | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 		return | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	} | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | 	errCh <- nil | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  | } | 
			
		
	
		
		
			
				
					
					|  |  |  |  |  |  |  |  | 
			
		
	
		
		
			
				
					
					|  |  |  | /* |  |  |  | /* | 
			
		
	
		
		
			
				
					
					|  |  |  | BOOKING IMPLIED TIME, not of subscription but of execution |  |  |  | BOOKING IMPLIED TIME, not of subscription but of execution | 
			
		
	
		
		
			
				
					
					|  |  |  | so is processing time execution time applied on computes |  |  |  | so is processing time execution time applied on computes | 
			
		
	
	
		
		
			
				
					
					|  |  |   |