retrying
tenacity
merry

概述

在做爬虫时,如requests获取网页出错、解析JSON出错、提取JSON中的某个key对应的值出错等都会导致程序抛出异常,这时一般会加上try...excep来捕获异常和重试机制
或者,在自动化测试中,在查找元素时,一些异常原因如升级弹窗,页面加载缓慢,不定时的弹窗(一般会封装成一个异常处理方法并加上重试逻辑)导致元素获取不到而不能进行点击或其他操作而抛出异常
因此捕获异常并重试在平时的项目中是很常见的需求

引子

如果写重试,我们有时候会这么写:

# 随机出大于5的数字
import random

retry_times = 3  # 避免RecursionError,加上重试次数

def do_something_unreliable():
    global retry_times
    num = random.randint(0, 10)
    try:
        if num < 5:
            raise IOError(f"{num} is not i wanted number")
        else:
            return num
    except IOError as e:
        print(e)
        while retry_times:   # 避免RecursionError: maximum recursion depth exceeded while calling a Python object
            print(f'start {4 - retry_times} retry')
            retry_times -= 1
            retry_result = do_something_unreliable()
            if retry_result:
                return retry_result

if __name__ == '__main__':
    print(do_something_unreliable())

try...except在加上while,是不是很不优雅?接下来,我们来试试强大的第三方库

retrying

retrying官方文档

注:retrying已经年久失修,停止维护了。建议使用后面章节介绍的tenacity

retrying的安装

pip3 install retrying

retrying的使用

# 改写上面的例子
import random
from retrying import retry

@retry
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        raise IOError(f"{num} is not i wanted number")
    else:
        return num
if __name__ == '__main__':
    print(do_something_unreliable())
  • @retry会一直重试,无等待,直到满足条件为止

retrying可配置参数

  • stop_max_attempt_number:最大重试次数,默认为5次
  • stop_max_delay:最大重试时间,默认为100ms
  • wait_fixed:两次重试调用之间的时间间隔,默认1000ms
  • wait_random_min、wait_random_max:两次重试调用之间的时间间隔最小值(默认为0ms)及最大值(默认为1000ms)
  • wait_incrementing_increment:每重试一次会增加的时长,默认100毫秒
  • wait_exponential_multiplier、wait_exponential_max:以指数的形式产生两次重试的时间间隔,产生的值为2^previous_attempt_number * wait_exponential_multiplier,previous_attempt_number是前面设置的重试的=次数
    如果产生的这个值超过了wait_exponential_max的大小,那么之后两次重试的时间间隔都为wait_exponential_max
  • retry_on_exception:指定一个函数,如果此函数返回指定异常,则会重试,如果不是指定的异常则会退出
  • wrap_exception:参数设置为True/False(默认),如果指定的异常类型,包裹在RetryError中,会看到RetryError和程序抛的Exception error
  • retry_on_result:指定函数返回值为真时来作为重试的条件
  • stop_func:每次抛出异常时都会执行的函数

演示示例

  • 例1:重试3次,每次重试时间间隔为2s
import random
from retrying import retry

@retry(stop_max_attempt_number=3, wait_fixed=2000)
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        raise IOError(f"{num} is not i wanted number")
    else:
        return num


if __name__ == '__main__':
    print(do_something_unreliable())
  • 例2: 如果ioerror则重试
import random
from retrying import retry

def retry_if_io_error(exception):
    return isinstance(exception, IOError)

@retry(retry_on_exception=retry_if_io_error, wrap_exception=True)
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        raise IOError(f"{num} is not i wanted number")
    else:
        return num

if __name__ == '__main__':
    print(do_something_unreliable())
  • 例3:如果返回None则重试
import random
from retrying import retry

def retry_if_result_none(result):
    return result is None

@retry(retry_on_result=retry_if_result_none)
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        return
    else:
        return num

if __name__ == '__main__':
    print(do_something_unreliable())
  • 例4:每次抛出异常时执行的指定函数
import random
from retrying import retry

def stop_func(attempts, delay):
    print(attempts, delay)

@retry(wait_fixed=2000, stop_func=stop_func)
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        raise IOError(f"{num} is not i wanted number")
    else:
        return num

if __name__ == '__main__':
    print(do_something_unreliable())
  • 其他参数及参数组合请自行尝试

tenacity

由于retrying年久失修,retenacity库在retrying库的基础上对其功能进行了扩展及问题修复(It originates from a fork of retrying which is sadly no longer maintained. Tenacity isn't api compatible with retrying but adds significant new functionality and fixes a number of longstanding bugs.),而且使用方式基本一致

tenacity官方文档

tenacity安装

pip3 install tenacity

tenacity使用

  • 简单使用
import random
from tenacity import retry

@retry
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        raise IOError(f"{num} is not i wanted number")
    else:
        return num

if __name__ == '__main__':
    print(do_something_unreliable())

使用看起来跟retrying一样

tenacity可配置参数

设置重试次数或时间stop

import random
# from tenacity import retry, stop_after_attempt, stop_after_delay
from tenacity import *

# 最多重试2s或者重试5次,  | 操作符可组合多个条件
@retry(stop=(stop_after_delay(2) | stop_after_attempt(5)))
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        print(num, '~~~')
        raise IOError(f"{num} is not i wanted number")
    else:
        return num

if __name__ == '__main__':
    print(do_something_unreliable())

设置重试间隔wait

import random
from tenacity import *

# @retry(wait=wait_fixed(2))
@retry(wait=wait_random(min=1, max=2))
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        print(num, '~~~')
        raise IOError(f"{num} is not i wanted number")
    else:
        return num


if __name__ == '__main__':
    print(do_something_unreliable())

还有更多wait的方法提供使用,请参考api文档

设置重试的条件retry

# 例1:
import random
from tenacity import *

@retry(retry=retry_if_exception_type(IOError))
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        print(num, '~~~')
        raise IOError(f"{num} is not i wanted number")
    else:
        return num

if __name__ == '__main__':
    print(do_something_unreliable())

# 例2:
import random
from tenacity import *

def is_none_p(value):
    return value is None  # 返回值为真时才会重试

@retry(retry=retry_if_result(is_none_p))
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 5:
        print(num, '~~~~~')
        return
    else:
        return num

if __name__ == '__main__':
    print(do_something_unreliable())

重试停止后的异常处理

  • reraise,默认为False,会抛出默认的 RetryError异常
    • 若设置为True,重新抛出原异常
import random
from tenacity import *

# reraise 重新抛出异常
# @retry(stop=stop_after_attempt(3))
@retry(reraise=True, stop=stop_after_attempt(3))
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 10:
        print(num, '~~~')
        raise IOError(f"{num} is not i wanted number")
    else:
        return num

if __name__ == '__main__':
    try:
        print(do_something_unreliable())
    except RetryError as e:
        print(e)
        print('got unexpected num')
    except IOError as e:
        print(e, "~~~~~~")

在重试前和重试后日志记录before,after,before_sleep

import logging
import random
from tenacity import *

logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger(__name__)

@retry(stop=stop_after_attempt(5), before_sleep=before_sleep_log(logger, logging.DEBUG))
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 10:
        print(num, '~~~')
        raise IOError(f"{num} is not i wanted number")
    else:
        return num

if __name__ == '__main__':
    print(do_something_unreliable())
------输出结果(当次运行)--------------
DEBUG:__main__:Retrying __main__.do_something_unreliable in 0.0 seconds as it raised OSError: 5 is not i wanted number.
DEBUG:__main__:Retrying __main__.do_something_unreliable in 0.0 seconds as it raised OSError: 8 is not i wanted number.
5 ~~~
8 ~~~
10

其他参数参看官方api文档示例

自定义回调retry_error_callback

  • 我们在重试失败后再次抛出异常,也可以自定义回调(在所有的尝试失败后)
  • callback函数参数retry_state必须有,这个参数包含当前重试调用的所有信息
import random
from tenacity import *

def is_false(value):
    return value is False

def return_last_value(retry_state):
    # return retry_state.outcome.result()
    return "can't get i wanted number after 5 retries"

@retry(stop=stop_after_attempt(5), retry=retry_if_result(is_false), retry_error_callback=return_last_value)
def do_something_unreliable():
    num = random.randint(0, 10)
    if num < 11:
        print(num, '~~~')
        return False
    else:
        return num

if __name__ == '__main__':
    print(do_something_unreliable())

更多高级用法参考官方api文档

merry

通过tenacity的介绍,对于异常处理,我们可以在多次重试后再次抛出异常,或者通过回调函数来进一步处理。而merry也是另一种异常处理的第三方库。

merry官方文档

merry安装

pip3 install merry

merry的基本使用

  • 例:两个数相除,并将结果写入文件
def write_to_file(num1, num2, filename):
    result = num1 / num2
    with open(filename, 'w') as f:
        f.write(str(result))

显然,上面的例子是很不严谨的,没有判断 num1、num2 的类型,如果不是数字类型,那会抛出 TypeError;没有判断 num2 是不是 0,如果是 0,那么会抛出 ZeroDivisionError;没有判断文件路径是否存在,如果是子文件夹下的路径,文件夹不存在的话,会抛出 FileNotFoundError

# 分别执行如下调用
write_to_file(1, 2, 'result.txt')
write_to_file(1, 'a', 'result.txt')
write_to_file(1, 0, 'result.txt')
write_to_file(1, 2, 'myresult/result.txt')
  • 用try...except来改写
def write_to_file(num1, num2, filename):
    try:
        result = num1 / num2
        with open(filename, 'w') as f:
            f.write(str(result))
    except ZeroDivisionError as e:
        print(e)
    except Exception as e:
        print(e)

这样处理也行,但是可读性很差,而且显得很臃肿,在项目中针对这样的处理可能还有很多,这样,我们就不得不重复的去写很多相同的代码

  • 使用merry来改写
from merry import Merry
merry = Merry()
# merry = Merry(debug=True) # 设置为True则异常会被抛出
merry.logger.disabled = True

@merry._try
def write_to_file(num1, num2, filename):
    result = num1 / num2
    with open(filename, 'w') as f:
        f.write(str(result))

# @merry._except(ZeroDivisionError, debug=True) # 单独设置某一特定异常被抛出
@merry._except(ZeroDivisionError) 
def zero_division_error(e):
    print(f'Error: {e}')

@merry._except(Exception)
def catch_all(e):
    print('other errors: ' + str(e))

write_to_file(1, 2, 'result.txt')
write_to_file(1, 'a', 'result.txt')
write_to_file(1, 0, 'result.txt')
write_to_file(1, 2, 'myresult/result.txt')

主逻辑里面不用额外加异常处理代码了,显得更简洁;主逻辑不需要再用try except;每个异常处理方法单独分开了,可以实现解耦和重用

  • 异常处理函数有一个可选的参数(e),该参数为异常对象
  • merry.logger.disabled,默认为False,设置为True,则会忽略logger的打印
  • debug设置(某一)异常是否被抛出,进而单独来处理

merry处理异常else和finally装饰器

# 没有任何异常发生时则执行
@merry._else
def else_clause():
    print('No exceptions where raised!')

# 不管是否发生异常,都要执行
@merry._finally
def finally_clause():
    print('Clean up time!')

更多高级用法参考官方文档,如merry异常的日志处理等

参考