Cancelling gRPC server-streaming call from client

When using gRPC server-streaming call, you may need to interrupt the execution of the request on the server from the client side. 

We have a service on the server:

public class YourServiceImpl extends YourService.YourServiceImplBase {
  @Override 
  public void yourMethod(ProtoRequest request, StreamObserver<ProtoResponse> reponseObserver) {
  	while (...) {
      Object result = heavyCompute();
      reponseObserver.onNext(result);
  	}
    reponseObserver.onCompleted()
  }
}

And the sendRequest() method on the client:

public void sendRequest(ProtoRequest request) {
  ManagedChannel channel = buildChannel();
  YourService.YourServiceBlockingStub stub = YourService.newBlockingStub(channel);
  stub.yourMethod(request);
}

Suppose, that at some moment we need to send a message from the client to the server that the request has been interrupted, and the while loop on the server has stopped. This is can be done using the Context class.

Let’s add the stopRequest() method to the client code for request interrupting:

public void stopRequest() {
  Context.current().withCancellation().cancel(new InterruptedException())
}

You can specify any exception as the parameter for the cancel(). 

Next, add a condition to the body of the while loop on the server:

public class YourServiceImpl extends YourService.YourServiceImplBase {
  @Override 
  public void yourMethod(ProtoRequest request, StreamObserver<ProtoResponse> reponseObserver) {
    while (...) {
      Object result = heavyCompute();
      reponseObserver.onNext(result);
      if (Context.current().isCancelled()) {
        return;
      }
    }
    reponseObserver.onCompleted()
  }
}
Telegram channel

If you still have any questions, feel free to ask me in the comments under this article or write me at promark33@gmail.com.

If I saved your day, you can support me 🤝

Leave a Reply

Your email address will not be published. Required fields are marked *