Coverage for kea/kea.py: 84%
202 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-22 16:05 +0800
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-22 16:05 +0800
1import logging
2import random
3import time
4import os
5import sys
6import importlib
7import inspect
8import attr
9from .utils import INITIALIZER_MARKER, MAINPATH_MARKER, RULE_MARKER
11from dataclasses import dataclass
12from typing import Dict, List, TYPE_CHECKING, Optional, Union
13from uiautomator2.exceptions import UiObjectNotFoundError
15if TYPE_CHECKING:
16 from .kea_test import Rule, MainPath, KeaTest
17 from .android_pdl_driver import Android_PDL_Driver
18 from .harmonyos_pdl_driver import HarmonyOS_PDL_Driver
20@dataclass
21class CHECK_RESULT:
22 ASSERTION_FAILURE = 0
23 PASS = 1
24 UI_NOT_FOUND = 2
25 PRECON_NOT_SATISFIED = 3
26 UNKNOWN_EXECPTION = 4
28OUTPUT_DIR = "output"
30@attr.s(frozen=True)
31class Rule:
32 """
33 A rule corresponds to a property, including the preconditions,
34 the interaction scenario, the postconditions (in the form of assertions).
35 """
37 # `preconditions` denotes the preconditions annotated with `@precondition`
38 preconditions = attr.ib()
40 # `function` denotes the function of @Rule.
41 # This function includes the interaction scenario and the assertions (i.e., the postconditions)
42 function = attr.ib()
44 def evolve(self, **changes) -> "Rule":
45 return Rule(**{**self.__dict__, **changes})
47 def __str__(self) -> str:
48 r = f"{self.function.__module__}.{self.function.__qualname__.split('.')[0]}.Rule(function: {self.function.__name__})"
49 return r
51@attr.s()
52class Initializer:
54 # `function` denotes the function of `@initializer.
55 function = attr.ib()
57@attr.s()
58class MainPath:
60 # `function` denotes the function of `@mainPath.
61 function = attr.ib()
63 # the interaction steps (events) in the main path
64 path: List[str] = attr.ib()
67class KeaTestElements:
68 """
70 KeaTestElements cannot be accessed by the users to avoid information leakage.
71 """
72 def __init__(self, keaTest_name):
73 self.keaTest_name = keaTest_name
74 self.rules:List["Rule"] = list()
75 self.initializers:List["Initializer"] = list()
76 self.mainPaths:List["MainPath"] = list()
78 def load_rules(self, keaTest:"KeaTest"):
79 """
80 Load the rule from the KeaTest class (user written property).
81 """
82 for _, v in inspect.getmembers(keaTest):
83 rule = getattr(v, RULE_MARKER, None)
84 if rule is not None:
85 self.rules.append(rule)
87 def load_initializers(self, keaTest:"KeaTest"):
88 """
89 Load the initializers from the KeaTest class (user written property).
90 """
91 for _, v in inspect.getmembers(keaTest):
92 initializer = getattr(v, INITIALIZER_MARKER, None)
93 if initializer is not None:
94 self.initializers.append(initializer)
96 def load_mainPaths(self, keaTest:"KeaTest"):
97 """
98 Load the mainPaths from the KeaTest class (user written property).
99 """
100 for _, v in inspect.getmembers(keaTest):
101 mainPath = getattr(v, MAINPATH_MARKER, None)
102 if mainPath is not None:
103 self.mainPaths.append(mainPath)
105class Kea:
106 """Kea class
108 Kea class is a manager of all the user defined app properties. It manages all the
109 properties at runtime and provides a set of methods for reading and executing these properties.
111 In Kea, one kea test denotes one app property file, which includes the elements
112 of a property (i.e., the property, the main path, the initializer).
113 """
114 # the database storing all kea tests (i.e., all the app properties to be tested)
115 _KeaTest_DB: Dict["KeaTest", "KeaTestElements"] = {}
116 # the driver for executing kea tests
117 _pdl_driver: Optional[Union["Android_PDL_Driver", "HarmonyOS_PDL_Driver"]]
118 _all_rules_list = None
120 def __init__(self):
121 self.logger = logging.getLogger(self.__class__.__name__)
123 @property
124 def all_rules(self) -> List["Rule"]:
125 """
126 :return: load rules from all Kea_Tests
127 """
128 if self._all_rules_list is None:
129 self._all_rules_list = list()
130 for keaTestElements in self._KeaTest_DB.values():
131 self._all_rules_list.extend(keaTestElements.rules)
132 return self._all_rules_list
134 @property
135 def initializer(self) -> Initializer:
136 """
137 By default, one app only has one initializer.
138 """
139 for keaTest, keaTestElements in self._KeaTest_DB.items():
140 if len(keaTestElements.initializers) > 0:
141 self.logger.info(f"Successfully found an initializer in {keaTest}")
142 return keaTestElements.initializers[0]
144 self.logger.warning("No initializer found for current apps.")
145 return None
147 @property
148 def all_mainPaths(self):
149 all_mainPaths = []
150 for keaTestElements in self._KeaTest_DB.values():
151 all_mainPaths.extend(keaTestElements.mainPaths)
152 return all_mainPaths
154 @classmethod
155 def set_pdl_driver(cls, driver:Optional[Union["Android_PDL_Driver", "HarmonyOS_PDL_Driver"]]):
156 """set the driver
157 """
158 cls._pdl_driver = driver
160 @classmethod
161 def load_app_properties(cls, property_files):
162 """load the app properties to be tested
164 load each property file and instantiate the corresponding test case
165 """
166 workspace_path = os.path.abspath(os.getcwd())
168 # remove duplicated property files
169 property_files = list(set(property_files))
171 for file in property_files:
173 # get the absolute path of the property file
174 file_abspath = os.path.join(workspace_path, file) if not os.path.isabs(file) else file
175 if not os.path.exists(file_abspath):
176 raise FileNotFoundError(f"{file} not exists.")
178 module_dir = os.path.dirname(file_abspath)
180 # load the module dir into the system path
181 if module_dir not in sys.path:
182 sys.path.insert(0, module_dir)
184 # dynamically change the workspace to make sure
185 # the import of the user properties work correctly
186 os.chdir(module_dir)
188 module_name, extension_name = (str(_) for _ in os.path.splitext(os.path.basename(file_abspath)))
189 if not extension_name == ".py":
190 print(f"{file} is not a property file... skipping this file")
191 continue
193 try:
194 module = importlib.import_module(module_name)
196 #! IMPORTANT: set the pdl driver in the modules (the user written properties)
197 module.d = cls._pdl_driver
199 from .kea_test import KeaTest
201 # find all kea tests in the module and attempt to load them.
202 for _, obj in inspect.getmembers(module):
203 if inspect.isclass(obj) and issubclass(obj, KeaTest) and obj is not KeaTest:
204 print(f"Loading property {obj.__name__} from {file}")
205 cls.load_KeaTest(obj)
207 except ModuleNotFoundError as e:
208 print(f"Error importing module {module_name}: {e}")
210 os.chdir(workspace_path)
212 @classmethod
213 def load_KeaTest(cls, keaTest:"KeaTest"):
214 """load kea tests from the app properties
216 """
217 keaTestElements = cls.init_KeaTestElements(keaTest)
218 keaTestElements.load_initializers(keaTest)
219 keaTestElements.load_rules(keaTest)
220 keaTestElements.load_mainPaths(keaTest)
222 if len(keaTestElements.rules) == 0:
223 raise Exception(f"No rule defined in {cls.__name__}")
225 @classmethod
226 def init_KeaTestElements(cls, keaTest:"KeaTest") -> "KeaTestElements":
227 """
228 Init the KeaTestElements for current KeaTest.
229 If the KeaTestElements for current KeaTest class has already been initialized. Find it and return it.
231 :return: KeaPBTest
232 """
233 # use a dict to store the KeaTestElements obj and make sure every
234 # KeaTestElements obj can only be instantiate once.
235 keaTest_name = keaTest.__module__ + '.' + keaTest.__name__
236 keaTestElements = cls._KeaTest_DB.get(keaTest, KeaTestElements(keaTest_name))
237 cls._KeaTest_DB[keaTest] = keaTestElements
238 return keaTestElements
240 def execute_rules(self, rules):
241 '''
242 random choose a rule, if the rule has preconditions, check the preconditions.
243 if the preconditions are satisfied, execute the rule.
244 '''
246 if len(rules) == 0:
247 return CHECK_RESULT.PRECON_NOT_SATISFIED
248 rule_to_check = random.choice(rules)
249 return self.execute_rule(rule_to_check, keaTest=None)
251 def execute_rule(self, rule:"Rule", keaTest:"KeaTest"):
252 """
253 execute a rule and return the execution result
254 """
255 self.logger.info(f"executing rule:\n{rule}")
256 if len(rule.preconditions) > 0:
257 if not all(precond(keaTest) for precond in rule.preconditions):
258 return CHECK_RESULT.PRECON_NOT_SATISFIED
259 # try to execute the rule and catch the exception if assertion error throws
260 try:
261 time.sleep(1)
262 # execute the interaction scenario I
263 rule.function(keaTest)
264 time.sleep(1)
265 except UiObjectNotFoundError as e:
267 self.logger.info("Could not find the UI object.")
268 import traceback
269 tb = traceback.extract_tb(e.__traceback__)
271 # Find the last traceback information, specifically the error inside rule.function
272 last_call = tb[1]
273 line_number = last_call.lineno
274 file_name = last_call.filename
275 code_context = last_call.line.strip()
277 # Print the line number and code content of the error.
278 self.logger.warning(f"Error occurred in file {file_name} on line {line_number}:")
279 self.logger.warning(f"Code causing the error: {code_context}")
280 return CHECK_RESULT.UI_NOT_FOUND
281 except AssertionError as e:
282 # the postcondition Q is violated
283 self.logger.error("Assertion failed: " + str(e))
284 return CHECK_RESULT.ASSERTION_FAILURE
285 except Exception as e:
286 self.logger.error("Unexpected exeception during executing rule: "+str(e))
287 return CHECK_RESULT.UNKNOWN_EXECPTION
289 return CHECK_RESULT.PASS
291 def execute_initializer(self, initializer: "Initializer"):
292 try:
293 initializer.function(self)
294 except UiObjectNotFoundError as e:
296 self.logger.info("Could not find the UI object.")
297 import traceback
298 tb = traceback.extract_tb(e.__traceback__)
300 # Find the last traceback information, specifically the error inside rule.function
301 last_call = tb[1]
302 line_number = last_call.lineno
303 file_name = last_call.filename
304 code_context = last_call.line.strip()
306 # Print the line number and code content of the error.
307 self.logger.warning(f"Error occurred in file {file_name} on line {line_number}:")
308 self.logger.warning(f"Code causing the error: {code_context}")
309 return CHECK_RESULT.UI_NOT_FOUND
310 except Exception as e:
311 self.logger.error("Unexpected exeception during executing rule: "+str(e))
312 return CHECK_RESULT.UNKNOWN_EXECPTION
313 return CHECK_RESULT.PASS
315 def execute_event_from_main_path(self, executable_script):
316 # d for PDL driver. Set the d as a local var to make it available in exectuable_scripts
317 d = self._pdl_driver
318 exec(executable_script)
320 def get_rules_whose_preconditions_are_satisfied(self) -> Dict["Rule", "KeaTest"]:
321 '''Check all rules and return the list of rules that meet the preconditions.'''
322 rules_passed_precondition:Dict["Rule", "KeaTest"] = {}
324 for keaTest, keaTestElements in self._KeaTest_DB.items():
325 for target_rule in keaTestElements.rules:
326 if len(target_rule.preconditions) > 0:
327 if all(precond(keaTest) for precond in target_rule.preconditions):
328 rules_passed_precondition[target_rule] = keaTest
330 return rules_passed_precondition
332 def get_rules_without_preconditions(self) -> Dict["Rule", "KeaTest"]:
333 '''Return the list of rules that do not have preconditions.
335 When a rule does not have preconditions, its preconditions are always true
336 '''
337 rules_without_precondition:Dict["Rule", "KeaTest"] = {}
339 for keaTest, keaTestElements in self._KeaTest_DB.items():
340 for target_rule in keaTestElements.rules:
341 if len(target_rule.preconditions) == 0:
342 rules_without_precondition[target_rule] = keaTest
343 return rules_without_precondition
345 def teardown(self):
346 """Called after a run has finished executing to clean up any necessary
347 state.
348 Does nothing by default.
349 """
350 ...