interp: add eval cancelation by semaphore

This commit is contained in:
Dan Kortschak
2019-10-30 01:48:04 +10:30
committed by Traefiker Bot
parent 75a696a5c8
commit 714253c1e6
9 changed files with 479 additions and 65 deletions

View File

@@ -84,8 +84,11 @@ func (interp *Interpreter) run(n *node, cf *frame) {
if cf == nil {
f = interp.frame
} else {
f = &frame{anc: cf, data: make([]reflect.Value, len(n.types))}
f = newFrame(cf, len(n.types), interp.runid())
}
interp.mutex.RLock()
f.done = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(interp.done)}
interp.mutex.RUnlock()
for i, t := range n.types {
f.data[i] = reflect.New(t).Elem()
@@ -98,17 +101,20 @@ func (interp *Interpreter) run(n *node, cf *frame) {
// runCfg executes a node AST by walking its CFG and running node builtin at each step
func runCfg(n *node, f *frame) {
defer func() {
f.mutex.Lock()
f.recovered = recover()
for _, val := range f.deferred {
val[0].Call(val[1:])
}
if f.recovered != nil {
fmt.Println(n.cfgErrorf("panic"))
f.mutex.Unlock()
panic(f.recovered)
}
f.mutex.Unlock()
}()
for exec := n.exec; exec != nil; {
for exec := n.exec; exec != nil && f.runid() == n.interp.runid(); {
exec = exec(f)
}
}
@@ -442,7 +448,7 @@ func genFunctionWrapper(n *node) func(*frame) reflect.Value {
}
return reflect.MakeFunc(n.typ.TypeOf(), func(in []reflect.Value) []reflect.Value {
// Allocate and init local frame. All values to be settable and addressable.
fr := frame{anc: f, data: make([]reflect.Value, len(def.types))}
fr := newFrame(f, len(def.types), f.runid())
d := fr.data
for i, t := range def.types {
d[i] = reflect.New(t).Elem()
@@ -471,7 +477,7 @@ func genFunctionWrapper(n *node) func(*frame) reflect.Value {
}
// Interpreter code execution
runCfg(start, &fr)
runCfg(start, fr)
result := fr.data[:numRet]
for i, r := range result {
@@ -679,7 +685,7 @@ func call(n *node) {
if def.frame != nil {
anc = def.frame
}
nf := frame{anc: anc, data: make([]reflect.Value, len(def.types))}
nf := newFrame(anc, len(def.types), anc.runid())
var vararg reflect.Value
// Init return values
@@ -744,10 +750,10 @@ func call(n *node) {
// Execute function body
if goroutine {
go runCfg(def.child[3].start, &nf)
go runCfg(def.child[3].start, nf)
return tnext
}
runCfg(def.child[3].start, &nf)
runCfg(def.child[3].start, nf)
// Handle branching according to boolean result
if fnext != nil && !nf.data[0].Bool() {
@@ -1049,10 +1055,10 @@ func getFunc(n *node) {
next := getExec(n.tnext)
n.exec = func(f *frame) bltn {
fr := *f
fr := f.clone()
nod := *n
nod.val = &nod
nod.frame = &fr
nod.frame = fr
dest(f).Set(reflect.ValueOf(&nod))
return next
}
@@ -1063,11 +1069,11 @@ func getMethod(n *node) {
next := getExec(n.tnext)
n.exec = func(f *frame) bltn {
fr := *f
fr := f.clone()
nod := *(n.val.(*node))
nod.val = &nod
nod.recv = n.recv
nod.frame = &fr
nod.frame = fr
f.data[i] = reflect.ValueOf(&nod)
return next
}
@@ -1082,11 +1088,11 @@ func getMethodByName(n *node) {
n.exec = func(f *frame) bltn {
val := value0(f).Interface().(valueInterface)
m, li := val.node.typ.lookupMethod(name)
fr := *f
fr := f.clone()
nod := *m
nod.val = &nod
nod.recv = &receiver{nil, val.value, li}
nod.frame = &fr
nod.frame = fr
f.data[i] = reflect.ValueOf(&nod)
return next
}
@@ -1618,7 +1624,10 @@ func rangeChan(n *node) {
tnext := getExec(n.tnext)
n.exec = func(f *frame) bltn {
v, ok := value(f).Recv()
chosen, v, ok := reflect.Select([]reflect.SelectCase{f.done, {Dir: reflect.SelectRecv, Chan: value(f)}})
if chosen == 0 {
return nil
}
if !ok {
return fnext
}
@@ -2050,19 +2059,63 @@ func recv(n *node) {
value := genValue(n.child[0])
tnext := getExec(n.tnext)
if n.fnext != nil {
fnext := getExec(n.fnext)
n.exec = func(f *frame) bltn {
if v, _ := value(f).Recv(); v.Bool() {
if n.interp.cancelChan {
// Cancellable channel read
if n.fnext != nil {
fnext := getExec(n.fnext)
n.exec = func(f *frame) bltn {
ch := value(f)
// Fast: channel read doesn't block
if x, ok := ch.TryRecv(); ok {
if x.Bool() {
return tnext
}
return fnext
}
// Slow: channel read blocks, allow cancel
chosen, v, _ := reflect.Select([]reflect.SelectCase{f.done, {Dir: reflect.SelectRecv, Chan: ch}})
if chosen == 0 {
return nil
}
if v.Bool() {
return tnext
}
return fnext
}
} else {
i := n.findex
n.exec = func(f *frame) bltn {
// Fast: channel read doesn't block
var ok bool
ch := value(f)
if f.data[i], ok = ch.TryRecv(); ok {
return tnext
}
// Slow: channel is blocked, allow cancel
var chosen int
chosen, f.data[i], _ = reflect.Select([]reflect.SelectCase{f.done, {Dir: reflect.SelectRecv, Chan: ch}})
if chosen == 0 {
return nil
}
return tnext
}
return fnext
}
} else {
i := n.findex
n.exec = func(f *frame) bltn {
f.data[i], _ = value(f).Recv()
return tnext
// Blocking channel read (less overhead)
if n.fnext != nil {
fnext := getExec(n.fnext)
n.exec = func(f *frame) bltn {
if v, _ := value(f).Recv(); v.Bool() {
return tnext
}
return fnext
}
} else {
i := n.findex
n.exec = func(f *frame) bltn {
f.data[i], _ = value(f).Recv()
return tnext
}
}
}
}
@@ -2073,11 +2126,33 @@ func recv2(n *node) {
vok := genValue(n.anc.child[1]) // status
tnext := getExec(n.tnext)
n.exec = func(f *frame) bltn {
v, ok := vchan(f).Recv()
vres(f).Set(v)
vok(f).SetBool(ok)
return tnext
if n.interp.cancelChan {
// Cancellable channel read
n.exec = func(f *frame) bltn {
ch, result, status := vchan(f), vres(f), vok(f)
// Fast: channel read doesn't block
if v, ok := ch.TryRecv(); ok {
result.Set(v)
status.SetBool(true)
return tnext
}
// Slow: channel is blocked, allow cancel
chosen, v, ok := reflect.Select([]reflect.SelectCase{f.done, {Dir: reflect.SelectRecv, Chan: ch}})
if chosen == 0 {
return nil
}
result.Set(v)
status.SetBool(ok)
return tnext
}
} else {
// Blocking channel read (less overhead)
n.exec = func(f *frame) bltn {
v, ok := vchan(f).Recv()
vres(f).Set(v)
vok(f).SetBool(ok)
return tnext
}
}
}
@@ -2099,9 +2174,27 @@ func send(n *node) {
convertLiteralValue(n.child[1], n.child[0].typ.val.TypeOf())
value1 := genValue(n.child[1]) // value to send
n.exec = func(f *frame) bltn {
value0(f).Send(value1(f))
return next
if n.interp.cancelChan {
// Cancellable send
n.exec = func(f *frame) bltn {
ch, data := value0(f), value1(f)
// Fast: send on channel doesn't block
if ok := ch.TrySend(data); ok {
return next
}
// Slow: send on channel blocks, allow cancel
chosen, _, _ := reflect.Select([]reflect.SelectCase{f.done, {Dir: reflect.SelectSend, Chan: ch, Send: data}})
if chosen == 0 {
return nil
}
return next
}
} else {
// Blocking send (less overhead)
n.exec = func(f *frame) bltn {
value0(f).Send(value1(f))
return next
}
}
}
@@ -2143,7 +2236,7 @@ func _select(n *node) {
chanValues := make([]func(*frame) reflect.Value, nbClause)
assignedValues := make([]func(*frame) reflect.Value, nbClause)
okValues := make([]func(*frame) reflect.Value, nbClause)
cases := make([]reflect.SelectCase, nbClause)
cases := make([]reflect.SelectCase, nbClause+1)
for i := 0; i < nbClause; i++ {
if len(n.child[i].child) > 1 {
@@ -2163,7 +2256,8 @@ func _select(n *node) {
}
n.exec = func(f *frame) bltn {
for i := range cases {
cases[nbClause] = f.done
for i := range cases[:nbClause] {
switch cases[i].Dir {
case reflect.SelectRecv:
cases[i].Chan = chanValues[i](f)
@@ -2175,6 +2269,9 @@ func _select(n *node) {
}
}
j, v, s := reflect.Select(cases)
if j == nbClause {
return nil
}
if cases[j].Dir == reflect.SelectRecv && assignedValues[j] != nil {
assignedValues[j](f).Set(v)
if ok[j] != nil {