Compare commits
9 Commits
feature/ad
...
bugfix/sch
Author | SHA1 | Date | |
---|---|---|---|
|
d4139fa270 | ||
|
db09d2ea89 | ||
|
e79e909805 | ||
|
f7e4e11705 | ||
|
77554cbcf5 | ||
|
0783395121 | ||
|
41ebcf150a | ||
|
1499def6ad | ||
|
adbab0f5d7 |
@@ -93,10 +93,10 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
|
|||||||
}
|
}
|
||||||
mypeer.AddExecution(*pexec)
|
mypeer.AddExecution(*pexec)
|
||||||
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
|
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
|
||||||
return nil, errors.New("peer is not reachable")
|
return nil, errors.New("peer is " + peerID + " not reachable")
|
||||||
} else {
|
} else {
|
||||||
if mypeer == nil {
|
if mypeer == nil {
|
||||||
return nil, errors.New("peer not found")
|
return nil, errors.New("peer " + peerID + " not found")
|
||||||
}
|
}
|
||||||
// If the peer is reachable, launch the execution
|
// If the peer is reachable, launch the execution
|
||||||
url = p.urlFormat((mypeer.Url), dt) + path // Format the URL
|
url = p.urlFormat((mypeer.Url), dt) + path // Format the URL
|
||||||
|
@@ -112,6 +112,8 @@ func (d *WorkflowExecution) VerifyAuth(request *tools.APIRequest) bool {
|
|||||||
func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking {
|
func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking {
|
||||||
booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
|
booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
|
||||||
booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
|
booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
|
||||||
|
booking = append(booking,d.bookEach(executionsID, wfID, tools.COMPUTE_RESOURCE, priceds[tools.COMPUTE_RESOURCE])...)
|
||||||
|
booking = append(booking,d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
|
||||||
return booking
|
return booking
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,11 +1,13 @@
|
|||||||
package workflow_execution
|
package workflow_execution
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"cloud.o-forge.io/core/oc-lib/logs"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/booking"
|
"cloud.o-forge.io/core/oc-lib/models/booking"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||||
@@ -112,6 +114,8 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
|
|||||||
}
|
}
|
||||||
ws.Workflow = wf
|
ws.Workflow = wf
|
||||||
for _, booking := range bookings {
|
for _, booking := range bookings {
|
||||||
|
l := logs.GetLogger()
|
||||||
|
l.Info().Msg("Booking on " + booking.DestPeerID)
|
||||||
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
|
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
|
||||||
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
|
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -150,6 +154,9 @@ func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*Workf
|
|||||||
return workflows_executions, err
|
return workflows_executions, err
|
||||||
}
|
}
|
||||||
for _, date := range dates {
|
for _, date := range dates {
|
||||||
|
fmt.Println("============")
|
||||||
|
fmt.Println("Date : " + fmt.Sprint(date))
|
||||||
|
fmt.Println("============")
|
||||||
obj := &WorkflowExecution{
|
obj := &WorkflowExecution{
|
||||||
AbstractObject: utils.AbstractObject{
|
AbstractObject: utils.AbstractObject{
|
||||||
UUID: uuid.New().String(), // set the uuid of the execution
|
UUID: uuid.New().String(), // set the uuid of the execution
|
||||||
@@ -186,8 +193,17 @@ func (ws *WorkflowSchedule) getDates() ([]Schedule, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return schedule, errors.New("Bad cron message: " + err.Error())
|
return schedule, errors.New("Bad cron message: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
toto, _ := json.MarshalIndent(sched,"","")
|
||||||
|
fmt.Println(string(toto))
|
||||||
|
s := sched.Next(ws.Start)
|
||||||
|
fmt.Println("s.IsZero() : " + fmt.Sprint(s.IsZero()) + "s.Before(*ws.End) : " + fmt.Sprint(s.Before(*ws.End)) )
|
||||||
|
tata, _ := json.MarshalIndent(s,"","")
|
||||||
|
fmt.Println("s = " + string(tata) + " et ws.End = " + fmt.Sprint(ws.End))
|
||||||
|
|
||||||
// loop through the cron schedule to set the executions
|
// loop through the cron schedule to set the executions
|
||||||
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
|
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
|
||||||
|
fmt.Println("next execution :" + fmt.Sprint(s))
|
||||||
e := s.Add(time.Duration(ws.DurationS) * time.Second)
|
e := s.Add(time.Duration(ws.DurationS) * time.Second)
|
||||||
schedule = append(schedule, Schedule{
|
schedule = append(schedule, Schedule{
|
||||||
Start: s,
|
Start: s,
|
||||||
|
Reference in New Issue
Block a user