Concurrency test partially fixed
Appending results to queue scrambled them in the same order as they were completed processing.
This commit is contained in:
1
go.mod
1
go.mod
@@ -4,6 +4,7 @@ go 1.18
|
|||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/cybriq/interrupt v0.1.3
|
github.com/cybriq/interrupt v0.1.3
|
||||||
|
github.com/davecgh/go-spew v1.1.1
|
||||||
go.uber.org/atomic v1.9.0
|
go.uber.org/atomic v1.9.0
|
||||||
google.golang.org/grpc v1.46.0
|
google.golang.org/grpc v1.46.0
|
||||||
google.golang.org/protobuf v1.28.0
|
google.golang.org/protobuf v1.28.0
|
||||||
|
|||||||
@@ -185,7 +185,6 @@ func TestGRPCCodecConcurrency(t *testing.T) {
|
|||||||
}
|
}
|
||||||
o += "\n"
|
o += "\n"
|
||||||
log.Println(o)
|
log.Println(o)
|
||||||
// t.FailNow()
|
|
||||||
|
|
||||||
// replace the hashedSeeds with the long version
|
// replace the hashedSeeds with the long version
|
||||||
hashedSeeds = longMessages
|
hashedSeeds = longMessages
|
||||||
@@ -200,10 +199,8 @@ func TestGRPCCodecConcurrency(t *testing.T) {
|
|||||||
|
|
||||||
// encoded := "\nencodedStr := []string{\n"
|
// encoded := "\nencodedStr := []string{\n"
|
||||||
|
|
||||||
// var encodedLong []string
|
var encodedLong = make([]string, len(hashedSeeds))
|
||||||
var encodedLong atomic.Value
|
var elMx sync.Mutex
|
||||||
|
|
||||||
encodedLong.Store([]string{})
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
var qCount atomic.Uint32
|
var qCount atomic.Uint32
|
||||||
@@ -244,7 +241,11 @@ func TestGRPCCodecConcurrency(t *testing.T) {
|
|||||||
// )
|
// )
|
||||||
// }
|
// }
|
||||||
|
|
||||||
encodedLong.Store(append(encodedLong.Load().([]string), encode))
|
elMx.Lock()
|
||||||
|
encodedLong[i] = encode
|
||||||
|
elMx.Unlock()
|
||||||
|
|
||||||
|
// encodedLong.Store(append(encodedLong.Load().([]string), encode))
|
||||||
// encoded += "\t\"" + encode + "\",\n"
|
// encoded += "\t\"" + encode + "\",\n"
|
||||||
wg.Done()
|
wg.Done()
|
||||||
qCount.Dec()
|
qCount.Dec()
|
||||||
@@ -252,15 +253,18 @@ func TestGRPCCodecConcurrency(t *testing.T) {
|
|||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
|
// To correctly test concurrency at max speed, we need to copy the values
|
||||||
|
// for each concurrent test unit
|
||||||
|
|
||||||
// encoded += "}\n"
|
// encoded += "}\n"
|
||||||
// t.Log(encoded)
|
// t.Log(encoded)
|
||||||
// Next, decode the encodedStr above, which should be the output of the
|
// Next, decode the encodedStr above, which should be the output of the
|
||||||
// original generated seeds, with the index mod 5 truncations performed on
|
// original generated seeds, with the index mod 5 truncations performed on
|
||||||
// each as was done to generate them.
|
// each as was done to generate them.
|
||||||
_ = dec
|
for i := range encodedLong {
|
||||||
for i := range encodedLong.Load().([]string) {
|
|
||||||
|
|
||||||
go func(i int) {
|
go func(i int, el string) {
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
qCount.Inc()
|
qCount.Inc()
|
||||||
@@ -269,9 +273,10 @@ func TestGRPCCodecConcurrency(t *testing.T) {
|
|||||||
)
|
)
|
||||||
res := <-dec(
|
res := <-dec(
|
||||||
&proto.DecodeRequest{
|
&proto.DecodeRequest{
|
||||||
EncodedString: encodedLong.Load().([]string)[i],
|
EncodedString: el,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
qCount.Dec()
|
||||||
log.Println(
|
log.Println(
|
||||||
"decode message", i, "received back", qCount.Load(), "in queue",
|
"decode message", i, "received back", qCount.Load(), "in queue",
|
||||||
)
|
)
|
||||||
@@ -291,8 +296,7 @@ func TestGRPCCodecConcurrency(t *testing.T) {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
qCount.Dec()
|
}(i, encodedLong[i])
|
||||||
}(i)
|
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
stopCli()
|
stopCli()
|
||||||
|
|||||||
Reference in New Issue
Block a user