Improve USB device reset logic

BUG=webrtc:7203
NOTRY=True

Review-Url: https://codereview.webrtc.org/2789533002
Cr-Commit-Position: refs/heads/master@{#17656}
This commit is contained in:
jansson 2017-04-11 07:40:26 -07:00 committed by Commit bot
parent b213a16b28
commit 80ff00cd2b
3 changed files with 113 additions and 33 deletions

View file

@ -471,6 +471,7 @@ def _RunPythonTests(input_api, output_api):
test_directories = [
Join('webrtc', 'tools', 'py_event_log_analyzer'),
Join('webrtc', 'tools'),
Join('webrtc', 'audio', 'test', 'unittests'),
] + [
root for root, _, files in os.walk(Join('tools-webrtc'))

View file

@ -7,14 +7,14 @@
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
import glob
import optparse
import os
import shutil
import subprocess
import sys
import time
import glob
import re
import shutil
# Used to time-stamp output files and directories
CURRENT_TIME = time.strftime("%d_%m_%Y-%H:%M:%S")
@ -158,47 +158,67 @@ def CreateRecordingDirs(options):
return record_paths
def RestartMagewellDevices(ref_video_device, test_video_device):
"""Reset the USB ports where Magewell capture devices are connected to.
def FindUsbPortForV4lDevices(ref_video_device, test_video_device):
"""Tries to find the usb port for ref_video_device and test_video_device.
Tries to find the provided ref_video_device and test_video_device devices
which use video4linux and then do a soft reset by using USB unbind and bind.
Args:
ref_device(string): reference recording device path.
test_device(string): test recording device path
Returns:
usb_ports(list): USB ports(string) for the devices found.
"""
# Find the device location including USB and USB Bus ID's. Use the usb1
# in the path since the driver folder is a symlink which contains all the
# usb device port mappings and it's the same in all usbN folders. Tested
# on Ubuntu 14.04.
v4l_device_path = '/sys/bus/usb/devices/usb1/1-1/driver/**/**/video4linux/'
v4l_ref_device = glob.glob('%s%s' % (v4l_device_path, ref_video_device))
v4l_test_device = glob.glob('%s%s' % (v4l_device_path, test_video_device))
usb_ports = []
paths = []
# Split on the driver folder first since we are only interested in the
# folders thereafter.
ref_path = str(v4l_ref_device).split('driver')[1].split('/')
test_path = str(v4l_test_device).split('driver')[1].split('/')
paths.append(ref_path)
paths.append(test_path)
for path in paths:
for usb_id in path:
# Look for : separator and then use the first element in the list.
# E.g 3-3.1:1.0 split on : and [0] becomes 3-3.1 which can be used
# for bind/unbind.
if ':' in usb_id:
usb_ports.append(usb_id.split(':')[0])
return usb_ports
def RestartMagewellDevices(ref_video_device_path, test_video_device_path):
"""Reset the USB ports where Magewell capture devices are connected to.
Performs a soft reset by using USB unbind and bind.
This is due to Magewell capture devices have proven to be unstable after the
first recording attempt.
Args :
ref_video_device(string): reference recording device path.
test_video_device(string): test recording device path
Args:
ref_video_device_path(string): reference recording device path.
test_video_device_path(string): test recording device path
Raises:
MagewellError: If no magewell devices are found.
"""
# Get the dev/videoN device name from the command line arguments.
ref_magewell = ref_video_device.split('/')[2]
test_magewell = test_video_device.split('/')[2]
# Find the device location including USB and USB Bus ID's.
device_string = '/sys/bus/usb/devices/usb*/**/**/video4linux/'
ref_magewell_device = glob.glob('%s%s' % (device_string, ref_magewell))
test_magewell_device = glob.glob('%s%s' % (device_string, test_magewell))
magewell_usb_ports = []
# Figure out the USB bus and port ID for each device.
ref_magewell_path = str(ref_magewell_device).split('/')
for directory in ref_magewell_path:
# Find the folder with pattern "N-N", e.g. "4-3" or \
# "[USB bus ID]-[USB port]"
if re.match(r'^\d-\d$', directory):
magewell_usb_ports.append(directory)
test_magewell_path = str(test_magewell_device).split('/')
for directory in test_magewell_path:
# Find the folder with pattern "N-N", e.g. "4-3" or \
# "[USB bus ID]-[USB port]"
if re.match(r'^\d-\d$', directory):
magewell_usb_ports.append(directory)
ref_magewell_path = ref_video_device_path.split('/')[2]
test_magewell_path = test_video_device_path.split('/')[2]
magewell_usb_ports = FindUsbPortForV4lDevices(ref_magewell_path,
test_magewell_path)
# Abort early if no devices are found.
if len(magewell_usb_ports) == 0:
@ -413,7 +433,6 @@ def CompareVideos(options, cropped_ref_file, cropped_test_file):
crop_height = param.split(':')[1]
compare_cmd = [
sys.executable,
compare_videos_script,
'--ref_video', cropped_ref_file,
'--test_video', cropped_test_file,

View file

@ -0,0 +1,60 @@
#!/usr/bin/env python
# Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
#
# Use of this source code is governed by a BSD-style license
# that can be found in the LICENSE file in the root of the source
# tree. An additional intellectual property rights grant can be found
# in the file PATENTS. All contributing project authors may
# be found in the AUTHORS file in the root of the source tree.
import glob
import unittest
from video_analysis import FindUsbPortForV4lDevices
class RunVideoAnalysisTest(unittest.TestCase):
def setGlobPath(self, path1, path2):
self.path1 = path1
self.path2 = path2
def setUp(self):
self.path1 = ''
self.path2 = ''
self.requestNbr = 1
def glob_mock(string):
# Eat incoming string.
del string
if self.requestNbr == 1:
self.requestNbr += 1
return self.path1
else:
self.requestNbr = 1
return self.path2
# Override the glob function with our own that returns a string set by the
# test.
glob.glob = glob_mock
# Verifies that the correct USB id is returned.
def testFindUSBPortForV4lDevices(self):
short_path1 = ('/sys/bus/usb/devices/usb1/1-1/driver/4-4/4-4:1.0/'
'video4linux/video0')
short_path2 = ('/sys/bus/usb/devices/usb1/1-1/driver/4-3/4-3:1.0/'
'video4linux/video1')
self.setGlobPath(short_path1, short_path2)
short_usb_ids = ['4-4', '4-3']
self.assertEqual(FindUsbPortForV4lDevices('video0', 'video1'),
short_usb_ids)
long_path1 = ('/sys/bus/usb/devices/usb1/1-1/driver/3-3/3-3.1:1.0/'
'video4linux/video0')
long_path2 = ('/sys/bus/usb/devices/usb1/1-1/driver/3-2/3-2.1:1.0/'
'video4linux/video1')
self.setGlobPath(long_path1, long_path2)
long_usb_ids = ['3-3.1', '3-2.1']
self.assertEqual(FindUsbPortForV4lDevices('video0', 'video1'), long_usb_ids)
if __name__ == "__main__":
unittest.main()