HowTo: gRPC Java Client Side Load Balancing using Consul
Nowadays, gRPC is used to create microservices instead of REST.
Here, I will show you how to do java client-side load balancing in gRPC using service discovery Consul.
Compile Hello World Service IDL
Let’s define hello world service IDL:
syntax = "proto3";option java_multiple_files = true;
option java_package = "io.shunters.grpc.component.proto.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";package helloworld;// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}// The request message containing the user's name.
message HelloRequest {
string name = 1;
}// The response message containing the greetings
message HelloReply {
string message = 1;
}
To compile this IDL, you have to install protoc on your machine:
# download protoc compiler.
## for linux.
wget https://github.com/google/protobuf/releases/download/v3.5.1/protoc-3.5.1-linux-x86_64.zip;
## for windows.
wget https://github.com/google/protobuf/releases/download/v3.5.1/protoc-3.5.1-win32.zip;
Add protobuf maven plugin:
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration> <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.9.0:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
</plugin>
Generate java classes with protoc:
# run protoc command for linux:
mvn protobuf:test-compile -DprotocExecutable="/usr/local/bin/protoc"
mvn protobuf:test-compile-custom -DprotocExecutable="/usr/local/bin/protoc"
# run protoc command for windows:
mvn protobuf:test-compile -DprotocExecutable="F:/dev/protoc/protoc-3.5.1-win32/bin/protoc.exe"
mvn protobuf:test-compile-custom -DprotocExecutable="F:/dev/protoc/protoc-3.5.1-win32/bin/protoc.exe"
Create gRPC Server
You could implement and expose Hello World Service in gRPC Server:
package io.shunters.grpc.component.grpc;import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.shunters.grpc.component.proto.helloworld.GreeterGrpc;
import io.shunters.grpc.component.proto.helloworld.HelloReply;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;/**
* Created by mykidong on 2018-01-10.
*/
public class HelloWorldServer {
private static Logger log = LoggerFactory.getLogger(HelloWorldServer.class); private Server server; public void start() throws IOException {
int port = 50051;
server = ServerBuilder.forPort(port)
.addService(new HelloWorldService())
.build()
.start();
log.info("Server started, listening on " + port); Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.err.println("*** shutting down gRPC server since JVM is shutting down");
HelloWorldServer.this.stop();
System.err.println("*** server shut down");
}
});
} public void stop() {
if (server != null) {
server.shutdown();
}
} /**
* Await termination on the main thread since the grpc library uses daemon threads.
*/
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
} public static void main(String[] args) throws Exception {
// start server.
final HelloWorldServer server = new HelloWorldServer();
server.start();
server.blockUntilShutdown();
} private static class HelloWorldService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(io.shunters.grpc.component.proto.helloworld.HelloRequest request,
io.grpc.stub.StreamObserver<io.shunters.grpc.component.proto.helloworld.HelloReply> responseObserver) {
String name = request.getName(); HelloReply response = HelloReply.newBuilder().setMessage("hello " + name).build(); responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
}
This Hello World service would be packaged to docker which is deployed to container orchestrator, for instance, Nomad and registered as service name “hello-world” to the service discovery Consul, for which see this resource: https://medium.com/@mykidong/howto-container-orchestration-with-nomad-and-consul-f99430abcc85
Create Consul Name Resolver to do client-side load balancing
To do client-side load balancing, let’s create consul name resolver:
package io.shunters.grpc.component.grpc;import io.grpc.Attributes;
import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.NameResolverProvider;
import io.shunters.grpc.api.component.ServiceDiscovery;
import io.shunters.grpc.component.consul.ConsulServiceDiscovery;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;public class ConsulNameResolver extends NameResolver { private static Logger log = LoggerFactory.getLogger(ConsulNameResolver.class); private static final int DEFAULT_PAUSE_IN_SECONDS = 5; private URI uri;
private String serviceName;
private int pauseInSeconds;
private boolean ignoreConsul;
private List<String> hostPorts; private Listener listener; private List<ServiceDiscovery.ServiceNode> nodes; private ConnectionCheckTimer connectionCheckTimer; public ConsulNameResolver(URI uri, String serviceName, int pauseInSeconds, boolean ignoreConsul, List<String> hostPorts) {
this.uri = uri;
this.serviceName = serviceName;
this.pauseInSeconds = pauseInSeconds;
this.ignoreConsul = ignoreConsul;
this.hostPorts = hostPorts; // run connection check timer.
this.connectionCheckTimer = new ConnectionCheckTimer(this, this.pauseInSeconds);
this.connectionCheckTimer.runTimer();
} @Override
public String getServiceAuthority() {
return this.uri.getAuthority();
} @Override
public void start(Listener listener) {
this.listener = listener; loadServiceNodes();
} private void loadServiceNodes() {
List<EquivalentAddressGroup> addrs = new ArrayList<>(); if(!this.ignoreConsul) {
String consulHost = uri.getHost();
int consulPort = uri.getPort(); nodes = getServiceNodes(serviceName, consulHost, consulPort);
if (nodes == null || nodes.size() == 0) {
log.info("there is no node info for serviceName: [{}]...", serviceName);
return;
} for (ServiceDiscovery.ServiceNode node : nodes) {
String host = node.getHost();
int port = node.getPort();
log.info("serviceName: [" + serviceName + "], host: [" + host + "], port: [" + port + "]"); List<SocketAddress> sockaddrsList = new ArrayList<SocketAddress>();
sockaddrsList.add(new InetSocketAddress(host, port));
addrs.add(new EquivalentAddressGroup(sockaddrsList));
}
}
else
{
nodes = new ArrayList<>();
for(String hostPort : this.hostPorts)
{
String[] tokens = hostPort.split(":"); String host = tokens[0];
int port = Integer.valueOf(tokens[1]);
log.info("static host: [" + host + "], port: [" + port + "]"); nodes.add(new ServiceDiscovery.ServiceNode("", host, port)); List<SocketAddress> sockaddrsList = new ArrayList<SocketAddress>();
sockaddrsList.add(new InetSocketAddress(host, port));
addrs.add(new EquivalentAddressGroup(sockaddrsList));
}
} if(addrs.size() > 0) {
this.listener.onAddresses(addrs, Attributes.EMPTY);
}
} public List<ServiceDiscovery.ServiceNode> getNodes() {
return this.nodes;
} private List<ServiceDiscovery.ServiceNode> getServiceNodes(String serviceName, String consulHost, int consulPort) {
ServiceDiscovery serviceDiscovery = ConsulServiceDiscovery.singleton(consulHost, consulPort); return serviceDiscovery.getHealthServices(serviceName);
} @Override
public void shutdown() { } private static class ConnectionCheckTimer {
private ConnectionCheckTimerTask timerTask;
private int delay = 1000;
private int pauseInSeconds;
private Timer timer;
private ConsulNameResolver consulNameResolver; public ConnectionCheckTimer(ConsulNameResolver consulNameResolver, int pauseInSeconds) {
this.consulNameResolver = consulNameResolver;
this.pauseInSeconds = pauseInSeconds; this.timerTask = new ConnectionCheckTimerTask(this.consulNameResolver);
this.timer = new Timer();
} public void runTimer() {
this.timer.scheduleAtFixedRate(this.timerTask, delay, this.pauseInSeconds * 1000);
} public void reset() {
this.timerTask.cancel();
this.timer.purge();
this.timerTask = new ConnectionCheckTimerTask(consulNameResolver);
}
} private static class ConnectionCheckTimerTask extends TimerTask {
private ConsulNameResolver consulNameResolver; public ConnectionCheckTimerTask(ConsulNameResolver consulNameResolver) {
this.consulNameResolver = consulNameResolver;
} @Override
public void run() {
List<ServiceDiscovery.ServiceNode> nodes = consulNameResolver.getNodes();
if(nodes != null) {
for (ServiceDiscovery.ServiceNode node : nodes) {
String host = node.getHost();
int port = node.getPort();
try {
Socket socketClient = new Socket(host, port);
} catch (IOException e) {
log.error(e.getMessage());
log.info("service nodes being reloaded..."); this.consulNameResolver.loadServiceNodes();
break;
}
}
}
else
{
log.info("no service nodes...");
}
}
} public static class ConsulNameResolverProvider extends NameResolverProvider { private String serviceName;
private int pauseInSeconds;
private boolean ignoreConsul;
private List<String> hostPorts; public ConsulNameResolverProvider(String serviceName, int pauseInSeconds, boolean ignoreConsul, List<String> hostPorts)
{
this.serviceName = serviceName;
this.pauseInSeconds = pauseInSeconds;
this.ignoreConsul = ignoreConsul;
this.hostPorts = hostPorts;
} @Override
protected boolean isAvailable() {
return true;
} @Override
protected int priority() {
return 5;
} @Nullable
@Override
public NameResolver newNameResolver(URI uri, Attributes attributes) {
return new ConsulNameResolver(uri, serviceName, pauseInSeconds, this.ignoreConsul, this.hostPorts);
} @Override
public String getDefaultScheme() {
return "consul";
}
}
}
This Consul NameResolver will retrieve the list of service ip and port for the service name which is registered in Consul.
You can see the ConsulNameResolver codes for more details: https://github.com/mykidong/grpc-java-load-balancer-using-consul/blob/master/src/main/java/io/shunters/grpc/component/grpc/ConsulNameResolver.java
Write Client which do load balancing using service discovery Consul
To call hello world service, let’s write a client:
package io.shunters.grpc.component.grpc;import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.util.RoundRobinLoadBalancerFactory;
import io.shunters.grpc.component.proto.helloworld.GreeterGrpc;
import io.shunters.grpc.component.proto.helloworld.HelloReply;
import io.shunters.grpc.component.proto.helloworld.HelloRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.List;/**
* Created by mykidong on 2018-01-10.
*/
public class HelloWorldClientWithNameResolver {
private static Logger log = LoggerFactory.getLogger(HelloWorldClientWithNameResolver.class); private final ManagedChannel channel;
private final GreeterGrpc.GreeterBlockingStub blockingStub; /**
* Consul NameResolver Usage.
*
*
* @param serviceName consul service name.
* @param consulHost consul agent host.
* @param consulPort consul agent port.
* @param ignoreConsul if true, consul is not used. instead, the static node list will be used.
* @param hostPorts the static node list, for instance, Arrays.asList("host1:port1", "host2:port2")
*/
public HelloWorldClientWithNameResolver(String serviceName,
String consulHost,
int consulPort,
boolean ignoreConsul,
List<String> hostPorts) { String consulAddr = "consul://" + consulHost + ":" + consulPort; int pauseInSeconds = 5; channel = ManagedChannelBuilder
.forTarget(consulAddr) .loadBalancerFactory(RoundRobinLoadBalancerFactory.getInstance())
.nameResolverFactory(new ConsulNameResolver.ConsulNameResolverProvider(serviceName, pauseInSeconds, ignoreConsul, hostPorts))
.usePlaintext(true)
.build(); blockingStub = GreeterGrpc.newBlockingStub(channel);
} public void sayHello() {
try {
HelloRequest request = HelloRequest.newBuilder()
.setName("grpc load balancer")
.build(); HelloReply response = blockingStub.sayHello(request);
String message = response.getMessage();
log.info("message: [{}]", message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
ConsulNameResolver will be used as a load-balancer with Round Robin.
Let’s invoke a RPC:
// service name which is registered in consul service discovery.
String serviceName = "hello-world";String consulHost = "localhost";
int consulPort = 8500;
boolean ignoreConsul = false;// init. client which do load balancing using consul.
HelloWorldClientWithNameResolver client = new HelloWorldClientWithNameResolver(serviceName, consulHost, consulPort, ignoreConsul, null);// call rpc.
client.sayHello();
Conclusion
gRPC client-side load balancing is an important factor to improve the performance to call RPC services.
In microservices architecture, most of the gRPC service applications are deployed on container orchestrator, and using service discovery, for example Consul, another client microservice application can retrieve the list of the service ip and ports from the service discovery and use this to do load balancing like I mentioned above.
References
The complete codes mentioned here can be found in this repo: