本文记录了一个使用rpc进行简单文件共享的程序,仅用于p2p原理学习,关于这段代码中的异常处理等部分还有很多值得学习的地方。
server.py
# -*- coding=utf-8 -*- from xmlrpclib import ServerProxy, Fault from SimpleXMLRPCServer import SimpleXMLRPCServer from SocketServer import ThreadingMixIn import os import sys UNHANDLED = 100 # faultCode MAX_LENGTH = 12 # 多线程声明,增强并发能力 class ThreadXMLRPCServer(ThreadingMixIn, SimpleXMLRPCServer):pass # 异常类定义: class UnhandledQuery(Fault): """ 查找异常 """ def __init__(self, message='Not right query.'): Fault.__init__(self, UNHANDLED, message) class MyNode: def __init__(self, url): self.url = url self.know = set() # 创建集合 pass def _start(self): server = ThreadXMLRPCServer(("localhost", 8088), allow_none=True) server.register_instance(self) server.serve_forever() def query(self, query, queue=[]): """ 通过异常捕捉,判断本节点是否有该文件,如果找到则返回,找不到则查找其他已知节点 """ try: print 'search local file' return self._getfile(query) except UnhandledQuery: queue.append(self.url) if len(history) >= MAX_LENGTH: raise # 抛出异常 print 'Search other servers.' return self._broadcast(query, queue) def _getfile(self, query, queue=[]): """ 查找文件 """ dir = os.getcwd() filename = join(dir, query) if not os.path.isfile(filename): raise UnhandledQuery # 抛出异常 return open(filename).read() def _broadcast(self, query, history): """ 内部时使用,用于将查询广播到其他已知节点 """ for other in self.know.copy(): try: # 根据url创建远程节点的proxy,使用xml_rpc进行远程调用 print 'search ----', other server = ServerProxy(other) print 'start query', other return server.query(query, history) except Fault, f: if f.faultCode == UNHANDLED: pass else: self.know.remove(other) except: # 说明该server已经不可用 self.know.remove(other) # 如果在所有节点中没有找到,就返回异常 raise UnhandledQuery def hello(self, other): """ 认识其他节点 """ self.know.add(other) return 0 def fetch(self, query): """ 用于从节点下载数据 """ print 'server fetch' result = self.query(query) print 'get----' # 把查询到的数据写到本地 f = open(join(os.getcwd(), query), 'w') f.write(result) f.close() return 0 if __name__ == '__main__': url = sys.argv[1] node = MyNode(url) node._start() print "Server Node Start."
client.py
# -*- coding=utf-8 -*- from xmlrpclib import ServerProxy, Fault from string import lowercase from server import MyNode, UNHANDLED from threading import Thread from time import sleep import sys HEAD_START = 0.1 # second class Client1(): def __init__(self, url, urlfile): """ 启动服务器 """ print 'in' n = MyNode(url) t = Thread(target=n._start) t.setDaemon(1) t.start() # 等待服务器启动 sleep(HEAD_START) self.server = ServerProxy(url) self.server.hello(url) for line in open(urlfile): line = line.strip() def do_fetch(self, arg): "调用服务器的fetch方法" try: print 'do_fetch' self.server.fetch(arg) except Fault, f: print f if f.faultCode != UNHANDLED: raise print 'Could not find the file ', arg def main(): url, urlfile = sys.argv[1:] print url, urlfile print '---------' client = Client1(url, urlfile) if __name__ == '__main__': main()