频道栏目
首页 > 资讯 > Python > 正文

Python基础-分布式进程

17-06-28        来源:[db:作者]  
收藏   我要投稿

Python基础-分布式进程,在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

实现步骤

服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:
首先是产生任务

import random,time,queue,threading
from multiprocessing.managers import BaseManager
from multiprocessing import freeze_support

#发送任务队列
task_queue = queue.Queue()

#接受结果队列
result_queue = queue.Queue()

#因为callable接受的是函数  但是不知为什么 lambda 会出错
def task_queue_callable():
    return task_queue
def result_queue_callable():
    return result_queue


#从BaseManager继承得到
class QueueManager(BaseManager):
    pass


def run_task():
    #注册2个Queue到网上 callable关联Queue对象
    #callable=lambda : task_queue
    QueueManager.register('get_task_queue', callable=task_queue_callable)
    QueueManager.register('get_result_queue', callable=result_queue_callable)

    #绑定端口为 60000 验证码 '123456'
    manager = QueueManager(address=('127.0.0.1',6000), authkey=b'123456')

    #启动Queue
    manager.start()

    #获得通过网络访问的Queue对象
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    #往里面放任务  等待计算结果:其他机器执行n*n操作 并放入结果队列
    for i in range(10):
        n = random.randint(0,1000)
        print('put task : %d' % n)
        task.put(n)

    print('wait for result')
    for i in range(10):
        r = result.get(timeout=100)
        print('result : %s' % r)

    #关闭
    manager.shutdown()
    print('master shutdown')


if __name__ == '__main__':

    freeze_support()
    thread = threading.Thread(target=run_task)
    thread.start()
    thread.join()

#假如不安装freeze_support这个模块 就会包以下错误
'''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
'''

接着是处理任务的:

import random
import time,sys,queue,threading
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
    pass

def run_worker():
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')

    server_addr = '127.0.0.1'
    server_port = 6000
    print('connect to server %s:%d' % (server_addr,server_port))
    manager = QueueManager(address=(server_addr,server_port), authkey=b'123456')

    #网络连接
    manager.connect()

    #获取Queue对象
    task = manager.get_task_queue()
    result = manager.get_result_queue()

    #取出task任务  完成计算  并写到result
    while True:
        try:
            n = task.get(timeout=10)
            print('do task %d * %d' % (n,n))
            r = '%d * %d = %d' % (n,n,n*n)
            time.sleep(random.random() % 2)
            result.put(r)
        except queue.Empty:
            print('task do over!')
    manager.shutdown
    print('worker shutdowm')

if __name__=='__main__':

    thread = threading.Thread(target=run_worker)
    thread.start()
    thread.join()

处理结果流程

#产生任务
put task : 125
put task : 621
put task : 996
put task : 815
put task : 172
put task : 425
put task : 163
put task : 132
put task : 474
put task : 211
wait for result

#运行处理任务后
connect to server 127.0.0.1:6000
do task 125 * 125
do task 621 * 621
do task 996 * 996
do task 815 * 815
do task 172 * 172
do task 425 * 425
do task 163 * 163
do task 132 * 132
do task 474 * 474
do task 211 * 211

#同样生产任务这边也会打印
result : 125 * 125 = 15625
result : 621 * 621 = 385641
result : 996 * 996 = 992016
result : 815 * 815 = 664225
result : 172 * 172 = 29584
result : 425 * 425 = 180625
result : 163 * 163 = 26569
result : 132 * 132 = 17424
result : 474 * 474 = 224676
result : 211 * 211 = 44521
master shutdown
相关TAG标签
上一篇:内部类与向上转型
下一篇:Bootstrap响应式实用工具
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站