频道栏目
首页 > 资讯 > 其他 > 正文

分布式共享锁简单Demo

17-07-10        来源:[db:作者]  
收藏   我要投稿

zookeeper是一个分布式协调服务,为用户的分布式应用提供协调服务。使用范围有:主从协调、服务器节点动态上下线、统一配置管理、分布式共享锁、统一名称服务……
文章利用zookeeper实现简单的分布式锁

zookeeper虽然提供的服务很多,但底层只有两个功能
1.管理程序提交的数据;
2.为程序提供数据节点监听服务

分布式锁的简单实现

分布式系统多个应用需要操作同一个资源,为资源操作线程安全问题,创建分布式锁,应用取得锁才能操作资源,否则阻塞直到分布锁到该应用节点时才能操作资源。
思路:多个应用在zookeeper上注册,zookeeper采用排序最小的分配锁,如果是最小就获取锁并操作数据,然后删除该节点。

zookeeper集群安装

zookeeper超过半数以上集群便可存活,因此适合安装奇数节个集群。 官网上下载zookeeper-3.x.x.tar.gz包上传到linux虚拟机中解压,在zoo.cfg文件中配置各集群节点的ip和端口。
# zoo.cfg配置
server.1=192.168.10.121:2888:3888
server.2=192.168.10.122:2888:3888
server.3=192.168.10.123:2888:3888
依次启动各群集节点的zookeeper:cd /usr/local/zookeeper ./bin zkServer.sh start启动zookeeper服务

Java实现

通过连接集群的Zookeeper对象操作节点,主要方法有create/delete/exists,创建zookeeper时可创建监听对象及监听回调方法,当节点状态改变时则触发事件(监听仅一次有效),因此可重新通过get(… true)使监听方法激活为true.

public class DistributedClient {

    //zookeeper主机
    private String hosts = "192.168.10.121:2181,192.168.10.122:2181,192.168.10.123:2181";

    //超时时间
    private int SESSION_TIME_OUT = 2000;

    private String subNode = "/subNode";
    private String groupNode = "/group";

    private ZooKeeper zk;
    private volatile String thisPath; //当前节点

    public void connectZookeeper() throws Exception {
        zk = new ZooKeeper(hosts, SESSION_TIME_OUT, new Watcher() {
            //监听回调 
            public void process(WatchedEvent event) {
                if (event.getType() == EventType.NodeChildrenChanged 
                        && groupNode.equals(event.getPath())) {
                    //获取子节点,并对父节点进行监听
                    try {
                        List childrenList = zk.getChildren(groupNode, true);
                        String thisNode = thisPath.substring((groupNode + "/").length());
                        //比较自己是否是最小的节点
                        Collections.sort(childrenList);
                        if (0 == childrenList.indexOf(thisNode)) {
                            doSomething();  //特定的业务逻辑,用完锁之后需要删除
                            //重新注册一次锁, 带序号的锁
                            thisPath = zk.create(groupNode + subNode,
                                    null, 
                                    Ids.OPEN_ACL_UNSAFE,
                                    CreateMode.EPHEMERAL_SEQUENTIAL);
                        }
                    } catch (Exception e) {
                        System.err.println("zookeeper is error");
                    }

                }
            }

        });

        if(null == zk.exists(groupNode, true)) {
            zk.create(groupNode, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //父节点必须为persistent才可创建子节点
        }
        //初始化创建锁
        thisPath = zk.create(groupNode + subNode,
                null, 
                Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);
        Thread.sleep(1000);
        List childrenNodes = zk.getChildren(groupNode, true);
        if (1 == childrenNodes.size()) {
            doSomething();
            thisPath = zk.create(groupNode + subNode,
                    null, 
                    Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
        }
    }

    /**
     * 特写业务实现
     * @throws Exception 
     * @throws InterruptedException 
     */
    private void doSomething() throws Exception {
        System.out.println("get this lock" + thisPath);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            System.out.println("finish use this lock, will to release: " + thisPath);
            zk.delete(this.thisPath, -1); //-1表示不指定版本
        }

    }

    @Test
    public void testNode() {
        try {
            for (int i = 0; i < 5; i++)
            {
                new Thread() {
                    public void run() {
                        try {
                            DistributedClient dl = new DistributedClient();
                            dl.connectZookeeper();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }.start();

            }
            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            System.err.println("zookeeper connect error");
            e.printStackTrace();
        }
    }

}
相关TAG标签
上一篇:造轮子的方式去理解RxJava源码
下一篇:IDA动态调试so源码 F5伪代码查看
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站