Coverage for kea/device_hm.py: 76%

393 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-22 16:05 +0800

1import json 

2import logging 

3import os 

4import re 

5import subprocess 

6import sys 

7import time 

8from typing import IO 

9import typing 

10 

11if typing.TYPE_CHECKING: 

12 from .start import Setting 

13from .input_event import InputEvent, TouchEvent, LongTouchEvent, ScrollEvent, SetTextEvent, KeyEvent 

14 

15from .device import Device 

16from .adapter.hdc import HDC, HDC_EXEC 

17from .app_hm import AppHM 

18from .adapter.hilog import Hilog 

19from .intent import Intent 

20 

21DEFAULT_NUM = '1234567890' 

22DEFAULT_CONTENT = 'Hello world!' 

23 

24 

25class DeviceHM(Device): 

26 """ 

27 this class describes a connected device 

28 """ 

29 

30 def __init__(self, device_serial=None, is_emulator=False, output_dir=None, 

31 cv_mode=False, grant_perm=False, telnet_auth_token=None, 

32 enable_accessibility_hard=False, humanoid=None, ignore_ad=False, 

33 is_harmonyos=True, save_log=False, settings:"Setting"=None): 

34 """ 

35 initialize a device connection 

36 :param device_serial: serial number of target device 

37 :param is_emulator: boolean, type of device, True for emulator, False for real device 

38 :return: 

39 """ 

40 self.logger = logging.getLogger(self.__class__.__name__) 

41 self.cur_event_count = 0 

42 self.screenshot_path = None 

43 self.current_state = None 

44 

45 if "emulator" in device_serial and not is_emulator: 

46 self.logger.warning("Seems like you are using an emulator. If so, please add is_emulator option.") 

47 self.serial = device_serial 

48 self.is_emulator = is_emulator 

49 self.cv_mode = cv_mode 

50 self.output_dir = output_dir 

51 if output_dir is not None: 

52 if not os.path.isdir(output_dir): 

53 os.makedirs(output_dir) 

54 self.grant_perm = grant_perm 

55 self.enable_accessibility_hard = enable_accessibility_hard 

56 self.humanoid = humanoid 

57 self.ignore_ad = ignore_ad 

58 

59 # basic device information 

60 self.settings = settings 

61 self.display_info = None 

62 self._model_number = None 

63 self._device_name = None 

64 self.sdk_version = None 

65 self.release_version = None 

66 self.connected = True 

67 self.last_know_state = None 

68 self.__used_ports = [] 

69 self.pause_sending_event = False 

70 

71 self.is_harmonyos = is_harmonyos 

72 self.save_log = save_log 

73 

74 # adapters 

75 self.hdc = HDC(device=self) 

76 self.hilog = Hilog(device=self) 

77 

78 from hmdriver2.driver import Driver 

79 self.hm2 = Driver(serial=self.serial) 

80 

81 

82 self.logger.info("You're runing droidbot on HarmonyOS") 

83 self.adapters = { 

84 self.hdc: True, 

85 self.hilog: True if self.save_log else False 

86 } 

87 

88 def check_connectivity(self): 

89 """ 

90 check if the device is available 

91 """ 

92 for adapter in self.adapters: 

93 adapter_name = adapter.__class__.__name__ 

94 adapter_enabled = self.adapters[adapter] 

95 if not adapter_enabled: 

96 print("[CONNECTION] %s is not enabled." % adapter_name) 

97 else: 

98 if adapter.check_connectivity(): 

99 print("[CONNECTION] %s is enabled and connected." % adapter_name) 

100 else: 

101 print("[CONNECTION] %s is enabled but not connected." % adapter_name) 

102 

103 def set_up(self): 

104 """ 

105 Set connections on this device 

106 """ 

107 # wait for emulator to start 

108 # self.wait_for_device() 

109 for adapter in self.adapters: 

110 adapter_enabled = self.adapters[adapter] 

111 if not adapter_enabled: 

112 continue 

113 adapter.set_up() 

114 

115 def connect(self): 

116 """ 

117 establish connections on this device 

118 :return: 

119 """ 

120 for adapter in self.adapters: 

121 adapter_enabled = self.adapters[adapter] 

122 if not adapter_enabled: 

123 continue 

124 adapter.connect() 

125 

126 # self.get_sdk_version() 

127 # self.get_release_version() 

128 # self.get_ro_secure() 

129 # self.get_ro_debuggable() 

130 # self.get_display_info() 

131 

132 self.unlock() 

133 self.check_connectivity() 

134 self.connected = True 

135 

136 def disconnect(self): 

137 """ 

138 disconnect current device 

139 :return: 

140 """ 

141 self.connected = False 

142 for adapter in self.adapters: 

143 adapter_enabled = self.adapters[adapter] 

144 if not adapter_enabled: 

145 continue 

146 adapter.disconnect() 

147 

148 if self.output_dir is not None: 

149 temp_dir = os.path.join(self.output_dir, "temp") 

150 if os.path.exists(temp_dir): 

151 import shutil 

152 shutil.rmtree(temp_dir) 

153 

154 def tear_down(self): 

155 for adapter in self.adapters: 

156 adapter_enabled = self.adapters[adapter] 

157 if not adapter_enabled: 

158 continue 

159 adapter.tear_down() 

160 

161 def send_documents(self, app): 

162 self.logger.warning("send_documents Not implemented") 

163 

164 def is_foreground(self, app) -> bool: 

165 """ 

166 check if app is in foreground of device 

167 :param app: App 

168 :return: boolean 

169 """ 

170 if isinstance(app, str): 

171 package_name = app 

172 elif isinstance(app, AppHM): 

173 package_name = app.get_package_name() 

174 else: 

175 return False 

176 

177 top_activity_name = self.get_top_activity_name() 

178 if top_activity_name is None: 

179 return False 

180 return top_activity_name.startswith(package_name) 

181 

182 @property 

183 def model_number(self): 

184 """ 

185 Get model number 

186 """ 

187 if self._model_number is None: 

188 self._model_number = self.hdc.get_model_number() 

189 return self._model_number 

190 

191 @property 

192 def device_name(self): 

193 if self._device_name is None: 

194 self._device_name = self.hdc.get_device_name() 

195 return self._device_name 

196 

197 def get_sdk_version(self): 

198 pass 

199 

200 def get_release_version(self): 

201 """ 

202 Get version of current SDK 

203 """ 

204 if self.release_version is None: 

205 self.release_version = self.hdc.get_release_version() 

206 return self.release_version 

207 

208 def get_ro_secure(self): 

209 pass 

210 

211 def get_ro_debuggable(self): 

212 pass 

213 

214 def get_display_info(self, refresh=True): 

215 """ 

216 get device display information, including width, height, and density 

217 :param refresh: if set to True, refresh the display info instead of using the old values 

218 :return: dict, display_info 

219 """ 

220 r = self.hdc.shell("hidumper -s RenderService -a screen") 

221 pattern = r"activeMode: (?P<width>\d+)x(?P<height>\d+)" 

222 m = re.search(pattern, r) 

223 assert m, "Failed when getting screen resolution with hidumper" 

224 display_info = {"width":int(m.group("width")), "height":int(m.group("height"))} 

225 return display_info 

226 

227 def get_width(self, refresh=False): 

228 display_info = self.get_display_info(refresh=refresh) 

229 width = 0 

230 if "width" in display_info: 

231 width = display_info["width"] 

232 elif not refresh: 

233 width = self.get_width(refresh=True) 

234 else: 

235 self.logger.warning("get_width: width not in display_info") 

236 return width 

237 

238 def get_height(self, refresh=False): 

239 display_info = self.get_display_info(refresh=refresh) 

240 height = 0 

241 if "height" in display_info: 

242 height = display_info["height"] 

243 elif not refresh: 

244 height = self.get_width(refresh=True) 

245 else: 

246 self.logger.warning("get_height: height not in display_info") 

247 return height 

248 

249 def unlock(self): 

250 """ 

251 unlock screen 

252 skip first-use tutorials 

253 etc 

254 :return: 

255 """ 

256 self.hdc.unlock() 

257 

258 def send_intent(self, intent): 

259 """ 

260 send an intent to device via am (ActivityManager) 

261 :param intent: instance of Intent or str 

262 :return: 

263 """ 

264 assert self.hdc is not None 

265 assert intent is not None 

266 if isinstance(intent, Intent): 

267 cmd = intent.get_cmd() 

268 else: 

269 cmd = intent 

270 return self.hdc.shell(cmd) 

271 

272 def send_event(self, event:"InputEvent"): 

273 """ 

274 send one event to device 

275 :param event: the event to be sent 

276 :return: 

277 """ 

278 event.send(self) 

279 

280 def start_app(self, app): 

281 """ 

282 start an app on the device 

283 :param app: instance of App, or str of package name 

284 :return: 

285 """ 

286 if isinstance(app, str): 

287 package_name = app 

288 elif isinstance(app, AppHM): 

289 package_name = app.get_package_name() 

290 if app.get_main_activity(): 

291 package_name += "/%s" % app.get_main_activity() 

292 else: 

293 self.logger.warning("unsupported param " + app + " with type: ", type(app)) 

294 return 

295 intent = Intent(suffix=package_name) 

296 self.send_intent(intent) 

297 

298 def get_top_activity_name(self) -> str: 

299 """ 

300 Get current activity 

301 """ 

302 r = self.hdc.shell("aa dump --mission-list") 

303 

304 if r"#FOREGROUND" not in r: 

305 return None 

306 

307 mission_list = r.split("Mission ID") 

308 for mission in mission_list: 

309 if "state" not in mission: 

310 continue 

311 if "#BACKGROUND" in mission: 

312 continue 

313 

314 import re 

315 pattern = r"mission name #\[#(?P<bundleName>.+?):.+?:(?P<abilityName>.+?)\]" 

316 m = re.search(pattern, mission) 

317 if m: 

318 return m.group("bundleName") + "/" + m.group("abilityName") 

319 

320 return None 

321 

322 def get_current_activity_stack(self): 

323 """ 

324 Get current activity stack 

325 :return: a list of str, each str is an activity name, the first is the top activity name 

326 """ 

327 task_to_activities = self.get_task_activities() 

328 top_activity = self.get_top_activity_name() 

329 if top_activity: 

330 for task_id in task_to_activities: 

331 activities = task_to_activities[task_id] 

332 if len(activities) > 0 and activities[0] == top_activity: 

333 return activities 

334 self.logger.warning("Unable to get current activity stack.") 

335 return [top_activity] 

336 else: 

337 return None 

338 

339 

340 def install_app(self, app): 

341 """ 

342 install an app to device 

343 @param app: instance of App 

344 @return: 

345 """ 

346 assert isinstance(app, AppHM) 

347 package_name = app.get_package_name() 

348 if package_name not in self.hdc.get_installed_apps(): 

349 install_cmd = [HDC_EXEC, "-t", self.serial, "install", "-r"] 

350 install_cmd.append(HDC.get_relative_path(app.app_path)) 

351 install_p = subprocess.Popen(install_cmd, stdout=subprocess.PIPE) 

352 # print(" ".join(install_cmd)) 

353 

354 print("Please wait while installing the app...") 

355 output = install_p.stdout.readline() 

356 if output: 

357 print(output.strip().decode()) 

358 

359 if not self.connected: 

360 install_p.terminate() 

361 return 

362 # dump the package info through the bundleName 

363 # hdc shell bm dump -n [bundleName] 

364 dumpsys_p = subprocess.Popen([HDC_EXEC, "-t", self.serial, "shell", 

365 "bm","dump" ,"-n", package_name], stdout=subprocess.PIPE) 

366 dumpsys_lines = [] 

367 while True: 

368 line = dumpsys_p.stdout.readline() 

369 if not line: 

370 break 

371 if not isinstance(line, str): 

372 line = line.decode() 

373 dumpsys_lines.append(line) 

374 

375 if self.output_dir is not None: 

376 package_info_file_name = "%s/dumpsys_package_%s.txt" % (self.output_dir, app.get_package_name()) 

377 with open(package_info_file_name, "w") as fp: 

378 fp.writelines(dumpsys_lines) 

379 

380 with open(package_info_file_name, "r") as fp: 

381 app.dumpsys_main_activity = self.__parse_main_activity_from_dumpsys_lines(fp) 

382 

383 self.logger.info("App installed: %s" % package_name) 

384 self.logger.info("Main activity: %s" % app.get_main_activity()) 

385 

386 @staticmethod 

387 def __parse_main_activity_from_dumpsys_lines(fp:IO): 

388 """ 

389 """ 

390 

391 # jump the fist line (the bundleName, which dosen't follow the json format) 

392 # to correctly parse the file into json 

393 cur_package = fp.readline().replace(":", "").strip() 

394 

395 import json 

396 dumpsys = json.load(fp) 

397 

398 # main ability 

399 return dumpsys["hapModuleInfos"][0]["mainAbility"] 

400 

401 def uninstall_app(self, app): 

402 """ 

403 Uninstall an app from device. 

404 :param app: an instance of App or a package name 

405 """ 

406 if isinstance(app, AppHM): 

407 package_name = app.get_package_name() 

408 # if self.settings.is_package: 

409 # return 

410 else: 

411 package_name = app 

412 if package_name in self.hdc.get_installed_apps(): 

413 uninstall_cmd = [HDC_EXEC, "-t", self.serial, "uninstall", package_name] 

414 uninstall_p = subprocess.Popen(uninstall_cmd, stdout=subprocess.PIPE) 

415 while package_name in self.hdc.get_installed_apps(): 

416 print("Please wait while uninstalling the app...") 

417 time.sleep(2) 

418 uninstall_p.terminate() 

419 

420 def push_file(self, local_file, remote_dir="/sdcard/"): 

421 """ 

422 push file/directory to target_dir 

423 :param local_file: path to file/directory in host machine 

424 :param remote_dir: path to target directory in device 

425 :return: 

426 """ 

427 if not os.path.exists(local_file): 

428 self.logger.warning("push_file file does not exist: %s" % local_file) 

429 self.hdc.run_cmd(["file send", local_file, remote_dir]) 

430 

431 def pull_file(self, remote_file, local_file): 

432 r = self.hdc.run_cmd(["file", "recv", remote_file, local_file]) 

433 assert not r.startswith("[Fail]"), "Error with receiving file" 

434 

435 def take_screenshot(self): 

436 

437 if self.output_dir is None: 

438 return None 

439 

440 r = self.hdc.shell("snapshot_display") 

441 assert "success" in r, "Error when taking screenshot" 

442 

443 remote_path = r.splitlines()[0].split()[-1] 

444 file_name = os.path.basename(remote_path) 

445 temp_path = os.path.join(self.output_dir, "temp") 

446 local_path = os.path.join(os.getcwd(), temp_path, file_name) 

447 

448 self.pull_file(remote_path, HDC.get_relative_path(local_path)) 

449 

450 return local_path 

451 

452 def save_screenshot_for_report(self, event_name=None, event=None, current_state=None): 

453 """ 

454 save screenshot for report, save to "all_states" dir 

455 """ 

456 

457 self.cur_event_count += 1 

458 if current_state is None: 

459 self.current_state = self.get_current_state() 

460 else: 

461 self.current_state = current_state 

462 self.save_to_all_states_dir(self.screenshot_path, event_name=event_name, event=event) 

463 

464 def draw_event(self, event, event_name, screenshot_path): 

465 import cv2 

466 image = cv2.imread(screenshot_path) 

467 if event is not None and screenshot_path is not None: 

468 if isinstance(event, InputEvent): 

469 if isinstance(event, TouchEvent): 

470 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])), 

471 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (0, 0, 255), 5) 

472 elif isinstance(event, LongTouchEvent): 

473 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])), 

474 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (0, 255, 0), 5) 

475 elif isinstance(event, SetTextEvent): 

476 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])), 

477 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (255, 0, 0), 5) 

478 elif isinstance(event, ScrollEvent): 

479 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])), 

480 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (255, 255, 0), 5) 

481 elif isinstance(event, KeyEvent): 

482 cv2.putText(image, event.name, (100, 300), cv2.FONT_HERSHEY_SIMPLEX, 5, (0, 255, 0), 3, cv2.LINE_AA) 

483 else: 

484 return 

485 else: 

486 if event_name == "click": 

487 cv2.rectangle(image, (int(event.bounds.left), int(event.bounds.top)), (int(event.bounds.right), int(event.bounds.bottom)), (0, 0, 255), 5) 

488 elif event_name == "long_click": 

489 cv2.rectangle(image, (int(event.bounds.left), int(event.bounds.top)), (int(event.bounds.right), int(event.bounds.bottom)), (0, 255, 0), 5) 

490 elif event_name == "set_text": 

491 cv2.rectangle(image, (int(event.bounds.left), int(event.bounds.top)), (int(event.bounds.right), int(event.bounds.bottom)), (255, 0, 0), 5) 

492 elif event_name == "press": 

493 cv2.putText(image,event, (100,300), cv2.FONT_HERSHEY_SIMPLEX, 5,(0, 255, 0), 3, cv2.LINE_AA) 

494 else: 

495 return 

496 try: 

497 cv2.imwrite(screenshot_path, image) 

498 except Exception as e: 

499 self.logger.warning(e) 

500 

501 def save_to_all_states_dir(self, local_image_path, event, event_name = None): 

502 import shutil 

503 all_states_dir = os.path.join(self.output_dir, "all_states") 

504 if not os.path.exists(all_states_dir): 

505 os.makedirs(all_states_dir) 

506 

507 json_dir = os.path.join(self.output_dir, "report_screenshot.json") 

508 if not self.is_harmonyos: 

509 if self.adapters[self.minicap]: 

510 dest_screenshot_path = "%s/screen_%s.jpg" % (all_states_dir, self.cur_event_count) 

511 else: 

512 dest_screenshot_path = "%s/screen_%s.png" % (all_states_dir, self.cur_event_count) 

513 else: 

514 dest_screenshot_path = "%s/screen_%s.jpeg" % (all_states_dir, self.cur_event_count) 

515 

516 if self.current_state is not None: 

517 dest_state_json_path = "%s/state_%s.json" % (all_states_dir, self.cur_event_count) 

518 state_json_file = open(dest_state_json_path, "w") 

519 state_json_file.write(self.current_state.to_json()) 

520 state_json_file.close() 

521 

522 try: 

523 with open(json_dir, 'r') as json_file: 

524 report_screens = json.load(json_file) 

525 except FileNotFoundError: 

526 report_screens = [] 

527 if event_name is None: 

528 event_name = event.get_event_name() 

529 

530 img_file_name = os.path.basename(dest_screenshot_path) 

531 

532 report_screen = { 

533 "event": event_name, 

534 "event_index": str(self.cur_event_count), 

535 "screen_shoot": img_file_name 

536 } 

537 

538 report_screens.append(report_screen) 

539 with open(json_dir, 'w') as json_file: 

540 json.dump(report_screens, json_file, indent=4) 

541 if self.current_state is not None and local_image_path != dest_screenshot_path: 

542 self.current_state.screenshot_path = dest_screenshot_path 

543 shutil.move(local_image_path, dest_screenshot_path) 

544 

545 self.draw_event(event, event_name, dest_screenshot_path) 

546 

547 def get_current_state(self, action_count=None): 

548 self.logger.debug("getting current device state...") 

549 current_state = None 

550 try: 

551 foreground_activity = self.get_top_activity_name() 

552 # activity_stack = self.get_current_activity_stack() 

553 activity_stack = [foreground_activity] 

554 # background_services = self.get_service_names() 

555 screenshot_path = self.take_screenshot() 

556 self.screenshot_path = screenshot_path 

557 self.logger.debug("finish getting current device state...") 

558 # if there's no foreground activities (In home or lock screen) 

559 views = self.get_views() if foreground_activity is not None else [] 

560 from .device_state import DeviceState 

561 current_state = DeviceState(self, 

562 views=views, 

563 foreground_activity=foreground_activity, 

564 activity_stack=activity_stack, 

565 background_services=None, 

566 screenshot_path=screenshot_path, 

567 tag=action_count) 

568 except Exception as e: 

569 self.logger.warning("exception in get_current_state: %s" % e) 

570 import traceback 

571 traceback.print_exc() 

572 self.logger.debug("finish getting current device state...") 

573 self.last_know_state = current_state 

574 if not current_state: 

575 self.logger.warning("Failed to get current state!") 

576 return current_state 

577 

578 def get_last_known_state(self): 

579 return self.last_know_state 

580 

581 def view_touch(self, x, y): 

582 self.hdc.touch(x, y) 

583 

584 def view_long_touch(self, x, y, duration=2000): 

585 """ 

586 Long touches at (x, y) 

587 @param duration: duration in ms 

588 """ 

589 self.hdc.long_touch(x, y, duration) 

590 

591 def view_drag(self, start_xy, end_xy, duration): 

592 """ 

593 Sends drag event n PX (actually it's using C{input swipe} command. 

594 """ 

595 self.hdc.drag(start_xy, end_xy, duration) 

596 

597 def view_append_text(self, text): 

598 try: 

599 self.hm2.input_text() 

600 except: 

601 self.hdc.type(text) 

602 

603 def view_set_text(self, text): 

604 try: 

605 self.hm2.input_text(text) 

606 except: 

607 self.logger.warning( 

608 "Failed to input text with hmdriver2. Use `hdc` to append text instead." 

609 ) 

610 self.hdc.type(text) 

611 

612 def key_press(self, key_code): 

613 self.hdc.press(key_code) 

614 

615 def shutdown(self): 

616 pass 

617 # self.adb.shell("reboot -p") 

618 

619 # def get_views(self): 

620 # if self.hdc: 

621 # views = self.get_views() 

622 # if views: 

623 # return views 

624 # else: 

625 # self.logger.warning("Failed to get views using HDC.") 

626 

627 # self.logger.warning("failed to get current views!") 

628 # return None 

629 

630 def get_views(self): 

631 return self.hdc.get_views(self.output_dir) 

632 

633 def get_random_port(self): 

634 """ 

635 get a random port on host machine to establish connection 

636 :return: a port number 

637 """ 

638 import socket 

639 temp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

640 temp_sock.bind(("", 0)) 

641 port = temp_sock.getsockname()[1] 

642 temp_sock.close() 

643 if port in self.__used_ports: 

644 return self.get_random_port() 

645 self.__used_ports.append(port) 

646 return port