使用 Python 控制 reSpeaker Flex
介绍
在本节中,我们将介绍如何使用 Python SDK 来控制你的 reSpeaker Flex。对于希望构建自己应用的用户而言,这会让开发变得更加便捷。 例如,你可以检测声音来自何处、执行语音活动检测(VAD)、控制 LED 等等。
Python SDK
我们提供了一个完整的 Python 指南,讲解如何使用 USB 固件与 Flex 通信。该 Python 脚本可以在你喜欢的 IDE 中运行,而无需 Flex XVF_Host。你可以从这个链接中找到更多命令。
你需要安装 pyusb 库。
如何获取 DOA 和 VAD
import sys
import struct
import argparse
import usb.core
import usb.util
import time
try:
import libusb_package
except ImportError:
libusb_package = None
DEFAULT_VID = 0x2886
# name, resid, cmdid, length, type
PARAMETERS = {
"VERSION": (48, 0, 3, "ro", "uint8"),
"AEC_AZIMUTH_VALUES": (33, 75, 16, "ro", "radians"),
"DOA_VALUE": (20, 18, 4, "ro", "uint16"),
"REBOOT": (48, 7, 1, "wo", "uint8"),
}
class ReSpeaker:
TIMEOUT = 100000
def __init__(self, dev):
self.dev = dev
def write(self, name, data_list):
try:
data = PARAMETERS[name]
except KeyError:
return
if data[3] == "ro":
raise ValueError('{} is read-only'.format(name))
if len(data_list) != data[2]:
raise ValueError('{} value count is not {}'.format(name, data[2]))
windex = data[0] # resid
wvalue = data[1] # cmdid
data_type = data[4] # type
data_cnt = data[2] # cnt
payload = []
if data_type == 'float' or data_type == 'radians':
for i in range(data_cnt):
payload += struct.pack(b'f', float(data_list[i]))
elif data_type == 'char' or data_type == 'uint8':
for i in range(data_cnt):
payload += data_list[i].to_bytes(1, byteorder='little')
else:
for i in range(data_cnt):
payload += struct.pack(b'i', data_list[i])
print("WriteCMD: cmdid: {}, resid: {}, payload: {}".format(wvalue, windex, payload))
self.dev.ctrl_transfer(
usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
0, wvalue, windex, payload, self.TIMEOUT)
def read(self, name):
try:
data = PARAMETERS[name]
except KeyError:
return
resid = data[0]
cmdid = 0x80 | data[1]
length = data[2] + 1 # 1 byte for status
response = self.dev.ctrl_transfer(
usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
0, cmdid, resid, length, self.TIMEOUT)
if data[4] == 'uint8':
result = response.tolist()
elif data[4] == 'radians':
byte_data = response.tobytes()
num_values = ( length - 1 ) / 4
match_str = '<'
for i in range(int(num_values)):
match_str += 'f'
result = struct.unpack(match_str, byte_data[1:length])
elif data[4] == 'uint16':
result = response.tolist()
return result
def close(self):
"""
close the interface
"""
usb.util.dispose_resources(self.dev)
def find(vid=DEFAULT_VID, pid=None):
if sys.platform.startswith('win') and libusb_package is None:
raise RuntimeError("Windows requires libusb-package. Install it with: pip install libusb-package")
usb_find = libusb_package.find if sys.platform.startswith('win') else usb.core.find
if pid is not None:
dev = usb_find(idVendor=vid, idProduct=pid)
if not dev:
return None
return ReSpeaker(dev)
devices = list(usb_find(find_all=True, idVendor=vid) or [])
if not devices:
return None
devices.sort(key=lambda device: getattr(device, 'idProduct', 0))
return ReSpeaker(devices[0])
def main():
parser = argparse.ArgumentParser(description='Read DoA values from ReSpeaker over USB control interface')
parser.add_argument('--vid', type=lambda x: int(x, 0), default=DEFAULT_VID,
help='usb vendor ID (default: 0x2886)')
parser.add_argument('--pid', type=lambda x: int(x, 0), default=None,
help='usb product ID; if omitted, auto-discover by VID for compatibility across firmware PIDs')
parser.add_argument('--interval', type=float, default=1.0,
help='read interval in seconds (default: 1.0)')
args = parser.parse_args()
dev = find(vid=args.vid, pid=args.pid)
if not dev:
if args.pid is None:
print(f'No device found for VID=0x{args.vid:04x}')
else:
print(f'No device found for VID=0x{args.vid:04x}, PID=0x{args.pid:04x}')
sys.exit(1)
print(f'Connected device VID=0x{dev.dev.idVendor:04x}, PID=0x{dev.dev.idProduct:04x}')
print('{}: {}'.format("VERSION", dev.read("VERSION")))
while True:
result = dev.read("DOA_VALUE")
print(result)
print('{}: {}, {}: {}'.format("SPEECH_DETECTED", result[3], "DOA_VALUE", result[1] + result[2] * 256))
time.sleep(args.interval)
dev.close()
if __name__ == '__main__':
main()
技术支持与产品讨论
感谢你选择我们的产品!我们将为你提供多种支持,以确保你在使用我们产品时的体验尽可能顺畅。我们提供了多种沟通渠道,以满足不同的偏好和需求。