Photo by JJ Ying on Unsplash

gRPC: synchronous and asynchronous Server streaming RPC

Pankaj Kumar

--

gRPC, one of the most popular RPC frameworks for inter-process microservices communication, supports both unary and streaming RPC. Contrary to unary RPC, in a streaming RPC, a client sends a single request, and in return, the server sends a stream of messages.

In this article, we will see how to implement server streaming RPC and how to handle errors in streaming response.

gRPC RPC type

The gRPC supports four types of RPC:

  1. Unary RPC: the client sends a single request and receives a single response.
  2. Server streaming RPC: the client sends a single request and in return, the server sends a stream of messages.
  3. Client streaming RPC: the client sends a stream of messages and the server responds with a single message.
  4. Bidirectional streaming RPC: in bidirectional streaming, both client and server send a stream of messages.

Additionally, a gRPC RPC can be synchronous or asynchronous.

  • Synchronous: a client call waits for the server to respond.
  • Asynchronous: client makes non-blocking calls to the server, and the server returns the response asynchronously.

What is gRPC server streaming RPC?

In the server streaming RPC, a gRPC client sends a single message, and in reply, the gRPC server sends a bunch of messages. The streams of messages are followed by a signal to notify the client about the end of the stream. This signal is sent by the server in the form of a onComplete() method call (more about this later).

Server streaming service definition

Let’s consider a use case where a client calls the server with a list of ids and the server returns the stream of messages instead of returning all responses in one go. For such use cases, you can define a service with stream keyword in return type as returns (stream <message>). One example of such service is shown below.

With respect to the service definition, the only difference between unary and server-streaming is the use of the stream keyword, and the rest remains the same.

Protobuf code generation

You can use the protoc compiler to generate the stubs for the client and server.

The protobuf gradle plugin is a Gradle plugin that compiles protobuf (.proto) files and generates client and server code in java.

Running command gradlew build generates source code in the directory build/generated/source/proto/main/grpc and build/generated/source/proto/main/java. The generated java source files should be added to the sourceSet so that they can be compiled along with Java sources.

Code Example

The working code example of this article is listed on GitHub . To run the example, clone the repository, and import grpc-server-streaming-rpc as a project in your favorite IDE as a gradle project.

To build the project and generate client and server stubs, run the command .\gradlew clean build. You can start the gRPC server in IDE by running the main method of the class GrpcServer. The gRPC server runs on localhost:3000.

Implementing server code

The gRPC server provides the implementation of service methods defined in the proto files and exposes those as RPC methods. Broadly, implementing server streaming code includes three steps.

  1. Generate server stub: the first step is to generate server stubs. For this, run the command .\gradlew clean build. The Gradle protobuf plugin generates a server stub in the directory build/generated/source/proto/.
  2. Implement service class: the next step is to implement RPC service by extending from autogenerated class ProductServiceGrpc.ProductServiceImplBase and override listProduct method (check service implementation code below). You can delegate the call to business logic from the service class. For example, the service method listService can call repository to fetch products from database.
  3. Implement server code: the last step is to implement server code and register the service. This can be done as ServerBuilder.forPort(port).addService(new ProductService()).build().

Service Implementation

In the above code, the call to the responseObserver.onCompleted() indicates that the stream response has been completed.

Server code

Running gRPC server

You can start the gRPC server from IDE. For that, you can run the main method of dev.techdozo.product.GrpcServer. However, for production, you may want to deploy the server as a container or as a stand-alone application.

Implementing gRPC client channel

A gRPC channel represents a virtual connection to an endpoint to perform RPC.

You can create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build().

var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();

Creation of a channel is expensive, so make sure to create a channel once and reuse it.

gRPC supports two types of client stubs:

  • blocking/synchronous stub: in this stub, the RPC call waits for the server to respond.
  • non-blocking/asynchronous stub: client makes non-blocking calls to the server, where the response is returned asynchronously.

Implementing blocking client stub

To create a synchronous/blocking client stub, use the newBlockingStub static method of auto-generated ProductServiceGrpc.

To run the blocking client example, run the main method of the class dev.techdozo.order.client.UnaryGrpcBlockingClient from IDE. At the same time, make sure that the gRPC server is running. Once run, the client prints log like:

[INFO ] 2021-12-19 15:11:19.618 [main] UnaryGrpcBlockingClient - Received Product from server, info product { name: "Apple iPhone 12 Pro (128GB)" description: "Apple iPhone 12 Pro (128GB) - Graphite" price: 1617.29 }

As you can infer from logs, both request and response are in the same thread [ main]. In other words, the client blocks until the response are returned by the server.

Implementing asynchronous client stub

For most use cases a blocking RPC suffices. However, the blocking RPC waits for the server to return a response and thus wasting precious CPU cycles. You can use an asynchronous client stub to overcome this problem by registering a callback. This callback is called in a different thread, once the server sends the response.

To implement asynchronous client stubs, use the newStub static method of ProductServiceGrpc.

var productServiceAsyncStub = ProductServiceGrpc.newStub(managedChannel);

and register a callback as:

where callback is defined as:

To run the asynchronous client example, run the main method of class dev.techdozo.order.client.UnaryGrpcAsynClient from IDE. Once run, the client prints log like:

[INFO ] 2021-12-19 15:16:11.798 [main] UnaryGrpcAsynClient - Calling Server.. [INFO ] 2021-12-19 15:16:14.046 [grpc-default-executor-1] UnaryGrpcAsynClient - Received product, product { name: "Apple iPhone 12 Pro Max (128GB)" description: "Apple iPhone 12 Pro (128GB) - Red" price: 1752.59 }

Did you notice that the callback happens in a different thread grpc-default-executor-1 than the main thread?

For the callback, gRPC uses a cached thread pool that creates new threads as needed but will reuse previously constructed threads when they are available. If you want you can provide your own thread pool as:

var executorService = Executors.newFixedThreadPool(10); var managedChannel = ManagedChannelBuilder.forAddress(host, port).executor(executorService) .usePlaintext() .build();

Handling errors in stream response

If you recall, we sent the error response Status.NOT_FOUND in case the product is not found for a given id. Sending an error response terminates the stream. This can be a problem if you want to read complete stream response even in case of error and handle error separately.

Consider a scenario when you call a server with product ids product-1, product- 2, product- 3 where product-2 is NOT_FOUND on the server and the server responds as responseObserver.onError(new StatusException(Status.NOT_FOUND)). In this case, the server will throw an error for product-3 as " Stream was terminated by error, no further calls are allowed" and on the client-side, you will not get the response of product-3 even if you are interested.

SEVERE: Exception while executing runnable io.grpc.internal.Ser[email protected]42f8c429 java.lang.IllegalStateException: Stream was terminated by error, no further calls are allowed at com.google.common.base.Preconditions.checkState(Preconditions.java:511)

One important point to note here is that calling onError on the server just terminates the stream and it has no impact on the gRPC connection. If you want, you can reuse the same ManagedChannel, which represents a virtual connection to the server, and call the server again.

How to handle errors in the gRPC stream?

As you know by now, sending an error response terminates the stream. So clearly, you can’t call the responseObserver.onError method on the gRPC server in the case of business errors such as NOT_FOUND. To handle such errors, you need to send the error response as part of the stream message itself. This is where the protobuf oneOf feature comes in handy.

Using the protobuf oneOf, you can specify that the server either sends an error response or a valid response as:

message GetProductResponse { 
oneof product_response {
Product product = 1;
google.rpc.Status error = 2;
}
}

On the gRPC server, you need to make changes to send either a valid response or an error response as:

On gRPC client, you can take appropriate action as:

Summary

gRPC is a popular remote procedure call (RPC) framework. The gRPC supports both unary RPC and streaming RPC. In streaming RPC, a client sends a single request and receives a bunch of messages as the response.

A streaming RPC in gRPC can be synchronous or asynchronous. In synchronous RPC, a client call waits for the server to respond. As the name suggests, in asynchronous RPC the server returns the response asynchronously.

Handling business errors using the normal construct of calling responseObserver.onError() doesn’t work. We can use protobuf oneOf to send error responses back to the client.

if you like this article, then please follow me on LinkedIn for more tips on #software architecture.

Originally published at https://techdozo.dev on December 30, 2021.

--

--

No responses yet