1、配景
最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章《轻量级漫衍式 RPC 框架》,作者用Zookeeper、Netty和Spring写了一个轻量级的漫衍式RPC框架。花了一些时间看了下他的代码,写的清洁简朴,写的RPC框架可以算是一个浅易版的dubbo。这个RPC框架虽小,可是麻雀虽小,五脏俱全,有乐趣的可以进修一下。
本人在这个浅易版的RPC上添加了如下特性:
项目地点:https://github.com/luxiaoxun/NettyRpc
2、简介
RPC,即 Remote Procedure Call(长途进程挪用),挪用长途计较机上的处事,就像挪用当地处事一样。RPC可以很好的解耦系统,如WebService就是一种基于Http协议的RPC。
这个RPC整体框架如下:

这个RPC框架利用的一些技能所办理的问题:
处事宣布与订阅:处事端利用Zookeeper注册处事地点,劳务派遣管理系统,客户端从Zookeeper获取可用的处事地点。
通信:利用Netty作为通信框架。
Spring:利用Spring设置处事,加载Bean,扫描注解。
动态署理:客户端利用署理模式透明化处事挪用。
动静编解码:利用Protostuff序列化和反序列化动静。
3、处事端宣布处事
利用注解标注要宣布的处事
处事注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcService {
Class<?> value();
}
一个处事接口:
public interface HelloService {
String hello(String name);
String hello(Person person);
}
一个处事实现:利用注解标注
@RpcService(HelloService.class)
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String name) {
return "Hello! " + name;
}
@Override
public String hello(Person person) {
return "Hello! " + person.getFirstName() + " " + person.getLastName();
}
}
处事在启动的时候扫描获得所有的处事接口及其实现:
@Override
public void setApplicationContext(ApplicationContext ctx) throws BeansException {
Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
if (MapUtils.isNotEmpty(serviceBeanMap)) {
for (Object serviceBean : serviceBeanMap.values()) {
String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
handlerMap.put(interfaceName, serviceBean);
}
}
}
在Zookeeper集群上注册处事地点:
public class ServiceRegistry {
private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
private CountDownLatch latch = new CountDownLatch(1);
private String registryAddress;
public ServiceRegistry(String registryAddress) {
this.registryAddress = registryAddress;
}
public void register(String data) {
if (data != null) {
ZooKeeper zk = connectServer();
if (zk != null) {
AddRootNode(zk); // Add root node if not exist
createNode(zk, data);
}
}
}
private ZooKeeper connectServer() {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.SyncConnected) {
latch.countDown();
}
}
});
latch.await();
} catch (IOException e) {
LOGGER.error("", e);
}
catch (InterruptedException ex){
LOGGER.error("", ex);
}
return zk;
}
private void AddRootNode(ZooKeeper zk){
try {
Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);
if (s == null) {
zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
LOGGER.error(e.toString());
} catch (InterruptedException e) {
LOGGER.error(e.toString());
}
}
private void createNode(ZooKeeper zk, String data) {
try {
byte[] bytes = data.getBytes();
String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
LOGGER.debug("create zookeeper node ({} => {})", path, data);
} catch (KeeperException e) {
LOGGER.error("", e);
}
catch (InterruptedException ex){
LOGGER.error("", ex);
}
}
}
ServiceRegistry