// Copyright 2024 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. // // Package integration_test implements a client to exercise the pw_grpc server implementation package integration_test import ( "bufio" "context" "fmt" "hash/crc32" "io" "os/exec" "strconv" "strings" "testing" "time" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" pb "google.golang.org/grpc/examples/features/proto/echo" "google.golang.org/grpc/status" ) const port = "3402" func TestUnaryEcho(t *testing.T) { const num_connections = 1 cmd, reader, err := launchServer(t, num_connections) if err != nil { t.Errorf("Failed to launch %v", err) } defer cmd.Wait() conn, echo_client, err := connectServer() if err != nil { t.Errorf("Failed to connect %v", err) } defer conn.Close() go logServer(t, reader) testRPC(t, func(t *testing.T, ctx context.Context, msg string) { t.Logf("call UnaryEcho(%v)", msg) resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: msg}) if err != nil { t.Logf("... failed with error: %v", err.Error()) if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { t.Errorf("Error unexpected %v", err) } } else { t.Logf("... Recv %v", resp) if resp.Message != msg { t.Errorf("Unexpected response %v", resp) } } }) } func TestFragmentedMessage(t *testing.T) { // Test sending successively larger messages, larger than the maximum // HTTP2 data frame size (16384), ensuring messages are fragmented across // frames. const num_connections = 1 cmd, reader, err := launchServer(t, num_connections) if err != nil { t.Errorf("Failed to launch %v", err) } defer cmd.Wait() conn, echo_client, err := connectServer() if err != nil { t.Errorf("Failed to connect %v", err) } defer conn.Close() go logServer(t, reader) const num_calls = 4 for i := 0; i < num_calls; i++ { t.Run(fmt.Sprintf("%d of %d", i+1, num_calls), func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() msg := "crc32:" + strings.Repeat("testmessage!", 1500*(i+1)) checksum := strconv.FormatUint(uint64(crc32.ChecksumIEEE([]byte(msg))), 10) done := make(chan struct{}) go func() { t.Logf("call UnaryChecksum") resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: msg}) if err != nil { t.Logf("... failed with error: %v", err.Error()) if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { t.Errorf("Error unexpected %v", err) } } else { t.Logf("... Recv %v", resp) if resp.Message != checksum { t.Errorf("Unexpected response %v", resp) } } close(done) }() <-done }) } } func TestMultipleConnections(t *testing.T) { const num_connections = 3 cmd, reader, err := launchServer(t, num_connections) if err != nil { t.Errorf("Failed to launch %v", err) } defer cmd.Wait() go logServer(t, reader) for i := 0; i < num_connections; i++ { t.Run(fmt.Sprintf("connection %d of %d", i+1, num_connections), func(t *testing.T) { conn, echo_client, err := connectServer() if err != nil { t.Errorf("Failed to connect %v", err) } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() resp, err := echo_client.UnaryEcho(ctx, &pb.EchoRequest{Message: "message0"}) if err != nil { t.Errorf("... failed with error: %v", err.Error()) } else { t.Logf("... Recv %v", resp) if resp.Message != "message0" { t.Errorf("Unexpected response %v", resp) } } conn.Close() }) } } func TestServerStreamingEcho(t *testing.T) { const num_connections = 1 cmd, reader, err := launchServer(t, num_connections) if err != nil { t.Errorf("Failed to launch %v", err) } defer cmd.Wait() conn, echo_client, err := connectServer() if err != nil { t.Errorf("Failed to connect %v", err) } defer conn.Close() go logServer(t, reader) testRPC(t, func(t *testing.T, ctx context.Context, msg string) { t.Logf("call ServerStreamingEcho(%v)", msg) client, err := echo_client.ServerStreamingEcho(ctx, &pb.EchoRequest{Message: msg}) if err != nil { t.Errorf("... failed with error: %v", err) return } for { resp, err := client.Recv() if err == io.EOF { t.Logf("... completed") return } if err != nil { t.Logf("... Recv failed with error: %v", err) if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { t.Errorf("Error unexpected %v", err) } return } t.Logf("... Recv %v", resp) if resp.Message != msg && resp.Message != "done" { t.Errorf("Unexpected response %v", resp) } } }) } func TestClientStreamingEcho(t *testing.T) { const num_connections = 1 cmd, reader, err := launchServer(t, num_connections) if err != nil { t.Errorf("Failed to launch %v", err) } defer cmd.Wait() conn, echo_client, err := connectServer() if err != nil { t.Errorf("Failed to connect %v", err) } defer conn.Close() go logServer(t, reader) testRPC(t, func(t *testing.T, ctx context.Context, msg string) { t.Logf("call ClientStreamingEcho()") client, err := echo_client.ClientStreamingEcho(ctx) if err != nil { t.Errorf("... failed with error: %v", err) return } for i := 0; i < 3; i++ { t.Logf("... Send %v", msg) if err := client.Send(&pb.EchoRequest{Message: msg}); err != nil { t.Errorf("... Send failed with error: %v", err) return } } if err := client.CloseSend(); err != nil { t.Errorf("... CloseSend failed with error: %v", err) return } resp, err := client.CloseAndRecv() if err != nil { t.Logf("... CloseAndRecv failed with error: %v", err) if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { t.Errorf("Error unexpected %v", err) } } else { t.Logf("... CloseAndRecv %v", resp) if resp.Message != "done" { t.Errorf("Unexpected response %v", resp) } } }) } func TestBidirectionalStreamingEcho(t *testing.T) { const num_connections = 1 cmd, reader, err := launchServer(t, num_connections) if err != nil { t.Errorf("Failed to launch %v", err) } defer cmd.Wait() conn, echo_client, err := connectServer() if err != nil { t.Errorf("Failed to connect %v", err) } defer conn.Close() go logServer(t, reader) testRPC(t, func(t *testing.T, ctx context.Context, msg string) { t.Logf("call BidirectionalStreamingEcho()") client, err := echo_client.BidirectionalStreamingEcho(ctx) if err != nil { t.Logf("... failed with error: %v", err) return } for i := 0; i < 3; i++ { t.Logf("... Send %v", msg) if err := client.Send(&pb.EchoRequest{Message: msg}); err != nil { t.Errorf("... Send failed with error: %v", err) return } } if err := client.CloseSend(); err != nil { t.Logf("... CloseSend failed with error: %v", err) return } for { resp, err := client.Recv() if err == io.EOF { t.Logf("... completed") return } if err != nil { t.Logf("... Recv failed with error: %v", err) if msg != "quiet" || status.Convert(err).Code() != codes.Canceled { t.Errorf("Error unexpected %v", err) } return } t.Logf("... Recv %v", resp) if resp.Message != msg { t.Errorf("Unexpected response %v", resp) } } }) } func logServer(t *testing.T, reader *bufio.Reader) { for { line, err := reader.ReadString('\n') if err != nil { break } t.Logf("SERVER: %v", line) } } func launchServer(t *testing.T, num_connections int) (*exec.Cmd, *bufio.Reader, error) { cmd := exec.Command("./test_pw_rpc_server", port, strconv.Itoa(num_connections)) output, err := cmd.StdoutPipe() if err != nil { t.Errorf("Failed to get stdout of server %v", err) return nil, nil, err } if err := cmd.Start(); err != nil { t.Errorf("Failed to launch server %v", err) return nil, nil, err } reader := bufio.NewReader(output) for { line, _ := reader.ReadString('\n') if strings.Contains(line, "Accept") { break } } return cmd, reader, nil } func connectServer() (*grpc.ClientConn, pb.EchoClient, error) { addr := "localhost:" + port conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, nil, err } echo_client := pb.NewEchoClient(conn) return conn, echo_client, nil } func testRPC(t *testing.T, call func(t *testing.T, ctx context.Context, msg string)) { const num_calls = 30 for i := 0; i < num_calls; i++ { t.Run(fmt.Sprintf("%d of %d", i+1, num_calls), func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() msg := fmt.Sprintf("message%d", i) if i == num_calls-1 { msg = "quiet" } done := make(chan struct{}) go func() { call(t, ctx, msg) close(done) }() // Test cancellation. When we sent "quiet", the server won't echo anything // back and instead will hold onto the request. Sleep a bit to make sure // the server doesn't respond. Then cancel the request, which should // complete the RPC. if msg == "quiet" { time.Sleep(100 * time.Millisecond) cancel() } <-done }) } }