Coverage for kea/kea.py: 84%

202 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-22 16:05 +0800

1import logging 

2import random 

3import time 

4import os 

5import sys 

6import importlib 

7import inspect 

8import attr 

9from .utils import INITIALIZER_MARKER, MAINPATH_MARKER, RULE_MARKER 

10 

11from dataclasses import dataclass 

12from typing import Dict, List, TYPE_CHECKING, Optional, Union 

13from uiautomator2.exceptions import UiObjectNotFoundError 

14 

15if TYPE_CHECKING: 

16 from .kea_test import Rule, MainPath, KeaTest 

17 from .android_pdl_driver import Android_PDL_Driver 

18 from .harmonyos_pdl_driver import HarmonyOS_PDL_Driver 

19 

20@dataclass 

21class CHECK_RESULT: 

22 ASSERTION_FAILURE = 0 

23 PASS = 1 

24 UI_NOT_FOUND = 2 

25 PRECON_NOT_SATISFIED = 3 

26 UNKNOWN_EXECPTION = 4 

27 

28OUTPUT_DIR = "output" 

29 

30@attr.s(frozen=True) 

31class Rule: 

32 """ 

33 A rule corresponds to a property, including the preconditions,  

34 the interaction scenario, the postconditions (in the form of assertions). 

35 """ 

36 

37 # `preconditions` denotes the preconditions annotated with `@precondition` 

38 preconditions = attr.ib() 

39 

40 # `function` denotes the function of @Rule.  

41 # This function includes the interaction scenario and the assertions (i.e., the postconditions) 

42 function = attr.ib() 

43 

44 def evolve(self, **changes) -> "Rule": 

45 return Rule(**{**self.__dict__, **changes}) 

46 

47 def __str__(self) -> str: 

48 r = f"{self.function.__module__}.{self.function.__qualname__.split('.')[0]}.Rule(function: {self.function.__name__})" 

49 return r 

50 

51@attr.s() 

52class Initializer: 

53 

54 # `function` denotes the function of `@initializer. 

55 function = attr.ib() 

56 

57@attr.s() 

58class MainPath: 

59 

60 # `function` denotes the function of `@mainPath. 

61 function = attr.ib() 

62 

63 # the interaction steps (events) in the main path 

64 path: List[str] = attr.ib() 

65 

66 

67class KeaTestElements: 

68 """ 

69  

70 KeaTestElements cannot be accessed by the users to avoid information leakage. 

71 """ 

72 def __init__(self, keaTest_name): 

73 self.keaTest_name = keaTest_name 

74 self.rules:List["Rule"] = list() 

75 self.initializers:List["Initializer"] = list() 

76 self.mainPaths:List["MainPath"] = list() 

77 

78 def load_rules(self, keaTest:"KeaTest"): 

79 """ 

80 Load the rule from the KeaTest class (user written property). 

81 """ 

82 for _, v in inspect.getmembers(keaTest): 

83 rule = getattr(v, RULE_MARKER, None) 

84 if rule is not None: 

85 self.rules.append(rule) 

86 

87 def load_initializers(self, keaTest:"KeaTest"): 

88 """ 

89 Load the initializers from the KeaTest class (user written property). 

90 """ 

91 for _, v in inspect.getmembers(keaTest): 

92 initializer = getattr(v, INITIALIZER_MARKER, None) 

93 if initializer is not None: 

94 self.initializers.append(initializer) 

95 

96 def load_mainPaths(self, keaTest:"KeaTest"): 

97 """ 

98 Load the mainPaths from the KeaTest class (user written property). 

99 """ 

100 for _, v in inspect.getmembers(keaTest): 

101 mainPath = getattr(v, MAINPATH_MARKER, None) 

102 if mainPath is not None: 

103 self.mainPaths.append(mainPath) 

104 

105class Kea: 

106 """Kea class 

107 

108 Kea class is a manager of all the user defined app properties. It manages all the  

109 properties at runtime and provides a set of methods for reading and executing these properties. 

110 

111 In Kea, one kea test denotes one app property file, which includes the elements 

112 of a property (i.e., the property, the main path, the initializer). 

113 """ 

114 # the database storing all kea tests (i.e., all the app properties to be tested) 

115 _KeaTest_DB: Dict["KeaTest", "KeaTestElements"] = {} 

116 # the driver for executing kea tests 

117 _pdl_driver: Optional[Union["Android_PDL_Driver", "HarmonyOS_PDL_Driver"]] 

118 _all_rules_list = None 

119 

120 def __init__(self): 

121 self.logger = logging.getLogger(self.__class__.__name__) 

122 

123 @property 

124 def all_rules(self) -> List["Rule"]: 

125 """ 

126 :return: load rules from all Kea_Tests 

127 """ 

128 if self._all_rules_list is None: 

129 self._all_rules_list = list() 

130 for keaTestElements in self._KeaTest_DB.values(): 

131 self._all_rules_list.extend(keaTestElements.rules) 

132 return self._all_rules_list 

133 

134 @property 

135 def initializer(self) -> Initializer: 

136 """ 

137 By default, one app only has one initializer.  

138 """ 

139 for keaTest, keaTestElements in self._KeaTest_DB.items(): 

140 if len(keaTestElements.initializers) > 0: 

141 self.logger.info(f"Successfully found an initializer in {keaTest}") 

142 return keaTestElements.initializers[0] 

143 

144 self.logger.warning("No initializer found for current apps.") 

145 return None 

146 

147 @property 

148 def all_mainPaths(self): 

149 all_mainPaths = [] 

150 for keaTestElements in self._KeaTest_DB.values(): 

151 all_mainPaths.extend(keaTestElements.mainPaths) 

152 return all_mainPaths 

153 

154 @classmethod 

155 def set_pdl_driver(cls, driver:Optional[Union["Android_PDL_Driver", "HarmonyOS_PDL_Driver"]]): 

156 """set the driver 

157 """ 

158 cls._pdl_driver = driver 

159 

160 @classmethod 

161 def load_app_properties(cls, property_files): 

162 """load the app properties to be tested 

163 

164 load each property file and instantiate the corresponding test case 

165 """ 

166 workspace_path = os.path.abspath(os.getcwd()) 

167 

168 # remove duplicated property files 

169 property_files = list(set(property_files)) 

170 

171 for file in property_files: 

172 

173 # get the absolute path of the property file 

174 file_abspath = os.path.join(workspace_path, file) if not os.path.isabs(file) else file 

175 if not os.path.exists(file_abspath): 

176 raise FileNotFoundError(f"{file} not exists.") 

177 

178 module_dir = os.path.dirname(file_abspath) 

179 

180 # load the module dir into the system path 

181 if module_dir not in sys.path: 

182 sys.path.insert(0, module_dir) 

183 

184 # dynamically change the workspace to make sure  

185 # the import of the user properties work correctly 

186 os.chdir(module_dir) 

187 

188 module_name, extension_name = (str(_) for _ in os.path.splitext(os.path.basename(file_abspath))) 

189 if not extension_name == ".py": 

190 print(f"{file} is not a property file... skipping this file") 

191 continue 

192 

193 try: 

194 module = importlib.import_module(module_name) 

195 

196 #! IMPORTANT: set the pdl driver in the modules (the user written properties) 

197 module.d = cls._pdl_driver 

198 

199 from .kea_test import KeaTest 

200 

201 # find all kea tests in the module and attempt to load them. 

202 for _, obj in inspect.getmembers(module): 

203 if inspect.isclass(obj) and issubclass(obj, KeaTest) and obj is not KeaTest: 

204 print(f"Loading property {obj.__name__} from {file}") 

205 cls.load_KeaTest(obj) 

206 

207 except ModuleNotFoundError as e: 

208 print(f"Error importing module {module_name}: {e}") 

209 

210 os.chdir(workspace_path) 

211 

212 @classmethod 

213 def load_KeaTest(cls, keaTest:"KeaTest"): 

214 """load kea tests from the app properties 

215 

216 """ 

217 keaTestElements = cls.init_KeaTestElements(keaTest) 

218 keaTestElements.load_initializers(keaTest) 

219 keaTestElements.load_rules(keaTest) 

220 keaTestElements.load_mainPaths(keaTest) 

221 

222 if len(keaTestElements.rules) == 0: 

223 raise Exception(f"No rule defined in {cls.__name__}") 

224 

225 @classmethod 

226 def init_KeaTestElements(cls, keaTest:"KeaTest") -> "KeaTestElements": 

227 """ 

228 Init the KeaTestElements for current KeaTest. 

229 If the KeaTestElements for current KeaTest class has already been initialized. Find it and return it. 

230 

231 :return: KeaPBTest 

232 """ 

233 # use a dict to store the KeaTestElements obj and make sure every 

234 # KeaTestElements obj can only be instantiate once. 

235 keaTest_name = keaTest.__module__ + '.' + keaTest.__name__ 

236 keaTestElements = cls._KeaTest_DB.get(keaTest, KeaTestElements(keaTest_name)) 

237 cls._KeaTest_DB[keaTest] = keaTestElements 

238 return keaTestElements 

239 

240 def execute_rules(self, rules): 

241 ''' 

242 random choose a rule, if the rule has preconditions, check the preconditions. 

243 if the preconditions are satisfied, execute the rule. 

244 ''' 

245 

246 if len(rules) == 0: 

247 return CHECK_RESULT.PRECON_NOT_SATISFIED 

248 rule_to_check = random.choice(rules) 

249 return self.execute_rule(rule_to_check, keaTest=None) 

250 

251 def execute_rule(self, rule:"Rule", keaTest:"KeaTest"): 

252 """ 

253 execute a rule and return the execution result 

254 """ 

255 self.logger.info(f"executing rule:\n{rule}") 

256 if len(rule.preconditions) > 0: 

257 if not all(precond(keaTest) for precond in rule.preconditions): 

258 return CHECK_RESULT.PRECON_NOT_SATISFIED 

259 # try to execute the rule and catch the exception if assertion error throws 

260 try: 

261 time.sleep(1) 

262 # execute the interaction scenario I 

263 rule.function(keaTest) 

264 time.sleep(1) 

265 except UiObjectNotFoundError as e: 

266 

267 self.logger.info("Could not find the UI object.") 

268 import traceback 

269 tb = traceback.extract_tb(e.__traceback__) 

270 

271 # Find the last traceback information, specifically the error inside rule.function 

272 last_call = tb[1] 

273 line_number = last_call.lineno 

274 file_name = last_call.filename 

275 code_context = last_call.line.strip() 

276 

277 # Print the line number and code content of the error. 

278 self.logger.warning(f"Error occurred in file {file_name} on line {line_number}:") 

279 self.logger.warning(f"Code causing the error: {code_context}") 

280 return CHECK_RESULT.UI_NOT_FOUND 

281 except AssertionError as e: 

282 # the postcondition Q is violated  

283 self.logger.error("Assertion failed: " + str(e)) 

284 return CHECK_RESULT.ASSERTION_FAILURE 

285 except Exception as e: 

286 self.logger.error("Unexpected exeception during executing rule: "+str(e)) 

287 return CHECK_RESULT.UNKNOWN_EXECPTION 

288 

289 return CHECK_RESULT.PASS 

290 

291 def execute_initializer(self, initializer: "Initializer"): 

292 try: 

293 initializer.function(self) 

294 except UiObjectNotFoundError as e: 

295 

296 self.logger.info("Could not find the UI object.") 

297 import traceback 

298 tb = traceback.extract_tb(e.__traceback__) 

299 

300 # Find the last traceback information, specifically the error inside rule.function 

301 last_call = tb[1] 

302 line_number = last_call.lineno 

303 file_name = last_call.filename 

304 code_context = last_call.line.strip() 

305 

306 # Print the line number and code content of the error. 

307 self.logger.warning(f"Error occurred in file {file_name} on line {line_number}:") 

308 self.logger.warning(f"Code causing the error: {code_context}") 

309 return CHECK_RESULT.UI_NOT_FOUND 

310 except Exception as e: 

311 self.logger.error("Unexpected exeception during executing rule: "+str(e)) 

312 return CHECK_RESULT.UNKNOWN_EXECPTION 

313 return CHECK_RESULT.PASS 

314 

315 def execute_event_from_main_path(self, executable_script): 

316 # d for PDL driver. Set the d as a local var to make it available in exectuable_scripts 

317 d = self._pdl_driver 

318 exec(executable_script) 

319 

320 def get_rules_whose_preconditions_are_satisfied(self) -> Dict["Rule", "KeaTest"]: 

321 '''Check all rules and return the list of rules that meet the preconditions.''' 

322 rules_passed_precondition:Dict["Rule", "KeaTest"] = {} 

323 

324 for keaTest, keaTestElements in self._KeaTest_DB.items(): 

325 for target_rule in keaTestElements.rules: 

326 if len(target_rule.preconditions) > 0: 

327 if all(precond(keaTest) for precond in target_rule.preconditions): 

328 rules_passed_precondition[target_rule] = keaTest 

329 

330 return rules_passed_precondition 

331 

332 def get_rules_without_preconditions(self) -> Dict["Rule", "KeaTest"]: 

333 '''Return the list of rules that do not have preconditions. 

334  

335 When a rule does not have preconditions, its preconditions are always true 

336 ''' 

337 rules_without_precondition:Dict["Rule", "KeaTest"] = {} 

338 

339 for keaTest, keaTestElements in self._KeaTest_DB.items(): 

340 for target_rule in keaTestElements.rules: 

341 if len(target_rule.preconditions) == 0: 

342 rules_without_precondition[target_rule] = keaTest 

343 return rules_without_precondition 

344 

345 def teardown(self): 

346 """Called after a run has finished executing to clean up any necessary 

347 state. 

348 Does nothing by default. 

349 """ 

350 ... 

351