Saltar al contenido principal

Control de reSpeaker Flex con Python

Introducción

En esta sección, vamos a hablar sobre cómo controlar tu reSpeaker Flex usando el SDK de Python. Esto hace que el desarrollo sea más conveniente para los usuarios que quieren crear sus propias aplicaciones. Por ejemplo, puedes detectar de dónde viene la voz, realizar detección de actividad de voz (VAD), controlar los LED y mucho más.

SDK de Python

Tenemos una guía completa de Python sobre cómo comunicarse con la Flex usando el firmware USB. Este script de Python se puede ejecutar en tu IDE favorito sin requerir el Flex XVF_Host. Puedes encontrar más comandos en este enlace.

Necesitas instalar la biblioteca pyusb.

Cómo obtener DOA y VAD


import sys
import struct
import argparse
import usb.core
import usb.util
import time

try:
import libusb_package
except ImportError:
libusb_package = None

DEFAULT_VID = 0x2886

# name, resid, cmdid, length, type
PARAMETERS = {
"VERSION": (48, 0, 3, "ro", "uint8"),
"AEC_AZIMUTH_VALUES": (33, 75, 16, "ro", "radians"),
"DOA_VALUE": (20, 18, 4, "ro", "uint16"),
"REBOOT": (48, 7, 1, "wo", "uint8"),
}

class ReSpeaker:
TIMEOUT = 100000

def __init__(self, dev):
self.dev = dev

def write(self, name, data_list):
try:
data = PARAMETERS[name]
except KeyError:
return

if data[3] == "ro":
raise ValueError('{} is read-only'.format(name))

if len(data_list) != data[2]:
raise ValueError('{} value count is not {}'.format(name, data[2]))

windex = data[0] # resid
wvalue = data[1] # cmdid
data_type = data[4] # type
data_cnt = data[2] # cnt
payload = []

if data_type == 'float' or data_type == 'radians':
for i in range(data_cnt):
payload += struct.pack(b'f', float(data_list[i]))
elif data_type == 'char' or data_type == 'uint8':
for i in range(data_cnt):
payload += data_list[i].to_bytes(1, byteorder='little')
else:
for i in range(data_cnt):
payload += struct.pack(b'i', data_list[i])

print("WriteCMD: cmdid: {}, resid: {}, payload: {}".format(wvalue, windex, payload))

self.dev.ctrl_transfer(
usb.util.CTRL_OUT | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
0, wvalue, windex, payload, self.TIMEOUT)


def read(self, name):
try:
data = PARAMETERS[name]
except KeyError:
return

resid = data[0]
cmdid = 0x80 | data[1]
length = data[2] + 1 # 1 byte for status

response = self.dev.ctrl_transfer(
usb.util.CTRL_IN | usb.util.CTRL_TYPE_VENDOR | usb.util.CTRL_RECIPIENT_DEVICE,
0, cmdid, resid, length, self.TIMEOUT)

if data[4] == 'uint8':
result = response.tolist()
elif data[4] == 'radians':
byte_data = response.tobytes()
num_values = ( length - 1 ) / 4
match_str = '<'
for i in range(int(num_values)):
match_str += 'f'
result = struct.unpack(match_str, byte_data[1:length])
elif data[4] == 'uint16':
result = response.tolist()

return result

def close(self):
"""
close the interface
"""
usb.util.dispose_resources(self.dev)


def find(vid=DEFAULT_VID, pid=None):
if sys.platform.startswith('win') and libusb_package is None:
raise RuntimeError("Windows requires libusb-package. Install it with: pip install libusb-package")

usb_find = libusb_package.find if sys.platform.startswith('win') else usb.core.find

if pid is not None:
dev = usb_find(idVendor=vid, idProduct=pid)
if not dev:
return None
return ReSpeaker(dev)

devices = list(usb_find(find_all=True, idVendor=vid) or [])
if not devices:
return None

devices.sort(key=lambda device: getattr(device, 'idProduct', 0))
return ReSpeaker(devices[0])

def main():
parser = argparse.ArgumentParser(description='Read DoA values from ReSpeaker over USB control interface')
parser.add_argument('--vid', type=lambda x: int(x, 0), default=DEFAULT_VID,
help='usb vendor ID (default: 0x2886)')
parser.add_argument('--pid', type=lambda x: int(x, 0), default=None,
help='usb product ID; if omitted, auto-discover by VID for compatibility across firmware PIDs')
parser.add_argument('--interval', type=float, default=1.0,
help='read interval in seconds (default: 1.0)')

args = parser.parse_args()

dev = find(vid=args.vid, pid=args.pid)
if not dev:
if args.pid is None:
print(f'No device found for VID=0x{args.vid:04x}')
else:
print(f'No device found for VID=0x{args.vid:04x}, PID=0x{args.pid:04x}')
sys.exit(1)

print(f'Connected device VID=0x{dev.dev.idVendor:04x}, PID=0x{dev.dev.idProduct:04x}')
print('{}: {}'.format("VERSION", dev.read("VERSION")))
while True:
result = dev.read("DOA_VALUE")
print(result)
print('{}: {}, {}: {}'.format("SPEECH_DETECTED", result[3], "DOA_VALUE", result[1] + result[2] * 256))
time.sleep(args.interval)

dev.close()

if __name__ == '__main__':
main()

Soporte técnico y debate sobre el producto

Gracias por elegir nuestros productos. Estamos aquí para ofrecerte distintos tipos de soporte y asegurarnos de que tu experiencia con nuestros productos sea lo más fluida posible. Ofrecemos varios canales de comunicación para adaptarnos a diferentes preferencias y necesidades.

Loading Comments...