Coverage for kea/device.py: 64%
647 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-22 16:05 +0800
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-22 16:05 +0800
1import json
2import logging
3import os
4import re
5import subprocess
6import sys
7import time
9import uiautomator2
10import pkg_resources
11from .adapter.uiautomator2_helper import Uiautomator2_Helper
13from .adapter.adb import ADB
15from .adapter.logcat import Logcat
16from .adapter.minicap import Minicap
17from .adapter.process_monitor import ProcessMonitor
18from .adapter.telnet import TelnetConsole
19from .adapter.user_input_monitor import UserInputMonitor
21from .app import App
22from .intent import Intent
24from .input_event import InputEvent, SetTextAndSearchEvent, TouchEvent, LongTouchEvent, ScrollEvent, SetTextEvent, \
25 KeyEvent
27DEFAULT_NUM = '1234567890'
28DEFAULT_CONTENT = 'Hello world!'
31class Device(object):
32 """
33 this class describes a connected device
34 """
36 def __init__(
37 self,
38 device_serial=None,
39 is_emulator=False,
40 output_dir=None,
41 cv_mode=False,
42 grant_perm=False,
43 send_document=False,
44 telnet_auth_token=None,
45 enable_accessibility_hard=False,
46 humanoid=None,
47 ignore_ad=False,
48 app_package_name=None,
49 is_harmonyos=False
50 ):
51 """
52 initialize a device connection
53 :param device_serial: serial number of target device
54 :param is_emulator: boolean, type of device, True for emulator, False for real device
55 :return:
56 """
58 self.logger = logging.getLogger(self.__class__.__name__)
59 self.app_package_name = app_package_name
60 if device_serial is None:
61 from .utils import get_available_devices
63 all_devices = get_available_devices()
64 if len(all_devices) == 0:
65 self.logger.warning("ERROR: No device connected.")
66 sys.exit(-1)
67 device_serial = all_devices[0]
68 if "emulator" in device_serial and not is_emulator:
69 self.logger.warning(
70 "Seems like you are using an emulator. If so, please add is_emulator option."
71 )
72 self.serial = device_serial
73 self.is_emulator = is_emulator
74 self.cv_mode = cv_mode
75 self.output_dir = output_dir
76 if output_dir is not None:
77 if not os.path.isdir(output_dir):
78 os.makedirs(output_dir)
79 self.grant_perm = grant_perm
80 self.send_document = send_document
81 self.enable_accessibility_hard = enable_accessibility_hard
82 self.humanoid = humanoid
83 self.ignore_ad = ignore_ad
84 self.is_harmonyos = is_harmonyos
85 self.cur_event_count = 0
86 self.screenshot_path = None
87 self.current_state = None
89 self.u2 = uiautomator2.connect(self.serial)
90 # disable keyboard
91 self.u2.set_fastinput_ime(True)
93 # basic device information
94 self.settings = {}
95 self.display_info = None
96 self.model_number = None
97 self.sdk_version = None
98 self.release_version = None
99 self.ro_debuggable = None
100 self.ro_secure = None
101 self.connected = True
102 self.last_know_state = None
103 self.__used_ports = []
104 self.pause_sending_event = False
106 # adapters
107 self.adb = ADB(device=self)
108 # Start the emulator console via Telnet.
109 self.telnet = TelnetConsole(device=self, auth_token=telnet_auth_token)
111 # self.droidbot_app = DroidBotAppConn(device=self)
113 # Minicap is an open-source screen capture tool for Android, commonly used to capture screen content for automation testing or remote viewing.
114 self.minicap = Minicap(device=self)
115 # Logcat is a logging tool in the Android system used to capture various log messages that occur on the device.
116 self.logcat = Logcat(device=self)
117 # The UserInputMonitor class is typically used to monitor and handle user input events (such as touch, clicks, keyboard input, etc.) on Android devices.
118 self.user_input_monitor = UserInputMonitor(device=self)
119 # ProcessMonitor is a class used to monitor and manage processes on the device.
120 self.process_monitor = ProcessMonitor(device=self)
121 # Uiautomator2_Helper is a class used to handle the logic for interacting with UIAutomator2.
122 self.uiautomator_helper = Uiautomator2_Helper(device=self)
124 # self.droidbot_ime = DroidBotIme(device=self)
126 self.adapters = {
127 self.adb: True,
128 self.telnet: False,
129 # self.droidbot_app: True,
130 self.minicap: True,
131 self.logcat: True,
132 self.user_input_monitor: True,
133 self.process_monitor: True,
134 # self.droidbot_ime: True,
135 }
137 # minicap currently not working on emulators
138 if self.is_emulator:
139 self.logger.info("disable minicap on emulator")
140 self.adapters[self.minicap] = False
142 # self.resource_path = "Document"
143 self.resource_path = pkg_resources.resource_filename(
144 "kea", "resources/Document"
145 )
147 def check_connectivity(self):
148 """
149 check if the device is available
150 """
151 for adapter in self.adapters:
152 adapter_name = adapter.__class__.__name__
153 adapter_enabled = self.adapters[adapter]
154 if not adapter_enabled:
155 print("[CONNECTION] %s is not enabled." % adapter_name)
156 else:
157 if adapter.check_connectivity():
158 print("[CONNECTION] %s is enabled and connected." % adapter_name)
159 else:
160 print(
161 "[CONNECTION] %s is enabled but not connected." % adapter_name
162 )
164 def wait_for_device(self):
165 """
166 wait until the device is fully booted
167 :return:
168 """
169 self.logger.info("waiting for device")
170 try:
171 subprocess.check_call(["adb", "-s", self.serial, "wait-for-device"])
172 # while True:
173 # out = subprocess.check_output(
174 # ["adb", "-s", self.serial, "shell", "getprop", "init.svc.bootanim"]).split()[0]
175 # if not isinstance(out, str):
176 # out = out.decode()
177 # if out == "stopped":
178 # break
179 # time.sleep(1)
180 except:
181 self.logger.warning("error waiting for device")
183 def set_up(self):
184 """
185 Set connections on this device
186 """
187 # wait for emulator to start
188 self.wait_for_device()
189 for adapter in self.adapters:
190 adapter_enabled = self.adapters[adapter]
191 if not adapter_enabled:
192 continue
193 adapter.set_up()
195 def connect(self):
196 """
197 establish connections on this device
198 :return:
199 """
200 for adapter in self.adapters:
201 adapter_enabled = self.adapters[adapter]
202 if not adapter_enabled:
203 continue
204 adapter.connect()
206 self.get_sdk_version()
207 self.get_release_version()
208 self.get_ro_secure()
209 self.get_ro_debuggable()
210 self.get_display_info()
212 self.unlock()
213 self.check_connectivity()
214 self.connected = True
216 def disconnect(self):
217 """
218 disconnect current device
219 :return:
220 """
221 self.connected = False
222 for adapter in self.adapters:
223 adapter_enabled = self.adapters[adapter]
224 if not adapter_enabled:
225 continue
226 adapter.disconnect()
228 if self.output_dir is not None:
229 temp_dir = os.path.join(self.output_dir, "temp")
230 if os.path.exists(temp_dir):
231 import shutil
233 shutil.rmtree(temp_dir)
235 def tear_down(self):
236 for adapter in self.adapters:
237 adapter_enabled = self.adapters[adapter]
238 if not adapter_enabled:
239 continue
240 adapter.tear_down()
242 def is_foreground(self, app):
243 """
244 check if app is in foreground of device
245 :param app: App
246 :return: boolean
247 """
248 if isinstance(app, str):
249 package_name = app
250 elif isinstance(app, App):
251 package_name = app.get_package_name()
252 else:
253 return False
255 top_activity_name = self.get_top_activity_name()
256 if top_activity_name is None:
257 return False
258 return top_activity_name.startswith(package_name)
260 def get_model_number(self):
261 """
262 Get model number
263 """
264 if self.model_number is None:
265 self.model_number = self.adb.get_model_number()
266 return self.model_number
268 def get_sdk_version(self):
269 """
270 Get version of current SDK
271 """
272 if self.sdk_version is None:
273 self.sdk_version = self.adb.get_sdk_version()
274 return self.sdk_version
276 def get_release_version(self):
277 """
278 Get version of current SDK
279 """
280 if self.release_version is None:
281 self.release_version = self.adb.get_release_version()
282 return self.release_version
284 def get_ro_secure(self):
285 if self.ro_secure is None:
286 self.ro_secure = self.adb.get_ro_secure()
287 return self.ro_secure
289 def get_ro_debuggable(self):
290 if self.ro_debuggable is None:
291 self.ro_debuggable = self.adb.get_ro_debuggable()
292 return self.ro_debuggable
294 def get_display_info(self, refresh=True):
295 """
296 get device display information, including width, height, and density
297 :param refresh: if set to True, refresh the display info instead of using the old values
298 :return: dict, display_info
299 """
300 if self.display_info is None or refresh:
301 self.display_info = self.adb.get_display_info()
302 return self.display_info
304 def get_width(self, refresh=False):
305 display_info = self.get_display_info(refresh=refresh)
306 width = 0
307 if "width" in display_info:
308 width = display_info["width"]
309 elif not refresh:
310 width = self.get_width(refresh=True)
311 else:
312 self.logger.warning("get_width: width not in display_info")
313 return width
315 def get_height(self, refresh=False):
316 display_info = self.get_display_info(refresh=refresh)
317 height = 0
318 if "height" in display_info:
319 height = display_info["height"]
320 elif not refresh:
321 height = self.get_width(refresh=True)
322 else:
323 self.logger.warning("get_height: height not in display_info")
324 return height
326 def unlock(self):
327 """
328 unlock screen
329 skip first-use tutorials
330 etc
331 :return:
332 """
333 self.adb.unlock()
335 def shake(self):
336 """
337 shake the device
338 """
339 # TODO the telnet-simulated shake event is not usable
340 telnet = self.telnet
341 if telnet is None:
342 self.logger.warning("Telnet not connected, so can't shake the device.")
343 sensor_xyz = [
344 (-float(v * 10) + 1, float(v) + 9.8, float(v * 2) + 0.5)
345 for v in [1, -1, 1, -1, 1, -1, 0]
346 ]
347 for x, y, z in sensor_xyz:
348 telnet.run_cmd("sensor set acceleration %f:%f:%f" % (x, y, z))
350 def add_env(self, env):
351 """
352 set env to the device
353 :param env: instance of AppEnv
354 """
355 self.logger.info("deploying env: %s" % env)
356 env.deploy(self)
358 def add_contact(self, contact_data):
359 """
360 add a contact to device
361 :param contact_data: dict of contact, should have keys like name, phone, email
362 :return:
363 """
364 assert self.adb is not None
365 contact_intent = Intent(
366 prefix="start",
367 action="android.intent.action.INSERT",
368 mime_type="vnd.android.cursor.dir/contact",
369 extra_string=contact_data,
370 )
371 self.send_intent(intent=contact_intent)
372 time.sleep(2)
373 self.adb.press("BACK")
374 time.sleep(2)
375 self.adb.press("BACK")
376 return True
378 def receive_call(self, phone=DEFAULT_NUM):
379 """
380 simulate a income phonecall
381 :param phone: str, phonenum
382 :return:
383 """
384 assert self.telnet is not None
385 return self.telnet.run_cmd("gsm call %s" % phone)
387 def cancel_call(self, phone=DEFAULT_NUM):
388 """
389 cancel phonecall
390 :param phone: str, phonenum
391 :return:
392 """
393 assert self.telnet is not None
394 return self.telnet.run_cmd("gsm cancel %s" % phone)
396 def accept_call(self, phone=DEFAULT_NUM):
397 """
398 accept phonecall
399 :param phone: str, phonenum
400 :return:
401 """
402 assert self.telnet is not None
403 return self.telnet.run_cmd("gsm accept %s" % phone)
405 def call(self, phone=DEFAULT_NUM):
406 """
407 simulate a outcome phonecall
408 :param phone: str, phonenum
409 :return:
410 """
411 call_intent = Intent(
412 prefix='start',
413 action="android.intent.action.CALL",
414 data_uri="tel:%s" % phone,
415 )
416 return self.send_intent(intent=call_intent)
418 def send_sms(self, phone=DEFAULT_NUM, content=DEFAULT_CONTENT):
419 """
420 send a SMS
421 :param phone: str, phone number of receiver
422 :param content: str, content of sms
423 :return:
424 """
425 send_sms_intent = Intent(
426 prefix='start',
427 action="android.intent.action.SENDTO",
428 data_uri="sms:%s" % phone,
429 extra_string={'sms_body': content},
430 extra_boolean={'exit_on_sent': 'true'},
431 )
432 self.send_intent(intent=send_sms_intent)
433 time.sleep(2)
434 self.adb.press('66')
435 return True
437 def receive_sms(self, phone=DEFAULT_NUM, content=DEFAULT_CONTENT):
438 """
439 receive a SMS
440 :param phone: str, phone number of sender
441 :param content: str, content of sms
442 :return:
443 """
444 assert self.telnet is not None
445 return self.telnet.run_cmd("sms send %s '%s'" % (phone, content))
447 def set_gps(self, x, y):
448 """
449 set GPS positioning to x,y
450 :param x: float
451 :param y: float
452 :return:
453 """
454 return self.telnet.run_cmd("geo fix %s %s" % (x, y))
456 def set_continuous_gps(self, center_x, center_y, delta_x, delta_y):
457 import threading
459 gps_thread = threading.Thread(
460 target=self.set_continuous_gps_blocked,
461 args=(center_x, center_y, delta_x, delta_y),
462 )
463 gps_thread.start()
464 return True
466 def set_continuous_gps_blocked(self, center_x, center_y, delta_x, delta_y):
467 """
468 simulate GPS on device via telnet
469 this method is blocked
470 @param center_x: x coordinate of GPS position
471 @param center_y: y coordinate of GPS position
472 @param delta_x: range of x coordinate
473 @param delta_y: range of y coordinate
474 """
475 import random
477 while self.connected:
478 x = random.random() * delta_x * 2 + center_x - delta_x
479 y = random.random() * delta_y * 2 + center_y - delta_y
480 self.set_gps(x, y)
481 time.sleep(3)
483 def get_settings(self):
484 """
485 get device settings via adb
486 """
487 db_name = "/data/data/com.android.providers.settings/databases/settings.db"
489 system_settings = {}
490 out = self.adb.shell("sqlite3 %s \"select * from %s\"" % (db_name, "system"))
491 out_lines = out.splitlines()
492 for line in out_lines:
493 segs = line.split('|')
494 if len(segs) != 3:
495 continue
496 system_settings[segs[1]] = segs[2]
498 secure_settings = {}
499 out = self.adb.shell("sqlite3 %s \"select * from %s\"" % (db_name, "secure"))
500 out_lines = out.splitlines()
501 for line in out_lines:
502 segs = line.split('|')
503 if len(segs) != 3:
504 continue
505 secure_settings[segs[1]] = segs[2]
507 self.settings['system'] = system_settings
508 self.settings['secure'] = secure_settings
509 return self.settings
511 def change_settings(self, table_name, name, value):
512 """
513 dangerous method, by calling this, change settings.db in device
514 be very careful for sql injection
515 :param table_name: table name to work on, usually it is system or secure
516 :param name: settings name to set
517 :param value: settings value to set
518 """
519 db_name = "/data/data/com.android.providers.settings/databases/settings.db"
521 self.adb.shell(
522 "sqlite3 %s \"update '%s' set value='%s' where name='%s'\""
523 % (db_name, table_name, value, name)
524 )
525 return True
527 def send_intent(self, intent):
528 """
529 send an intent to device via am (ActivityManager)
530 :param intent: instance of Intent or str
531 :return:
532 """
533 assert self.adb is not None
534 assert intent is not None
535 if isinstance(intent, Intent):
536 cmd = intent.get_cmd()
537 else:
538 cmd = intent
539 return self.adb.shell(cmd)
541 def send_event(self, event):
542 """
543 send one event to device
544 :param event: the event to be sent
545 :return:
546 """
547 event.send(self)
549 def start_app(self, app):
550 """
551 start an app on the device
552 :param app: instance of App, or str of package name
553 :return:
554 """
555 if isinstance(app, str):
556 package_name = app
557 elif isinstance(app, App):
558 package_name = app.get_package_name()
559 if app.get_main_activity():
560 package_name += "/%s" % app.get_main_activity()
561 else:
562 self.logger.warning("unsupported param " + app + " with type: ", type(app))
563 return
564 intent = Intent(suffix=package_name)
565 self.send_intent(intent)
567 def get_top_activity_name(self):
568 """
569 Get current activity
570 """
571 r = self.adb.shell("dumpsys activity activities")
572 activity_line_re = re.compile(
573 r'\* Hist[ ]+#\d+: ActivityRecord{[^ ]+ [^ ]+ ([^ ]+) t(\d+).*}'
574 )
575 m = activity_line_re.search(r)
576 if m:
577 return m.group(1)
578 # data = self.adb.shell("dumpsys activity top").splitlines()
579 # regex = re.compile("\s*ACTIVITY ([A-Za-z0-9_.]+)/([A-Za-z0-9_.]+)")
580 # m = regex.search(data[1])
581 # if m:
582 # return m.group(1) + "/" + m.group(2)
583 self.logger.warning("Unable to get top activity name.")
584 return None
586 def get_current_activity_stack(self):
587 """
588 Get current activity stack
589 :return: a list of str, each str is an activity name, the first is the top activity name
590 """
591 task_to_activities = self.get_task_activities()
592 top_activity = self.get_top_activity_name()
593 if top_activity:
594 for task_id in task_to_activities:
595 activities = task_to_activities[task_id]
596 if len(activities) > 0 and activities[0] == top_activity:
597 return activities
598 self.logger.warning("Unable to get current activity stack.")
599 return [top_activity]
600 else:
601 return None
603 def get_task_activities(self):
604 """
605 Get current tasks and corresponding activities.
606 :return: a dict mapping each task id to a list of activities, from top to down.
607 """
608 task_to_activities = {}
610 lines = self.adb.shell("dumpsys activity activities").splitlines()
611 activity_line_re = re.compile(
612 r'\* Hist[ ]+#\d+: ActivityRecord{[^ ]+ [^ ]+ ([^ ]+) t(\d+)}'
613 )
615 for line in lines:
616 line = line.strip()
617 if line.startswith("Task id #"):
618 task_id = line[9:]
619 task_to_activities[task_id] = []
620 elif line.startswith("* Hist #") or line.startswith("* Hist #"):
621 m = activity_line_re.match(line)
622 if m:
623 activity = m.group(1)
624 task_id = m.group(2)
625 if task_id not in task_to_activities:
626 task_to_activities[task_id] = []
627 task_to_activities[task_id].append(activity)
629 return task_to_activities
631 def get_service_names(self):
632 """
633 get current running services
634 :return: list of services
635 """
636 services = []
637 dat = self.adb.shell('dumpsys activity services')
638 lines = dat.splitlines()
639 service_re = re.compile('^.+ServiceRecord{.+ ([A-Za-z0-9_.]+)/([A-Za-z0-9_.]+)')
641 for line in lines:
642 m = service_re.search(line)
643 if m:
644 package = m.group(1)
645 service = m.group(2)
646 services.append("%s/%s" % (package, service))
647 return services
649 def get_package_path(self, package_name):
650 """
651 get installation path of a package (app)
652 :param package_name:
653 :return: package path of app in device
654 """
655 dat = self.adb.shell('pm path %s' % package_name)
656 package_path_re = re.compile('^package:(.+)$')
657 m = package_path_re.match(dat)
658 if m:
659 path = m.group(1)
660 return path.strip()
661 return None
663 def start_activity_via_monkey(self, package):
664 """
665 use monkey to start activity
666 @param package: package name of target activity
667 """
668 cmd = 'monkey'
669 if package:
670 cmd += ' -p %s' % package
671 out = self.adb.shell(cmd)
672 if re.search(r"(Error)|(Cannot find 'App')", out, re.IGNORECASE | re.MULTILINE):
673 raise RuntimeError(out)
675 def send_documents(self, app):
676 if not self.send_document:
677 self.logger.info("No document need to be sent")
678 return
680 self.logger.info("Sending documents.")
681 for file in os.listdir(self.resource_path):
682 if "anki" in app.package_name and file == "collection.anki2":
683 self.mkdir("/storage/emulated/0/AnkiDroid/")
684 self.push_file(os.path.join(self.resource_path, file), "/storage/emulated/0/AnkiDroid/")
685 continue
687 if "activitydiary" in app.package_name and file == "ActivityDiary_Export.sqlite3":
688 self.push_file(os.path.join(self.resource_path, file), "/storage/emulated/0/Download/")
689 continue
691 self.push_file(os.path.join(self.resource_path, file), "/sdcard/")
693 def install_app(self, app):
694 """
695 install an app to device
696 @param app: instance of App
697 @return:
698 """
699 assert isinstance(app, App)
700 # subprocess.check_call(["adb", "-s", self.serial, "uninstall", app.get_package_name()],
701 # stdout=subprocess.PIPE, stderr=subprocess.PIPE)
702 package_name = app.get_package_name()
703 if package_name not in self.adb.get_installed_apps():
704 install_cmd = ["adb", "-s", self.serial, "install", "-t", "-r"]
705 if self.grant_perm and self.get_sdk_version() >= 23 and "amaze" not in package_name:
706 print("Granting permissions for app %s" % package_name)
707 install_cmd.append("-g")
708 install_cmd.append(app.app_path)
709 install_p = subprocess.Popen(install_cmd, stdout=subprocess.PIPE)
710 while self.connected and package_name not in self.adb.get_installed_apps():
711 self.logger.info("Please wait while installing the app...")
712 time.sleep(2)
713 if not self.connected:
714 install_p.terminate()
715 return
717 dumpsys_p = subprocess.Popen(
718 ["adb", "-s", self.serial, "shell", "dumpsys", "package", package_name],
719 stdout=subprocess.PIPE,
720 )
721 dumpsys_lines = []
722 while True:
723 line = dumpsys_p.stdout.readline()
724 if not line:
725 break
726 if not isinstance(line, str):
727 line = line.decode()
728 dumpsys_lines.append(line)
729 if self.output_dir is not None:
730 package_info_file_name = "%s/dumpsys_package_%s.txt" % (
731 self.output_dir,
732 app.get_package_name(),
733 )
734 package_info_file = open(package_info_file_name, "w")
735 package_info_file.writelines(dumpsys_lines)
736 package_info_file.close()
737 # app.dumpsys_main_activity = self.__parse_main_activity_from_dumpsys_lines(
738 # dumpsys_lines
739 # )
741 self.logger.info("App installed: %s" % package_name)
742 self.logger.info("Main activity: %s" % app.get_main_activity())
744 @staticmethod
745 def __parse_main_activity_from_dumpsys_lines(lines):
746 main_activity = None
747 activity_line_re = re.compile("[^ ]+ ([^ ]+)/([^ ]+) filter [^ ]+")
748 action_re = re.compile("Action: \"([^ ]+)\"")
749 category_re = re.compile("Category: \"([^ ]+)\"")
751 activities = {}
753 cur_package = None
754 cur_activity = None
755 cur_actions = []
756 cur_categories = []
758 for line in lines:
759 line = line.strip()
760 m = activity_line_re.match(line)
761 if m:
762 activities[cur_activity] = {
763 "actions": cur_actions,
764 "categories": cur_categories,
765 }
766 cur_package = m.group(1)
767 cur_activity = m.group(2)
768 if cur_activity.startswith("."):
769 cur_activity = cur_package + cur_activity
770 cur_actions = []
771 cur_categories = []
772 else:
773 m1 = action_re.match(line)
774 if m1:
775 cur_actions.append(m1.group(1))
776 else:
777 m2 = category_re.match(line)
778 if m2:
779 cur_categories.append(m2.group(1))
781 if cur_activity is not None:
782 activities[cur_activity] = {
783 "actions": cur_actions,
784 "categories": cur_categories,
785 }
787 for activity in activities:
788 if (
789 "android.intent.action.MAIN" in activities[activity]["actions"]
790 and "android.intent.category.LAUNCHER"
791 in activities[activity]["categories"]
792 ):
793 main_activity = activity
794 return main_activity
796 def uninstall_app(self, app):
797 """
798 Uninstall an app from device.
799 :param app: an instance of App or a package name
800 """
801 if isinstance(app, App):
802 package_name = app.get_package_name()
803 # Don't uninstall the app if launch with package name
804 # if app.settings.is_package:
805 # return
806 else:
807 package_name = app
808 if package_name in self.adb.get_installed_apps():
809 uninstall_cmd = ["adb", "-s", self.serial, "uninstall", package_name]
810 uninstall_p = subprocess.Popen(uninstall_cmd, stdout=subprocess.PIPE)
811 while package_name in self.adb.get_installed_apps():
812 self.logger.info("Please wait while uninstalling the app...")
813 time.sleep(2)
814 uninstall_p.terminate()
816 def get_app_pid(self, app):
817 if isinstance(app, App):
818 package = app.get_package_name()
819 else:
820 package = app
822 name2pid = {}
823 ps_out = self.adb.shell(["ps"])
824 ps_out_lines = ps_out.splitlines()
825 ps_out_head = ps_out_lines[0].split()
826 if ps_out_head[1] != "PID" or ps_out_head[-1] != "NAME":
827 self.logger.warning("ps command output format error: %s" % ps_out_head)
828 for ps_out_line in ps_out_lines[1:]:
829 segs = ps_out_line.split()
830 if len(segs) < 4:
831 continue
832 pid = int(segs[1])
833 name = segs[-1]
834 name2pid[name] = pid
836 if package in name2pid:
837 return name2pid[package]
839 possible_pids = []
840 for name in name2pid:
841 if name.startswith(package):
842 possible_pids.append(name2pid[name])
843 if len(possible_pids) > 0:
844 return min(possible_pids)
846 return None
848 def push_file(self, local_file, remote_dir="/sdcard/"):
849 """
850 push file/directory to target_dir
851 :param local_file: path to file/directory in host machine
852 :param remote_dir: path to target directory in device
853 :return:
854 """
855 if not os.path.exists(local_file):
856 self.logger.warning("push_file file does not exist: %s" % local_file)
857 self.adb.run_cmd(["push", local_file, remote_dir])
859 def pull_file(self, remote_file, local_file):
860 self.adb.run_cmd(["pull", remote_file, local_file], disable_log=True)
862 def mkdir(self, path):
863 self.adb.run_cmd(["shell", "mkdir", path])
865 def save_screenshot_for_report(self, event_name=None, event=None, current_state=None):
866 """
867 save screenshot for report, save to "all_states" dir
868 """
870 self.cur_event_count += 1
871 if current_state is None:
872 self.current_state = self.get_current_state()
873 else:
874 self.current_state = current_state
875 self.save_to_all_states_dir(self.screenshot_path, event_name=event_name, event=event)
877 def draw_event(self, event, event_name, screenshot_path):
878 import cv2
879 image = cv2.imread(screenshot_path)
880 if event is not None and screenshot_path is not None:
881 if isinstance(event, InputEvent):
882 if isinstance(event, TouchEvent):
883 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])),
884 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (0, 0, 255), 5)
885 elif isinstance(event, LongTouchEvent):
886 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])),
887 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (0, 255, 0), 5)
888 elif isinstance(event, SetTextEvent):
889 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])),
890 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (255, 0, 0), 5)
891 elif isinstance(event, ScrollEvent):
892 cv2.rectangle(image, (int(event.view['bounds'][0][0]), int(event.view['bounds'][0][1])),
893 (int(event.view['bounds'][1][0]), int(event.view['bounds'][1][1])), (255, 255, 0), 5)
894 elif isinstance(event, KeyEvent):
895 cv2.putText(image, event.name, (100, 300), cv2.FONT_HERSHEY_SIMPLEX, 5, (0, 255, 0), 3, cv2.LINE_AA)
896 else:
897 return
898 else:
899 if event_name == "click":
900 cv2.rectangle(image, (int(event.info['bounds']['left']), int(event.info['bounds']['top'])),
901 (int(event.info['bounds']['right']), int(event.info['bounds']['bottom'])),
902 (0, 0, 255), 5)
903 elif event_name == "long_click":
904 cv2.rectangle(image, (int(event.info['bounds']['left']), int(event.info['bounds']['top'])),
905 (int(event.info['bounds']['right']), int(event.info['bounds']['bottom'])),
906 (0, 255, 0), 5)
907 elif event_name == "set_text":
908 cv2.rectangle(image, (int(event.info['bounds']['left']), int(event.info['bounds']['top'])),
909 (int(event.info['bounds']['right']), int(event.info['bounds']['bottom'])),
910 (255, 0, 0), 5)
911 elif event_name == "press":
912 cv2.putText(image, event, (100, 300), cv2.FONT_HERSHEY_SIMPLEX, 5, (0, 255, 0), 3, cv2.LINE_AA)
913 else:
914 return
915 try:
916 cv2.imwrite(screenshot_path, image)
917 except Exception as e:
918 self.logger.warning(e)
920 def take_screenshot(self):
921 if self.output_dir is None:
922 return None
924 from datetime import datetime
926 tag = datetime.now().strftime("%Y-%m-%d_%H%M%S")
927 local_image_dir = os.path.join(self.output_dir, "temp")
928 if not os.path.exists(local_image_dir):
929 os.makedirs(local_image_dir)
931 if self.adapters[self.minicap] and self.minicap.last_screen:
932 # minicap use jpg format
933 local_image_path = os.path.join(local_image_dir, "screen_%s.jpg" % tag)
934 with open(local_image_path, 'wb') as local_image_file:
935 local_image_file.write(self.minicap.last_screen)
936 return local_image_path
937 else:
938 # screencap use png format
939 local_image_path = os.path.join(local_image_dir, "screen_%s.png" % tag)
940 remote_image_path = "/sdcard/screen_%s.png" % tag
941 self.adb.shell("screencap -p %s" % remote_image_path)
942 self.pull_file(remote_image_path, local_image_path)
943 self.adb.shell("rm %s" % remote_image_path)
945 return local_image_path
947 def save_to_all_states_dir(self, local_image_path, event, event_name=None):
948 import shutil
949 all_states_dir = os.path.join(self.output_dir, "all_states")
950 if not os.path.exists(all_states_dir):
951 os.makedirs(all_states_dir)
953 json_dir = os.path.join(self.output_dir, "report_screenshot.json")
954 if not self.is_harmonyos:
955 if self.adapters[self.minicap]:
956 dest_screenshot_path = "%s/screen_%s.jpg" % (all_states_dir, self.cur_event_count)
957 else:
958 dest_screenshot_path = "%s/screen_%s.png" % (all_states_dir, self.cur_event_count)
959 else:
960 dest_screenshot_path = "%s/screen_%s.jpeg" % (all_states_dir, self.cur_event_count)
962 if self.current_state is not None:
963 dest_state_json_path = "%s/state_%s.json" % (all_states_dir, self.cur_event_count)
964 # self.current_state.screenshot_path = dest_screenshot_path
965 state_json_file = open(dest_state_json_path, "w")
966 state_json_file.write(self.current_state.to_json())
967 state_json_file.close()
969 try:
970 with open(json_dir, 'r') as json_file:
971 report_screens = json.load(json_file)
972 except FileNotFoundError:
973 report_screens = []
974 if event_name is None:
975 event_name = event.get_event_name()
977 img_file_name = os.path.basename(dest_screenshot_path)
979 report_screen = {
980 "event": event_name,
981 "event_index": str(self.cur_event_count),
982 "screen_shoot": img_file_name
983 }
985 report_screens.append(report_screen)
986 with open(json_dir, 'w') as json_file:
987 json.dump(report_screens, json_file, indent=4)
988 if self.current_state is not None and local_image_path != dest_screenshot_path:
989 self.current_state.screenshot_path = dest_screenshot_path
990 shutil.move(local_image_path, dest_screenshot_path)
992 self.draw_event(event, event_name, dest_screenshot_path)
994 def get_current_state(self):
995 self.logger.debug("getting current device state...")
996 current_state = None
997 try:
998 views = self.get_views()
999 foreground_activity = self.get_top_activity_name()
1000 activity_stack = self.get_current_activity_stack()
1001 background_services = self.get_service_names()
1002 screenshot_path = self.take_screenshot()
1003 self.screenshot_path = screenshot_path
1004 self.logger.debug("finish getting current device state...")
1005 from .device_state import DeviceState
1007 current_state = DeviceState(
1008 self,
1009 views=views,
1010 foreground_activity=foreground_activity,
1011 activity_stack=activity_stack,
1012 background_services=background_services,
1013 screenshot_path=screenshot_path,
1014 tag=self.cur_event_count
1015 )
1016 except Exception as e:
1017 self.logger.warning("exception in get_current_state: %s" % e)
1018 import traceback
1020 traceback.print_exc()
1021 self.logger.debug("finish getting current device state...")
1022 self.last_know_state = current_state
1023 if not current_state:
1024 self.logger.warning("Failed to get current state!")
1025 return current_state
1027 def get_last_known_state(self):
1028 return self.last_know_state
1030 def view_touch(self, x, y):
1031 self.adb.touch(x, y)
1033 def view_long_touch(self, x, y, duration=2000):
1034 """
1035 Long touches at (x, y)
1036 @param duration: duration in ms
1037 This workaround was suggested by U{HaMi<http://stackoverflow.com/users/2571957/hami>}
1038 """
1039 self.adb.long_touch(x, y, duration)
1041 def view_drag(self, start_xy, end_xy, duration):
1042 """
1043 Sends drag event n PX (actually it's using C{input swipe} command.
1044 """
1045 self.adb.drag(start_xy, end_xy, duration)
1047 def view_append_text(self, text):
1048 try:
1049 self.u2.send_keys(text=text, clear=False)
1050 except:
1051 self.adb.type(text)
1053 def view_set_text(self, text):
1054 try:
1055 self.u2.send_keys(text=text, clear=True)
1056 except:
1057 self.logger.warning(
1058 "`adb shell input text` doesn't support setting text, appending instead."
1059 )
1060 self.adb.type(text)
1062 def key_press(self, key_code):
1063 self.adb.press(key_code)
1065 def shutdown(self):
1066 self.adb.shell("reboot -p")
1068 def get_views(self):
1069 if self.cv_mode and self.adapters[self.minicap]:
1070 # Get views using cv module
1071 views = self.minicap.get_views()
1072 if views:
1073 return views
1074 else:
1075 self.logger.warning("Failed to get views using OpenCV.")
1076 # if self.droidbot_app and self.adapters[self.droidbot_app]:
1077 # views = self.droidbot_app.get_views()
1078 # if views:
1079 # return views
1080 # else:
1081 # self.logger.warning("Failed to get views using Accessibility.")
1082 if self.uiautomator_helper:
1083 views = self.uiautomator_helper.get_views()
1084 if views:
1085 return views
1086 else:
1087 self.logger.warning("Failed to get views using UiAutomator.")
1089 self.logger.warning("failed to get current views!")
1090 return None
1092 def get_random_port(self):
1093 """
1094 get a random port on host machine to establish connection
1095 :return: a port number
1096 """
1097 import socket
1099 temp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
1100 temp_sock.bind(("", 0))
1101 port = temp_sock.getsockname()[1]
1102 temp_sock.close()
1103 if port in self.__used_ports:
1104 return self.get_random_port()
1105 self.__used_ports.append(port)
1106 return port
1108 def handle_rotation(self):
1109 if not self.adapters[self.minicap]:
1110 return
1111 self.pause_sending_event = True
1112 if self.minicap.check_connectivity():
1113 self.minicap.disconnect()
1114 self.minicap.connect()
1116 if self.minicap.check_connectivity():
1117 print("[CONNECTION] %s is reconnected." % self.minicap.__class__.__name__)
1118 self.pause_sending_event = False
1120 def get_activity_short_name(self):
1121 return self.get_top_activity_name().split(".")[-1]
1123 def rotate_device_right(self):
1124 self.adb.disable_auto_rotation()
1125 time.sleep(1)
1126 self.adb.rotate_right()
1128 def rotate_device_neutral(self):
1129 self.adb.disable_auto_rotation()
1130 time.sleep(1)
1131 self.adb.rotate_neutral()
1133 # clear the app data (including user data and cache)
1134 def clear_data(self, package_name):
1135 """
1136 clear the app data (including user data and cache)
1137 :param package_name: the app package name
1138 """
1139 self.adb.clear_app_data(package_name)