retrying
tenacity
merry
概述
在做爬虫时,如requests获取网页出错、解析JSON出错、提取JSON中的某个key对应的值出错等都会导致程序抛出异常,这时一般会加上try...excep来捕获异常和重试机制
或者,在自动化测试中,在查找元素时,一些异常原因如升级弹窗,页面加载缓慢,不定时的弹窗(一般会封装成一个异常处理方法并加上重试逻辑)导致元素获取不到而不能进行点击或其他操作而抛出异常
因此捕获异常并重试在平时的项目中是很常见的需求
引子
如果写重试,我们有时候会这么写:
# 随机出大于5的数字
import random
retry_times = 3 # 避免RecursionError,加上重试次数
def do_something_unreliable():
global retry_times
num = random.randint(0, 10)
try:
if num < 5:
raise IOError(f"{num} is not i wanted number")
else:
return num
except IOError as e:
print(e)
while retry_times: # 避免RecursionError: maximum recursion depth exceeded while calling a Python object
print(f'start {4 - retry_times} retry')
retry_times -= 1
retry_result = do_something_unreliable()
if retry_result:
return retry_result
if __name__ == '__main__':
print(do_something_unreliable())
try...except在加上while,是不是很不优雅?接下来,我们来试试强大的第三方库
retrying
retrying官方文档
retrying的安装
pip3 install retrying
retrying的使用
# 改写上面的例子
import random
from retrying import retry
@retry
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
@retry
会一直重试,无等待,直到满足条件为止
retrying可配置参数
- stop_max_attempt_number:最大重试次数,默认为5次
- stop_max_delay:最大重试时间,默认为100ms
- wait_fixed:两次重试调用之间的时间间隔,默认1000ms
- wait_random_min、wait_random_max:两次重试调用之间的时间间隔最小值(默认为0ms)及最大值(默认为1000ms)
- wait_incrementing_increment:每重试一次会增加的时长,默认100毫秒
- wait_exponential_multiplier、wait_exponential_max:以指数的形式产生两次重试的时间间隔,产生的值为2^previous_attempt_number * wait_exponential_multiplier,previous_attempt_number是前面设置的重试的=次数
如果产生的这个值超过了wait_exponential_max的大小,那么之后两次重试的时间间隔都为wait_exponential_max - retry_on_exception:指定一个函数,如果此函数返回指定异常,则会重试,如果不是指定的异常则会退出
- wrap_exception:参数设置为True/False(默认),如果指定的异常类型,包裹在RetryError中,会看到RetryError和程序抛的Exception error
- retry_on_result:指定函数返回值为真时来作为重试的条件
- stop_func:每次抛出异常时都会执行的函数
演示示例
- 例1:重试3次,每次重试时间间隔为2s
import random
from retrying import retry
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
- 例2: 如果ioerror则重试
import random
from retrying import retry
def retry_if_io_error(exception):
return isinstance(exception, IOError)
@retry(retry_on_exception=retry_if_io_error, wrap_exception=True)
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
- 例3:如果返回None则重试
import random
from retrying import retry
def retry_if_result_none(result):
return result is None
@retry(retry_on_result=retry_if_result_none)
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
return
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
- 例4:每次抛出异常时执行的指定函数
import random
from retrying import retry
def stop_func(attempts, delay):
print(attempts, delay)
@retry(wait_fixed=2000, stop_func=stop_func)
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
- 其他参数及参数组合请自行尝试
tenacity
由于retrying年久失修,retenacity库在retrying库的基础上对其功能进行了扩展及问题修复(It originates from a fork of retrying which is sadly no longer maintained. Tenacity isn't api compatible with retrying but adds significant new functionality and fixes a number of longstanding bugs.),而且使用方式基本一致
tenacity官方文档
tenacity安装
pip3 install tenacity
tenacity使用
- 简单使用
import random
from tenacity import retry
@retry
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
使用看起来跟retrying一样
tenacity可配置参数
设置重试次数或时间stop
import random
# from tenacity import retry, stop_after_attempt, stop_after_delay
from tenacity import *
# 最多重试2s或者重试5次, | 操作符可组合多个条件
@retry(stop=(stop_after_delay(2) | stop_after_attempt(5)))
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
print(num, '~~~')
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
设置重试间隔wait
import random
from tenacity import *
# @retry(wait=wait_fixed(2))
@retry(wait=wait_random(min=1, max=2))
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
print(num, '~~~')
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
还有更多wait的方法提供使用,请参考api文档
设置重试的条件retry
# 例1:
import random
from tenacity import *
@retry(retry=retry_if_exception_type(IOError))
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
print(num, '~~~')
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
# 例2:
import random
from tenacity import *
def is_none_p(value):
return value is None # 返回值为真时才会重试
@retry(retry=retry_if_result(is_none_p))
def do_something_unreliable():
num = random.randint(0, 10)
if num < 5:
print(num, '~~~~~')
return
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
重试停止后的异常处理
- reraise,默认为False,会抛出默认的 RetryError异常
- 若设置为True,重新抛出原异常
import random
from tenacity import *
# reraise 重新抛出异常
# @retry(stop=stop_after_attempt(3))
@retry(reraise=True, stop=stop_after_attempt(3))
def do_something_unreliable():
num = random.randint(0, 10)
if num < 10:
print(num, '~~~')
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
try:
print(do_something_unreliable())
except RetryError as e:
print(e)
print('got unexpected num')
except IOError as e:
print(e, "~~~~~~")
在重试前和重试后日志记录before,after,before_sleep
import logging
import random
from tenacity import *
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger(__name__)
@retry(stop=stop_after_attempt(5), before_sleep=before_sleep_log(logger, logging.DEBUG))
def do_something_unreliable():
num = random.randint(0, 10)
if num < 10:
print(num, '~~~')
raise IOError(f"{num} is not i wanted number")
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
------输出结果(当次运行)--------------
DEBUG:__main__:Retrying __main__.do_something_unreliable in 0.0 seconds as it raised OSError: 5 is not i wanted number.
DEBUG:__main__:Retrying __main__.do_something_unreliable in 0.0 seconds as it raised OSError: 8 is not i wanted number.
5 ~~~
8 ~~~
10
其他参数参看官方api文档示例
自定义回调retry_error_callback
- 我们在重试失败后再次抛出异常,也可以自定义回调(在所有的尝试失败后)
- callback函数参数retry_state必须有,这个参数包含当前重试调用的所有信息
import random
from tenacity import *
def is_false(value):
return value is False
def return_last_value(retry_state):
# return retry_state.outcome.result()
return "can't get i wanted number after 5 retries"
@retry(stop=stop_after_attempt(5), retry=retry_if_result(is_false), retry_error_callback=return_last_value)
def do_something_unreliable():
num = random.randint(0, 10)
if num < 11:
print(num, '~~~')
return False
else:
return num
if __name__ == '__main__':
print(do_something_unreliable())
更多高级用法参考官方api文档
merry
通过tenacity的介绍,对于异常处理,我们可以在多次重试后再次抛出异常,或者通过回调函数来进一步处理。而merry也是另一种异常处理的第三方库。
merry官方文档
merry安装
pip3 install merry
merry的基本使用
- 例:两个数相除,并将结果写入文件
def write_to_file(num1, num2, filename):
result = num1 / num2
with open(filename, 'w') as f:
f.write(str(result))
显然,上面的例子是很不严谨的,没有判断 num1、num2 的类型,如果不是数字类型,那会抛出 TypeError;没有判断 num2 是不是 0,如果是 0,那么会抛出 ZeroDivisionError;没有判断文件路径是否存在,如果是子文件夹下的路径,文件夹不存在的话,会抛出 FileNotFoundError
# 分别执行如下调用 write_to_file(1, 2, 'result.txt') write_to_file(1, 'a', 'result.txt') write_to_file(1, 0, 'result.txt') write_to_file(1, 2, 'myresult/result.txt')
- 用try...except来改写
def write_to_file(num1, num2, filename):
try:
result = num1 / num2
with open(filename, 'w') as f:
f.write(str(result))
except ZeroDivisionError as e:
print(e)
except Exception as e:
print(e)
这样处理也行,但是可读性很差,而且显得很臃肿,在项目中针对这样的处理可能还有很多,这样,我们就不得不重复的去写很多相同的代码
- 使用merry来改写
from merry import Merry
merry = Merry()
# merry = Merry(debug=True) # 设置为True则异常会被抛出
merry.logger.disabled = True
@merry._try
def write_to_file(num1, num2, filename):
result = num1 / num2
with open(filename, 'w') as f:
f.write(str(result))
# @merry._except(ZeroDivisionError, debug=True) # 单独设置某一特定异常被抛出
@merry._except(ZeroDivisionError)
def zero_division_error(e):
print(f'Error: {e}')
@merry._except(Exception)
def catch_all(e):
print('other errors: ' + str(e))
write_to_file(1, 2, 'result.txt')
write_to_file(1, 'a', 'result.txt')
write_to_file(1, 0, 'result.txt')
write_to_file(1, 2, 'myresult/result.txt')
主逻辑里面不用额外加异常处理代码了,显得更简洁;主逻辑不需要再用try except;每个异常处理方法单独分开了,可以实现解耦和重用
- 异常处理函数有一个可选的参数(e),该参数为异常对象
- merry.logger.disabled,默认为False,设置为True,则会忽略logger的打印
- debug设置(某一)异常是否被抛出,进而单独来处理
merry处理异常else和finally装饰器
# 没有任何异常发生时则执行
@merry._else
def else_clause():
print('No exceptions where raised!')
# 不管是否发生异常,都要执行
@merry._finally
def finally_clause():
print('Clean up time!')
更多高级用法参考官方文档,如merry异常的日志处理等