-
Notifications
You must be signed in to change notification settings - Fork 0
/
job.go
156 lines (131 loc) · 2.92 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package jobmanager
import (
"errors"
"reflect"
"runtime"
"time"
)
var (
// ErrParamsNotAdapted error
ErrParamsNotAdapted = errors.New("the number of params is not adapted")
// ErrNotAFunction error
ErrNotAFunction = errors.New("only functions can be schedule into the job queue")
//ErrParameterCannotBeNil = errors.New("nil paramaters cannot be used with reflection")
)
// Status Job Status
type Status int
// Job struct
type Job struct {
ID string
Status Status
Result JobResult
Tags []string
StartDatetime time.Time
EndDatetime time.Time
JobFunc string
funcs map[string]interface{} // Map for the function task store
fparams map[string][]interface{} // Map for function and params of function
done chan interface{}
}
// JobResult struct
type JobResult struct {
Value interface{}
Err error
}
const (
// Pending state
Pending Status = iota
// Running state
Running
// Done state
Done
// Cancelled state
Cancelled
)
func (j *Job) addTag(tag string) *Job {
j.Tags = append(j.Tags, tag)
return j
}
// do method
func (j *Job) do(jobFun interface{}, params ...interface{}) error {
typ := reflect.TypeOf(jobFun)
if typ.Kind() != reflect.Func {
return ErrNotAFunction
}
fname := getFunctionName(jobFun)
j.funcs[fname] = jobFun
j.fparams[fname] = params
j.JobFunc = fname
return nil
}
// wait method
func (j *Job) wait() {
<-j.done
}
func (j *Job) resetState() {
if j.Status == Done {
j.Status = Pending
j.done = make(chan interface{})
}
}
// run method
func (j *Job) run() (interface{}, error) {
defer func() {
j.EndDatetime = time.Now().UTC()
}()
j.StartDatetime = time.Now().UTC()
return callJobFuncWithParams(j.funcs[j.JobFunc], j.fparams[j.JobFunc])
}
func getFunctionName(fn interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(fn).Pointer()).Name()
}
func callJobFuncWithParams(jobFunc interface{}, params []interface{}) (interface{}, error) {
f := reflect.ValueOf(jobFunc)
if len(params) != f.Type().NumIn() {
return nil, ErrParamsNotAdapted
}
in := make([]reflect.Value, len(params))
for k, param := range params {
in[k] = reflect.ValueOf(param)
}
res := f.Call(in)
if len(res) == 1 {
value := res[0].Interface()
if checkIfIsError(value) {
return nil, value.(error)
}
return value, nil
}
if len(res) >= 2 {
value1 := res[0].Interface()
value2 := res[1].Interface()
if checkIfIsError(value1) {
return value2, value1.(error)
}
if checkIfIsError(value2) {
return value1, value2.(error)
}
return value1, nil
}
return nil, nil
}
func checkIfIsError(value interface{}) bool {
ok := false
if value != nil {
_, ok = value.(error)
}
return ok
}
func (j *Job) closeDoneChannel() {
channelIsOpen := true
select {
case _, channelIsOpen = <-j.done:
default:
}
if channelIsOpen {
close(j.done)
}
}
func (j *Job) isCancelled() bool {
return errors.Is(j.Result.Err, ErrJobCancelled)
}