La question se pose souvent de savoir comment accélérer le gRPC. gRPC permet un RPC haute performance, mais il n'est pas toujours clair comment atteindre ces performances. Et j'ai décidé d'essayer de montrer mon fil de pensée lors de l'optimisation des programmes.
"-", . . . , , . gRPC . , .
KvClient â "-". , , . , .
KvService â "-". . 10 50 , .
KvRunner â . , , , . Runner 60 , RPC.
kvstore.proto â Protocol Buffers . , . Create, Retrieve, Update Delete ( CRUD). , . REST, .
Protocol Buffers gRPC â , . gRPC. gRPC-, , (stub, ).
, , , , . , RPC. , create:
private void doCreate(KeyValueServiceBlockingStub stub) {
ByteString key = createRandomKey();
try {
CreateResponse res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
if (!res.equals(CreateResponse.getDefaultInstance())) {
throw new RuntimeException("Invalid response");
}
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Code.ALREADY_EXISTS) {
knownKeys.remove(key);
logger.log(Level.INFO, "Key already existed", e);
} else {
throw e;
}
}
}
. , . , , , , , . , , , . , . , - , . .
gRPC API, . gRPC-, . , RPC.
private final Map<ByteBuffer, ByteBuffer> store = new HashMap<>();
@Override
public synchronized void create(
CreateRequest request, StreamObserver<CreateResponse> responseObserver) {
ByteBuffer key = request.getKey().asReadOnlyByteBuffer();
ByteBuffer value = request.getValue().asReadOnlyByteBuffer();
simulateWork(WRITE_DELAY_MILLIS);
if (store.putIfAbsent(key, value) == null) {
responseObserver.onNext(CreateResponse.getDefaultInstance());
responseObserver.onCompleted();
return;
}
responseObserver.onError(Status.ALREADY_EXISTS.asRuntimeException());
}
ByteBuffer
. , , synchronized
. Map
.
, . onNext()
responseObserver
. onCompleted()
.
â , . Ubuntu, 12- 32 . :
$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 1:10:07 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 1:11:07 PM io.grpc.examples.KvRunner runClient
INFO: Did 16.55 RPCs/s
real 1m0.927s
user 0m10.688s
sys 0m1.456s
! 16 RPC . , , . , .
- , , . , , . , .
void doClientWork(AtomicBoolean done) {
Random random = new Random();
KeyValueServiceBlockingStub stub = KeyValueServiceGrpc.newBlockingStub(channel);
while (!done.get()) {
// Pick a random CRUD action to take.
int command = random.nextInt(4);
if (command == 0) {
doCreate(stub);
continue;
}
/* ... */
rpcCount++;
}
}
, RPC. . RPC? , 50 . 20 :
20 = 1000 / (50 / )
16 , . , time
, . simulateWork (sleep). , , RPC.
, (real) (user). , 10 . 16% . , , , , RPC.
. â , . .
gRPC- Java : , ListenableFuture
. . ListenableFuture API â , , . , , , RPC, .
ListenableFuture
. , . , , . , RPC ( ). , RPC , RPC .
. , . . - . doCreate():
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error) {
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> rpcCount.incrementAndGet(), MoreExecutors.directExecutor());
Futures.addCallback(res, new FutureCallback<CreateResponse>() {
@Override
public void onSuccess(CreateResponse result) {
if (!result.equals(CreateResponse.getDefaultInstance())) {
error.compareAndSet(null, new RuntimeException("Invalid response"));
}
synchronized (knownKeys) {
knownKeys.add(key);
}
}
@Override
public void onFailure(Throwable t) {
Status status = Status.fromThrowable(t);
if (status.getCode() == Code.ALREADY_EXISTS) {
synchronized (knownKeys) {
knownKeys.remove(key);
}
logger.log(Level.INFO, "Key already existed", t);
} else {
error.compareAndSet(null, t);
}
}
});
}
KeyValueServiceFutureStub
, Future
. gRPC Java ListenableFuture
, Future
. . , RPC . , .
RPC. , RPC.
RPC , , . doCreate()
RPC, , throw. . , , .
, , knownKeys
, RPC , , , . , , . : knownKeys
, . knownKeys
, knownKeys
, RPC, . , . , . , .
, , :
WARNING: An exception was thrown by io.grpc.netty.NettyClientStream$Sink$1.operationComplete()
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
...
?! , ? . . , . , . : "unable to create new native thread" ( ), . , . OOM , Java , . , , .
, ? RPC , . , RPC, . ListenableFuture
.
RPC. RPC . , RPC. , , . RPC ( ), . (Semaphore):
private final Semaphore limiter = new Semaphore(100);
private void doCreate(KeyValueServiceFutureStub stub, AtomicReference<Throwable> error)
throws InterruptedException {
limiter.acquire();
ByteString key = createRandomKey();
ListenableFuture<CreateResponse> res = stub.create(
CreateRequest.newBuilder()
.setKey(key)
.setValue(randomBytes(MEAN_VALUE_SIZE))
.build());
res.addListener(() -> {
rpcCount.incrementAndGet();
limiter.release();
}, MoreExecutors.directExecutor());
/* ... */
}
.
:
$ ./gradlew installDist
$ time ./build/install/kvstore/bin/kvstore
Feb 26, 2018 2:40:47 PM io.grpc.examples.KvRunner runClient
INFO: Starting
Feb 26, 2018 2:41:47 PM io.grpc.examples.KvRunner runClient
INFO: Did 24.283 RPCs/s
real 1m0.923s
user 0m12.772s
sys 0m1.572s
46% RPC , . , 20% . , . . - .
? , 1/4 (create, update delete). 1/4 . RPC :
.25 * 50ms (create)
.25 * 10ms (retrieve)
.25 * 50ms (update)
+.25 * 50ms (delete)
------------
40ms
40 RPC , RPC :
25 = 1000 / (40 / )
, . , . , , .
gRPC-. , . , . .
Java Developer. Basic. , , .