Bitcraze 官方提供了名为cflib
的 Python 库来对无人机进行操作,下面介绍一些诸如飞行控制、获取 log 信息等基本操作的实现方法。
连接 Crazyflie
我们通过 URI*(Uniform Resource Identifier)*来标识 Crazyflie 无人机,其中 URI 的格式为:InterfaceType://InterfaceId/InterfaceChannel/InterfaceSpeed
例如:URI 为radio://0/3/2M
,其中:
radio
标识通过无线信号的方式(PA)来连接;0
标识是在电脑上标识为0
号的无线设备;3
标识该设备在该连接设备下的频道;2M
标识无线信号的频率。
想要连接无人机的话,首先要初始化驱动(USB 或者 PA)
cflib.crtp.init_drivers()
驱动初始化后便可以对每个接口进行扫描
available = cflib.crtp.scan_interfaces()
扫描操作会使用当前存在的连接设备(USB 或者 PA)来对可以通过该设备访问到的无人机进行扫描并以List
的形式返回。
下面的操作用来连接第一个扫描到的无人机:
import cflib.crtp
from cflib.crazyflie import Crazyflie
def connected_callback(link_uri):
print("连接到了无人机")
if __name__ == '__main__':
# 初始化驱动
cflib.crtp.init_drivers()
available = cflib.crtp.scan_interfaces()
FIRST_LINK_URL = ""
if len(available) > 0:
FIRST_LINK_URL = available[0][0]
else:
print("没有扫描到无人机")
cf = Crazyflie()
cf.open_link(FIRST_LINK_URL)
cf.connected.add_callback(connected_callback)
上面的示例中,我们为无人机对象设置了连接时触发的回调函数。不止是连接,cflib
库还提供了多种情况下的回调函数设置,覆盖了无人机通讯接口(USB 或 PA)生命周期内的方方面面,比如disconnected
、connection_lost
、cf.connection_requested
。
cflib
库还提供了SyncCrazyflie
包装类,该类包装*(Wrapped)*了普通的Crazyflie
类,提供了同步的 Crazyflie 无人机对象用以操控。其对象在__enter__
和__exit__
时默认调用了open_link
和close_link
函数,因此无需手动控制连接与释放连接。下面是一个示例:
import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
def connected_callback(link_uri):
print("连接到了无人机")
if __name__ == '__main__':
# 初始化驱动
cflib.crtp.init_drivers()
# 扫描无人机
available = cflib.crtp.scan_interfaces()
# 获取并初始化第一个扫描到的无人机对象
FIRST_LINK_URL = ""
if len(available) > 0:
FIRST_LINK_URL = available[0][0]
else:
print("没有扫描到无人机")
cf = Crazyflie(rw_cache="./cache")
cf.connected.add_callback(connected_callback)
# 使用 SyncCrazyflie 来包装上面初始化的 Crazyflie 对象
with SyncCrazyflie(FIRST_LINK_URL, cf=cf) as scf:
print("do something......")
控制 Crazyflie
Crazyflie 的控制一般通过 Motion Commander **来实现。该控制类提供了一些基本的操作,诸如前进、后退、旋转等。通过下面的例子及对应的注释可了解其大致用法:
import logging
import time
import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.positioning.motion_commander import MotionCommander
# 无人机的地址,根据需求自行更改
URI = 'radio://0/3/2M'
# 设置只输出 log 框架本身的信息
logging.basicConfig(level=logging.ERROR)
if __name__ == '__main__':
# 初始化底层驱动
cflib.crtp.init_drivers(enable_debug_driver=False)
with SyncCrazyflie(URI, cf=Crazyflie(rw_cache='./cache')) as scf:
# 当 MotionCommander 对象被创建后,其类内部的__enter__函数被触发。
# 该函数调用了 takeoff() 函数,因此无需手动起飞,降落时也同理,其触发了__exit__函数(内部调用了 land() 降落函数)。
with MotionCommander(scf) as mc:
time.sleep(1)
# MotionCommand 封装了一些函数来供我们以米为单位来控制无人机的移动
# 我们可以控制无人机朝着前后、上下、左右等方向来移动
mc.forward(0.5)
mc.back(0.5)
time.sleep(1)
mc.up(0.5)
mc.down(0.5)
time.sleep(1)
# 移动的同时,我们也可以设置速度大小
mc.right(0.5, velocity=0.8)
time.sleep(1)
mc.left(0.5, velocity=0.4)
time.sleep(1)
# 我们同样可以控制无人机进行旋转
mc.circle_right(0.5, velocity=0.5, angle_degrees=180)
# 向左掉头,本质上是封装了向左旋转的函数
mc.turn_left(90)
time.sleep(1)
# 在三维空间中沿着一条线来移动
mc.move_distance(-1, 0.0, 0.5, velocity=0.6)
time.sleep(1)
# MotionCommand 也提供了持续移动的函数供使用,比如说开始向左保持 0.5m 的速度移动直到收到下个指令
mc.start_left(velocity=0.5)
# 该动作开始后我们可以做其他事情
for _ in range(5):
print('做一些其他事情')
time.sleep(0.2)
# 当然了,我们也可以发送命令来停止移动
mc.stop()
# With 是 Python 的语法糖,在 `With` 这个 `Scope` 结束的时候,我们新建的 MotionCommander 对象的生命周期结束了。
# 这个时候会触发__exit__(会调用 land() 函数),无人机会自行降落。
获取与设置 Log 参数
日志参数可以分为两种,一种是无人机本身自带的参数,另一种是我们在固件中自行设置的参数。这些参数在设置和获取时遵循一个基本格式,即group.name
的形式。
在连接无人机的时候,TOC(Table Of Content)信息会被下载到本地,每次我们获取 TOC 里面的参数,往往通过group.name
来找到对应参数并对无人机配置相应的LogConfig
来实现。
下面是官方提供的获取 Log 使用的示例:
"""
Simple example that connects to the first Crazyflie found, logs the Stabilizer
and prints it to the console. After 10s the application disconnects and exits.
"""
import logging
import time
from threading import Timer
import cflib.crtp # noqa
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
# Only output errors from the logging framework
logging.basicConfig(level=logging.ERROR)
class LoggingExample:
"""
Simple logging example class that logs the Stabilizer from a supplied
link uri and disconnects after 5s.
"""
def __init__(self, link_uri):
""" Initialize and run the example with the specified link_uri """
self._cf = Crazyflie(rw_cache='./cache')
# Connect some callbacks from the Crazyflie API
self._cf.connected.add_callback(self._connected)
self._cf.disconnected.add_callback(self._disconnected)
self._cf.connection_failed.add_callback(self._connection_failed)
self._cf.connection_lost.add_callback(self._connection_lost)
print('Connecting to %s' % link_uri)
# Try to connect to the Crazyflie
self._cf.open_link(link_uri)
# Variable used to keep main loop occupied until disconnect
self.is_connected = True
def _connected(self, link_uri):
""" This callback is called form the Crazyflie API when a Crazyflie
has been connected and the TOCs have been downloaded."""
print('Connected to %s' % link_uri)
# The definition of the logconfig can be made before connecting
self._lg_stab = LogConfig(name='Stabilizer', period_in_ms=10)
self._lg_stab.add_variable('stabilizer.roll', 'float')
self._lg_stab.add_variable('stabilizer.pitch', 'float')
self._lg_stab.add_variable('stabilizer.yaw', 'float')
# Adding the configuration cannot be done until a Crazyflie is
# connected, since we need to check that the variables we
# would like to log are in the TOC.
try:
self._cf.log.add_config(self._lg_stab)
# This callback will receive the data
self._lg_stab.data_received_cb.add_callback(self._stab_log_data)
# This callback will be called on errors
self._lg_stab.error_cb.add_callback(self._stab_log_error)
# Start the logging
self._lg_stab.start()
except KeyError as e:
print('Could not start log configuration,'
'{} not found in TOC'.format(str(e)))
except AttributeError:
print('Could not add Stabilizer log config, bad configuration.')
# Start a timer to disconnect in 10s
t = Timer(5, self._cf.close_link)
t.start()
def _stab_log_error(self, logconf, msg):
"""Callback from the log API when an error occurs"""
print('Error when logging %s: %s' % (logconf.name, msg))
def _stab_log_data(self, timestamp, data, logconf):
"""Callback from a the log API when data arrives"""
print('[%d][%s]: %s' % (timestamp, logconf.name, data))
def _connection_failed(self, link_uri, msg):
"""Callback when connection initial connection fails (i.e no Crazyflie
at the specified address)"""
print('Connection to %s failed: %s' % (link_uri, msg))
self.is_connected = False
def _connection_lost(self, link_uri, msg):
"""Callback when disconnected after a connection has been made (i.e
Crazyflie moves out of range)"""
print('Connection to %s lost: %s' % (link_uri, msg))
def _disconnected(self, link_uri):
"""Callback when the Crazyflie is disconnected (called in all cases)"""
print('Disconnected from %s' % link_uri)
self.is_connected = False
if __name__ == '__main__':
cflib.crtp.init_drivers(enable_debug_driver=False)
print('Scanning interfaces for Crazyflies...')
available = cflib.crtp.scan_interfaces()
print('Crazyflies found:')
for i in available:
print(i[0])
if len(available) > 0:
le = LoggingExample(available[0][0])
else:
print('No Crazyflies found, cannot run example')
while le.is_connected:
time.sleep(1)
cflib
还提供了SyncLogger
用于同步读取 log 信息:
import logging
import time
import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crazyflie.syncLogger import SyncLogger
logging.basicConfig(level=logging.ERROR)
if __name__ == '__main__':
cflib.crtp.init_drivers(enable_debug_driver=False)
print('Scanning interfaces for Crazyflies...')
available = cflib.crtp.scan_interfaces()
print('Crazyflies found:')
for i in available:
print(i[0])
if len(available) == 0:
print('No Crazyflies found, cannot run example')
else:
lg_stab = LogConfig(name='Stabilizer', period_in_ms=10)
lg_stab.add_variable('stabilizer.roll', 'float')
lg_stab.add_variable('stabilizer.pitch', 'float')
lg_stab.add_variable('stabilizer.yaw', 'float')
cf = Crazyflie(rw_cache='./cache')
with SyncCrazyflie(available[0][0], cf=cf) as scf:
# Note: it is possible to add more than one log config using an
# array.
# with SyncLogger(scf, [lg_stab, other_conf]) as logger:
with SyncLogger(scf, lg_stab) as logger:
endTime = time.time() + 10
for log_entry in logger:
timestamp = log_entry[0]
data = log_entry[1]
logconf_name = log_entry[2]
print('[%d][%s]: %s' % (timestamp, logconf_name, data))
if time.time() > endTime:
break
如果想要自定义 Log 参数的话,可以通过引入官方提供的 C 语言库(log.h)文件,调用相关函数来实现,具体细节可参考 log.h 源代码。下面是一个简单的自定义 Log 示例:
LOG_GROUP_START(MyLog)
LOG_ADD(LOG_INT16, logVar1, logVar1Addr)
LOG_ADD(LOG_INT16, logVar2, logVar2Addr)
LOG_ADD(LOG_INT16, logVar3, logVar3Addr)
LOG_GROUP_STOP(MyLog)