Coverage for kea/device_hm.py: 76%
393 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-22 16:05 +0800
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-22 16:05 +0800
1import json
2import logging
3import os
4import re
5import subprocess
6import sys
7import time
8from typing import IO
9import typing
11if typing.TYPE_CHECKING:
12 from .start import Setting
13from .input_event import InputEvent, TouchEvent, LongTouchEvent, ScrollEvent, SetTextEvent, KeyEvent
15from .device import Device
16from .adapter.hdc import HDC, HDC_EXEC
17from .app_hm import AppHM
18from .adapter.hilog import Hilog
19from .intent import Intent
21DEFAULT_NUM = '1234567890'
22DEFAULT_CONTENT = 'Hello world!'
25class DeviceHM(Device):
26 """
27 this class describes a connected device
28 """
30 def __init__(self, device_serial=None, is_emulator=False, output_dir=None,
31 cv_mode=False, grant_perm=False, telnet_auth_token=None,
32 enable_accessibility_hard=False, humanoid=None, ignore_ad=False,
33 is_harmonyos=True, save_log=False, settings:"Setting"=None):
34 """
35 initialize a device connection
36 :param device_serial: serial number of target device
37 :param is_emulator: boolean, type of device, True for emulator, False for real device
38 :return:
39 """
40 self.logger = logging.getLogger(self.__class__.__name__)
41 self.cur_event_count = 0
42 self.screenshot_path = None
43 self.current_state = None
45 if "emulator" in device_serial and not is_emulator:
46 self.logger.warning("Seems like you are using an emulator. If so, please add is_emulator option.")
47 self.serial = device_serial
48 self.is_emulator = is_emulator
49 self.cv_mode = cv_mode
50 self.output_dir = output_dir
51 if output_dir is not None:
52 if not os.path.isdir(output_dir):
53 os.makedirs(output_dir)
54 self.grant_perm = grant_perm
55 self.enable_accessibility_hard = enable_accessibility_hard
56 self.humanoid = humanoid
57 self.ignore_ad = ignore_ad
59 # basic device information
60 self.settings = settings
61 self.display_info = None
62 self._model_number = None
63 self._device_name = None
64 self.sdk_version = None
65 self.release_version = None
66 self.connected = True
67 self.last_know_state = None
68 self.__used_ports = []
69 self.pause_sending_event = False
71 self.is_harmonyos = is_harmonyos
72 self.save_log = save_log
74 # adapters
75 self.hdc = HDC(device=self)
76 self.hilog = Hilog(device=self)
78 from hmdriver2.driver import Driver
79 self.hm2 = Driver(serial=self.serial)
82 self.logger.info("You're runing droidbot on HarmonyOS")
83 self.adapters = {
84 self.hdc: True,
85 self.hilog: True if self.save_log else False
86 }
88 def check_connectivity(self):
89 """
90 check if the device is available
91 """
92 for adapter in self.adapters:
93 adapter_name = adapter.__class__.__name__
94 adapter_enabled = self.adapters[adapter]
95 if not adapter_enabled:
96 print("[CONNECTION] %s is not enabled." % adapter_name)
97 else:
98 if adapter.check_connectivity():
99 print("[CONNECTION] %s is enabled and connected." % adapter_name)
100 else:
101 print("[CONNECTION] %s is enabled but not connected." % adapter_name)
103 def set_up(self):
104 """
105 Set connections on this device
106 """
107 # wait for emulator to start
108 # self.wait_for_device()
109 for adapter in self.adapters:
110 adapter_enabled = self.adapters[adapter]
111 if not adapter_enabled:
112 continue
113 adapter.set_up()
115 def connect(self):
116 """
117 establish connections on this device
118 :return:
119 """
120 for adapter in self.adapters:
121 adapter_enabled = self.adapters[adapter]
122 if not adapter_enabled:
123 continue
124 adapter.connect()
126 # self.get_sdk_version()
127 # self.get_release_version()
128 # self.get_ro_secure()
129 # self.get_ro_debuggable()
130 # self.get_display_info()
132 self.unlock()
133 self.check_connectivity()
134 self.connected = True
136 def disconnect(self):
137 """
138 disconnect current device
139 :return:
140 """
141 self.connected = False
142 for adapter in self.adapters:
143 adapter_enabled = self.adapters[adapter]
144 if not adapter_enabled:
145 continue
146 adapter.disconnect()
148 if self.output_dir is not None:
149 temp_dir = os.path.join(self.output_dir, "temp")
150 if os.path.exists(temp_dir):
151 import shutil
152 shutil.rmtree(temp_dir)
154 def tear_down(self):
155 for adapter in self.adapters:
156 adapter_enabled = self.adapters[adapter]
157 if not adapter_enabled:
158 continue
159 adapter.tear_down()
161 def send_documents(self, app):
162 self.logger.warning("send_documents Not implemented")
164 def is_foreground(self, app) -> bool:
165 """
166 check if app is in foreground of device
167 :param app: App
168 :return: boolean
169 """
170 if isinstance(app, str):
171 package_name = app
172 elif isinstance(app, AppHM):
173 package_name = app.get_package_name()
174 else:
175 return False
177 top_activity_name = self.get_top_activity_name()
178 if top_activity_name is None:
179 return False
180 return top_activity_name.startswith(package_name)
182 @property
183 def model_number(self):
184 """
185 Get model number
186 """
187 if self._model_number is None:
188 self._model_number = self.hdc.get_model_number()
189 return self._model_number
191 @property
192 def device_name(self):
193 if self._device_name is None:
194 self._device_name = self.hdc.get_device_name()
195 return self._device_name
197 def get_sdk_version(self):
198 pass
200 def get_release_version(self):
201 """
202 Get version of current SDK
203 """
204 if self.release_version is None:
205 self.release_version = self.hdc.get_release_version()
206 return self.release_version
208 def get_ro_secure(self):
209 pass
211 def get_ro_debuggable(self):
212 pass
214 def get_display_info(self, refresh=True):
215 """
216 get device display information, including width, height, and density
217 :param refresh: if set to True, refresh the display info instead of using the old values
218 :return: dict, display_info
219 """
220 r = self.hdc.shell("hidumper -s RenderService -a screen")
221 pattern = r"activeMode: (?P<width>\d+)x(?P<height>\d+)"
222 m = re.search(pattern, r)
223 assert m, "Failed when getting screen resolution with hidumper"
224 display_info = {"width":int(m.group("width")), "height":int(m.group("height"))}
225 return display_info
227 def get_width(self, refresh=False):
228 display_info = self.get_display_info(refresh=refresh)
229 width = 0
230 if "width" in display_info:
231 width = display_info["width"]
232 elif not refresh:
233 width = self.get_width(refresh=True)
234 else:
235 self.logger.warning("get_width: width not in display_info")
236 return width
238 def get_height(self, refresh=False):
239 display_info = self.get_display_info(refresh=refresh)
240 height = 0
241 if "height" in display_info:
242 height = display_info["height"]
243 elif not refresh:
244 height = self.get_width(refresh=True)
245 else:
246 self.logger.warning("get_height: height not in display_info")
247 return height
249 def unlock(self):
250 """
251 unlock screen
252 skip first-use tutorials
253 etc
254 :return:
255 """
256 self.hdc.unlock()
258 def send_intent(self, intent):
259 """
260 send an intent to device via am (ActivityManager)
261 :param intent: instance of Intent or str
262 :return:
263 """
264 assert self.hdc is not None
265 assert intent is not None
266 if isinstance(intent, Intent):
267 cmd = intent.get_cmd()
268 else:
269 cmd = intent
270 return self.hdc.shell(cmd)
272 def send_event(self, event:"InputEvent"):
273 """
274 send one event to device
275 :param event: the event to be sent
276 :return:
277 """
278 event.send(self)
280 def start_app(self, app):
281 """
282 start an app on the device
283 :param app: instance of App, or str of package name
284 :return:
285 """
286 if isinstance(app, str):
287 package_name = app
288 elif isinstance(app, AppHM):
289 package_name = app.get_package_name()
290 if app.get_main_activity():
291 package_name += "/%s" % app.get_main_activity()
292 else:
293 self.logger.warning("unsupported param " + app + " with type: ", type(app))
294 return
295 intent = Intent(suffix=package_name)
296 self.send_intent(intent)
298 def get_top_activity_name(self) -> str:
299 """
300 Get current activity
301 """
302 r = self.hdc.shell("aa dump --mission-list")
304 if r"#FOREGROUND" not in r:
305 return None
307 mission_list = r.split("Mission ID")
308 for mission in mission_list:
309 if "state" not in mission:
310 continue
311 if "#BACKGROUND" in mission:
312 continue
314 import re
315 pattern = r"mission name #\[#(?P<bundleName>.+?):.+?:(?P<abilityName>.+?)\]"
316 m = re.search(pattern, mission)
317 if m:
318 return m.group("bundleName") + "/" + m.group("abilityName")
320 return None
322 def get_current_activity_stack(self):
323 """
324 Get current activity stack
325 :return: a list of str, each str is an activity name, the first is the top activity name
326 """
327 task_to_activities = self.get_task_activities()
328 top_activity = self.get_top_activity_name()
329 if top_activity:
330 for task_id in task_to_activities:
331 activities = task_to_activities[task_id]
332 if len(activities) > 0 and activities[0] == top_activity:
333 return activities
334 self.logger.warning("Unable to get current activity stack.")
335 return [top_activity]
336 else:
337 return None
340 def install_app(self, app):
341 """
342 install an app to device
343 @param app: instance of App
344 @return:
345 """
346 assert isinstance(app, AppHM)
347 package_name = app.get_package_name()
348 if package_name not in self.hdc.get_installed_apps():
349 install_cmd = [HDC_EXEC, "-t", self.serial, "install", "-r"]
350 install_cmd.append(HDC.get_relative_path(app.app_path))
351 install_p = subprocess.Popen(install_cmd, stdout=subprocess.PIPE)
352 # print(" ".join(install_cmd))
354 print("Please wait while installing the app...")
355 output = install_p.stdout.readline()
356 if output:
357 print(output.strip().decode())
359 if not self.connected:
360 install_p.terminate()
361 return
362 # dump the package info through the bundleName
363 # hdc shell bm dump -n [bundleName]
364 dumpsys_p = subprocess.Popen([HDC_EXEC, "-t", self.serial, "shell",
365 "bm","dump" ,"-n", package_name], stdout=subprocess.PIPE)
366 dumpsys_lines = []
367 while True:
368 line = dumpsys_p.stdout.readline()
369 if not line:
370 break
371 if not isinstance(line, str):
372 line = line.decode()
373 dumpsys_lines.append(line)
375 if self.output_dir is not None:
376 package_info_file_name = "%s/dumpsys_package_%s.txt" % (self.output_dir, app.get_package_name())
377 with open(package_info_file_name, "w") as fp:
378 fp.writelines(dumpsys_lines)
380 with open(package_info_file_name, "r") as fp:
381 app.dumpsys_main_activity = self.__parse_main_activity_from_dumpsys_lines(fp)
383 self.logger.info("App installed: %s" % package_name)
384 self.logger.info("Main activity: %s" % app.get_main_activity())
386 @staticmethod
387 def __parse_main_activity_from_dumpsys_lines(fp:IO):
388 """
389 """
391 # jump the fist line (the bundleName, which dosen't follow the json format)
392 # to correctly parse the file into json
393 cur_package = fp.readline().replace(":", "").strip()
395 import json
396 dumpsys = json.load(fp)
398 # main ability
399 return dumpsys["hapModuleInfos"][0]["mainAbility"]
401 def uninstall_app(self, app):
402 """
403 Uninstall an app from device.
404 :param app: an instance of App or a package name
405 """
406 if isinstance(app, AppHM):
407 package_name = app.get_package_name()
408 # if self.settings.is_package:
409 # return
410 else:
411 package_name = app
412 if package_name in self.hdc.get_installed_apps():
413 uninstall_cmd = [HDC_EXEC, "-t", self.serial, "uninstall", package_name]
414 uninstall_p = subprocess.Popen(uninstall_cmd, stdout=subprocess.PIPE)
415 while package_name in self.hdc.get_installed_apps():
416 print("Please wait while uninstalling the app...")
417 time.sleep(2)
418 uninstall_p.terminate()
420 def push_file(self, local_file, remote_dir="/sdcard/"):
421 """
422 push file/directory to target_dir
423 :param local_file: path to file/directory in host machine
424 :param remote_dir: path to target directory in device
425 :return:
426 """
427 if not os.path.exists(local_file):
428 self.logger.warning("push_file file does not exist: %s" % local_file)
429 self.hdc.run_cmd(["file send", local_file, remote_dir])
431 def pull_file(self, remote_file, local_file):
432 r = self.hdc.run_cmd(["file", "recv", remote_file, local_file])
433 assert not r.startswith("[Fail]"), "Error with receiving file"
435 def take_screenshot(self):
437 if self.output_dir is None:
438 return None
440 r = self.hdc.shell("snapshot_display")
441 assert "success" in r, "Error when taking screenshot"
443 remote_path = r.splitlines()[0].split()[-1]
444 file_name = os.path.basename(remote_path)
445 temp_path = os.path.join(self.output_dir, "temp")
446 local_path = os.path.join(os.getcwd(), temp_path, file_name)
448 self.pull_file(remote_path, HDC.get_relative_path(local_path))
450 return local_path
452 def save_screenshot_for_report(self, event_name=None, event=None, current_state=None):
453 """
454 save screenshot for report, save to "all_states" dir
455 """
457 self.cur_event_count += 1
458 if current_state is None:
459 self.current_state = self.get_current_state()
460 else:
461 self.current_state = current_state
462 self.save_to_all_states_dir(self.screenshot_path, event_name=event_name, event=event)
464 def draw_event(self, event, event_name, screenshot_path):
465 import cv2
466 image = cv2.imread(screenshot_path)
467 if event is not None and screenshot_path is not None:
468 if isinstance(event, InputEvent):
469 if isinstance(event, TouchEvent):
470 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])),
471 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (0, 0, 255), 5)
472 elif isinstance(event, LongTouchEvent):
473 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])),
474 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (0, 255, 0), 5)
475 elif isinstance(event, SetTextEvent):
476 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])),
477 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (255, 0, 0), 5)
478 elif isinstance(event, ScrollEvent):
479 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])),
480 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (255, 255, 0), 5)
481 elif isinstance(event, KeyEvent):
482 cv2.putText(image, event.name, (100, 300), cv2.FONT_HERSHEY_SIMPLEX, 5, (0, 255, 0), 3, cv2.LINE_AA)
483 else:
484 return
485 else:
486 if event_name == "click":
487 cv2.rectangle(image, (int(event.bounds.left), int(event.bounds.top)), (int(event.bounds.right), int(event.bounds.bottom)), (0, 0, 255), 5)
488 elif event_name == "long_click":
489 cv2.rectangle(image, (int(event.bounds.left), int(event.bounds.top)), (int(event.bounds.right), int(event.bounds.bottom)), (0, 255, 0), 5)
490 elif event_name == "set_text":
491 cv2.rectangle(image, (int(event.bounds.left), int(event.bounds.top)), (int(event.bounds.right), int(event.bounds.bottom)), (255, 0, 0), 5)
492 elif event_name == "press":
493 cv2.putText(image,event, (100,300), cv2.FONT_HERSHEY_SIMPLEX, 5,(0, 255, 0), 3, cv2.LINE_AA)
494 else:
495 return
496 try:
497 cv2.imwrite(screenshot_path, image)
498 except Exception as e:
499 self.logger.warning(e)
501 def save_to_all_states_dir(self, local_image_path, event, event_name = None):
502 import shutil
503 all_states_dir = os.path.join(self.output_dir, "all_states")
504 if not os.path.exists(all_states_dir):
505 os.makedirs(all_states_dir)
507 json_dir = os.path.join(self.output_dir, "report_screenshot.json")
508 if not self.is_harmonyos:
509 if self.adapters[self.minicap]:
510 dest_screenshot_path = "%s/screen_%s.jpg" % (all_states_dir, self.cur_event_count)
511 else:
512 dest_screenshot_path = "%s/screen_%s.png" % (all_states_dir, self.cur_event_count)
513 else:
514 dest_screenshot_path = "%s/screen_%s.jpeg" % (all_states_dir, self.cur_event_count)
516 if self.current_state is not None:
517 dest_state_json_path = "%s/state_%s.json" % (all_states_dir, self.cur_event_count)
518 state_json_file = open(dest_state_json_path, "w")
519 state_json_file.write(self.current_state.to_json())
520 state_json_file.close()
522 try:
523 with open(json_dir, 'r') as json_file:
524 report_screens = json.load(json_file)
525 except FileNotFoundError:
526 report_screens = []
527 if event_name is None:
528 event_name = event.get_event_name()
530 img_file_name = os.path.basename(dest_screenshot_path)
532 report_screen = {
533 "event": event_name,
534 "event_index": str(self.cur_event_count),
535 "screen_shoot": img_file_name
536 }
538 report_screens.append(report_screen)
539 with open(json_dir, 'w') as json_file:
540 json.dump(report_screens, json_file, indent=4)
541 if self.current_state is not None and local_image_path != dest_screenshot_path:
542 self.current_state.screenshot_path = dest_screenshot_path
543 shutil.move(local_image_path, dest_screenshot_path)
545 self.draw_event(event, event_name, dest_screenshot_path)
547 def get_current_state(self, action_count=None):
548 self.logger.debug("getting current device state...")
549 current_state = None
550 try:
551 foreground_activity = self.get_top_activity_name()
552 # activity_stack = self.get_current_activity_stack()
553 activity_stack = [foreground_activity]
554 # background_services = self.get_service_names()
555 screenshot_path = self.take_screenshot()
556 self.screenshot_path = screenshot_path
557 self.logger.debug("finish getting current device state...")
558 # if there's no foreground activities (In home or lock screen)
559 views = self.get_views() if foreground_activity is not None else []
560 from .device_state import DeviceState
561 current_state = DeviceState(self,
562 views=views,
563 foreground_activity=foreground_activity,
564 activity_stack=activity_stack,
565 background_services=None,
566 screenshot_path=screenshot_path,
567 tag=action_count)
568 except Exception as e:
569 self.logger.warning("exception in get_current_state: %s" % e)
570 import traceback
571 traceback.print_exc()
572 self.logger.debug("finish getting current device state...")
573 self.last_know_state = current_state
574 if not current_state:
575 self.logger.warning("Failed to get current state!")
576 return current_state
578 def get_last_known_state(self):
579 return self.last_know_state
581 def view_touch(self, x, y):
582 self.hdc.touch(x, y)
584 def view_long_touch(self, x, y, duration=2000):
585 """
586 Long touches at (x, y)
587 @param duration: duration in ms
588 """
589 self.hdc.long_touch(x, y, duration)
591 def view_drag(self, start_xy, end_xy, duration):
592 """
593 Sends drag event n PX (actually it's using C{input swipe} command.
594 """
595 self.hdc.drag(start_xy, end_xy, duration)
597 def view_append_text(self, text):
598 try:
599 self.hm2.input_text()
600 except:
601 self.hdc.type(text)
603 def view_set_text(self, text):
604 try:
605 self.hm2.input_text(text)
606 except:
607 self.logger.warning(
608 "Failed to input text with hmdriver2. Use `hdc` to append text instead."
609 )
610 self.hdc.type(text)
612 def key_press(self, key_code):
613 self.hdc.press(key_code)
615 def shutdown(self):
616 pass
617 # self.adb.shell("reboot -p")
619 # def get_views(self):
620 # if self.hdc:
621 # views = self.get_views()
622 # if views:
623 # return views
624 # else:
625 # self.logger.warning("Failed to get views using HDC.")
627 # self.logger.warning("failed to get current views!")
628 # return None
630 def get_views(self):
631 return self.hdc.get_views(self.output_dir)
633 def get_random_port(self):
634 """
635 get a random port on host machine to establish connection
636 :return: a port number
637 """
638 import socket
639 temp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
640 temp_sock.bind(("", 0))
641 port = temp_sock.getsockname()[1]
642 temp_sock.close()
643 if port in self.__used_ports:
644 return self.get_random_port()
645 self.__used_ports.append(port)
646 return port