Coverage for kea/device.py: 64%

647 statements  

« prev     ^ index     » next       coverage.py v7.6.9, created at 2024-12-22 16:05 +0800

1import json 

2import logging 

3import os 

4import re 

5import subprocess 

6import sys 

7import time 

8 

9import uiautomator2 

10import pkg_resources 

11from .adapter.uiautomator2_helper import Uiautomator2_Helper 

12 

13from .adapter.adb import ADB 

14 

15from .adapter.logcat import Logcat 

16from .adapter.minicap import Minicap 

17from .adapter.process_monitor import ProcessMonitor 

18from .adapter.telnet import TelnetConsole 

19from .adapter.user_input_monitor import UserInputMonitor 

20 

21from .app import App 

22from .intent import Intent 

23 

24from .input_event import InputEvent, SetTextAndSearchEvent, TouchEvent, LongTouchEvent, ScrollEvent, SetTextEvent, \ 

25 KeyEvent 

26 

27DEFAULT_NUM = '1234567890' 

28DEFAULT_CONTENT = 'Hello world!' 

29 

30 

31class Device(object): 

32 """ 

33 this class describes a connected device 

34 """ 

35 

36 def __init__( 

37 self, 

38 device_serial=None, 

39 is_emulator=False, 

40 output_dir=None, 

41 cv_mode=False, 

42 grant_perm=False, 

43 send_document=False, 

44 telnet_auth_token=None, 

45 enable_accessibility_hard=False, 

46 humanoid=None, 

47 ignore_ad=False, 

48 app_package_name=None, 

49 is_harmonyos=False 

50 ): 

51 """ 

52 initialize a device connection 

53 :param device_serial: serial number of target device 

54 :param is_emulator: boolean, type of device, True for emulator, False for real device 

55 :return: 

56 """ 

57 

58 self.logger = logging.getLogger(self.__class__.__name__) 

59 self.app_package_name = app_package_name 

60 if device_serial is None: 

61 from .utils import get_available_devices 

62 

63 all_devices = get_available_devices() 

64 if len(all_devices) == 0: 

65 self.logger.warning("ERROR: No device connected.") 

66 sys.exit(-1) 

67 device_serial = all_devices[0] 

68 if "emulator" in device_serial and not is_emulator: 

69 self.logger.warning( 

70 "Seems like you are using an emulator. If so, please add is_emulator option." 

71 ) 

72 self.serial = device_serial 

73 self.is_emulator = is_emulator 

74 self.cv_mode = cv_mode 

75 self.output_dir = output_dir 

76 if output_dir is not None: 

77 if not os.path.isdir(output_dir): 

78 os.makedirs(output_dir) 

79 self.grant_perm = grant_perm 

80 self.send_document = send_document 

81 self.enable_accessibility_hard = enable_accessibility_hard 

82 self.humanoid = humanoid 

83 self.ignore_ad = ignore_ad 

84 self.is_harmonyos = is_harmonyos 

85 self.cur_event_count = 0 

86 self.screenshot_path = None 

87 self.current_state = None 

88 

89 self.u2 = uiautomator2.connect(self.serial) 

90 # disable keyboard 

91 self.u2.set_fastinput_ime(True) 

92 

93 # basic device information 

94 self.settings = {} 

95 self.display_info = None 

96 self.model_number = None 

97 self.sdk_version = None 

98 self.release_version = None 

99 self.ro_debuggable = None 

100 self.ro_secure = None 

101 self.connected = True 

102 self.last_know_state = None 

103 self.__used_ports = [] 

104 self.pause_sending_event = False 

105 

106 # adapters 

107 self.adb = ADB(device=self) 

108 # Start the emulator console via Telnet. 

109 self.telnet = TelnetConsole(device=self, auth_token=telnet_auth_token) 

110 

111 # self.droidbot_app = DroidBotAppConn(device=self) 

112 

113 # Minicap is an open-source screen capture tool for Android, commonly used to capture screen content for automation testing or remote viewing. 

114 self.minicap = Minicap(device=self) 

115 # Logcat is a logging tool in the Android system used to capture various log messages that occur on the device. 

116 self.logcat = Logcat(device=self) 

117 # The UserInputMonitor class is typically used to monitor and handle user input events (such as touch, clicks, keyboard input, etc.) on Android devices. 

118 self.user_input_monitor = UserInputMonitor(device=self) 

119 # ProcessMonitor is a class used to monitor and manage processes on the device. 

120 self.process_monitor = ProcessMonitor(device=self) 

121 # Uiautomator2_Helper is a class used to handle the logic for interacting with UIAutomator2. 

122 self.uiautomator_helper = Uiautomator2_Helper(device=self) 

123 

124 # self.droidbot_ime = DroidBotIme(device=self) 

125 

126 self.adapters = { 

127 self.adb: True, 

128 self.telnet: False, 

129 # self.droidbot_app: True, 

130 self.minicap: True, 

131 self.logcat: True, 

132 self.user_input_monitor: True, 

133 self.process_monitor: True, 

134 # self.droidbot_ime: True, 

135 } 

136 

137 # minicap currently not working on emulators 

138 if self.is_emulator: 

139 self.logger.info("disable minicap on emulator") 

140 self.adapters[self.minicap] = False 

141 

142 # self.resource_path = "Document" 

143 self.resource_path = pkg_resources.resource_filename( 

144 "kea", "resources/Document" 

145 ) 

146 

147 def check_connectivity(self): 

148 """ 

149 check if the device is available 

150 """ 

151 for adapter in self.adapters: 

152 adapter_name = adapter.__class__.__name__ 

153 adapter_enabled = self.adapters[adapter] 

154 if not adapter_enabled: 

155 print("[CONNECTION] %s is not enabled." % adapter_name) 

156 else: 

157 if adapter.check_connectivity(): 

158 print("[CONNECTION] %s is enabled and connected." % adapter_name) 

159 else: 

160 print( 

161 "[CONNECTION] %s is enabled but not connected." % adapter_name 

162 ) 

163 

164 def wait_for_device(self): 

165 """ 

166 wait until the device is fully booted 

167 :return: 

168 """ 

169 self.logger.info("waiting for device") 

170 try: 

171 subprocess.check_call(["adb", "-s", self.serial, "wait-for-device"]) 

172 # while True: 

173 # out = subprocess.check_output( 

174 # ["adb", "-s", self.serial, "shell", "getprop", "init.svc.bootanim"]).split()[0] 

175 # if not isinstance(out, str): 

176 # out = out.decode() 

177 # if out == "stopped": 

178 # break 

179 # time.sleep(1) 

180 except: 

181 self.logger.warning("error waiting for device") 

182 

183 def set_up(self): 

184 """ 

185 Set connections on this device 

186 """ 

187 # wait for emulator to start 

188 self.wait_for_device() 

189 for adapter in self.adapters: 

190 adapter_enabled = self.adapters[adapter] 

191 if not adapter_enabled: 

192 continue 

193 adapter.set_up() 

194 

195 def connect(self): 

196 """ 

197 establish connections on this device 

198 :return: 

199 """ 

200 for adapter in self.adapters: 

201 adapter_enabled = self.adapters[adapter] 

202 if not adapter_enabled: 

203 continue 

204 adapter.connect() 

205 

206 self.get_sdk_version() 

207 self.get_release_version() 

208 self.get_ro_secure() 

209 self.get_ro_debuggable() 

210 self.get_display_info() 

211 

212 self.unlock() 

213 self.check_connectivity() 

214 self.connected = True 

215 

216 def disconnect(self): 

217 """ 

218 disconnect current device 

219 :return: 

220 """ 

221 self.connected = False 

222 for adapter in self.adapters: 

223 adapter_enabled = self.adapters[adapter] 

224 if not adapter_enabled: 

225 continue 

226 adapter.disconnect() 

227 

228 if self.output_dir is not None: 

229 temp_dir = os.path.join(self.output_dir, "temp") 

230 if os.path.exists(temp_dir): 

231 import shutil 

232 

233 shutil.rmtree(temp_dir) 

234 

235 def tear_down(self): 

236 for adapter in self.adapters: 

237 adapter_enabled = self.adapters[adapter] 

238 if not adapter_enabled: 

239 continue 

240 adapter.tear_down() 

241 

242 def is_foreground(self, app): 

243 """ 

244 check if app is in foreground of device 

245 :param app: App 

246 :return: boolean 

247 """ 

248 if isinstance(app, str): 

249 package_name = app 

250 elif isinstance(app, App): 

251 package_name = app.get_package_name() 

252 else: 

253 return False 

254 

255 top_activity_name = self.get_top_activity_name() 

256 if top_activity_name is None: 

257 return False 

258 return top_activity_name.startswith(package_name) 

259 

260 def get_model_number(self): 

261 """ 

262 Get model number 

263 """ 

264 if self.model_number is None: 

265 self.model_number = self.adb.get_model_number() 

266 return self.model_number 

267 

268 def get_sdk_version(self): 

269 """ 

270 Get version of current SDK 

271 """ 

272 if self.sdk_version is None: 

273 self.sdk_version = self.adb.get_sdk_version() 

274 return self.sdk_version 

275 

276 def get_release_version(self): 

277 """ 

278 Get version of current SDK 

279 """ 

280 if self.release_version is None: 

281 self.release_version = self.adb.get_release_version() 

282 return self.release_version 

283 

284 def get_ro_secure(self): 

285 if self.ro_secure is None: 

286 self.ro_secure = self.adb.get_ro_secure() 

287 return self.ro_secure 

288 

289 def get_ro_debuggable(self): 

290 if self.ro_debuggable is None: 

291 self.ro_debuggable = self.adb.get_ro_debuggable() 

292 return self.ro_debuggable 

293 

294 def get_display_info(self, refresh=True): 

295 """ 

296 get device display information, including width, height, and density 

297 :param refresh: if set to True, refresh the display info instead of using the old values 

298 :return: dict, display_info 

299 """ 

300 if self.display_info is None or refresh: 

301 self.display_info = self.adb.get_display_info() 

302 return self.display_info 

303 

304 def get_width(self, refresh=False): 

305 display_info = self.get_display_info(refresh=refresh) 

306 width = 0 

307 if "width" in display_info: 

308 width = display_info["width"] 

309 elif not refresh: 

310 width = self.get_width(refresh=True) 

311 else: 

312 self.logger.warning("get_width: width not in display_info") 

313 return width 

314 

315 def get_height(self, refresh=False): 

316 display_info = self.get_display_info(refresh=refresh) 

317 height = 0 

318 if "height" in display_info: 

319 height = display_info["height"] 

320 elif not refresh: 

321 height = self.get_width(refresh=True) 

322 else: 

323 self.logger.warning("get_height: height not in display_info") 

324 return height 

325 

326 def unlock(self): 

327 """ 

328 unlock screen 

329 skip first-use tutorials 

330 etc 

331 :return: 

332 """ 

333 self.adb.unlock() 

334 

335 def shake(self): 

336 """ 

337 shake the device 

338 """ 

339 # TODO the telnet-simulated shake event is not usable 

340 telnet = self.telnet 

341 if telnet is None: 

342 self.logger.warning("Telnet not connected, so can't shake the device.") 

343 sensor_xyz = [ 

344 (-float(v * 10) + 1, float(v) + 9.8, float(v * 2) + 0.5) 

345 for v in [1, -1, 1, -1, 1, -1, 0] 

346 ] 

347 for x, y, z in sensor_xyz: 

348 telnet.run_cmd("sensor set acceleration %f:%f:%f" % (x, y, z)) 

349 

350 def add_env(self, env): 

351 """ 

352 set env to the device 

353 :param env: instance of AppEnv 

354 """ 

355 self.logger.info("deploying env: %s" % env) 

356 env.deploy(self) 

357 

358 def add_contact(self, contact_data): 

359 """ 

360 add a contact to device 

361 :param contact_data: dict of contact, should have keys like name, phone, email 

362 :return: 

363 """ 

364 assert self.adb is not None 

365 contact_intent = Intent( 

366 prefix="start", 

367 action="android.intent.action.INSERT", 

368 mime_type="vnd.android.cursor.dir/contact", 

369 extra_string=contact_data, 

370 ) 

371 self.send_intent(intent=contact_intent) 

372 time.sleep(2) 

373 self.adb.press("BACK") 

374 time.sleep(2) 

375 self.adb.press("BACK") 

376 return True 

377 

378 def receive_call(self, phone=DEFAULT_NUM): 

379 """ 

380 simulate a income phonecall 

381 :param phone: str, phonenum 

382 :return: 

383 """ 

384 assert self.telnet is not None 

385 return self.telnet.run_cmd("gsm call %s" % phone) 

386 

387 def cancel_call(self, phone=DEFAULT_NUM): 

388 """ 

389 cancel phonecall 

390 :param phone: str, phonenum 

391 :return: 

392 """ 

393 assert self.telnet is not None 

394 return self.telnet.run_cmd("gsm cancel %s" % phone) 

395 

396 def accept_call(self, phone=DEFAULT_NUM): 

397 """ 

398 accept phonecall 

399 :param phone: str, phonenum 

400 :return: 

401 """ 

402 assert self.telnet is not None 

403 return self.telnet.run_cmd("gsm accept %s" % phone) 

404 

405 def call(self, phone=DEFAULT_NUM): 

406 """ 

407 simulate a outcome phonecall 

408 :param phone: str, phonenum 

409 :return: 

410 """ 

411 call_intent = Intent( 

412 prefix='start', 

413 action="android.intent.action.CALL", 

414 data_uri="tel:%s" % phone, 

415 ) 

416 return self.send_intent(intent=call_intent) 

417 

418 def send_sms(self, phone=DEFAULT_NUM, content=DEFAULT_CONTENT): 

419 """ 

420 send a SMS 

421 :param phone: str, phone number of receiver 

422 :param content: str, content of sms 

423 :return: 

424 """ 

425 send_sms_intent = Intent( 

426 prefix='start', 

427 action="android.intent.action.SENDTO", 

428 data_uri="sms:%s" % phone, 

429 extra_string={'sms_body': content}, 

430 extra_boolean={'exit_on_sent': 'true'}, 

431 ) 

432 self.send_intent(intent=send_sms_intent) 

433 time.sleep(2) 

434 self.adb.press('66') 

435 return True 

436 

437 def receive_sms(self, phone=DEFAULT_NUM, content=DEFAULT_CONTENT): 

438 """ 

439 receive a SMS 

440 :param phone: str, phone number of sender 

441 :param content: str, content of sms 

442 :return: 

443 """ 

444 assert self.telnet is not None 

445 return self.telnet.run_cmd("sms send %s '%s'" % (phone, content)) 

446 

447 def set_gps(self, x, y): 

448 """ 

449 set GPS positioning to x,y 

450 :param x: float 

451 :param y: float 

452 :return: 

453 """ 

454 return self.telnet.run_cmd("geo fix %s %s" % (x, y)) 

455 

456 def set_continuous_gps(self, center_x, center_y, delta_x, delta_y): 

457 import threading 

458 

459 gps_thread = threading.Thread( 

460 target=self.set_continuous_gps_blocked, 

461 args=(center_x, center_y, delta_x, delta_y), 

462 ) 

463 gps_thread.start() 

464 return True 

465 

466 def set_continuous_gps_blocked(self, center_x, center_y, delta_x, delta_y): 

467 """ 

468 simulate GPS on device via telnet 

469 this method is blocked 

470 @param center_x: x coordinate of GPS position 

471 @param center_y: y coordinate of GPS position 

472 @param delta_x: range of x coordinate 

473 @param delta_y: range of y coordinate 

474 """ 

475 import random 

476 

477 while self.connected: 

478 x = random.random() * delta_x * 2 + center_x - delta_x 

479 y = random.random() * delta_y * 2 + center_y - delta_y 

480 self.set_gps(x, y) 

481 time.sleep(3) 

482 

483 def get_settings(self): 

484 """ 

485 get device settings via adb 

486 """ 

487 db_name = "/data/data/com.android.providers.settings/databases/settings.db" 

488 

489 system_settings = {} 

490 out = self.adb.shell("sqlite3 %s \"select * from %s\"" % (db_name, "system")) 

491 out_lines = out.splitlines() 

492 for line in out_lines: 

493 segs = line.split('|') 

494 if len(segs) != 3: 

495 continue 

496 system_settings[segs[1]] = segs[2] 

497 

498 secure_settings = {} 

499 out = self.adb.shell("sqlite3 %s \"select * from %s\"" % (db_name, "secure")) 

500 out_lines = out.splitlines() 

501 for line in out_lines: 

502 segs = line.split('|') 

503 if len(segs) != 3: 

504 continue 

505 secure_settings[segs[1]] = segs[2] 

506 

507 self.settings['system'] = system_settings 

508 self.settings['secure'] = secure_settings 

509 return self.settings 

510 

511 def change_settings(self, table_name, name, value): 

512 """ 

513 dangerous method, by calling this, change settings.db in device 

514 be very careful for sql injection 

515 :param table_name: table name to work on, usually it is system or secure 

516 :param name: settings name to set 

517 :param value: settings value to set 

518 """ 

519 db_name = "/data/data/com.android.providers.settings/databases/settings.db" 

520 

521 self.adb.shell( 

522 "sqlite3 %s \"update '%s' set value='%s' where name='%s'\"" 

523 % (db_name, table_name, value, name) 

524 ) 

525 return True 

526 

527 def send_intent(self, intent): 

528 """ 

529 send an intent to device via am (ActivityManager) 

530 :param intent: instance of Intent or str 

531 :return: 

532 """ 

533 assert self.adb is not None 

534 assert intent is not None 

535 if isinstance(intent, Intent): 

536 cmd = intent.get_cmd() 

537 else: 

538 cmd = intent 

539 return self.adb.shell(cmd) 

540 

541 def send_event(self, event): 

542 """ 

543 send one event to device 

544 :param event: the event to be sent 

545 :return: 

546 """ 

547 event.send(self) 

548 

549 def start_app(self, app): 

550 """ 

551 start an app on the device 

552 :param app: instance of App, or str of package name 

553 :return: 

554 """ 

555 if isinstance(app, str): 

556 package_name = app 

557 elif isinstance(app, App): 

558 package_name = app.get_package_name() 

559 if app.get_main_activity(): 

560 package_name += "/%s" % app.get_main_activity() 

561 else: 

562 self.logger.warning("unsupported param " + app + " with type: ", type(app)) 

563 return 

564 intent = Intent(suffix=package_name) 

565 self.send_intent(intent) 

566 

567 def get_top_activity_name(self): 

568 """ 

569 Get current activity 

570 """ 

571 r = self.adb.shell("dumpsys activity activities") 

572 activity_line_re = re.compile( 

573 r'\* Hist[ ]+#\d+: ActivityRecord{[^ ]+ [^ ]+ ([^ ]+) t(\d+).*}' 

574 ) 

575 m = activity_line_re.search(r) 

576 if m: 

577 return m.group(1) 

578 # data = self.adb.shell("dumpsys activity top").splitlines() 

579 # regex = re.compile("\s*ACTIVITY ([A-Za-z0-9_.]+)/([A-Za-z0-9_.]+)") 

580 # m = regex.search(data[1]) 

581 # if m: 

582 # return m.group(1) + "/" + m.group(2) 

583 self.logger.warning("Unable to get top activity name.") 

584 return None 

585 

586 def get_current_activity_stack(self): 

587 """ 

588 Get current activity stack 

589 :return: a list of str, each str is an activity name, the first is the top activity name 

590 """ 

591 task_to_activities = self.get_task_activities() 

592 top_activity = self.get_top_activity_name() 

593 if top_activity: 

594 for task_id in task_to_activities: 

595 activities = task_to_activities[task_id] 

596 if len(activities) > 0 and activities[0] == top_activity: 

597 return activities 

598 self.logger.warning("Unable to get current activity stack.") 

599 return [top_activity] 

600 else: 

601 return None 

602 

603 def get_task_activities(self): 

604 """ 

605 Get current tasks and corresponding activities. 

606 :return: a dict mapping each task id to a list of activities, from top to down. 

607 """ 

608 task_to_activities = {} 

609 

610 lines = self.adb.shell("dumpsys activity activities").splitlines() 

611 activity_line_re = re.compile( 

612 r'\* Hist[ ]+#\d+: ActivityRecord{[^ ]+ [^ ]+ ([^ ]+) t(\d+)}' 

613 ) 

614 

615 for line in lines: 

616 line = line.strip() 

617 if line.startswith("Task id #"): 

618 task_id = line[9:] 

619 task_to_activities[task_id] = [] 

620 elif line.startswith("* Hist #") or line.startswith("* Hist #"): 

621 m = activity_line_re.match(line) 

622 if m: 

623 activity = m.group(1) 

624 task_id = m.group(2) 

625 if task_id not in task_to_activities: 

626 task_to_activities[task_id] = [] 

627 task_to_activities[task_id].append(activity) 

628 

629 return task_to_activities 

630 

631 def get_service_names(self): 

632 """ 

633 get current running services 

634 :return: list of services 

635 """ 

636 services = [] 

637 dat = self.adb.shell('dumpsys activity services') 

638 lines = dat.splitlines() 

639 service_re = re.compile('^.+ServiceRecord{.+ ([A-Za-z0-9_.]+)/([A-Za-z0-9_.]+)') 

640 

641 for line in lines: 

642 m = service_re.search(line) 

643 if m: 

644 package = m.group(1) 

645 service = m.group(2) 

646 services.append("%s/%s" % (package, service)) 

647 return services 

648 

649 def get_package_path(self, package_name): 

650 """ 

651 get installation path of a package (app) 

652 :param package_name: 

653 :return: package path of app in device 

654 """ 

655 dat = self.adb.shell('pm path %s' % package_name) 

656 package_path_re = re.compile('^package:(.+)$') 

657 m = package_path_re.match(dat) 

658 if m: 

659 path = m.group(1) 

660 return path.strip() 

661 return None 

662 

663 def start_activity_via_monkey(self, package): 

664 """ 

665 use monkey to start activity 

666 @param package: package name of target activity 

667 """ 

668 cmd = 'monkey' 

669 if package: 

670 cmd += ' -p %s' % package 

671 out = self.adb.shell(cmd) 

672 if re.search(r"(Error)|(Cannot find 'App')", out, re.IGNORECASE | re.MULTILINE): 

673 raise RuntimeError(out) 

674 

675 def send_documents(self, app): 

676 if not self.send_document: 

677 self.logger.info("No document need to be sent") 

678 return 

679 

680 self.logger.info("Sending documents.") 

681 for file in os.listdir(self.resource_path): 

682 if "anki" in app.package_name and file == "collection.anki2": 

683 self.mkdir("/storage/emulated/0/AnkiDroid/") 

684 self.push_file(os.path.join(self.resource_path, file), "/storage/emulated/0/AnkiDroid/") 

685 continue 

686 

687 if "activitydiary" in app.package_name and file == "ActivityDiary_Export.sqlite3": 

688 self.push_file(os.path.join(self.resource_path, file), "/storage/emulated/0/Download/") 

689 continue 

690 

691 self.push_file(os.path.join(self.resource_path, file), "/sdcard/") 

692 

693 def install_app(self, app): 

694 """ 

695 install an app to device 

696 @param app: instance of App 

697 @return: 

698 """ 

699 assert isinstance(app, App) 

700 # subprocess.check_call(["adb", "-s", self.serial, "uninstall", app.get_package_name()], 

701 # stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

702 package_name = app.get_package_name() 

703 if package_name not in self.adb.get_installed_apps(): 

704 install_cmd = ["adb", "-s", self.serial, "install", "-t", "-r"] 

705 if self.grant_perm and self.get_sdk_version() >= 23 and "amaze" not in package_name: 

706 print("Granting permissions for app %s" % package_name) 

707 install_cmd.append("-g") 

708 install_cmd.append(app.app_path) 

709 install_p = subprocess.Popen(install_cmd, stdout=subprocess.PIPE) 

710 while self.connected and package_name not in self.adb.get_installed_apps(): 

711 self.logger.info("Please wait while installing the app...") 

712 time.sleep(2) 

713 if not self.connected: 

714 install_p.terminate() 

715 return 

716 

717 dumpsys_p = subprocess.Popen( 

718 ["adb", "-s", self.serial, "shell", "dumpsys", "package", package_name], 

719 stdout=subprocess.PIPE, 

720 ) 

721 dumpsys_lines = [] 

722 while True: 

723 line = dumpsys_p.stdout.readline() 

724 if not line: 

725 break 

726 if not isinstance(line, str): 

727 line = line.decode() 

728 dumpsys_lines.append(line) 

729 if self.output_dir is not None: 

730 package_info_file_name = "%s/dumpsys_package_%s.txt" % ( 

731 self.output_dir, 

732 app.get_package_name(), 

733 ) 

734 package_info_file = open(package_info_file_name, "w") 

735 package_info_file.writelines(dumpsys_lines) 

736 package_info_file.close() 

737 # app.dumpsys_main_activity = self.__parse_main_activity_from_dumpsys_lines( 

738 # dumpsys_lines 

739 # ) 

740 

741 self.logger.info("App installed: %s" % package_name) 

742 self.logger.info("Main activity: %s" % app.get_main_activity()) 

743 

744 @staticmethod 

745 def __parse_main_activity_from_dumpsys_lines(lines): 

746 main_activity = None 

747 activity_line_re = re.compile("[^ ]+ ([^ ]+)/([^ ]+) filter [^ ]+") 

748 action_re = re.compile("Action: \"([^ ]+)\"") 

749 category_re = re.compile("Category: \"([^ ]+)\"") 

750 

751 activities = {} 

752 

753 cur_package = None 

754 cur_activity = None 

755 cur_actions = [] 

756 cur_categories = [] 

757 

758 for line in lines: 

759 line = line.strip() 

760 m = activity_line_re.match(line) 

761 if m: 

762 activities[cur_activity] = { 

763 "actions": cur_actions, 

764 "categories": cur_categories, 

765 } 

766 cur_package = m.group(1) 

767 cur_activity = m.group(2) 

768 if cur_activity.startswith("."): 

769 cur_activity = cur_package + cur_activity 

770 cur_actions = [] 

771 cur_categories = [] 

772 else: 

773 m1 = action_re.match(line) 

774 if m1: 

775 cur_actions.append(m1.group(1)) 

776 else: 

777 m2 = category_re.match(line) 

778 if m2: 

779 cur_categories.append(m2.group(1)) 

780 

781 if cur_activity is not None: 

782 activities[cur_activity] = { 

783 "actions": cur_actions, 

784 "categories": cur_categories, 

785 } 

786 

787 for activity in activities: 

788 if ( 

789 "android.intent.action.MAIN" in activities[activity]["actions"] 

790 and "android.intent.category.LAUNCHER" 

791 in activities[activity]["categories"] 

792 ): 

793 main_activity = activity 

794 return main_activity 

795 

796 def uninstall_app(self, app): 

797 """ 

798 Uninstall an app from device. 

799 :param app: an instance of App or a package name 

800 """ 

801 if isinstance(app, App): 

802 package_name = app.get_package_name() 

803 # Don't uninstall the app if launch with package name 

804 # if app.settings.is_package: 

805 # return 

806 else: 

807 package_name = app 

808 if package_name in self.adb.get_installed_apps(): 

809 uninstall_cmd = ["adb", "-s", self.serial, "uninstall", package_name] 

810 uninstall_p = subprocess.Popen(uninstall_cmd, stdout=subprocess.PIPE) 

811 while package_name in self.adb.get_installed_apps(): 

812 self.logger.info("Please wait while uninstalling the app...") 

813 time.sleep(2) 

814 uninstall_p.terminate() 

815 

816 def get_app_pid(self, app): 

817 if isinstance(app, App): 

818 package = app.get_package_name() 

819 else: 

820 package = app 

821 

822 name2pid = {} 

823 ps_out = self.adb.shell(["ps"]) 

824 ps_out_lines = ps_out.splitlines() 

825 ps_out_head = ps_out_lines[0].split() 

826 if ps_out_head[1] != "PID" or ps_out_head[-1] != "NAME": 

827 self.logger.warning("ps command output format error: %s" % ps_out_head) 

828 for ps_out_line in ps_out_lines[1:]: 

829 segs = ps_out_line.split() 

830 if len(segs) < 4: 

831 continue 

832 pid = int(segs[1]) 

833 name = segs[-1] 

834 name2pid[name] = pid 

835 

836 if package in name2pid: 

837 return name2pid[package] 

838 

839 possible_pids = [] 

840 for name in name2pid: 

841 if name.startswith(package): 

842 possible_pids.append(name2pid[name]) 

843 if len(possible_pids) > 0: 

844 return min(possible_pids) 

845 

846 return None 

847 

848 def push_file(self, local_file, remote_dir="/sdcard/"): 

849 """ 

850 push file/directory to target_dir 

851 :param local_file: path to file/directory in host machine 

852 :param remote_dir: path to target directory in device 

853 :return: 

854 """ 

855 if not os.path.exists(local_file): 

856 self.logger.warning("push_file file does not exist: %s" % local_file) 

857 self.adb.run_cmd(["push", local_file, remote_dir]) 

858 

859 def pull_file(self, remote_file, local_file): 

860 self.adb.run_cmd(["pull", remote_file, local_file], disable_log=True) 

861 

862 def mkdir(self, path): 

863 self.adb.run_cmd(["shell", "mkdir", path]) 

864 

865 def save_screenshot_for_report(self, event_name=None, event=None, current_state=None): 

866 """ 

867 save screenshot for report, save to "all_states" dir 

868 """ 

869 

870 self.cur_event_count += 1 

871 if current_state is None: 

872 self.current_state = self.get_current_state() 

873 else: 

874 self.current_state = current_state 

875 self.save_to_all_states_dir(self.screenshot_path, event_name=event_name, event=event) 

876 

877 def draw_event(self, event, event_name, screenshot_path): 

878 import cv2 

879 image = cv2.imread(screenshot_path) 

880 if event is not None and screenshot_path is not None: 

881 if isinstance(event, InputEvent): 

882 if isinstance(event, TouchEvent): 

883 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])), 

884 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (0, 0, 255), 5) 

885 elif isinstance(event, LongTouchEvent): 

886 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])), 

887 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (0, 255, 0), 5) 

888 elif isinstance(event, SetTextEvent): 

889 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])), 

890 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (255, 0, 0), 5) 

891 elif isinstance(event, ScrollEvent): 

892 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])), 

893 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (255, 255, 0), 5) 

894 elif isinstance(event, KeyEvent): 

895 cv2.putText(image, event.name, (100, 300), cv2.FONT_HERSHEY_SIMPLEX, 5, (0, 255, 0), 3, cv2.LINE_AA) 

896 else: 

897 return 

898 else: 

899 if event_name == "click": 

900 cv2.rectangle(image, (int(event.info['bounds']['left']), int(event.info['bounds']['top'])), 

901 (int(event.info['bounds']['right']), int(event.info['bounds']['bottom'])), 

902 (0, 0, 255), 5) 

903 elif event_name == "long_click": 

904 cv2.rectangle(image, (int(event.info['bounds']['left']), int(event.info['bounds']['top'])), 

905 (int(event.info['bounds']['right']), int(event.info['bounds']['bottom'])), 

906 (0, 255, 0), 5) 

907 elif event_name == "set_text": 

908 cv2.rectangle(image, (int(event.info['bounds']['left']), int(event.info['bounds']['top'])), 

909 (int(event.info['bounds']['right']), int(event.info['bounds']['bottom'])), 

910 (255, 0, 0), 5) 

911 elif event_name == "press": 

912 cv2.putText(image, event, (100, 300), cv2.FONT_HERSHEY_SIMPLEX, 5, (0, 255, 0), 3, cv2.LINE_AA) 

913 else: 

914 return 

915 try: 

916 cv2.imwrite(screenshot_path, image) 

917 except Exception as e: 

918 self.logger.warning(e) 

919 

920 def take_screenshot(self): 

921 if self.output_dir is None: 

922 return None 

923 

924 from datetime import datetime 

925 

926 tag = datetime.now().strftime("%Y-%m-%d_%H%M%S") 

927 local_image_dir = os.path.join(self.output_dir, "temp") 

928 if not os.path.exists(local_image_dir): 

929 os.makedirs(local_image_dir) 

930 

931 if self.adapters[self.minicap] and self.minicap.last_screen: 

932 # minicap use jpg format 

933 local_image_path = os.path.join(local_image_dir, "screen_%s.jpg" % tag) 

934 with open(local_image_path, 'wb') as local_image_file: 

935 local_image_file.write(self.minicap.last_screen) 

936 return local_image_path 

937 else: 

938 # screencap use png format 

939 local_image_path = os.path.join(local_image_dir, "screen_%s.png" % tag) 

940 remote_image_path = "/sdcard/screen_%s.png" % tag 

941 self.adb.shell("screencap -p %s" % remote_image_path) 

942 self.pull_file(remote_image_path, local_image_path) 

943 self.adb.shell("rm %s" % remote_image_path) 

944 

945 return local_image_path 

946 

947 def save_to_all_states_dir(self, local_image_path, event, event_name=None): 

948 import shutil 

949 all_states_dir = os.path.join(self.output_dir, "all_states") 

950 if not os.path.exists(all_states_dir): 

951 os.makedirs(all_states_dir) 

952 

953 json_dir = os.path.join(self.output_dir, "report_screenshot.json") 

954 if not self.is_harmonyos: 

955 if self.adapters[self.minicap]: 

956 dest_screenshot_path = "%s/screen_%s.jpg" % (all_states_dir, self.cur_event_count) 

957 else: 

958 dest_screenshot_path = "%s/screen_%s.png" % (all_states_dir, self.cur_event_count) 

959 else: 

960 dest_screenshot_path = "%s/screen_%s.jpeg" % (all_states_dir, self.cur_event_count) 

961 

962 if self.current_state is not None: 

963 dest_state_json_path = "%s/state_%s.json" % (all_states_dir, self.cur_event_count) 

964 # self.current_state.screenshot_path = dest_screenshot_path 

965 state_json_file = open(dest_state_json_path, "w") 

966 state_json_file.write(self.current_state.to_json()) 

967 state_json_file.close() 

968 

969 try: 

970 with open(json_dir, 'r') as json_file: 

971 report_screens = json.load(json_file) 

972 except FileNotFoundError: 

973 report_screens = [] 

974 if event_name is None: 

975 event_name = event.get_event_name() 

976 

977 img_file_name = os.path.basename(dest_screenshot_path) 

978 

979 report_screen = { 

980 "event": event_name, 

981 "event_index": str(self.cur_event_count), 

982 "screen_shoot": img_file_name 

983 } 

984 

985 report_screens.append(report_screen) 

986 with open(json_dir, 'w') as json_file: 

987 json.dump(report_screens, json_file, indent=4) 

988 if self.current_state is not None and local_image_path != dest_screenshot_path: 

989 self.current_state.screenshot_path = dest_screenshot_path 

990 shutil.move(local_image_path, dest_screenshot_path) 

991 

992 self.draw_event(event, event_name, dest_screenshot_path) 

993 

994 def get_current_state(self): 

995 self.logger.debug("getting current device state...") 

996 current_state = None 

997 try: 

998 views = self.get_views() 

999 foreground_activity = self.get_top_activity_name() 

1000 activity_stack = self.get_current_activity_stack() 

1001 background_services = self.get_service_names() 

1002 screenshot_path = self.take_screenshot() 

1003 self.screenshot_path = screenshot_path 

1004 self.logger.debug("finish getting current device state...") 

1005 from .device_state import DeviceState 

1006 

1007 current_state = DeviceState( 

1008 self, 

1009 views=views, 

1010 foreground_activity=foreground_activity, 

1011 activity_stack=activity_stack, 

1012 background_services=background_services, 

1013 screenshot_path=screenshot_path, 

1014 tag=self.cur_event_count 

1015 ) 

1016 except Exception as e: 

1017 self.logger.warning("exception in get_current_state: %s" % e) 

1018 import traceback 

1019 

1020 traceback.print_exc() 

1021 self.logger.debug("finish getting current device state...") 

1022 self.last_know_state = current_state 

1023 if not current_state: 

1024 self.logger.warning("Failed to get current state!") 

1025 return current_state 

1026 

1027 def get_last_known_state(self): 

1028 return self.last_know_state 

1029 

1030 def view_touch(self, x, y): 

1031 self.adb.touch(x, y) 

1032 

1033 def view_long_touch(self, x, y, duration=2000): 

1034 """ 

1035 Long touches at (x, y) 

1036 @param duration: duration in ms 

1037 This workaround was suggested by U{HaMi<http://stackoverflow.com/users/2571957/hami>} 

1038 """ 

1039 self.adb.long_touch(x, y, duration) 

1040 

1041 def view_drag(self, start_xy, end_xy, duration): 

1042 """ 

1043 Sends drag event n PX (actually it's using C{input swipe} command. 

1044 """ 

1045 self.adb.drag(start_xy, end_xy, duration) 

1046 

1047 def view_append_text(self, text): 

1048 try: 

1049 self.u2.send_keys(text=text, clear=False) 

1050 except: 

1051 self.adb.type(text) 

1052 

1053 def view_set_text(self, text): 

1054 try: 

1055 self.u2.send_keys(text=text, clear=True) 

1056 except: 

1057 self.logger.warning( 

1058 "`adb shell input text` doesn't support setting text, appending instead." 

1059 ) 

1060 self.adb.type(text) 

1061 

1062 def key_press(self, key_code): 

1063 self.adb.press(key_code) 

1064 

1065 def shutdown(self): 

1066 self.adb.shell("reboot -p") 

1067 

1068 def get_views(self): 

1069 if self.cv_mode and self.adapters[self.minicap]: 

1070 # Get views using cv module 

1071 views = self.minicap.get_views() 

1072 if views: 

1073 return views 

1074 else: 

1075 self.logger.warning("Failed to get views using OpenCV.") 

1076 # if self.droidbot_app and self.adapters[self.droidbot_app]: 

1077 # views = self.droidbot_app.get_views() 

1078 # if views: 

1079 # return views 

1080 # else: 

1081 # self.logger.warning("Failed to get views using Accessibility.") 

1082 if self.uiautomator_helper: 

1083 views = self.uiautomator_helper.get_views() 

1084 if views: 

1085 return views 

1086 else: 

1087 self.logger.warning("Failed to get views using UiAutomator.") 

1088 

1089 self.logger.warning("failed to get current views!") 

1090 return None 

1091 

1092 def get_random_port(self): 

1093 """ 

1094 get a random port on host machine to establish connection 

1095 :return: a port number 

1096 """ 

1097 import socket 

1098 

1099 temp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

1100 temp_sock.bind(("", 0)) 

1101 port = temp_sock.getsockname()[1] 

1102 temp_sock.close() 

1103 if port in self.__used_ports: 

1104 return self.get_random_port() 

1105 self.__used_ports.append(port) 

1106 return port 

1107 

1108 def handle_rotation(self): 

1109 if not self.adapters[self.minicap]: 

1110 return 

1111 self.pause_sending_event = True 

1112 if self.minicap.check_connectivity(): 

1113 self.minicap.disconnect() 

1114 self.minicap.connect() 

1115 

1116 if self.minicap.check_connectivity(): 

1117 print("[CONNECTION] %s is reconnected." % self.minicap.__class__.__name__) 

1118 self.pause_sending_event = False 

1119 

1120 def get_activity_short_name(self): 

1121 return self.get_top_activity_name().split(".")[-1] 

1122 

1123 def rotate_device_right(self): 

1124 self.adb.disable_auto_rotation() 

1125 time.sleep(1) 

1126 self.adb.rotate_right() 

1127 

1128 def rotate_device_neutral(self): 

1129 self.adb.disable_auto_rotation() 

1130 time.sleep(1) 

1131 self.adb.rotate_neutral() 

1132 

1133 # clear the app data (including user data and cache) 

1134 def clear_data(self, package_name): 

1135 """ 

1136 clear the app data (including user data and cache) 

1137 :param package_name: the app package name 

1138 """ 

1139 self.adb.clear_app_data(package_name)