Coverage for kea/input_policy.py: 64%

568 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-22 16:05 +0800

1from dataclasses import dataclass 

2import os 

3import logging 

4import random 

5import copy 

6import re 

7import time 

8from .utils import Time, generate_report 

9from abc import abstractmethod 

10from .input_event import ( 

11 KEY_RotateDeviceToPortraitEvent, 

12 KEY_RotateDeviceToLandscapeEvent, 

13 KeyEvent, 

14 IntentEvent, 

15 ReInstallAppEvent, 

16 RotateDevice, 

17 RotateDeviceToPortraitEvent, 

18 RotateDeviceToLandscapeEvent, 

19 KillAppEvent, 

20 KillAndRestartAppEvent, 

21 SetTextEvent, 

22) 

23from .utg import UTG 

24 

25# from .kea import utils 

26from .kea import CHECK_RESULT 

27from typing import TYPE_CHECKING, Dict 

28 

29if TYPE_CHECKING: 

30 from .input_manager import InputManager 

31 from .kea import Kea 

32 from .app import App 

33 from .device import Device 

34 

35# Max number of restarts 

36MAX_NUM_RESTARTS = 5 

37# Max number of steps outside the app 

38MAX_NUM_STEPS_OUTSIDE = 10 

39MAX_NUM_STEPS_OUTSIDE_KILL = 10 

40# Max number of replay tries 

41MAX_REPLY_TRIES = 5 

42START_TO_GENERATE_EVENT_IN_POLICY = 2 

43# Max number of query llm 

44MAX_NUM_QUERY_LLM = 10 

45 

46# Some input event flags 

47EVENT_FLAG_STARTED = "+started" 

48EVENT_FLAG_START_APP = "+start_app" 

49EVENT_FLAG_STOP_APP = "+stop_app" 

50EVENT_FLAG_EXPLORE = "+explore" 

51EVENT_FLAG_NAVIGATE = "+navigate" 

52EVENT_FLAG_TOUCH = "+touch" 

53 

54# Policy taxanomy 

55POLICY_GUIDED = "guided" 

56POLICY_RANDOM = "random" 

57POLICY_NONE = "none" 

58POLICY_LLM = "llm" 

59 

60 

61@dataclass 

62class RULE_STATE: 

63 PRECONDITION_SATISFIED = "#satisfy pre" 

64 PROPERTY_CHECKED = "#check property" 

65 POSTCONDITION_VIOLATED = "#postcondition is violated" 

66 

67 

68class InputInterruptedException(Exception): 

69 pass 

70 

71 

72class InputPolicy(object): 

73 """ 

74 This class is responsible for generating events to stimulate more app behaviour 

75 It should call AppEventManager.send_event method continuously 

76 """ 

77 

78 def __init__(self, device: "Device", app: "App", allow_to_generate_utg=False): 

79 self.logger = logging.getLogger(self.__class__.__name__) 

80 self.time_recoder = Time() 

81 self.utg = UTG(device=device, app=app) 

82 self.device = device 

83 self.app = app 

84 self.event_count = 0 

85 

86 self.last_event = None 

87 self.from_state = None 

88 self.to_state = None 

89 self.allow_to_generate_utg = allow_to_generate_utg 

90 self.triggered_bug_information = [] 

91 self.time_needed_to_satisfy_precondition = [] 

92 

93 self._num_restarts = 0 

94 self._num_steps_outside = 0 

95 self._event_trace = "" 

96 

97 def start(self, input_manager: "InputManager"): 

98 """ 

99 start producing events 

100 :param input_manager: instance of InputManager 

101 """ 

102 # number of events that have been executed 

103 self.event_count = 0 

104 # self.input_manager = input_manager 

105 while input_manager.enabled and self.event_count < input_manager.event_count: 

106 try: 

107 # always try to close the keyboard on the device. 

108 # if self.device.is_harmonyos is False and hasattr(self.device, "u2"): 

109 # self.device.u2.set_fastinput_ime(True) 

110 

111 self.logger.info("Exploration event count: %d", self.event_count) 

112 

113 if self.to_state is not None: 

114 self.from_state = self.to_state 

115 else: 

116 self.from_state = self.device.get_current_state() 

117 

118 if self.event_count == 0: 

119 # If the application is running, close the application. 

120 event = KillAppEvent(app=self.app) 

121 elif self.event_count == 1: 

122 # start the application 

123 event = IntentEvent(self.app.get_start_intent()) 

124 else: 

125 event = self.generate_event() 

126 

127 if event is not None: 

128 try: 

129 self.device.save_screenshot_for_report( 

130 event=event, current_state=self.from_state 

131 ) 

132 except Exception as e: 

133 self.logger.error("SaveScreenshotForReport failed: %s", e) 

134 self.from_state = self.device.get_current_state() 

135 self.device.save_screenshot_for_report(event=event, current_state=self.from_state) 

136 input_manager.add_event(event) 

137 self.to_state = self.device.get_current_state() 

138 self.last_event = event 

139 if self.allow_to_generate_utg: 

140 self.update_utg() 

141 

142 bug_report_path = os.path.join(self.device.output_dir, "all_states") 

143 # TODO this function signature is too long? 

144 generate_report( 

145 bug_report_path, 

146 self.device.output_dir, 

147 self.triggered_bug_information, 

148 self.time_needed_to_satisfy_precondition, 

149 self.device.cur_event_count, 

150 self.time_recoder.get_time_duration(), 

151 ) 

152 

153 except KeyboardInterrupt: 

154 break 

155 except InputInterruptedException as e: 

156 self.logger.info("stop sending events: %s" % e) 

157 self.logger.info("action count: %d" % self.event_count) 

158 break 

159 

160 except RuntimeError as e: 

161 self.logger.info("RuntimeError: %s, stop sending events" % e) 

162 break 

163 except Exception as e: 

164 self.logger.warning("exception during sending events: %s" % e) 

165 import traceback 

166 

167 traceback.print_exc() 

168 self.event_count += 1 

169 self.tear_down() 

170 

171 def update_utg(self): 

172 self.utg.add_transition(self.last_event, self.from_state, self.to_state) 

173 

174 def move_the_app_to_foreground_if_needed(self, current_state): 

175 """ 

176 if the app is not running on the foreground of the device, then try to bring it back 

177 """ 

178 if current_state.get_app_activity_depth(self.app) < 0: 

179 # If the app is not in the activity stack 

180 start_app_intent = self.app.get_start_intent() 

181 

182 # It seems the app stucks at some state, has been 

183 # 1) force stopped (START, STOP) 

184 # just start the app again by increasing self.__num_restarts 

185 # 2) started at least once and cannot be started (START) 

186 # pass to let viewclient deal with this case 

187 # 3) nothing 

188 # a normal start. clear self.__num_restarts. 

189 

190 if self._event_trace.endswith( 

191 EVENT_FLAG_START_APP + EVENT_FLAG_STOP_APP 

192 ) or self._event_trace.endswith(EVENT_FLAG_START_APP): 

193 self._num_restarts += 1 

194 self.logger.info( 

195 "The app had been restarted %d times.", self._num_restarts 

196 ) 

197 else: 

198 self._num_restarts = 0 

199 

200 # pass (START) through 

201 if not self._event_trace.endswith(EVENT_FLAG_START_APP): 

202 if self._num_restarts > MAX_NUM_RESTARTS: 

203 # If the app had been restarted too many times, enter random mode 

204 msg = "The app had been restarted too many times. Entering random mode." 

205 self.logger.info(msg) 

206 else: 

207 # Start the app 

208 self._event_trace += EVENT_FLAG_START_APP 

209 self.logger.info("Trying to start the app...") 

210 return IntentEvent(intent=start_app_intent) 

211 

212 elif current_state.get_app_activity_depth(self.app) > 0: 

213 # If the app is in activity stack but is not in foreground 

214 self._num_steps_outside += 1 

215 

216 if self._num_steps_outside > MAX_NUM_STEPS_OUTSIDE: 

217 # If the app has not been in foreground for too long, try to go back 

218 if self._num_steps_outside > MAX_NUM_STEPS_OUTSIDE_KILL: 

219 stop_app_intent = self.app.get_stop_intent() 

220 go_back_event = IntentEvent(stop_app_intent) 

221 else: 

222 go_back_event = KeyEvent(name="BACK") 

223 self._event_trace += EVENT_FLAG_NAVIGATE 

224 self.logger.info("Going back to the app...") 

225 return go_back_event 

226 else: 

227 # If the app is in foreground 

228 self._num_steps_outside = 0 

229 

230 @abstractmethod 

231 def tear_down(self): 

232 """ """ 

233 pass 

234 

235 @abstractmethod 

236 def generate_event(self): 

237 """ 

238 generate an event 

239 @return: 

240 """ 

241 pass 

242 

243 @abstractmethod 

244 def generate_random_event_based_on_current_state(self): 

245 """ 

246 generate an event 

247 @return: 

248 """ 

249 pass 

250 

251 

252class KeaInputPolicy(InputPolicy): 

253 """ 

254 state-based input policy 

255 """ 

256 

257 def __init__(self, device, app, kea: "Kea" = None, allow_to_generate_utg=False): 

258 super(KeaInputPolicy, self).__init__(device, app, allow_to_generate_utg) 

259 self.kea = kea 

260 # self.last_event = None 

261 # self.from_state = None 

262 # self.to_state = None 

263 

264 # retrive all the rules from the provided properties 

265 self.statistics_of_rules = {} 

266 for rule in self.kea.all_rules: 

267 self.statistics_of_rules[str(rule)] = { 

268 RULE_STATE.PRECONDITION_SATISFIED: 0, 

269 RULE_STATE.PROPERTY_CHECKED: 0, 

270 RULE_STATE.POSTCONDITION_VIOLATED: 0, 

271 } 

272 

273 def run_initializer(self): 

274 if self.kea.initializer is None: 

275 self.logger.warning("No initializer") 

276 return 

277 

278 result = self.kea.execute_initializer(self.kea.initializer) 

279 if ( 

280 result == CHECK_RESULT.PASS 

281 ): # why only check `result`, `result` could have different values. 

282 self.logger.info("-------initialize successfully-----------") 

283 else: 

284 self.logger.error("-------initialize failed-----------") 

285 

286 def check_rule_whose_precondition_are_satisfied(self): 

287 """ 

288 TODO should split the function 

289 #! xixian - agree to split the function 

290 """ 

291 # ! TODO - xixian - should we emphasize the following data structure is a dict? 

292 rules_ready_to_be_checked = ( 

293 self.kea.get_rules_whose_preconditions_are_satisfied() 

294 ) 

295 rules_ready_to_be_checked.update(self.kea.get_rules_without_preconditions()) 

296 if len(rules_ready_to_be_checked) == 0: 

297 self.logger.debug("No rules match the precondition") 

298 return 

299 

300 candidate_rules_list = list(rules_ready_to_be_checked.keys()) 

301 for candidate_rule in candidate_rules_list: 

302 self.statistics_of_rules[str(candidate_rule)][ 

303 RULE_STATE.PRECONDITION_SATISFIED 

304 ] += 1 

305 # randomly select a rule to check 

306 rule_to_check = random.choice(candidate_rules_list) 

307 

308 if rule_to_check is not None: 

309 self.logger.info(f"-------Check Property : {rule_to_check}------") 

310 self.statistics_of_rules[str(rule_to_check)][ 

311 RULE_STATE.PROPERTY_CHECKED 

312 ] += 1 

313 pre_id = self.device.cur_event_count # TODO what does pre_id mean? 

314 # check rule, record relavant info and output log 

315 result = self.kea.execute_rule( 

316 rule=rule_to_check, keaTest=rules_ready_to_be_checked[rule_to_check] 

317 ) 

318 if result == CHECK_RESULT.ASSERTION_FAILURE: 

319 self.logger.error( 

320 f"-------Postcondition failed. Assertion error, Property:{rule_to_check}------" 

321 ) 

322 self.logger.debug( 

323 "-------time from start : %s-----------" 

324 % str(self.time_recoder.get_time_duration()) 

325 ) 

326 self.statistics_of_rules[str(rule_to_check)][ 

327 RULE_STATE.POSTCONDITION_VIOLATED 

328 ] += 1 

329 post_id = self.device.cur_event_count # TODO what does post_id mean? 

330 self.triggered_bug_information.append( 

331 ( 

332 (pre_id, post_id), 

333 self.time_recoder.get_time_duration(), 

334 rule_to_check.function.__name__, 

335 ) 

336 ) 

337 elif result == CHECK_RESULT.PASS: 

338 self.logger.info( 

339 f"-------Post condition satisfied. Property:{rule_to_check} pass------" 

340 ) 

341 self.logger.debug( 

342 "-------time from start : %s-----------" 

343 % str(self.time_recoder.get_time_duration()) 

344 ) 

345 

346 elif result == CHECK_RESULT.UI_NOT_FOUND: 

347 self.logger.error( 

348 f"-------Execution failed: UiObjectNotFound during exectution. Property:{rule_to_check}-----------" 

349 ) 

350 elif result == CHECK_RESULT.PRECON_NOT_SATISFIED: 

351 self.logger.info("-------Precondition not satisfied-----------") 

352 else: 

353 raise AttributeError(f"Invalid property checking result {result}") 

354 

355 def generate_event(self): 

356 """ 

357 generate an event 

358 @return: 

359 """ 

360 pass 

361 

362 def update_utg(self): 

363 self.utg.add_transition(self.last_event, self.from_state, self.to_state) 

364 

365 

366class RandomPolicy(KeaInputPolicy): 

367 """ 

368 generate random event based on current app state 

369 """ 

370 

371 def __init__( 

372 self, 

373 device, 

374 app, 

375 kea=None, 

376 restart_app_after_check_property=False, 

377 number_of_events_that_restart_app=100, 

378 clear_and_reinstall_app=False, 

379 allow_to_generate_utg=False, 

380 ): 

381 super(RandomPolicy, self).__init__(device, app, kea, allow_to_generate_utg) 

382 self.restart_app_after_check_property = restart_app_after_check_property 

383 self.number_of_events_that_restart_app = number_of_events_that_restart_app 

384 self.clear_and_reinstall_app = clear_and_reinstall_app 

385 self.logger = logging.getLogger(self.__class__.__name__) 

386 self.last_rotate_events = KEY_RotateDeviceToPortraitEvent 

387 

388 def generate_event(self): 

389 """ 

390 generate an event 

391 @return: 

392 """ 

393 

394 if self.event_count == START_TO_GENERATE_EVENT_IN_POLICY or isinstance( 

395 self.last_event, ReInstallAppEvent 

396 ): 

397 self.run_initializer() 

398 self.from_state = self.device.get_current_state() 

399 current_state = self.from_state 

400 if current_state is None: 

401 time.sleep(5) 

402 return KeyEvent(name="BACK") 

403 

404 if self.event_count % self.number_of_events_that_restart_app == 0: 

405 if self.clear_and_reinstall_app: 

406 self.logger.info( 

407 "clear and reinstall app after %s events" 

408 % self.number_of_events_that_restart_app 

409 ) 

410 return ReInstallAppEvent(self.app) 

411 self.logger.info( 

412 "restart app after %s events" % self.number_of_events_that_restart_app 

413 ) 

414 return KillAndRestartAppEvent(app=self.app) 

415 

416 rules_to_check = self.kea.get_rules_whose_preconditions_are_satisfied() 

417 

418 if len(rules_to_check) > 0: 

419 self.time_needed_to_satisfy_precondition.append( 

420 self.time_recoder.get_time_duration() 

421 ) 

422 self.logger.debug( 

423 "has rule that matches the precondition and the time duration is " 

424 + self.time_recoder.get_time_duration() 

425 ) 

426 if random.random() < 0.5: 

427 self.logger.info("Check property") 

428 self.check_rule_whose_precondition_are_satisfied() 

429 if self.restart_app_after_check_property: 

430 self.logger.debug("restart app after check property") 

431 return KillAppEvent(app=self.app) 

432 return None 

433 else: 

434 self.logger.info("Don't check the property due to the randomness") 

435 

436 event = self.generate_random_event_based_on_current_state() 

437 

438 return event 

439 

440 def generate_random_event_based_on_current_state(self): 

441 """ 

442 generate an event based on current UTG 

443 @return: InputEvent 

444 """ 

445 current_state = self.from_state 

446 self.logger.debug("Current state: %s" % current_state.state_str) 

447 event = self.move_the_app_to_foreground_if_needed(current_state) 

448 if event is not None: 

449 return event 

450 

451 possible_events = current_state.get_possible_input() 

452 possible_events.append(KeyEvent(name="BACK")) 

453 possible_events.append(RotateDevice()) 

454 

455 self._event_trace += EVENT_FLAG_EXPLORE 

456 

457 event = random.choice(possible_events) 

458 if isinstance(event, RotateDevice): 

459 # select a rotate event with different direction than last time 

460 if self.last_rotate_events == KEY_RotateDeviceToPortraitEvent: 

461 self.last_rotate_events = KEY_RotateDeviceToLandscapeEvent 

462 event = ( 

463 RotateDeviceToLandscapeEvent() 

464 ) 

465 else: 

466 self.last_rotate_events = KEY_RotateDeviceToPortraitEvent 

467 event = RotateDeviceToPortraitEvent() 

468 return event 

469 

470 

471class GuidedPolicy(KeaInputPolicy): 

472 """ 

473 generate events around the main path 

474 """ 

475 

476 def __init__(self, device, app, kea=None, allow_to_generate_utg=False): 

477 super(GuidedPolicy, self).__init__(device, app, kea, allow_to_generate_utg) 

478 self.logger = logging.getLogger(self.__class__.__name__) 

479 

480 if len(self.kea.all_mainPaths): 

481 self.logger.info("Found %d mainPaths" % len(self.kea.all_mainPaths)) 

482 else: 

483 self.logger.error("No mainPath found") 

484 

485 self.main_path = None 

486 self.execute_main_path = True 

487 

488 self.current_index_on_main_path = 0 

489 self.max_number_of_mutate_steps_on_single_node = 20 

490 self.current_number_of_mutate_steps_on_single_node = 0 

491 self.number_of_events_that_try_to_find_event_on_main_path = 0 

492 self.index_on_main_path_after_mutation = -1 

493 self.mutate_node_index_on_main_path = 0 

494 

495 self.last_random_text = None 

496 self.last_rotate_events = KEY_RotateDeviceToPortraitEvent 

497 

498 def select_main_path(self): 

499 if len(self.kea.all_mainPaths) == 0: 

500 self.logger.error("No mainPath") 

501 return 

502 self.main_path = random.choice(self.kea.all_mainPaths) 

503 # self.path_func, self.main_path = self.kea.parse_mainPath(self.main_path) 

504 self.path_func, self.main_path = self.main_path.function, self.main_path.path 

505 self.logger.info( 

506 f"Select the {len(self.main_path)} steps mainPath function: {self.path_func}" 

507 ) 

508 self.main_path_list = copy.deepcopy(self.main_path) 

509 self.max_number_of_events_that_try_to_find_event_on_main_path = min( 

510 10, len(self.main_path) 

511 ) 

512 self.mutate_node_index_on_main_path = len(self.main_path) 

513 

514 def generate_event(self): 

515 """ """ 

516 current_state = self.from_state 

517 

518 # Return relevant events based on whether the application is in the foreground. 

519 event = self.move_the_app_to_foreground_if_needed(current_state) 

520 if event is not None: 

521 return event 

522 

523 if ((self.event_count == START_TO_GENERATE_EVENT_IN_POLICY) 

524 or isinstance(self.last_event, ReInstallAppEvent)): 

525 self.select_main_path() 

526 self.run_initializer() 

527 time.sleep(2) 

528 self.from_state = self.device.get_current_state() 

529 if self.execute_main_path: 

530 event_str = self.get_next_event_from_main_path() 

531 if event_str: 

532 self.logger.info("*****main path running*****") 

533 self.kea.execute_event_from_main_path(event_str) 

534 return None 

535 if event is None: 

536 # generate event aroud the state on the main path 

537 event = self.mutate_the_main_path() 

538 

539 return event 

540 

541 def stop_mutation(self): 

542 self.index_on_main_path_after_mutation = -1 

543 self.number_of_events_that_try_to_find_event_on_main_path = 0 

544 self.execute_main_path = True 

545 self.current_number_of_mutate_steps_on_single_node = 0 

546 self.current_index_on_main_path = 0 

547 self.mutate_node_index_on_main_path -= 1 

548 if self.mutate_node_index_on_main_path == -1: 

549 self.mutate_node_index_on_main_path = len(self.main_path) 

550 return ReInstallAppEvent(app=self.app) 

551 self.logger.info( 

552 "reach the max number of mutate steps on single node, restart the app" 

553 ) 

554 return KillAndRestartAppEvent(app=self.app) 

555 

556 def mutate_the_main_path(self): 

557 event = None 

558 self.current_number_of_mutate_steps_on_single_node += 1 

559 

560 if ( 

561 self.current_number_of_mutate_steps_on_single_node 

562 >= self.max_number_of_mutate_steps_on_single_node 

563 ): 

564 # try to find an event from the main path that can be executed on current state 

565 if ( 

566 self.number_of_events_that_try_to_find_event_on_main_path 

567 <= self.max_number_of_events_that_try_to_find_event_on_main_path 

568 ): 

569 self.number_of_events_that_try_to_find_event_on_main_path += 1 

570 # if reach the state that satsfies the precondition, check the rule and turn to execute the main path. 

571 if self.index_on_main_path_after_mutation == len(self.main_path_list): 

572 self.logger.info( 

573 "reach the end of the main path that could satisfy the precondition" 

574 ) 

575 rules_to_check = ( 

576 self.kea.get_rules_whose_preconditions_are_satisfied() 

577 ) 

578 if len(rules_to_check) > 0: 

579 t = self.time_recoder.get_time_duration() 

580 self.time_needed_to_satisfy_precondition.append(t) 

581 self.check_rule_whose_precondition_are_satisfied() 

582 return self.stop_mutation() 

583 

584 # find if there is any event in the main path that could be executed on currenty state 

585 event_str = self.get_event_from_main_path() 

586 try: 

587 self.kea.execute_event_from_main_path(event_str) 

588 self.logger.info("find the event in the main path") 

589 return None 

590 except Exception: 

591 self.logger.info("can't find the event in the main path") 

592 return self.stop_mutation() 

593 

594 return self.stop_mutation() 

595 

596 self.index_on_main_path_after_mutation = -1 

597 

598 if len(self.kea.get_rules_whose_preconditions_are_satisfied()) > 0: 

599 # if the property has been checked, don't return any event 

600 self.check_rule_whose_precondition_are_satisfied() 

601 return None 

602 

603 event = self.generate_random_event_based_on_current_state() 

604 return event 

605 

606 def get_next_event_from_main_path(self): 

607 """ 

608 get a next event when execute on the main path 

609 """ 

610 if self.current_index_on_main_path == self.mutate_node_index_on_main_path: 

611 self.logger.info( 

612 "reach the mutate index, start mutate on the node %d" 

613 % self.mutate_node_index_on_main_path 

614 ) 

615 self.execute_main_path = False 

616 return None 

617 

618 self.logger.info( 

619 "execute node index on main path: %d" % self.current_index_on_main_path 

620 ) 

621 u2_event_str = self.main_path_list[self.current_index_on_main_path] 

622 if u2_event_str is None: 

623 self.logger.warning( 

624 "event is None on main path node %d" % self.current_index_on_main_path 

625 ) 

626 self.current_index_on_main_path += 1 

627 return self.get_next_event_from_main_path() 

628 self.current_index_on_main_path += 1 

629 return u2_event_str 

630 

631 def get_ui_element_dict(self, ui_element_str: str) -> Dict[str, str]: 

632 """ 

633 get ui elements of the event 

634 """ 

635 start_index = ui_element_str.find("(") + 1 

636 end_index = ui_element_str.find(")", start_index) 

637 

638 if start_index != -1 and end_index != -1: 

639 ui_element_str = ui_element_str[start_index:end_index] 

640 ui_elements = ui_element_str.split(",") 

641 

642 ui_elements_dict = {} 

643 for ui_element in ui_elements: 

644 attribute_name, attribute_value = ui_element.split("=") 

645 attribute_name = attribute_name.strip() 

646 attribute_value = attribute_value.strip() 

647 attribute_value = attribute_value.strip('"') 

648 ui_elements_dict[attribute_name] = attribute_value 

649 return ui_elements_dict 

650 

651 def get_event_from_main_path(self): 

652 """ 

653 get an event can lead current state to go back to the main path 

654 """ 

655 if self.index_on_main_path_after_mutation == -1: 

656 for i in range(len(self.main_path_list) - 1, -1, -1): 

657 event_str = self.main_path_list[i] 

658 ui_elements_dict = self.get_ui_element_dict(event_str) 

659 current_state = self.from_state 

660 view = current_state.get_view_by_attribute(ui_elements_dict) 

661 if view is None: 

662 continue 

663 self.index_on_main_path_after_mutation = i + 1 

664 return event_str 

665 else: 

666 event_str = self.main_path_list[self.index_on_main_path_after_mutation] 

667 ui_elements_dict = self.get_ui_element_dict(event_str) 

668 current_state = self.from_state 

669 view = current_state.get_view_by_attribute(ui_elements_dict) 

670 if view is None: 

671 return None 

672 self.index_on_main_path_after_mutation += 1 

673 return event_str 

674 return None 

675 

676 def generate_random_event_based_on_current_state(self): 

677 """ 

678 generate an event based on current UTG to explore the app 

679 @return: InputEvent 

680 """ 

681 current_state = self.from_state 

682 self.logger.info("Current state: %s" % current_state.state_str) 

683 event = self.move_the_app_to_foreground_if_needed(current_state) 

684 if event is not None: 

685 return event 

686 

687 # Get all possible input events 

688 possible_events = current_state.get_possible_input() 

689 

690 # if self.random_input: 

691 # random.shuffle(possible_events) 

692 possible_events.append(KeyEvent(name="BACK")) 

693 possible_events.append(RotateDevice()) 

694 

695 self._event_trace += EVENT_FLAG_EXPLORE 

696 

697 event = random.choice(possible_events) 

698 if isinstance(event, RotateDevice): 

699 if self.last_rotate_events == KEY_RotateDeviceToPortraitEvent: 

700 self.last_rotate_events = KEY_RotateDeviceToLandscapeEvent 

701 event = RotateDeviceToLandscapeEvent() 

702 else: 

703 self.last_rotate_events = KEY_RotateDeviceToPortraitEvent 

704 event = RotateDeviceToPortraitEvent() 

705 

706 return event 

707 

708 

709class LLMPolicy(RandomPolicy): 

710 """ 

711 use LLM to generate input when detected ui tarpit 

712 """ 

713 

714 def __init__( 

715 self, 

716 device, 

717 app, 

718 kea=None, 

719 restart_app_after_check_property=False, 

720 number_of_events_that_restart_app=100, 

721 clear_and_restart_app_data_after_100_events=False, 

722 allow_to_generate_utg=False, 

723 ): 

724 super(LLMPolicy, self).__init__(device, app, kea) 

725 self.logger = logging.getLogger(self.__class__.__name__) 

726 self.__action_history = [] 

727 self.__all_action_history = set() 

728 self.__activity_history = set() 

729 self.from_state = None 

730 self.task = "You are an expert in App GUI testing. Please guide the testing tool to enhance the coverage of functional scenarios in testing the App based on your extensive App testing experience. " 

731 

732 def start( 

733 self, input_manager: "InputManager" 

734 ): # TODO do not need to write start here? 

735 """ 

736 start producing events 

737 :param input_manager: instance of InputManager 

738 """ 

739 self.event_count = 0 

740 self.input_manager = input_manager 

741 while input_manager.enabled and self.event_count < input_manager.event_count: 

742 try: 

743 if self.device.is_harmonyos == False and hasattr(self.device, "u2"): 

744 self.device.u2.set_fastinput_ime(True) 

745 

746 self.logger.info("Exploration action count: %d" % self.event_count) 

747 

748 if self.to_state is not None: 

749 self.from_state = self.to_state 

750 else: 

751 self.from_state = self.device.get_current_state() 

752 

753 if self.event_count == 0: 

754 # If the application is running, close the application. 

755 event = KillAppEvent(app=self.app) 

756 elif self.event_count == 1: 

757 event = IntentEvent(self.app.get_start_intent()) 

758 else: 

759 if input_manager.sim_calculator.detected_ui_tarpit(input_manager): 

760 # If detected a ui tarpit 

761 if input_manager.sim_calculator.sim_count > MAX_NUM_QUERY_LLM: 

762 # If query LLM too much 

763 self.logger.info(f"query too much. go back!") 

764 event = KeyEvent(name="BACK") 

765 self.clear_action_history() 

766 input_manager.sim_calculator.sim_count = 0 

767 else: 

768 # stop random policy, start query LLM 

769 event = self.generate_llm_event() 

770 else: 

771 event = self.generate_event() 

772 

773 if event is not None: 

774 self.device.save_screenshot_for_report( 

775 event=event, current_state=self.from_state 

776 ) 

777 input_manager.add_event(event) 

778 self.to_state = self.device.get_current_state() 

779 self.last_event = event 

780 if self.allow_to_generate_utg: 

781 self.update_utg() 

782 

783 bug_report_path = os.path.join(self.device.output_dir, "all_states") 

784 generate_report( 

785 bug_report_path, 

786 self.device.output_dir, 

787 self.triggered_bug_information, 

788 self.time_needed_to_satisfy_precondition, 

789 self.device.cur_event_count, 

790 self.time_recoder.get_time_duration(), 

791 ) 

792 except KeyboardInterrupt: 

793 break 

794 except InputInterruptedException as e: 

795 self.logger.info("stop sending events: %s" % e) 

796 self.logger.info("action count: %d" % self.event_count) 

797 break 

798 

799 except RuntimeError as e: 

800 self.logger.info("RuntimeError: %s, stop sending events" % e) 

801 break 

802 except Exception as e: 

803 self.logger.warning("exception during sending events: %s" % e) 

804 import traceback 

805 

806 traceback.print_exc() 

807 self.event_count += 1 

808 self.tear_down() 

809 

810 def generate_llm_event(self): 

811 """ 

812 generate an LLM event 

813 @return: 

814 """ 

815 

816 if self.event_count == START_TO_GENERATE_EVENT_IN_POLICY or isinstance( 

817 self.last_event, ReInstallAppEvent 

818 ): 

819 self.run_initializer() 

820 self.from_state = self.device.get_current_state() 

821 current_state = self.from_state 

822 if current_state is None: 

823 import time 

824 

825 time.sleep(5) 

826 return KeyEvent(name="BACK") 

827 

828 if ( 

829 self.event_count % self.number_of_events_that_restart_app == 0 

830 and self.clear_and_reinstall_app 

831 ): 

832 self.logger.info( 

833 "clear and restart app after %s events" 

834 % self.number_of_events_that_restart_app 

835 ) 

836 return ReInstallAppEvent(self.app) 

837 rules_to_check = self.kea.get_rules_whose_preconditions_are_satisfied() 

838 

839 if len(rules_to_check) > 0: 

840 t = self.time_recoder.get_time_duration() 

841 self.time_needed_to_satisfy_precondition.append(t) 

842 self.logger.debug( 

843 "has rule that matches the precondition and the time duration is " 

844 + self.time_recoder.get_time_duration() 

845 ) 

846 if random.random() < 0.5: 

847 self.logger.info("Check property") 

848 self.check_rule_whose_precondition_are_satisfied() 

849 if self.restart_app_after_check_property: 

850 self.logger.debug("restart app after check property") 

851 return KillAppEvent(app=self.app) 

852 return None 

853 else: 

854 self.logger.info( 

855 "Found exectuable property in current state. No property will be checked now according to the random checking policy." 

856 ) 

857 event = None 

858 

859 if event is None: 

860 event = self.generate_llm_event_based_on_utg() 

861 

862 if isinstance(event, RotateDevice): 

863 if self.last_rotate_events == KEY_RotateDeviceToPortraitEvent: 

864 self.last_rotate_events = KEY_RotateDeviceToLandscapeEvent 

865 event = RotateDeviceToLandscapeEvent() 

866 else: 

867 self.last_rotate_events = KEY_RotateDeviceToPortraitEvent 

868 event = RotateDeviceToPortraitEvent() 

869 

870 return event 

871 

872 def generate_llm_event_based_on_utg(self): 

873 """ 

874 generate an event based on current UTG 

875 @return: InputEvent 

876 """ 

877 current_state = self.from_state 

878 self.logger.info("Current state: %s" % current_state.state_str) 

879 

880 if current_state.get_app_activity_depth(self.app) < 0: 

881 # If the app is not in the activity stack 

882 start_app_intent = self.app.get_start_intent() 

883 

884 # It seems the app stucks at some state, has been 

885 # 1) force stopped (START, STOP) 

886 # just start the app again by increasing self.__num_restarts 

887 # 2) started at least once and cannot be started (START) 

888 # pass to let viewclient deal with this case 

889 # 3) nothing 

890 # a normal start. clear self.__num_restarts. 

891 

892 if self._event_trace.endswith( 

893 EVENT_FLAG_START_APP + EVENT_FLAG_STOP_APP 

894 ) or self._event_trace.endswith(EVENT_FLAG_START_APP): 

895 self._num_restarts += 1 

896 self.logger.info( 

897 "The app had been restarted %d times.", self._num_restarts 

898 ) 

899 else: 

900 self._num_restarts = 0 

901 

902 # pass (START) through 

903 if not self._event_trace.endswith(EVENT_FLAG_START_APP): 

904 if self._num_restarts > MAX_NUM_RESTARTS: 

905 # If the app had been restarted too many times, enter random mode 

906 msg = "The app had been restarted too many times. Entering random mode." 

907 self.logger.info(msg) 

908 self.__random_explore = True 

909 else: 

910 # Start the app 

911 self._event_trace += EVENT_FLAG_START_APP 

912 self.logger.info("Trying to start the app...") 

913 self.__action_history = [f"- start the app {self.app.app_name}"] 

914 return IntentEvent(intent=start_app_intent) 

915 

916 elif current_state.get_app_activity_depth(self.app) > 0: 

917 # If the app is in activity stack but is not in foreground 

918 self.__num_steps_outside += 1 

919 

920 if self.__num_steps_outside > MAX_NUM_STEPS_OUTSIDE: 

921 # If the app has not been in foreground for too long, try to go back 

922 if self.__num_steps_outside > MAX_NUM_STEPS_OUTSIDE_KILL: 

923 stop_app_intent = self.app.get_stop_intent() 

924 go_back_event = IntentEvent(stop_app_intent) 

925 else: 

926 go_back_event = KeyEvent(name="BACK") 

927 self._event_trace += EVENT_FLAG_NAVIGATE 

928 self.logger.info("Going back to the app...") 

929 self.__action_history.append("- go back") 

930 return go_back_event 

931 else: 

932 # If the app is in foreground 

933 self.__num_steps_outside = 0 

934 

935 action, candidate_actions = self._get_action_with_LLM( 

936 current_state, 

937 self.__action_history, 

938 self.__activity_history, 

939 ) 

940 if action is not None: 

941 self.__action_history.append(current_state.get_action_desc(action)) 

942 self.__all_action_history.add(current_state.get_action_desc(action)) 

943 return action 

944 

945 if self.__random_explore: 

946 self.logger.info("Trying random event...") 

947 action = random.choice(candidate_actions) 

948 self.__action_history.append(current_state.get_action_desc(action)) 

949 self.__all_action_history.add(current_state.get_action_desc(action)) 

950 return action 

951 

952 # If couldn't find a exploration target, stop the app 

953 stop_app_intent = self.app.get_stop_intent() 

954 self.logger.info("Cannot find an exploration target. Trying to restart app...") 

955 self.__action_history.append("- stop the app") 

956 self.__all_action_history.add("- stop the app") 

957 self._event_trace += EVENT_FLAG_STOP_APP 

958 return IntentEvent(intent=stop_app_intent) 

959 

960 def _query_llm(self, prompt, model_name="gpt-3.5-turbo"): 

961 # TODO: replace with your own LLM 

962 from openai import OpenAI 

963 

964 gpt_url = "" 

965 gpt_key = "" 

966 client = OpenAI(base_url=gpt_url, api_key=gpt_key) 

967 

968 messages = [{"role": "user", "content": prompt}] 

969 completion = client.chat.completions.create( 

970 messages=messages, model=model_name, timeout=30 

971 ) 

972 res = completion.choices[0].message.content 

973 return res 

974 

975 def _get_action_with_LLM(self, current_state, action_history, activity_history): 

976 activity = current_state.foreground_activity 

977 task_prompt = ( 

978 self.task 

979 + f"Currently, the App is stuck on the {activity} page, unable to explore more features. You task is to select an action based on the current GUI Infomation to perform next and help the app escape the UI tarpit." 

980 ) 

981 visisted_page_prompt = ( 

982 f"I have already visited the following activities: \n" 

983 + "\n".join(activity_history) 

984 ) 

985 # all_history_prompt = f'I have already completed the following actions to explore the app: \n' + '\n'.join(all_action_history) 

986 history_prompt = ( 

987 f"I have already completed the following steps to leave {activity} page but failed: \n " 

988 + ";\n ".join(action_history) 

989 ) 

990 state_prompt, candidate_actions = current_state.get_described_actions() 

991 question = "Which action should I choose next? Just return the action id and nothing else.\nIf no more action is needed, return -1." 

992 prompt = f"{task_prompt}\n{state_prompt}\n{visisted_page_prompt}\n{history_prompt}\n{question}" 

993 print(prompt) 

994 response = self._query_llm(prompt) 

995 print(f"response: {response}") 

996 

997 match = re.search(r"\d+", response) 

998 if not match: 

999 return None, candidate_actions 

1000 idx = int(match.group(0)) 

1001 selected_action = candidate_actions[idx] 

1002 if isinstance(selected_action, SetTextEvent): 

1003 view_text = current_state.get_view_desc(selected_action.view) 

1004 question = f"What text should I enter to the {view_text}? Just return the text and nothing else." 

1005 prompt = f"{task_prompt}\n{state_prompt}\n{question}" 

1006 print(prompt) 

1007 response = self._query_llm(prompt) 

1008 print(f"response: {response}") 

1009 selected_action.text = response.replace('"', "") 

1010 if len(selected_action.text) > 30: # heuristically disable long text input 

1011 selected_action.text = "" 

1012 return selected_action, candidate_actions 

1013 

1014 def get_last_state(self): 

1015 return self.from_state 

1016 

1017 def clear_action_history(self): 

1018 self.__action_history = []