gRPC: synchronous and asynchronous Server streaming RPC
gRPC, one of the most popular RPC frameworks for inter-process microservices communication, supports both unary and streaming RPC. Contrary to unary RPC, in a streaming RPC, a client sends a single request, and in return, the server sends a stream of messages.
In this article, we will see how to implement server streaming RPC and how to handle errors in streaming response.
gRPC RPC type
The gRPC supports four types of RPC:
- Unary RPC: the client sends a single request and receives a single response.
- Server streaming RPC: the client sends a single request and in return, the server sends a stream of messages.
- Client streaming RPC: the client sends a stream of messages and the server responds with a single message.
- Bidirectional streaming RPC: in bidirectional streaming, both client and server send a stream of messages.
Additionally, a gRPC RPC can be synchronous or asynchronous.
- Synchronous: a client call waits for the server to respond.
- Asynchronous: client makes non-blocking calls to the server, and the server returns the response asynchronously.
What is gRPC server streaming RPC?
In the server streaming RPC, a gRPC client sends a single message, and in reply, the gRPC server sends a bunch of messages. The streams of messages are followed by a signal to notify the client about the end of the stream. This signal is sent by the server in the form of a onComplete()
method call (more about this later).
Server streaming service definition
Let’s consider a use case where a client calls the server with a list of ids and the server returns the stream of messages instead of returning all responses in one go. For such use cases, you can define a service with stream
keyword in return type as returns (stream <message>)
. One example of such service is shown below.
With respect to the service definition, the only difference between unary and server-streaming is the use of the stream
keyword, and the rest remains the same.
Protobuf code generation
You can use the protoc compiler to generate the stubs for the client and server.
The protobuf gradle plugin is a Gradle plugin that compiles protobuf (.proto) files and generates client and server code in java.
Running command gradlew build
generates source code in the directory build/generated/source/proto/main/grpc
and build/generated/source/proto/main/java
. The generated java source files should be added to the sourceSet so that they can be compiled along with Java sources.
Code Example
The working code example of this article is listed on GitHub . To run the example, clone the repository, and import grpc-server-streaming-rpc as a project in your favorite IDE as a gradle project.
To build the project and generate client and server stubs, run the command .\gradlew clean build
. You can start the gRPC server in IDE by running the main method of the class GrpcServer
. The gRPC server runs on localhost:3000.
Implementing server code
The gRPC server provides the implementation of service
methods defined in the proto files and exposes those as RPC methods. Broadly, implementing server streaming code includes three steps.
- Generate server stub: the first step is to generate server stubs. For this, run the command
.\gradlew clean build
. The Gradle protobuf plugin generates a server stub in the directorybuild/generated/source/proto/
. - Implement service class: the next step is to implement RPC service by extending from autogenerated class
ProductServiceGrpc.ProductServiceImplBase
and overridelistProduct
method (check service implementation code below). You can delegate the call to business logic from the service class. For example, the service methodlistService
can callrepository
to fetchproducts
from database. - Implement server code: the last step is to implement server code and register the service. This can be done as
ServerBuilder.forPort(port).addService(new ProductService()).build()
.
Service Implementation
In the above code, the call to the responseObserver.onCompleted()
indicates that the stream response has been completed.
Server code
Running gRPC server
You can start the gRPC server from IDE. For that, you can run the main method of dev.techdozo.product.GrpcServer
. However, for production, you may want to deploy the server as a container or as a stand-alone application.
Implementing gRPC client channel
A gRPC channel represents a virtual connection to an endpoint to perform RPC.
You can create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build()
.
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
Creation of a channel is expensive, so make sure to create a channel once and reuse it.
gRPC supports two types of client stubs:
- blocking/synchronous stub: in this stub, the RPC call waits for the server to respond.
- non-blocking/asynchronous stub: client makes non-blocking calls to the server, where the response is returned asynchronously.
Implementing blocking client stub
To create a synchronous/blocking client stub, use the newBlockingStub
static method of auto-generated ProductServiceGrpc
.
To run the blocking client example, run the main method of the class dev.techdozo.order.client.UnaryGrpcBlockingClient
from IDE. At the same time, make sure that the gRPC server is running. Once run, the client prints log like:
[INFO ] 2021-12-19 15:11:19.618 [main] UnaryGrpcBlockingClient - Received Product from server, info product { name: "Apple iPhone 12 Pro (128GB)" description: "Apple iPhone 12 Pro (128GB) - Graphite" price: 1617.29 }
As you can infer from logs, both request and response are in the same thread [ main
]. In other words, the client blocks until the response are returned by the server.
Implementing asynchronous client stub
For most use cases a blocking RPC suffices. However, the blocking RPC waits for the server to return a response and thus wasting precious CPU cycles. You can use an asynchronous client stub to overcome this problem by registering a callback. This callback is called in a different thread, once the server sends the response.
To implement asynchronous client stubs, use the newStub
static method of ProductServiceGrpc
.
var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);
and register a callback as:
where callback is defined as:
To run the asynchronous client example, run the main method of class dev.techdozo.order.client.UnaryGrpcAsynClient
from IDE. Once run, the client prints log like:
[INFO ] 2021-12-19 15:16:11.798 [main] UnaryGrpcAsynClient - Calling Server.. [INFO ] 2021-12-19 15:16:14.046 [grpc-default-executor-1] UnaryGrpcAsynClient - Received product, product { name: "Apple iPhone 12 Pro Max (128GB)" description: "Apple iPhone 12 Pro (128GB) - Red" price: 1752.59 }
Did you notice that the callback happens in a different thread grpc-default-executor-1 than the main thread?
For the callback, gRPC uses a cached thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. If you want you can provide your own thread pool as:
var executorService = Executors.newFixedThreadPool(10); var managedChannel = ManagedChannelBuilder.forAddress(host, port).executor(executorService) .usePlaintext() .build();
Handling errors in stream response
If you recall, we sent the error response Status.NOT_FOUND
in case the product is not found for a given id. Sending an error response terminates the stream. This can be a problem if you want to read complete stream response even in case of error and handle error separately.
Consider a scenario when you call a server with product ids product-1
, product- 2
, product- 3
where product-2
is NOT_FOUND
on the server and the server responds as responseObserver.onError(new StatusException(Status.NOT_FOUND))
. In this case, the server will throw an error for product-3
as " Stream was terminated by error, no further calls are allowed" and on the client-side, you will not get the response of product-3
even if you are interested.
SEVERE: Exception while executing runnable io.grpc.internal.Ser[email protected]42f8c429 java.lang.IllegalStateException: Stream was terminated by error, no further calls are allowed at com.google.common.base.Preconditions.checkState(Preconditions.java:511)
One important point to note here is that calling onError on the server just terminates the stream and it has no impact on the gRPC connection. If you want, you can reuse the same ManagedChannel, which represents a virtual connection to the server, and call the server again.
How to handle errors in the gRPC stream?
As you know by now, sending an error response terminates the stream. So clearly, you can’t call the responseObserver.onError
method on the gRPC server in the case of business errors such as NOT_FOUND
. To handle such errors, you need to send the error response as part of the stream message itself. This is where the protobuf oneOf
feature comes in handy.
Using the protobuf oneOf
, you can specify that the server either sends an error response or a valid response as:
message GetProductResponse {
oneof product_response {
Product product = 1;
google.rpc.Status error = 2;
}
}
On the gRPC server, you need to make changes to send either a valid response or an error response as:
On gRPC client, you can take appropriate action as:
Summary
gRPC is a popular remote procedure call (RPC) framework. The gRPC supports both unary RPC and streaming RPC. In streaming RPC, a client sends a single request and receives a bunch of messages as the response.
A streaming RPC in gRPC can be synchronous or asynchronous. In synchronous RPC, a client call waits for the server to respond. As the name suggests, in asynchronous RPC the server returns the response asynchronously.
Handling business errors using the normal construct of calling responseObserver.onError() doesn’t work. We can use protobuf oneOf to send error responses back to the client.
if you like this article, then please follow me on LinkedIn for more tips on #software architecture.
Originally published at https://techdozo.dev on December 30, 2021.