SoFunction
Updated on 2024-11-21

Python3+Requests+Excel complete interface automation test framework implementation

The framework as a whole uses Python3+Requests+Excel: includes access to real-time tokens

1、------base

-------

runmethond: encapsulation of different request methods

import json
import requests

.urllib3.disable_warnings()

class RunMethod:
  def post_main(self, url, data, header=None):
    res = None
    if header != None:
      res = (url=url, data=data, headers=header,verify=False)
    else:
      res = (url=url, data=data,verify=False)
    return ()

  def get_main(self, url, data=None, header=None):
    res = None
    if header != None:
      res = (url=url, params=data, headers=header, verify=False)
    else:
      res = (url=url, params=data, verify=False)
    return ()

  def run_main(self, method, url, data=None, header=None):
    res = None
    if method == 'Post':
      res = self.post_main(url, data, header)
    else:
      res = self.get_main(url, data, header)
    return (res, indent=2, sort_keys=True, ensure_ascii=False)


if __name__ == '__main__':
  url = '/post'
  data = {
    'cart': '11'
  }
  run = RunMethod()
  run_test = run.run_main(method="Post", url=url, data=data)
  print(run_test)

2、------data

------data_config.py

data_config: get data in excel module

class global_val:
  Id = '0'
  request_name = '1'
  url = '2'
  run = '3'
  request_way = '4'
  header = '5'
  case_depend = '6'
  data_depend = '7'
  field_depend = '8'
  data = '9'
  expect = '10'
  result = '11'


def get_id():
  """Get case_id"""
  return global_val.Id


def get_request_name():
  """Get request module name"""
  return global_val.request_name


def get_url():
  """Get request url"""
  return global_val.url


def get_run():
  """Get whether to run"""
  return global_val.run


def get_run_way():
  """Get request method"""
  return global_val.request_way


def get_header():
  """Get whether it carries a header"""
  return global_val.header


def get_case_depend():
  """case dependency"""
  return global_val.case_depend


def get_data_depend():
  """Dependent return data"""
  return global_val.data_depend


def get_field_depend():
  """Data dependent fields"""
  return global_val.field_depend


def get_data():
  """Get request data"""
  return global_val.data


def get_expect():
  """Obtaining expected results"""
  return global_val.expect


def get_result():
  """Get return results"""
  return global_val.result

3、-----data

-----dependent_data.py

dependent_data: solving data dependency problems

from util.operation_excel import OperationExcel
from  import RunMethod
from data.get_data import GetData
from jsonpath_rw import jsonpath, parse
import json


class DependentData:
  """Addressing Data Dependencies"""

  def __init__(self, case_id):
    self.case_id = case_id
    self.opera_excel = OperationExcel()
     = GetData()

  def get_case_line_data(self):
    """
    Get the entire row of data for the case_id by case_id
    :param case_id: case_id
    :return.
    """
    rows_data = self.opera_excel.get_row_data(self.case_id)
    return rows_data

  def run_dependent(self):
    """
    Execute the dependency test and get the result
    :return.
    """
    run_method = RunMethod()
    row_num = self.opera_excel.get_row_num(self.case_id)
    request_data = .get_data_for_json(row_num)
    # header = .is_header(row_num)
    method = .get_request_method(row_num)
    url = .get_request_url(row_num)
    res = run_method.run_main(method, url, request_data)
    return (res)

  def get_data_for_key(self, row):
    """
    Get the response that executes the dependent case based on the key of the dependency and return it.
    :return.
    """
    depend_data = .get_depend_key(row)
    response_data = self.run_dependent()
    return [ for match in parse(depend_data).find(response_data)][0]

4、-----data

-----get_data.py

get_data: get excel data

from util.operation_excel import OperationExcel
from data import data_config
from util.operation_json import OperationJson


class GetData:
  """Getting excel data"""

  def __init__(self):
    self.opera_excel = OperationExcel()

  def get_case_lines(self):
    """Get the number of excel rows, i.e. the number of cases."""
    return self.opera_excel.get_lines()

  def get_is_run(self, row):
    """Get whether to execute"""
    flag = None
    col = int(data_config.get_run())
    run_model = self.opera_excel.get_cell_value(row, col)
    if run_model == 'yes':
      flag = True
    else:
      flag = False
    return flag

  def is_header(self, row):
    """
    Whether to carry a header
    :param row: row number
    :return.
    """
    col = int(data_config.get_header())
    header = self.opera_excel.get_cell_value(row, col)
    if header != '':
      return header
    else:
      return None

  def get_request_method(self, row):
    """
    Get request method
    :param row: row number
    :return.
    """
    # col columns
    col = int(data_config.get_run_way())
    request_method = self.opera_excel.get_cell_value(row, col)
    return request_method

  def get_request_url(self, row):
    """
    Get url
    :param row: row number
    :return.
    """
    col = int(data_config.get_url())
    url = self.opera_excel.get_cell_value(row, col)
    return url

  def get_request_data(self, row):
    """
    Getting the request data
    :param row: row number
    :return.
    """
    col = int(data_config.get_data())
    data = self.opera_excel.get_cell_value(row, col)
    if data == '':
      return None
    return data

  def get_data_for_json(self, row):
    """
    Getting data data by keyword
    :param row.
    :return.
    """
    opera_json = OperationJson()
    request_data = opera_json.get_data(self.get_request_data(row))
    return request_data

  def get_expcet_data(self, row):
    """
    Getting the expected results
    :param row.
    :return.
    """
    col = int(data_config.get_expect())
    expect = self.opera_excel.get_cell_value(row, col)
    if expect == "":
      return None
    else:
      return expect

  def write_result(self, row, value):
    """
    Write the result data
    :param row.
    :param col.
    :return.
    """
    col = int(data_config.get_result())
    self.opera_excel.write_value(row, col, value)

  def get_depend_key(self, row):
    """
    Get the key of the dependent data
    :param row:row number
    :return.
    """
    col = int(data_config.get_data_depend())
    depend_key = self.opera_excel.get_cell_value(row, col)
    if depend_key == "":
      return None
    else:
      return depend_key

  def is_depend(self, row):
    """
    Determine if there is a case dependency
    :param row:row number
    :return.
    """
    col = int(data_config.get_case_depend()) # Get if there are data dependent columns
    depend_case_id = self.opera_excel.get_cell_value(row, col)
    if depend_case_id == "":
      return None
    else:
      return depend_case_id

  def get_depend_field(self, row):
    """
    Get the dependent fields
    :param row.
    :return.
    """
    col = int(data_config.get_field_depend())
    data = self.opera_excel.get_cell_value(row, col)
    if data == "":
      return None
    else:
      return data

5、-----dataconfig

-----

: Use case data

6、-----dataconfig

-----

: request data, according to their actual business, and is associated with the request data columns in the case layer.

{
 "user": {
  "username": "1111111",
  "password": "123456"
 },
 "filtrate": {
  "type_id": "2",
  "brand_id": "1",
  "model_id": "111"
 },
 "search": {
  "page": "1",
  "keyword": "oppo",
  "type": "12"
 },
 "token": {
  "token": ""
 }

7、-----dataconfig

-----

: Automatically writes the fetched token to this file in real time

{"data": {"token": "db6f0abee4e5040f5337f5c47a82879"}}

8、-----main

-----run_test.py

run_test: main runtime program

from  import RunMethod
from data.get_data import GetData
from util.common_util import CommonUtil
from data.dependent_data import DependentData
# from util.send_email import SendEmail
from util.operation_header import OperationHeader
from util.operation_json import OperationJson


class RunTest:

  def __init__(self):
    self.run_method = RunMethod()
     = GetData()
    self.com_util = CommonUtil()
    # self.send_email = SendEmail()

  def go_on_run(self):
    """Program execution"""
    pass_count = []
    fail_count = []
    res = None
    # Get the number of use cases
    rows_count = .get_case_lines()
    # The first line index is 0
    for i in range(1, rows_count):
      is_run = .get_is_run(i)
      if is_run:
        url = .get_request_url(i)
        method = .get_request_method(i)
        request_data = .get_data_for_json(i)
        expect = .get_expcet_data(i)
        header = .is_header(i)
        depend_case = .is_depend(i)

        if depend_case != None:
          self.depend_data = DependentData(depend_case)
          # Get dependent response data
          depend_response_data = self.depend_data.get_data_for_key(i)
          # Get the dependent key
          depend_key = .get_depend_field(i)
          # Update request fields
          request_data[depend_key] = depend_response_data
        # Writes the returned token from this interface to a file if the header field value is write, reads the file if it is yes
        if header == "write":
          res = self.run_method.run_main(method, url, request_data)
          op_header = OperationHeader(res)
          op_header.write_token()
        elif header == 'yes':
          op_json = OperationJson("../dataconfig/")
          token = op_json.get_data('data')
          request_data = dict(request_data, **token) # Merge request data with login token and use as request data

          res = self.run_method.run_main(method, url, request_data)
        else:
          res = self.run_method.run_main(method, url, request_data)

        if expect != None:
          if self.com_util.is_contain(expect, res):
            .write_result(i, "Pass")
            pass_count.append(i)
          else:
            .write_result(i, res)
            fail_count.append(i)
        else:
          print(f"use caseID:case-{i},The expected result cannot be null")

    # Send mail
    # self.send_email.send_main(pass_count, fail_count)

    print(f"通过use case数:{len(pass_count)}")
    print(f"失败use case数:{len(fail_count)}")


if __name__ == '__main__':
  run = RunTest()
  run.go_on_run()

9、-----util

-----common_util.py

common_util: for assertions

class CommonUtil:
  def is_contain(self, str_one, str_two):
    """
    Determine if a string is in another string
    :param str_one.
    :param str_two.
    :return.
    """
    flag = None
    if str_one in str_two:
      flag = True
    else:
      flag = False
    return flag

10、-----util

-----operation_excel.py

operation_excel: operation excel

import xlrd
from  import copy


class OperationExcel:
  """Operate excel"""

  def __init__(self, file_name=None, sheet_id=None):
    if file_name:
      self.file_name = file_name
      self.sheet_id = sheet_id
    else:
      self.file_name ='../dataconfig/'
      self.sheet_id = 0
     = self.get_data()

  def get_data(self):
    """
    Get the contents of sheets
    :return.
    """
    data = xlrd.open_workbook(self.file_name)
    tables = ()[self.sheet_id]
    return tables

  def get_lines(self):
    """
    Get the number of rows in a cell
    :return.
    """
    tables = 
    return 

  def get_cell_value(self, row, col):
    """
    Getting cell data
    :param row: row
    :param col: Column
    :return.
    """
    tables = 
    cell = tables.cell_value(row, col)
    return cell

  def write_value(self, row, col, value):
    """
    Write back data to excel
    :param row:Row
    :param col:column
    :param value: value
    :return.
    """
    read_data = xlrd.open_workbook(self.file_name)
    write_data = copy(read_data)
    sheet_data = write_data.get_sheet(0)
    sheet_data.write(row, col, value)
    write_data.save(self.file_name)

  def get_row_data(self, case_id):
    """
    Get the contents of the corresponding row based on the corresponding case_id
    :param case_id: case id
    :return.
    """
    row_num = self.get_row_num(case_id)
    row_data = self.get_row_value(row_num)
    return row_data

  def get_row_num(self, case_id):
    """
    Get the corresponding line number based on case_id
    :param case_id.
    :return.
    """
    num = 0
    cols_data = self.get_cols_data()
    for col_data in cols_data:
      if case_id in col_data:
        return num
      num = num + 1

  def get_row_value(self, row):
    """
     Based on the row number, find the contents of the row
    :param row:row number
    :return.

    """
    tables = 
    row_data = tables.row_values(row)
    return row_data

  def get_cols_data(self, col_id=None):
    """
    Get the contents of a column
    :param col_id:column number
    :return.
    """
    if col_id != None:
      cols = .col_values(col_id)
    else:
      cols = .col_values(0)
    return cols


if __name__ == '__main__':
  opera = OperationExcel()
  opera.get_data()
  print(opera.get_data().nrows)
  print(opera.get_lines())
  print(opera.get_cell_value(1, 2))

11、-----util

-----operation_header.py

operation_header: get login token in real time and write token to file

import json
from util.operation_json import OperationJson
from  import RunMethod
class OperationHeader:

  def __init__(self, response):
     = (response)

  def get_response_token(self):
    '''
    Get the token returned from login
    '''
    token = {"data":{"token":['data']['token']}}
    return token

  def write_token(self):
    op_json = OperationJson()
    op_json.write_data(self.get_response_token())

 

if __name__ == '__main__':

  url = "http://xxxxx"

  data = {
    "username": "1111",
    "password": "123456"
  }
  run_method=RunMethod()
  # res = ((url, data).json())
  res=run_method.run_main('Post', url, data)
  op = OperationHeader(res)
  op.write_token()

12、-----util

-----operation_json.py

operation_json: operation json file

import json


class OperationJson:
  """Manipulating json files"""

  def __init__(self,file_path=None):
    if file_path==None:
      self.file_path="../dataconfig/"
    else:
      self.file_path=file_path
     = self.read_data()

  def read_data(self):
    """
    Reading a json file
    :param file_name:file path
    :return.
    """
    with open(self.file_path) as fp:
      data = (fp)
      return data

  def get_data(self, id):
    """Get corresponding data based on keywords"""
    return [id]

  # Write to json
  def write_data(self, data):
    with open("../dataconfig/", 'w') as fp:
      ((data))


if __name__ == '__main__':
  # file_path = "../dataconfig/"
  opejson = OperationJson()
  print(opejson.read_data())
  print(opejson.get_data('filtrate'))

This is the whole content of this article.