欢迎访问昆山宝鼎软件有限公司网站! 设为首页 | 网站地图 | XML | RSS订阅 | 宝鼎邮箱 | 后台管理


新闻资讯

MENU

软件开发知识

都需要再 昆山软件开发 次watchNode

点击: 次  来源:宝鼎软件 时间:2017-06-01

原文出处: 阿凡卢

1、配景

最近在搜索Netty和Zookeeper方面的文章时,看到了这篇文章《轻量级漫衍式 RPC 框架》,作者用Zookeeper、Netty和Spring写了一个轻量级的漫衍式RPC框架。花了一些时间看了下他的代码,写的清洁简朴,写的RPC框架可以算是一个浅易版的dubbo。这个RPC框架虽小,可是麻雀虽小,五脏俱全,有乐趣的可以进修一下。

本人在这个浅易版的RPC上添加了如下特性:

  • 处事异法式用的支持,回调函数callback的支持
  • 客户端利用长毗连(在多次挪用共享毗连)
  • 处事端异步多线程处理惩罚RPC请求
  • 项目地点:https://github.com/luxiaoxun/NettyRpc

    2、简介

    RPC,即 Remote Procedure Call(长途进程挪用),挪用长途计较机上的处事,就像挪用当地处事一样。RPC可以很好的解耦系统,如WebService就是一种基于Http协议的RPC。

    这个RPC整体框架如下:

    都需要再 昆山软件开拓 次watchNode

    这个RPC框架利用的一些技能所办理的问题:

    处事宣布与订阅:处事端利用Zookeeper注册处事地点,劳务派遣管理系统,客户端从Zookeeper获取可用的处事地点。

    通信:利用Netty作为通信框架。

    Spring:利用Spring设置处事,加载Bean,扫描注解。

    动态署理:客户端利用署理模式透明化处事挪用。

    动静编解码:利用Protostuff序列化和反序列化动静。

    3、处事端宣布处事

    利用注解标注要宣布的处事

    处事注解

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface RpcService {
        Class<?> value();
    }

    一个处事接口:

    public interface HelloService {
    
        String hello(String name);
    
        String hello(Person person);
    }

    一个处事实现:利用注解标注

    @RpcService(HelloService.class)
    public class HelloServiceImpl implements HelloService {
    
        @Override
        public String hello(String name) {
            return "Hello! " + name;
        }
    
        @Override
        public String hello(Person person) {
            return "Hello! " + person.getFirstName() + " " + person.getLastName();
        }
    }

    处事在启动的时候扫描获得所有的处事接口及其实现:

    @Override
        public void setApplicationContext(ApplicationContext ctx) throws BeansException {
            Map<String, Object> serviceBeanMap = ctx.getBeansWithAnnotation(RpcService.class);
            if (MapUtils.isNotEmpty(serviceBeanMap)) {
                for (Object serviceBean : serviceBeanMap.values()) {
                    String interfaceName = serviceBean.getClass().getAnnotation(RpcService.class).value().getName();
                    handlerMap.put(interfaceName, serviceBean);
                }
            }
        }

    在Zookeeper集群上注册处事地点:

    public class ServiceRegistry {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(ServiceRegistry.class);
    
        private CountDownLatch latch = new CountDownLatch(1);
    
        private String registryAddress;
    
        public ServiceRegistry(String registryAddress) {
            this.registryAddress = registryAddress;
        }
    
        public void register(String data) {
            if (data != null) {
                ZooKeeper zk = connectServer();
                if (zk != null) {
                    AddRootNode(zk); // Add root node if not exist
                    createNode(zk, data);
                }
            }
        }
    
        private ZooKeeper connectServer() {
            ZooKeeper zk = null;
            try {
                zk = new ZooKeeper(registryAddress, Constant.ZK_SESSION_TIMEOUT, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            latch.countDown();
                        }
                    }
                });
                latch.await();
            } catch (IOException e) {
                LOGGER.error("", e);
            }
            catch (InterruptedException ex){
                LOGGER.error("", ex);
            }
            return zk;
        }
    
        private void AddRootNode(ZooKeeper zk){
            try {
                Stat s = zk.exists(Constant.ZK_REGISTRY_PATH, false);
                if (s == null) {
                    zk.create(Constant.ZK_REGISTRY_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                }
            } catch (KeeperException e) {
                LOGGER.error(e.toString());
            } catch (InterruptedException e) {
                LOGGER.error(e.toString());
            }
        }
    
        private void createNode(ZooKeeper zk, String data) {
            try {
                byte[] bytes = data.getBytes();
                String path = zk.create(Constant.ZK_DATA_PATH, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
                LOGGER.debug("create zookeeper node ({} => {})", path, data);
            } catch (KeeperException e) {
                LOGGER.error("", e);
            }
            catch (InterruptedException ex){
                LOGGER.error("", ex);
            }
        }
    }
    
    ServiceRegistry