gRPC Client Streaming

Pankaj Kumar
7 min readJan 20, 2023
Photo by Joan Gamell on Unsplash

While REST is the de facto standard for public API, gRPC is increasingly becoming one of the most popular choices for inter-process microservices communication. gRPC is a strongly typed, polyglot, efficient, fast RPC framework that supports unary (request and reply) and streaming (client and server-side) communication patterns.

The gRPC client streaming RPC allows a client to send a stream of messages to a server. This differs from the unary RPC, where the client sends a single request, and the server sends back a single response. With client streaming, the client sends multiple messages to the server, and the server can process them as they arrive before sending a single response back to the client. This is useful for scenarios where the client has a lot of data to send to the server and wants to stream it in chunks rather than sending it all at once.

In this article, we will understand how to implement gRPC client streaming RPC in Java.

Let’s get started!

What is gRPC Client Streaming RPC?

In the client streaming RPC, a gRPC client sends a stream of messages, and the gRPC server responds with a single message. The streams of messages can be followed by a signal to notify the server about the end of the stream. The client sends this signal as an onComplete() method call (more about this later).

Client Streaming Use Cases

The gRPC client streaming can be used to address all streaming scenarios, such as:

  1. Real-time location data sharing.
  2. Multiplayer games.
  3. When a client needs to send lots of data to the server.
  4. Real-time driver location in ride-hailing apps.

The gRPC client streaming is used by Lyft to share real-time driver locations with the user.

Client Streaming Service Definition

Let’s consider a use case of a ride-hailing app. In this example, as soon as the driver starts the trip, the client application (mobile app used by the driver) sends the real-time location to the server. When the trip finishes, the server summarises and sends the total distance covered and the charge.

For the above use case, we can define the gRPC service with stream keyword in request as:

syntax = "proto3";
package ride_sharing.trip;

option java_package = "dev.techdozo.ride_sharing.trip";

message TripDataRequest {
RideType ride_type = 1;
double latitude = 2;
double longitude = 3;
}

message RideType {
string driver_id = 1;
string ride_id = 2;
}

message TripSummaryResponse {
double distance = 1;
double charge = 2;
}

service TripService {
rpc SendTripData(stream TripDataRequest) returns (TripSummaryResponse);
}

Code Generation

For the above protobuf service definition, we can generate client and server stubs using the potoc compiler. The protoc compiler supports code generation in many different languages.

For our example, we will use Protobuf Gradle Plugin to generate source code in Java. The protocol buffer plugin assembles the Protobuf Compiler ( protoc) command line and uses it to generate Java source files from the proto files. The generated java source files should be added to the , in the build.gradle, so they can be compiled along with Java classes.

sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}

Running the command gradlew build generates source code in the directory build/generated/source/proto/main/grpc and build/generated/source/proto/main/java.

gRPC Server

The gRPC server implements services defined in the proto files and exposes those as RPC API.

In our case, the gRPC server has one RPC method rpc SendTripData(stream TripDataRequest) returns (TripSummaryResponse). Let's, see how to implement server-side service gRPC API for client streaming.

Implementing Service Definition

Once the server stub is generated, we need to override sendTripData method in the autogenerated abstract class TripServiceGrpc.TripServiceImplBase and return StreamObserver object as:

public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {

@Override
public StreamObserver<TripDataRequest> sendTripData(
StreamObserver<TripSummaryResponse> responseObserver) {

return new StreamObserver<TripDataRequest>() {
private String rideId;

@Override
public void onNext(TripDataRequest request) {
// called when a client sends a message
//....
}

@Override
public void onError(Throwable t) {
// called when a client sends an error
//....
}

@Override
public void onCompleted() {
// called when client completes
//....
}
};
}
}

In the above code,

  1. The StreamObserver's onNext method is called every time the client makes a streaming request. Here, we can store the streaming data in the database for trip summary calculation.
  2. The StreamObserver's onError method is when a client sends an error.
  3. The StreamObserver's onCompleted method is when a client completes the streaming call by calling tripDataRequestStreamObserver.onCompleted(). In this method, the server sends trip summary data to the client.

Server Implementation

public class RideSharingAPI extends TripServiceGrpc.TripServiceImplBase {

private final TripRepository tripRepository;
private final TripSummaryService tripSummaryService;

@Override
public StreamObserver<TripDataRequest> sendTripData(
StreamObserver<TripSummaryResponse> responseObserver) {

return new StreamObserver<TripDataRequest>() {
private String rideId;

@Override
public void onNext(TripDataRequest request) {
// Save Ride data
tripRepository.saveTripData(
new RideData(
request.getRideType().getDriverId(),
request.getRideType().getRideId(),
request.getLatitude(),
request.getLongitude()));
log.info("Driver Id {} - object {}", request.getRideType().getDriverId(), this);
this.rideId = request.getRideType().getRideId();
}

@Override
public void onError(Throwable t) {
log.error("Error while processing request ");
}

@Override
public void onCompleted() {
// Once the trip is completed then, generate trip summary
var tripSummary = tripSummaryService.getTripSummary(rideId);
responseObserver.onNext(
TripSummaryResponse.newBuilder()
.setDistance(tripSummary.getDistance())
.setCharge(tripSummary.getCharge())
.build());
log.info("Request completed");
}
};
}
}

Registering Service

To expose the RPC service RideSharingAPI, we need to create a gRPC server instance and register the service by calling the addService method. The server listens to the specified port and dispatches all requests to the relevant service.

public RideSharingServer(int port) {
this.port = port;
var rideSharingAPI = new RideSharingAPI(...);
this.server = ServerBuilder.forPort(port).addService(rideSharingAPI).build();
}

Implementing gRPC Client Stub

The first thing we need to do to implement a gRPC client is to generate client stubs using the proto files and then create a gRPC channel specifying the server address and port as ManagedChannelBuilder.forAddress(host, port).usePlaintext().build().

The channel represents a virtual connection to an endpoint to perform RPC.

You can create the client stub using the newly created channel:

var managedChannel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext().build();
TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc
.newStub(managedChannel);

There are two types of client stubs:

  • Blocking: The BlockingStub, which waits until it receives a server response.
  • Non-Blocking: The NonBlockingStub doesn't wait for a server response but instead registers an observer to receive the response.

As the client streaming API is always async, we need to create an async stub as TripServiceGrpc.newStub(managedChannel).

public class RideSharingClient {
private final String host;
private final int port;

public RideSharingClient(String host, int port) {
this.host = host;
this.port = port;
}

public void callServer() {

log.info("Calling Server..");
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);
//...
//...

}

}

Similar to the unary asynchronous client stub, we can register a callback as:

StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
tripServiceStub.sendTripData(new TripSummaryCallback());

TripSummaryCallback is of type StreamObserver, which has three methods - onNext, onError and onCompleted. The callback methods are called when the server needs to send some data. For example, the onNext method is called when the server calls the onCompleted method.

TripSummaryCallback Implementation

private static class TripSummaryCallback implements StreamObserver<TripSummaryResponse> {

@Override
public void onNext(TripSummaryResponse tripSummaryResponse) {
log.info(
"Trip Summary : distance {}, charge {} ",
tripSummaryResponse.getDistance(),
tripSummaryResponse.getCharge());
}

@Override
public void onError(Throwable cause) {
log.error("Error occurred, cause {}", cause.getMessage());
}

@Override
public void onCompleted() {
log.info("Stream completed");
}
}

In the above code,

  1. The onNext method of this TripSummaryCallback is called once the server sends TripSummary via the call-back method onCompleted.
  2. The onError method is called when the server sends an error. For example, the server can throw an error if it detects the wrong latitude or longitude values.
  3. In the case of client streaming, the onCompleted method is never called. As the same API is used for bidirectional and client streaming, we can leave onCompleted unimplemented.

Client Implementation

public void callServer() {

log.info("Calling Server..");
var managedChannel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
TripServiceGrpc.TripServiceStub tripServiceStub = TripServiceGrpc.newStub(managedChannel);

StreamObserver<Service.TripDataRequest> tripDataRequestStreamObserver =
tripServiceStub.sendTripData(new TripSummaryCallback());

IntStream.range(0, 1000)
.mapToObj(
n ->
Service.TripDataRequest.newBuilder()
.setRideType(
Service.RideType.newBuilder()
.setDriverId("Driver_1")
.setRideId("Ride_" + n)
.build())
.setLatitude(ThreadLocalRandom.current().nextDouble(-90, 90))
.setLongitude(ThreadLocalRandom.current().nextDouble(-180, 180))
.build())
.forEach(tripDataRequestStreamObserver::onNext);

log.info("Calling complete..");
tripDataRequestStreamObserver.onCompleted();
Thread.sleep(30000);
}
  1. We have used IntStream.range(0, 1000)...forEach(tripDataRequestStreamObserver::onNext) to demonstrate the client streaming scenario. In real life, this call will be made by a mobile client by reading GPS data.
  2. Once the trip is completed, the client indicates to the server by calling tripDataRequestStreamObserver.onCompleted(). Then, the server computes the trip summary and calls the callback method onCompleted, and the client receives the trip summary in onNext method of TripSummaryCallback.
  3. Thread.sleep(30000) so the program doesn't exit before the client receives the callback.

Code Example

The working code example of this article is listed on GitHub. To run the example, clone the repository and import grpc-client-streaming as a project in your favorite IDE as a Gradle project.

Summary

gRPC has built-in streaming support. The gRPC client streaming can be used for many use cases, such as real-time location data sharing, multiplayer games, when a client needs to send lots of data to the server, etc.

Originally published at https://techdozo.dev on January 20, 2023.

--

--

Pankaj Kumar

Software Architect @ Schlumberger ``` Cloud | Microservices | Programming | Kubernetes | Architecture | Machine Learning | Java | Python ```