| 
						 
							
							
							
						 
					 | 
				
			
			 | 
			 | 
			
				@@ -1,12 +1,13 @@
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				package workflow_execution
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				import (
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"encoding/json"
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"errors"
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"fmt"
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"strings"
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"sync"
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"time"
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"cloud.o-forge.io/core/oc-lib/logs"
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"cloud.o-forge.io/core/oc-lib/models/booking"
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"cloud.o-forge.io/core/oc-lib/models/common/enum"
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					"cloud.o-forge.io/core/oc-lib/models/peer"
 | 
			
		
		
	
	
		
			
				
					
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@@ -78,60 +79,22 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					bookings := []*booking.Booking{}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					for _, exec := range execs {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					errCh := make(chan error, len(bookings))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					var m sync.Mutex
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					for _, b := range bookings {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						go getBooking(b, request, wf, execs, bookings, errCh, &m)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					for i := 0; i < len(bookings); i++ {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						if err := <-errCh; err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							return false, wf, execs, bookings, err
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						for _, b := range bookings {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							meth := request.Caller.URLS[tools.BOOKING][tools.GET]
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							request.Caller.URLS[tools.BOOKING][tools.GET] = meth
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							if err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
								return false, wf, execs, bookings, err
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					return true, wf, execs, bookings, nil
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				func getBooking( b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					m.Lock()
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					c, err := getCallerCopy(request, errCh)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					if err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						errCh <- err
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						return
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					m.Unlock()
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					meth := c.URLS[tools.BOOKING][tools.GET]
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					c.URLS[tools.BOOKING][tools.GET] = meth
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					if err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						errCh <- err
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						return
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					errCh <- nil
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					var c tools.HTTPCaller
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					err := request.Caller.DeepCopy(c)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					if err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						errCh <- err
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						return tools.HTTPCaller{}, nil
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					c.URLS = request.Caller.URLS
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					return c, err
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					if request == nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						return ws, nil, []*WorkflowExecution{}, errors.New("no request found")
 | 
			
		
		
	
	
		
			
				
					
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@@ -150,20 +113,15 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					ws.Workflow = wf
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					var errCh = make(chan error, len(bookings))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					var m sync.Mutex
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					for _, booking := range bookings {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						go ws.BookExecs(booking, request, errCh, &m)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					for i := 0; i < len(bookings); i++ {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						if err := <- errCh ; err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						l := logs.GetLogger()
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						l.Info().Msg("Booking on " + booking.DestPeerID)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						if err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					fmt.Println("Schedules")
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					for _, exec := range executions {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						err := exec.PurgeDraft(request)
 | 
			
		
		
	
	
		
			
				
					
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@@ -177,27 +135,6 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					return ws, wf, executions, nil
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					m.Lock()
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					c, err := getCallerCopy(request, errCh)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					if err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						errCh <- err
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						return
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					m.Unlock()
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						tools.BOOKING, tools.POST, booking.Serialize(booking), &c)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					if err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						errCh <- err
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						return
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					errCh <- nil
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				/*
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				BOOKING IMPLIED TIME, not of subscription but of execution
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				so is processing time execution time applied on computes
 | 
			
		
		
	
	
		
			
				
					
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@@ -217,6 +154,9 @@ func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*Workf
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						return workflows_executions, err
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
					for _, date := range dates {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						fmt.Println("============")
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						fmt.Println("Date : " + fmt.Sprint(date))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						fmt.Println("============")
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						obj := &WorkflowExecution{
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							AbstractObject: utils.AbstractObject{
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
								UUID: uuid.New().String(),                                 // set the uuid of the execution
 | 
			
		
		
	
	
		
			
				
					
					| 
						
					 | 
				
			
			 | 
			 | 
			
				@@ -253,8 +193,17 @@ func (ws *WorkflowSchedule) getDates() ([]Schedule, error) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						if err != nil {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							return schedule, errors.New("Bad cron message: " + err.Error())
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						}
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						toto, _ := json.MarshalIndent(sched,"","")
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						fmt.Println(string(toto))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						s := sched.Next(ws.Start)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						fmt.Println("s.IsZero() : " + fmt.Sprint(s.IsZero()) + "s.Before(*ws.End) : " +  fmt.Sprint(s.Before(*ws.End)) )
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						tata, _ := json.MarshalIndent(s,"","")
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						fmt.Println("s = " + string(tata) + " et ws.End = " + fmt.Sprint(ws.End))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
				
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						// loop through the cron schedule to set the executions
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
						for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							fmt.Println("next execution :" + fmt.Sprint(s))
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							e := s.Add(time.Duration(ws.DurationS) * time.Second)
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
							schedule = append(schedule, Schedule{
 | 
			
		
		
	
		
			
				 | 
				 | 
			
			 | 
			 | 
			
								Start: s,
 | 
			
		
		
	
	
		
			
				
					
					| 
						
					 | 
				
			
			 | 
			 | 
			
				 
 |