diff --git a/pkg/grpc/concurrency_test.go b/pkg/grpc/concurrency_test.go index b778084..8551f21 100644 --- a/pkg/grpc/concurrency_test.go +++ b/pkg/grpc/concurrency_test.go @@ -27,6 +27,11 @@ type SequencedString struct { str string } +type SequencedBytes struct { + seq int + byt []byte +} + // TestGRPCCodecConcurrency deliberately intersperses extremely long messages // and spawns tests concurrently in order to ensure the client correctly returns // responses to the thread that requested them @@ -115,12 +120,14 @@ func TestGRPCCodecConcurrency(t *testing.T) { // We will keep track of ins and outs for log prints var qCount atomic.Uint32 + log.Println("encoding received items") + wg.Add(1) for i := range generated { go func(i int) { - log.Println("processing item", i) + log.Println("encode processing item", i) // we need to wait until all messages process before collating the // results @@ -169,7 +176,7 @@ func TestGRPCCodecConcurrency(t *testing.T) { for item := range stringChan { counter++ - log.Println("collating item", item.seq, "items done:", counter) + log.Println("collating encode item", item.seq, "items done:", counter) // place items back in the sequence position they were created in encoded[item.seq] = item.str if counter >= testItems { @@ -177,6 +184,75 @@ func TestGRPCCodecConcurrency(t *testing.T) { } } + // To create a collection that can be sorted easily after creation back into + // an ordered slice, we create a buffered channel with enough buffers to + // hold all of the items we will feed into it + bytesChan := make(chan SequencedBytes, testItems) + + log.Println("decoding received items") + + wg.Add(1) + for i := range encoded { + + go func(i int) { + + log.Println("decode processing item", i) + + // we need to wait until all messages process before collating the + // results + wg.Add(1) + qCount.Inc() + + log.Println( + "decode request", i, "sending,", + qCount.Load(), "items in queue", + ) + + // send out the query and wait for the response + decRes := <-dec( + &proto.DecodeRequest{ + EncodedString: encoded[i], + }, + ) + + // push the returned result into our channel buffer with the item + // sequence number, so it can be reordered correctly + bytesChan <- SequencedBytes{ + seq: i, + byt: decRes.GetData(), + } + + wg.Done() + qCount.Dec() + + log.Println( + "decode request", i, "response received,", + qCount.Load(), "items in queue", + ) + + }(i) + } + wg.Done() + + // Wait until all results are back so we can assemble them in order for + // checking + wg.Wait() + + decoded := make([][]byte, testItems) + + counter = 0 + + for item := range bytesChan { + + counter++ + log.Println("collating decode item", item.seq, "items done:", counter) + // place items back in the sequence position they were created in + decoded[item.seq] = item.byt + if counter >= testItems { + break + } + } + log.Println("shutting down client and server") // wg.Wait() stopCli()