频道栏目
首页 > 资讯 > 其他综合 > 正文

zkClient实现master选举

17-01-23        来源:[db:作者]  
收藏   我要投稿

master选举

master选举
master选举

zkClient实现maste选举

public class MasterServer {

    private ZkClient zkClient;
    //争抢的一个master节点
    private final String MASTER_NODE = "/master";
    //server链接字符串
    private static final String CONNECTION_STRING = "139.199.182.26:5000,139.199.182.26:5001,139.199.182.26:5002";
    //超时时间
    private static final int SESSION_TIMEOUT = 5000;
    //争抢master节点服务器信息
    private ServerData serverData;
    //争抢到的master节点服务器信息
    private ServerData masterData;
    //服务器是否启动
    private volatile boolean running = false;
    //master节点的监听事件
    IZkDataListener dataListener;

    private ScheduledExecutorService scheService = Executors.newScheduledThreadPool(1);


    public MasterServer(ZkClient zkClient, ServerData serverData) {
        this.zkClient = zkClient;
        this.serverData = serverData;//标识来争抢master节点的服务
        dataListener = new IZkDataListener() {
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {

            }

            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                //监听节点删除时间
                scheService.schedule(new Runnable() {
                    @Override
                    public void run() {
                        takeMaster();
                    }
                }, 5, TimeUnit.SECONDS);
            }
        };
    }

    //开始争抢master的方法
    public void start() {
        if (running) {
            throw new RuntimeException("服务已经启动了");

        }
        running = true;
        zkClient.subscribeDataChanges(MASTER_NODE, dataListener);
        takeMaster();
    }

    //停止
    public void stop() {
        if (!running) {
            throw new RuntimeException("服务已经停止了");
        }
        running = false;
        zkClient.subscribeDataChanges(MASTER_NODE, dataListener);
        releaseMaster();
    }

    private void takeMaster() {
        if (!running) {
            return;
        }
        try {
            System.out.println(serverData.getServerName() + "来抢master节点");
            //用到了一个临时节点的特性
            zkClient.createEphemeral(MASTER_NODE, serverData);
            masterData = serverData;
            System.out.println(serverData.getServerName() + "争抢到了master节点");
            scheService.schedule(new Runnable() {
                @Override
                public void run() {
                    if (checkMaster()) {
                        zkClient.delete(MASTER_NODE);
                    }
                }
            }, 5, TimeUnit.SECONDS);
        } catch (ZkNodeExistsException e) {
            //如果节点创建过程中提示节点已经存在的异常,那这个时候意味着master节点是存在的(特性,节点唯一性)
            //读取当前master节点的服务器信息
            ServerData serverData = zkClient.readData(MASTER_NODE);

            if (serverData == null) {
                //在读取过程中,发现master节点已经被释放
                takeMaster();
            } else {
                masterData = serverData;
            }
        }

    }

    //释放master
    private void releaseMaster() {
        if (checkMaster()) {
            zkClient.delete(MASTER_NODE);
        }
    }

    //校验当前的服务器是不是master
    private boolean checkMaster() {
        try {
            //读取当前master节点的数据,并赋值给masterdata
            ServerData ms = zkClient.readData(MASTER_NODE);
            masterData = ms;
            //这个时候,如果master节点的数据和当前过来争抢master节点的服务器的数据是一样的话,那么意味
            //当前的serverData就是master
            if (masterData.getServerName().equals(serverData.getServerName())) {
                return true;
            }
            return false;
        } catch (ZkNoNodeException e) {
            return false;
        } catch (ZkInterruptedException e) {
            return checkMaster();
        } catch (ZkException e) {
            return false;
        }
    }

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(10);
        for (int i = 0; i < 10; i++) {
            final int idx = i;
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                        //初始化一个zkclient的连接
                        ZkClient zk = new ZkClient(CONNECTION_STRING,
                                SESSION_TIMEOUT, SESSION_TIMEOUT,
                                new SerializableSerializer());
                        //定义一台争抢master节点的服务器
                        ServerData serverData = new ServerData();
                        serverData.setServerId(idx);
                        serverData.setServerName("#server-" + idx);
                        //初始化一个争抢master节点的服务
                        MasterServer ms = new MasterServer(zk, serverData);
                        ms.start();
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            service.execute(runnable);
        }
        service.shutdown();
    }
}

/**
 * 每台服务器自身属性
 *@author hao.wang
 *@date 2017/1/20 15:37
 */
public class ServerData implements Serializable{

    private int serverId;

    private String serverName;

    public int getServerId() {
        return serverId;
    }

    public void setServerId(int serverId) {
        this.serverId = serverId;
    }

    public String getServerName() {
        return serverName;
    }

    public void setServerName(String serverName) {
        this.serverName = serverName;
    }
}
相关TAG标签
上一篇:Zookeeper实现分布式锁
下一篇:Mac环境下配置adb
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站