completed encode and decode steps of test
This commit is contained in:
@@ -27,6 +27,11 @@ type SequencedString struct {
|
||||
str string
|
||||
}
|
||||
|
||||
type SequencedBytes struct {
|
||||
seq int
|
||||
byt []byte
|
||||
}
|
||||
|
||||
// TestGRPCCodecConcurrency deliberately intersperses extremely long messages
|
||||
// and spawns tests concurrently in order to ensure the client correctly returns
|
||||
// responses to the thread that requested them
|
||||
@@ -115,12 +120,14 @@ func TestGRPCCodecConcurrency(t *testing.T) {
|
||||
// We will keep track of ins and outs for log prints
|
||||
var qCount atomic.Uint32
|
||||
|
||||
log.Println("encoding received items")
|
||||
|
||||
wg.Add(1)
|
||||
for i := range generated {
|
||||
|
||||
go func(i int) {
|
||||
|
||||
log.Println("processing item", i)
|
||||
log.Println("encode processing item", i)
|
||||
|
||||
// we need to wait until all messages process before collating the
|
||||
// results
|
||||
@@ -169,7 +176,7 @@ func TestGRPCCodecConcurrency(t *testing.T) {
|
||||
for item := range stringChan {
|
||||
|
||||
counter++
|
||||
log.Println("collating item", item.seq, "items done:", counter)
|
||||
log.Println("collating encode item", item.seq, "items done:", counter)
|
||||
// place items back in the sequence position they were created in
|
||||
encoded[item.seq] = item.str
|
||||
if counter >= testItems {
|
||||
@@ -177,6 +184,75 @@ func TestGRPCCodecConcurrency(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// To create a collection that can be sorted easily after creation back into
|
||||
// an ordered slice, we create a buffered channel with enough buffers to
|
||||
// hold all of the items we will feed into it
|
||||
bytesChan := make(chan SequencedBytes, testItems)
|
||||
|
||||
log.Println("decoding received items")
|
||||
|
||||
wg.Add(1)
|
||||
for i := range encoded {
|
||||
|
||||
go func(i int) {
|
||||
|
||||
log.Println("decode processing item", i)
|
||||
|
||||
// we need to wait until all messages process before collating the
|
||||
// results
|
||||
wg.Add(1)
|
||||
qCount.Inc()
|
||||
|
||||
log.Println(
|
||||
"decode request", i, "sending,",
|
||||
qCount.Load(), "items in queue",
|
||||
)
|
||||
|
||||
// send out the query and wait for the response
|
||||
decRes := <-dec(
|
||||
&proto.DecodeRequest{
|
||||
EncodedString: encoded[i],
|
||||
},
|
||||
)
|
||||
|
||||
// push the returned result into our channel buffer with the item
|
||||
// sequence number, so it can be reordered correctly
|
||||
bytesChan <- SequencedBytes{
|
||||
seq: i,
|
||||
byt: decRes.GetData(),
|
||||
}
|
||||
|
||||
wg.Done()
|
||||
qCount.Dec()
|
||||
|
||||
log.Println(
|
||||
"decode request", i, "response received,",
|
||||
qCount.Load(), "items in queue",
|
||||
)
|
||||
|
||||
}(i)
|
||||
}
|
||||
wg.Done()
|
||||
|
||||
// Wait until all results are back so we can assemble them in order for
|
||||
// checking
|
||||
wg.Wait()
|
||||
|
||||
decoded := make([][]byte, testItems)
|
||||
|
||||
counter = 0
|
||||
|
||||
for item := range bytesChan {
|
||||
|
||||
counter++
|
||||
log.Println("collating decode item", item.seq, "items done:", counter)
|
||||
// place items back in the sequence position they were created in
|
||||
decoded[item.seq] = item.byt
|
||||
if counter >= testItems {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
log.Println("shutting down client and server")
|
||||
// wg.Wait()
|
||||
stopCli()
|
||||
|
||||
Reference in New Issue
Block a user