温馨提示:本文需要结合上一篇 gRPC 文章 一起食用,否则可能看不懂。
前面一篇文章松哥和大家聊了 gRPC 的基本用法,今天我们再来稍微深入一点点,来看下 gRPC 中四种不同的通信模式。
gRPC 中四种不同的通信模式分别是:
一元 RPC
服务端流 RPC
客户端流 RPC
双向流 RPC
接下来松哥就通过四个完整的案例,来分别和向伙伴们演示这四种不同的通信模式。
1. 准备工作 关于 gRPC 的基础知识我们就不啰嗦了,咱们直接来看我今天的 proto 文件,如下:
这次我新建了一个名为 book.proto 的文件,这里主要定义了一些图书相关的方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 syntax = "proto3" ; option java_multiple_files = true ;option java_package = "org.javaboy.grpc.demo" ;option java_outer_classname = "BookServiceProto" ;import "google/protobuf/wrappers.proto" ;package book;service BookService { rpc addBook(Book) returns (google.protobuf.StringValue) ; rpc getBook(google.protobuf.StringValue) returns (Book) ; rpc searchBooks(google.protobuf.StringValue) returns (stream Book) ; rpc updateBooks(stream Book) returns (google.protobuf.StringValue) ; rpc processBooks(stream google.protobuf.StringValue) returns (stream BookSet) ; } message Book { string id = 1 ; repeated string tags = 2 ; string name = 3 ; float price = 4 ; string author = 5 ; } message BookSet { string id = 1 ; repeated Book bookList = 3 ; }
这个文件中,有一些内容我们在上篇文章 中都讲过了,讲过的我就不再重复了,我说一些上篇文章 没有涉及到的东西:
由于我们在这个文件中,引用了 Google 提供的 StringValue(google.protobuf.StringValue
),所以这个文件上面我们首先用 import 导入相关的文件,导入之后,才可以使用。
在方法参数和返回值中出现的 stream,就表示这个方法的参数或者返回值是流的形式(其实就是数据可以多次传输)。
message 中出现了一个上篇文章 没有的关键字 repeated,这个表示这个字段可以重复,可以简单理解为这就是我们 Java 中的数组。
好了,和上篇文章 相比,本文主要就是这几个地方不一样。
proto 文件写好之后,按照上篇文章 介绍的方法进行编译,生成对应的代码,这里就不再重复了。
2. 一元 RPC 一元 RPC 是一种比较简单的 RPC 模式,其实说白了我们上篇文章 和大家介绍的就是一种一元 RPC,也就是客户端发起一个请求,服务端给出一个响应,然后请求结束。
上面我们定义的五个方法中,addBook 和 getBook 都算是一种一元 RPC。
2.1 addBook 先来看 addBook 方法,这个方法的逻辑很简单,我们提前在服务端准备一个 Map 用来保存 Book,addBook 调用的时候,就把 book 对象存入到 Map 中,并且将 book 的 ID 返回,大家就这样一件事,来看看服务端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public void addBook (Book request, StreamObserver<StringValue> responseObserver) { bookMap.put(request.getId(), request); responseObserver.onNext(StringValue.newBuilder().setValue(request.getId()).build()); responseObserver.onCompleted(); } }
看过上篇文章 的小伙伴,我觉得这段代码应该很好理解。
客户端调用方式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); addBook(stub); } private static void addBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); stub.addBook(Book.newBuilder().setPrice(99 ).setId("100" ).setName("java" ).setAuthor("javaboy" ).build(), new StreamObserver <StringValue>() { @Override public void onNext (StringValue stringValue) { System.out.println("stringValue.getValue() = " + stringValue.getValue()); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { countDownLatch.countDown(); System.out.println("添加完毕" ); } }); countDownLatch.await(); } }
这里我使用了 CountDownLatch 来实现线程等待,等服务端给出响应之后,客户端再结束。这里在回调的 onNext 方法中,我们就可以拿到服务端的返回值。
2.2 getBook getBook 跟上面的 addBook 类似,先来看服务端代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public void getBook (StringValue request, StreamObserver<Book> responseObserver) { String id = request.getValue(); Book book = bookMap.get(id); if (book != null ) { responseObserver.onNext(book); responseObserver.onCompleted(); } else { responseObserver.onCompleted(); } } }
这个 getBook 就是根据客户端传来的 id,从 Map 中查询到一个 Book 并返回。
客户端调用代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); getBook(stub); } private static void getBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); stub.getBook(StringValue.newBuilder().setValue("2" ).build(), new StreamObserver <Book>() { @Override public void onNext (Book book) { System.out.println("book = " + book); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { countDownLatch.countDown(); System.out.println("查询完毕" ); } }); countDownLatch.await(); } }
小伙伴们大概也能看出来,addBook 和 getBook 基本上操作套路是一模一样的。
3. 服务端流 RPC 前面的一元 RPC,客户端发起一个请求,服务端给出一个响应,请求就结束了。服务端流则是客户端发起一个请求,服务端给一个响应序列,这个响应序列组成一个流。
上面我们给出的 searchBook 就是这样一个例子,searchBook 是传递图书的 tags 参数,然后在服务端查询哪些书的 tags 满足条件,将满足条件的书全部都返回去。
我们来看下服务端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public void searchBooks (StringValue request, StreamObserver<Book> responseObserver) { Set<String> keySet = bookMap.keySet(); String tags = request.getValue(); for (String key : keySet) { Book book = bookMap.get(key); int tagsCount = book.getTagsCount(); for (int i = 0 ; i < tagsCount; i++) { String t = book.getTags(i); if (t.equals(tags)) { responseObserver.onNext(book); break ; } } } responseObserver.onCompleted(); } }
小伙伴们看下,这段 Java 代码应该很好理解:
首先从 request 中提取客户端传来的 tags 参数。
遍历 bookMap,查看每一本书的 tags 是否等于客户端传来的 tags,如果相等,说明添加匹配,则通过 responseObserver.onNext(book);
将这本书写回到客户端。
等所有操作都完成后,执行 responseObserver.onCompleted();
,表示服务端的响应序列结束了,这样客户端也就知道请求结束了。
我们来看看客户端的代码,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); searchBook(stub); } private static void searchBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); stub.searchBooks(StringValue.newBuilder().setValue("明清小说" ).build(), new StreamObserver <Book>() { @Override public void onNext (Book book) { System.out.println(book); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { countDownLatch.countDown(); System.out.println("查询完毕!" ); } }); countDownLatch.await(); } }
客户端的代码好理解,搜索的关键字是 明清小说
,每当服务端返回一次数据的时候,客户端回调的 onNext 方法就会被触发一次,当服务端之行了 responseObserver.onCompleted();
之后,客户端的 onCompleted 方法也会被触发。
这个就是服务端流,客户端发起一个请求,服务端通过 onNext 可以多次写回数据。
4. 客户端流 RPC 客户端流则是客户端发起多个请求,服务端只给出一个响应。
上面的 updateBooks 就是一个客户端流的案例,客户端想要修改图书,可以发起多个请求修改多本书,服务端则收集多次修改的结果,将之汇总然后一次性返回给客户端。
我们先来看看服务端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public StreamObserver<Book> updateBooks (StreamObserver<StringValue> responseObserver) { StringBuilder sb = new StringBuilder ("更新的图书 ID 为:" ); return new StreamObserver <Book>() { @Override public void onNext (Book book) { bookMap.put(book.getId(), book); sb.append(book.getId()) .append("," ); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { responseObserver.onNext(StringValue.newBuilder().setValue(sb.toString()).build()); responseObserver.onCompleted(); } }; } }
客户端每发送一本书来,就会触发服务端的 onNext 方法,然后我们在这方法中进行图书的更新操作,并记录更新结果。最后,我们在 onCompleted 方法中,将更新结果汇总返回给客户端,基本上就是这样一个流程。
我们再来看看客户端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); updateBook(stub); } private static void updateBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); StreamObserver<Book> request = stub.updateBooks(new StreamObserver <StringValue>() { @Override public void onNext (StringValue stringValue) { System.out.println("stringValue.getValue() = " + stringValue.getValue()); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { System.out.println("更新完毕" ); countDownLatch.countDown(); } }); request.onNext(Book.newBuilder().setId("1" ).setName("a" ).setAuthor("b" ).build()); request.onNext(Book.newBuilder().setId("2" ).setName("c" ).setAuthor("d" ).build()); request.onCompleted(); countDownLatch.await(); } }
在客户端这块,updateBooks 方法会返回一个 StreamObserver 对象,调用该对象的 onNext 方法就是给服务端传递数据了,可以传递多个数据,调用该对象的 onCompleted 方法就是告诉服务端数据传递结束了,此时也会触发服务端的 onCompleted 方法,服务端的 onCompleted 方法执行之后,进而触发了客户端的 onCompleted 方法。
5. 双向流 RPC 双向流其实就是 3、4 小节的合体。即客户端多次发送数据,服务端也多次响应数据。
我们先来看下服务端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class BookServiceImpl extends BookServiceGrpc .BookServiceImplBase { private Map<String, Book> bookMap = new HashMap <>(); private List<Book> books = new ArrayList <>(); public BookServiceImpl () { Book b1 = Book.newBuilder().setId("1" ).setName("三国演义" ).setAuthor("罗贯中" ).setPrice(30 ).addTags("明清小说" ).addTags("通俗小说" ).build(); Book b2 = Book.newBuilder().setId("2" ).setName("西游记" ).setAuthor("吴承恩" ).setPrice(40 ).addTags("志怪小说" ).addTags("通俗小说" ).build(); Book b3 = Book.newBuilder().setId("3" ).setName("水浒传" ).setAuthor("施耐庵" ).setPrice(50 ).addTags("明清小说" ).addTags("通俗小说" ).build(); bookMap.put("1" , b1); bookMap.put("2" , b2); bookMap.put("3" , b3); } @Override public StreamObserver<StringValue> processBooks (StreamObserver<BookSet> responseObserver) { return new StreamObserver <StringValue>() { @Override public void onNext (StringValue stringValue) { Book b = Book.newBuilder().setId(stringValue.getValue()).build(); books.add(b); if (books.size() == 3 ) { BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build(); responseObserver.onNext(bookSet); books.clear(); } } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { BookSet bookSet = BookSet.newBuilder().addAllBookList(books).build(); responseObserver.onNext(bookSet); books.clear(); responseObserver.onCompleted(); } }; } }
这段代码没有实际意义,单纯为了给小伙伴们演示双向流,我的操作逻辑是客户端传递多个 ID 到服务端,然后服务端根据这些 ID 构建对应的 Book 对象,然后三个三个一组,再返回给客户端。客户端每次发送一个请求,都会触发服务端的 onNext 方法,我们在这个方法中对请求分组返回。最后如果还有剩余的请求,我们在 onCompleted() 方法中返回。
再来看看客户端的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 public class BookServiceClient { public static void main (String[] args) throws InterruptedException { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost" , 50051 ) .usePlaintext() .build(); BookServiceGrpc.BookServiceStub stub = BookServiceGrpc.newStub(channel); processBook(stub); } private static void processBook (BookServiceGrpc.BookServiceStub stub) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch (1 ); StreamObserver<StringValue> request = stub.processBooks(new StreamObserver <BookSet>() { @Override public void onNext (BookSet bookSet) { System.out.println("bookSet = " + bookSet); System.out.println("=============" ); } @Override public void onError (Throwable throwable) { } @Override public void onCompleted () { System.out.println("处理完毕!" ); countDownLatch.countDown(); } }); request.onNext(StringValue.newBuilder().setValue("a" ).build()); request.onNext(StringValue.newBuilder().setValue("b" ).build()); request.onNext(StringValue.newBuilder().setValue("c" ).build()); request.onNext(StringValue.newBuilder().setValue("d" ).build()); request.onCompleted(); countDownLatch.await(); } }
这个客户端的代码跟第四小节一模一样,不再赘述了。
好啦,这就是松哥和小伙伴们介绍的 gRPC 的四种不同的通信模式,文章中只给出了一些关键代码,如果小伙伴们没看明白,建议结合上篇文章 一起阅读就懂啦~