(资料图片)
就像一个浏览器一样,它会在同一个会话中自动处理cookie信息,不需要写任何额外的代码。
import requests
session = requests.Session() # 理解为就是一个浏览器
type(session)
requests.sessions.Session
session.post()# 登录session.get() # 获取某个数据,会自动携带上一步收到的cookie
# 课堂派案例headers = {"cookie": "FZ_STROAGE.ketangpai.com=eyJTRUVTSU9OSUQiOiIzMTI5MjRiNTU2MzNmMDUxIiwiU0VFU0lPTkRBVEUiOjE2Mzk1NzA0NDQ3Njd9; ARK_ID=undefined; ketangpai_home_slb=3fbda3fc94d5d1be63720d9c156288d0; PHPSESSID=kmugv5id4lcecie33asikt3p96; ketangpai_home_remember=think%3A%7B%22username%22%3A%22MDAwMDAwMDAwMLV2x5eHz7dthN523LWtftmC0IDak4NubQ%22%2C%22expire%22%3A%22MDAwMDAwMDAwMLOGvd6IubtrhKiGl7G2dZ4%22%2C%22token%22%3A%22MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-q6iZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug6eDl36KYW0%22%2C%22sign%22%3A%2207f1bd0c97817e6d7ebe92bfe8e30fe9%22%7D", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36"}res = requests.get(url="https://v4.ketangpai.com/UserApi/getUserInfo")
res.status_code
200
res.cookies
res.json()
{"status": 0, "info": "您还未登陆!"}
session = requests.Session() # 1. 创建一个session对象
# 2. 登录login_url = "https://v4.ketangpai.com/UserApi/login"data = {"email": "877***9301@qq.com", "password": "Pyt***inlan", "remember": 0}# json data paramsresponse = session.post(url=login_url, data=data)
session.cookies
res = session.get(url="https://v4.ketangpai.com/UserApi/getUserInfo")
res.json()
{"status": 1, "data": {"id": "MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ", "username": "****", "avatar": "http://v4.ketangpai.com/Public/Common/img/40/26.png", "usertype": "1", "email": "877***01@qq.com", "stno": "", "atteststate": 0, "attestInfo": []}}
# 如果不用session对象,每一步都需要自己处理cookie
login_url = "https://v4.ketangpai.com/UserApi/login"data = {"email": 877***9301@qq.com", "password": "Pyt***inlan", "remember": 0}# 1.登录response = requests.post(url=login_url, data=data)
response.status_code
200
response.json()
{"status": 1, "url": "/Main/index.html", "token": "MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-haiZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug7d33n96YW0", "isenterprise": 0, "uid": "MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ"}
# 2.获取数据res = requests.get(url="https://v4.ketangpai.com/UserApi/getUserInfo", cookies=response.cookies)
res.json()
{"status": 1, "data": {"id": "MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ", "username": "****", "avatar": "http://v4.ketangpai.com/Public/Common/img/40/26.png", "usertype": "1", "email": "877***01@qq.com", "stno": "", "atteststate": 0, "attestInfo": []}}
requests库的session对象仅仅只是自动帮我们处理了cookie的携带问题。
思路:
import jsonimport unittestfrom jsonpath import jsonpathimport settingfrom common import logger, dbfrom common.data_handler import ( replace_args_by_re, generate_no_usr_phone)from common.encrypt_handler import generate_signimport requestsclass BaseCase(unittest.TestCase): """ 用例基类 """ db = db logger = logger setting = setting name = "base_case" session = requests.session() # 创建一个session对象用来处理session鉴权 @classmethod def setUpClass(cls) -> None: cls.logger.info("==========={}接口开始测试===========".format(cls.name)) @classmethod def tearDownClass(cls) -> None: cls.logger.info("==========={}接口结束测试===========".format(cls.name)) def flow(self, item): """ 测试流程 """ self.logger.info(">>>>>>>用例{}开始执行>>>>>>>>".format(item["title"])) # 把测试数据绑定到方法属性case上,其他也要把一些变量绑定到对象的属性上 self._case = item # 1. 处理测试数据 self.process_test() # 2. 发送请求 self.send_request() # 3. 断言 self.assert_all() def process_test(self): """ 测试数据的处理 """ # 1.1 生成测试数据 self.generate_test_data() # 1.2 替换依赖参数 self.replace_args() # 1.3 处理url self.process_url() # 1.4 鉴权处理 self.auth_process() def auth_process(self): """ v3版本鉴权处理 :return: """ request_data = self._case.get("request_data") if request_data: headers = request_data.get("headers") if headers: if headers.get("X-Lemonban-Media-Type") == "lemonban.v3": # 获取token token = self._case["request_data"]["headers"]["Authorization"].split(" ")[-1] # 生成签名 sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY) # 添加到请求数据中 self._case["request_data"]["json"]["sign"] = sign self._case["request_data"]["json"]["timestamp"] = timestamp # if self._case["request_data"]["headers"]["X-Lemonban-Media-Type"] == "lemonban.v3": # # 获取token # token = self._case["request_data"]["headers"]["Authorization"].split(" ")[-1] # # 生成签名 # sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY) # # 添加到请求数据中 # self._case["request_data"]["json"]["sign"] = sign # self._case["request_data"]["json"]["timestamp"] = timestamp def generate_test_data(self): """ 生成测试数据 """ """ 生成测试数据,不是固定流程,有不同可以复写 :return: """ self._case = json.dumps(self._case) if "$phone_number$" in self._case: phone = generate_no_usr_phone() self._case = self._case.replace("$phone_number$", phone) self._case = json.loads(self._case) def replace_args(self): """ 替换参数 """ self._case = json.dumps(self._case) # 把用例数据dumps成字符串,一次替换 self._case = replace_args_by_re(self._case, self) self._case = json.loads(self._case) # 再将request_data, expect_data loads为字典 try: self._case["request_data"] = json.loads(self._case["request_data"]) self._case["expect_data"] = json.loads(self._case["expect_data"]) except Exception as e: self.logger.error("{}用例的测试数据格式不正确".format(self._case["title"])) raise e def process_url(self): """ 处理url """ if self._case["url"].startswith("http"): # 是否是全地址 pass elif self._case["url"].startswith("/"): # 是否是短地址 self._case["url"] = self.setting.PROJECT_HOST + self._case["url"] else: # 接口名称 try: self._case["url"] = self.setting.INTERFACES[self._case["url"]] except Exception as e: self.logger.error("接口名称:{}不存在".format(self._case["url"])) raise e def send_request(self): """ 发送请求 @return: """ self._response = self.session.request( url=self._case["url"], method=self._case["method"], **self._case["request_data"]) # self._response = send_http_request(url=self._case["url"], method=self._case["method"], # **self._case["request_data"]) def assert_all(self): try: # 3.1 断言响应状态码 self.assert_status_code() # 3.2 断言响应数据 self.assert_response() # 响应结果断言成功后就提取依赖数据 self.extract_data() # 3.3 数据库断言后面的任务 self.assert_sql() except Exception as e: self.logger.error("++++++用例{}测试失败".format(self._case["title"])) raise e else: self.logger.info("<<<<<<<<<用例{}测试成功<<<<<<<".format(self._case["title"])) def assert_status_code(self): """ 断言响应状态码 """ try: self.assertEqual(self._case["status_code"], self._response.status_code) except AssertionError as e: self.logger.warning("用例【{}】响应状态码断言异常".format(self._case["title"])) raise e else: self.logger.info("用例【{}】响应状态码断言成功".format(self._case["title"])) def assert_response(self): """ 断言响应数据 """ if self._case["res_type"].lower() == "json": res = self._response.json() elif self._case["res_type"].lower() == "html": # 扩展思路 res = self._response.text try: self.assertEqual(self._case["expect_data"], {"code": res["code"], "msg": res["msg"]}) except AssertionError as e: self.logger.warning("用例【{}】响应数据断言异常".format(self._case["title"])) self.logger.warning("用例【{}】期望结果为:{}".format(self._case["title"], self._case["expect_data"])) self.logger.warning("用例【{}】的响应结果:{}".format(self._case["title"], res)) raise e else: self.logger.info("用例【{}】响应数据断言成功".format(self._case["title"])) def assert_sql(self): """ 断言数据库 """ if self._case.get("sql"): # 返回指定键的值,如果键不在字典中返回默认值 None 或者设置的默认值。 # 只有sql字段有sql的才需要校验数据库 # 只有sql字段有sql的才需要校验数据库 sqls = self._case["sql"].split(",") for sql in sqls: try: self.assertTrue(self.db.exist(sql)) except AssertionError as e: self.logger.warning("用例【{}】数据库断言异常,执行的sql为:{}".format(self._case["title"], sql)) raise e except Exception as e: self.logger.warning("用例【{}】数据库断言异常,执行的sql为:{}".format(self._case["title"], sql)) raise e def extract_data(self): """ 根据提取表达式提取对应的数据 :return: """ if self._case.get("extract"): if self._case["res_type"].lower() == "json": self.extract_data_from_json() elif self._case["res_type"].lower() == "html": self.extract_data_from_html() elif self._case["res_type"].lower() == "xml": self.extract_data_from_xml() else: raise ValueError("res_type类型不正确,只支持json,html,xml") def extract_data_from_json(self): """ 从json数据中提取数据并绑定到类属性中 :return: """ try: rules = json.loads(self._case.get("extract")) except Exception as e: raise ValueError("用例【{}】的extract字段数据:{}格式不正确".format(self._case["title"], self._case["extract"])) for rule in rules: # 类属性名 name = rule[0] # 提取表达式 exp = rule[1] # 根据jsonpath去响应中提取值 value = jsonpath(self._response.json(), exp) # 如果能提取到值 if value: # 把值绑定到对应的类属性上 setattr(self.__class__, name, value[0]) # 注意value是个列表 else: # 提取不到值,说明jsonpath写错了,或者是响应又问题 raise ValueError("用例【{}】的提取表达式{}提取不到数据".format(self._case["title"], self._case["extract"])) def extract_data_from_html(self): """ 从html数据中提取数据并绑定到类属性中 :return: """ raise ValueError("请实现此方法") def extract_data_from_xml(self): """ 从xml数据中提取数据并绑定到类属性中 :return: """ raise ValueError("请实现此方法")
from unittestreport import ddt, list_datafrom common.base_case import BaseCasecases = [ {"title": "课堂派登录", "method": "post", "url": "https://v4.ketangpai.com/UserApi/login", "request_data": "{"data": {"email": "877***01@qq.com", "password": "Pyt***inlan", "remember": 0}}", "status_code": 200, "res_type": "json", "expect_data": "{"status": 1}" }, { "title": "获取用户信息", "method": "get", "url": "https://v4.ketangpai.com/UserApi/getUserInfo", "request_data": "{}", "status_code": 200, "res_type": "json", "expect_data": "{"status": 1}" }]@ddtclass TestCourseFlow(BaseCase): name = "课堂派测试" @list_data(cases) def test_course(self, item): self.flow(item) def assert_response(self): """ 复写assert_response方法实现课堂派的断言 :return: """ if self._case["res_type"].lower() == "json": res = self._response.json() elif self._case["res_type"].lower() == "html": # 扩展思路 res = self._response.text try: self.assertEqual(self._case["expect_data"], {"status": res["status"]}) except AssertionError as e: self.logger.warning("用例【{}】响应数据断言异常".format(self._case["title"])) self.logger.warning("用例【{}】期望结果为:{}".format(self._case["title"], self._case["expect_data"])) self.logger.warning("用例【{}】的响应结果:{}".format(self._case["title"], res)) raise e else: self.logger.info("用例【{}】响应数据断言成功".format(self._case["title"]))if __name__ == "__main__": BaseCase.unittest.main()
标签: