[Python] 使用multiprocessing實作watchdog

今天有一個需求,當一個function執行太長的時候我希望可以把它停止

比如說我今天使用facebokk presto做資料倉儲的時候

當資料量過大導致執行時間太長,更甚的是OutOfMemoryError導致presto的worker node shutdown

因此我希望有個watchdog的功能當我functon call執行太長的時候自動timeout,類似如下的感覺

monitor(myfunc)

最開始的想法是希望使用python的threading去實作

但是python的threading並沒有直接kill thread的方法,儘管參考了網路上的做法使用Event.set
http://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread-in-python

但是似乎只要thread的function沒有執行完成,thread就不會停止.

這並非我想要的效果,我希望當timeout發生的時候立即停止function的執行,也因此後來選了multiprocessing

multiprocessing直接支援terminate的呼叫,因此我只要搭配定期檢查timeout就能完成一個簡單的watchdog

程市碼如下

watchdog.py
 
from multiprocessing import Process
from time import sleep, time
from functools import partial

class FunctionCallWatchDog(object):
    '''
    The watch dog will invkoe a delegate function and kill it when timeouta occours.
    '''

    def __init__(self, timeout):
        ''' timeout: Int, seconds '''
        self.timeout = timeout

    def monitor_and_invoke(self, delegate_func, *args, **keywords):
        invoked_func = partial(delegate_func, *args, **keywords)
        p = Process(target=invoked_func)
        p.start()
        start = time()
        while time() - start <= self.timeout:
            if p.is_alive():
                sleep(.1)
            else:
                break
        else:
            p.terminate()
            p.join()
            raise TimeoutError("Function has been executed over its time limit: %s seconds." % (self.timeout)) 


class TimeoutError(Exception):
    def __init__(self, value):
            self.value = value

測試

 from time import sleep
 def myfunc():
     sleep(11)
     
 watchdog = FunctionCallWatchDog(timeout=10)
 watchdog.monitor_and_invoke(myfunc)

姑且是達到了我所想要的效果,但是這段程式有個嚴重的問題,就是他無法捕捉exception而只能處理timeout exception

這樣一來function call失敗外部是無法處理的

後來參考了Pythonのmultiprocessing.PoolでExceptionを受け取る(日文的 請注意)

可以用multiprocess的Pool去改寫解決這個問題

使用apply_async函式得到一個future物件,當使用該物件的get function時候,除了結果之外還能正確捕捉到exception

pool = Pool(num_pool)
res = pool.apply_async(myfunc)
res.get(timeout=10) #it will catch exception

但這邊還有一個小問題,就是會跑出PicklingError

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

在multiprocess的時候如果有自創exception的話,會在打包階段出問題,請參考如下
http://stackoverflow.com/questions/25156768/cant-pickle-type-instancemethod-using-pythons-multiprocessing-pool-apply-a

在python可以被pick的對象,請參考https://docs.python.org/2/library/pickle.html#what-can-be-pickled-and-unpickled

這時候可以用copy_reg來解決這問題,新增一段如下的程市碼

def _pickle_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)

copy_reg.pickle(types.MethodType, _pickle_method)

這時候噴出另一個錯誤: typeerror

TypeError: ('__init__() takes exactly...

這個問題似乎由來已久,即便到現在2.7的版本仍有此問題
https://bugs.python.org/issue9400

會出現這問題是因為我自制的TimeoutError 建構子需要兩個參數self及value

而multiprocessing.pool.AsyncResult.get()沒辦法對超過一個以上參數exception進行初始化

因此這邊必須在做個小修正

class TimeoutError(Exception):
    def __init__(self, value=None): #value補上預設值
            self.value = value

如此一來才真正完成一個可用的watchdog,完整程式碼如下:

watchdog.py
import copy_reg, types
from multiprocessing import Process, Pool, TimeoutError
from time import sleep, time
from functools import partial


def _pickle_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)

#Avoid PicklingError, http://stackoverflow.com/questions/25156768/cant-pickle-type-instancemethod-using-pythons-multiprocessing-pool-apply-a

copy_reg.pickle(types.MethodType, _pickle_method)


class FunctionCallWatchDog(object):
    '''
    The watch dog will invoke a delegate function and kill it when timeout occours.
    '''

    def __init__(self, timeout):
        ''' timeout: Int, seconds '''
        self.timeout = timeout

    def monitor_and_invoke(self, delegate_func, *args, **keywords):
        invoked_func = partial(delegate_func, *args, **keywords)
        pool = Pool(1)
        result = pool.apply_async(invoked_func)
        try:
            result.get(timeout=self.timeout)
        except TimeoutError as e:
            pool.terminate()
            pool.close()
            pool.join()
            raise ExecutionTimeoutError("Function has been executed over its time limit: %s seconds." % (self.timeout)) 
        except:
            raise


class ExecutionTimeoutError(Exception):
    def __init__(self, value=None):
            self.value = value
comments powered by Disqus