// Copyright 2022 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. #include "pw_rpc/pwpb/server_reader_writer.h" #include #include "gtest/gtest.h" #include "pw_rpc/pwpb/fake_channel_output.h" #include "pw_rpc/pwpb/test_method_context.h" #include "pw_rpc/service.h" #include "pw_rpc_test_protos/test.rpc.pwpb.h" namespace pw::rpc { namespace { namespace TestRequest = ::pw::rpc::test::pwpb::TestRequest; namespace TestResponse = ::pw::rpc::test::pwpb::TestResponse; namespace TestStreamResponse = ::pw::rpc::test::pwpb::TestStreamResponse; class TestServiceImpl final : public test::pw_rpc::pwpb::TestService::Service { public: Status TestUnaryRpc(const TestRequest::Message&, TestResponse::Message&) { return OkStatus(); } void TestAnotherUnaryRpc(const TestRequest::Message&, PwpbUnaryResponder&) {} void TestServerStreamRpc(const TestRequest::Message&, PwpbServerWriter&) {} void TestClientStreamRpc( PwpbServerReader&) {} void TestBidirectionalStreamRpc( PwpbServerReaderWriter&) {} }; template struct ReaderWriterTestContext { using Info = internal::MethodInfo; static constexpr uint32_t kChannelId = 1; ReaderWriterTestContext() : channel(Channel::Create(&output)), server(span(&channel, 1)) {} TestServiceImpl service; PwpbFakeChannelOutput<4> output; Channel channel; Server server; }; using test::pw_rpc::pwpb::TestService; TEST(PwpbUnaryResponder, DefaultConstructed) { PwpbUnaryResponder call; ASSERT_FALSE(call.active()); EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId); EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus())); call.set_on_error([](Status) {}); } TEST(PwpbServerWriter, DefaultConstructed) { PwpbServerWriter call; ASSERT_FALSE(call.active()); EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId); EXPECT_EQ(Status::FailedPrecondition(), call.Write({})); EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus())); call.set_on_error([](Status) {}); } TEST(PwpbServerReader, DefaultConstructed) { PwpbServerReader call; ASSERT_FALSE(call.active()); EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId); EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus())); call.set_on_next([](const TestRequest::Message&) {}); call.set_on_error([](Status) {}); } TEST(PwpbServerReaderWriter, DefaultConstructed) { PwpbServerReaderWriter call; ASSERT_FALSE(call.active()); EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId); EXPECT_EQ(Status::FailedPrecondition(), call.Write({})); EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus())); call.set_on_next([](const TestRequest::Message&) {}); call.set_on_error([](Status) {}); } TEST(PwpbUnaryResponder, Closed) { ReaderWriterTestContext ctx; PwpbUnaryResponder call = PwpbUnaryResponder::Open< TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service); ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus())); ASSERT_FALSE(call.active()); EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId); EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus())); call.set_on_error([](Status) {}); } TEST(PwpbServerWriter, Closed) { ReaderWriterTestContext ctx; PwpbServerWriter call = PwpbServerWriter::Open< TestService::TestServerStreamRpc>( ctx.server, ctx.channel.id(), ctx.service); ASSERT_EQ(OkStatus(), call.Finish(OkStatus())); ASSERT_FALSE(call.active()); EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId); EXPECT_EQ(Status::FailedPrecondition(), call.Write({})); EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus())); call.set_on_error([](Status) {}); } TEST(PwpbServerReader, Closed) { ReaderWriterTestContext ctx; PwpbServerReader call = PwpbServerReader::Open< TestService::TestClientStreamRpc>( ctx.server, ctx.channel.id(), ctx.service); ASSERT_EQ(OkStatus(), call.Finish({}, OkStatus())); ASSERT_FALSE(call.active()); EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId); EXPECT_EQ(Status::FailedPrecondition(), call.Finish({}, OkStatus())); call.set_on_next([](const TestRequest::Message&) {}); call.set_on_error([](Status) {}); } TEST(PwpbServerReaderWriter, Closed) { ReaderWriterTestContext ctx; PwpbServerReaderWriter call = PwpbServerReaderWriter:: Open( ctx.server, ctx.channel.id(), ctx.service); ASSERT_EQ(OkStatus(), call.Finish(OkStatus())); ASSERT_FALSE(call.active()); EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId); EXPECT_EQ(Status::FailedPrecondition(), call.Write({})); EXPECT_EQ(Status::FailedPrecondition(), call.Finish(OkStatus())); call.set_on_next([](const TestRequest::Message&) {}); call.set_on_error([](Status) {}); } TEST(PwpbUnaryResponder, Open_ReturnsUsableResponder) { ReaderWriterTestContext ctx; PwpbUnaryResponder responder = PwpbUnaryResponder::Open< TestService::TestUnaryRpc>(ctx.server, ctx.channel.id(), ctx.service); ASSERT_EQ(OkStatus(), responder.Finish({.value = 4321, .repeated_field = {}})); EXPECT_EQ(ctx.output.last_response().value, 4321); EXPECT_EQ(ctx.output.last_status(), OkStatus()); } TEST(PwpbServerWriter, Open_ReturnsUsableWriter) { ReaderWriterTestContext ctx; PwpbServerWriter responder = PwpbServerWriter::Open< TestService::TestServerStreamRpc>( ctx.server, ctx.channel.id(), ctx.service); ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321})); ASSERT_EQ(OkStatus(), responder.Finish()); EXPECT_EQ(ctx.output.last_response().number, 321u); EXPECT_EQ(ctx.output.last_status(), OkStatus()); } TEST(PwpbServerReader, Open_ReturnsUsableReader) { ReaderWriterTestContext ctx; PwpbServerReader responder = PwpbServerReader::Open< TestService::TestClientStreamRpc>( ctx.server, ctx.channel.id(), ctx.service); ASSERT_EQ(OkStatus(), responder.Finish({.chunk = {}, .number = 321})); EXPECT_EQ(ctx.output.last_response().number, 321u); } TEST(PwpbServerReaderWriter, Open_ReturnsUsableReaderWriter) { ReaderWriterTestContext ctx; PwpbServerReaderWriter responder = PwpbServerReaderWriter:: Open( ctx.server, ctx.channel.id(), ctx.service); ASSERT_EQ(OkStatus(), responder.Write({.chunk = {}, .number = 321})); ASSERT_EQ(OkStatus(), responder.Finish(Status::NotFound())); EXPECT_EQ(ctx.output.last_response() .number, 321u); EXPECT_EQ(ctx.output.last_status(), Status::NotFound()); } TEST(RawServerReaderWriter, Open_UnknownChannel) { ReaderWriterTestContext ctx; ASSERT_EQ(OkStatus(), ctx.server.CloseChannel(ctx.kChannelId)); PwpbServerReaderWriter call = PwpbServerReaderWriter:: Open( ctx.server, ctx.kChannelId, ctx.service); EXPECT_TRUE(call.active()); EXPECT_EQ(call.channel_id(), ctx.kChannelId); EXPECT_EQ(Status::Unavailable(), call.Write({})); ASSERT_EQ(OkStatus(), ctx.server.OpenChannel(ctx.kChannelId, ctx.output)); EXPECT_EQ(OkStatus(), call.Write({})); EXPECT_TRUE(call.active()); EXPECT_EQ(OkStatus(), call.Finish()); EXPECT_FALSE(call.active()); EXPECT_EQ(call.channel_id(), Channel::kUnassignedChannelId); } TEST(RawServerReaderWriter, Open_MultipleTimes_CancelsPrevious) { ReaderWriterTestContext ctx; PwpbServerReaderWriter one = PwpbServerReaderWriter:: Open( ctx.server, ctx.kChannelId, ctx.service); std::optional error; one.set_on_error([&error](Status status) { error = status; }); ASSERT_TRUE(one.active()); PwpbServerReaderWriter two = PwpbServerReaderWriter:: Open( ctx.server, ctx.kChannelId, ctx.service); EXPECT_FALSE(one.active()); EXPECT_TRUE(two.active()); EXPECT_EQ(Status::Cancelled(), error); } TEST(PwpbServerReader, CallbacksMoveCorrectly) { PW_PWPB_TEST_METHOD_CONTEXT(TestServiceImpl, TestClientStreamRpc) ctx; PwpbServerReader call_1 = ctx.reader(); ASSERT_TRUE(call_1.active()); TestRequest::Message received_request = {.integer = 12345678, .status_code = 1}; call_1.set_on_next([&received_request](const TestRequest::Message& value) { received_request = value; }); PwpbServerReader call_2; call_2 = std::move(call_1); constexpr TestRequest::Message request{.integer = 600613, .status_code = 2}; ctx.SendClientStream(request); EXPECT_EQ(request.integer, received_request.integer); EXPECT_EQ(request.status_code, received_request.status_code); } } // namespace } // namespace pw::rpc