Bitcraze官方提供了名为cflib的Python库来对无人机进行操作,下面介绍一些诸如飞行控制、获取log信息等基本操作的实现方法。

连接Crazyflie

我们通过URI*(Uniform Resource Identifier)*来标识Crazyflie无人机,其中URI的格式为:InterfaceType://InterfaceId/InterfaceChannel/InterfaceSpeed

例如:URI为radio://0/3/2M,其中:

  1. radio标识通过无线信号的方式(PA)来连接;
  2. 0标识是在电脑上标识为0号的无线设备;
  3. 3标识该设备在该连接设备下的频道;
  4. 2M标识无线信号的频率。

想要连接无人机的话,首先要初始化驱动(USB或者PA)

cflib.crtp.init_drivers()

驱动初始化后便可以对每个接口进行扫描

available = cflib.crtp.scan_interfaces()

扫描操作会使用当前存在的连接设备(USB或者PA)来对可以通过该设备访问到的无人机进行扫描并以List的形式返回。

下面的操作用来连接第一个扫描到的无人机:

import cflib.crtp
from cflib.crazyflie import Crazyflie


def connected_callback(link_uri):
    print("连接到了无人机")


if __name__ == '__main__':
    # 初始化驱动
    cflib.crtp.init_drivers()
    available = cflib.crtp.scan_interfaces()
    FIRST_LINK_URL = ""
    if len(available) > 0:
        FIRST_LINK_URL = available[0][0]
    else:
        print("没有扫描到无人机")
    cf = Crazyflie()
    cf.open_link(FIRST_LINK_URL)
    cf.connected.add_callback(connected_callback)

上面的示例中,我们为无人机对象设置了连接时触发的回调函数。不止是连接,cflib库还提供了多种情况下的回调函数设置,覆盖了无人机通讯接口(USB或PA)生命周期内的方方面面,比如disconnectedconnection_lostcf.connection_requested

cflib库还提供了SyncCrazyflie包装类,该类包装*(Wrapped)*了普通的Crazyflie类,提供了同步的Crazyflie无人机对象用以操控。其对象在__enter____exit__时默认调用了open_linkclose_link函数,因此无需手动控制连接与释放连接。下面是一个示例:

import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie


def connected_callback(link_uri):
    print("连接到了无人机")


if __name__ == '__main__':
    # 初始化驱动
    cflib.crtp.init_drivers()
    # 扫描无人机
    available = cflib.crtp.scan_interfaces()
    # 获取并初始化第一个扫描到的无人机对象
    FIRST_LINK_URL = ""
    if len(available) > 0:
        FIRST_LINK_URL = available[0][0]
    else:
        print("没有扫描到无人机")

    cf = Crazyflie(rw_cache="./cache")
    cf.connected.add_callback(connected_callback)

    # 使用SyncCrazyflie来包装上面初始化的Crazyflie对象
    with SyncCrazyflie(FIRST_LINK_URL, cf=cf) as scf:
        print("do something......")

控制Crazyflie

Crazyflie的控制一般通过Motion Commander**来实现。该控制类提供了一些基本的操作,诸如前进、后退、旋转等。通过下面的例子及对应的注释可了解其大致用法:

import logging
import time

import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.positioning.motion_commander import MotionCommander

# 无人机的地址,根据需求自行更改
URI = 'radio://0/3/2M'

# 设置只输出log框架本身的信息
logging.basicConfig(level=logging.ERROR)


if __name__ == '__main__':
    # 初始化底层驱动
    cflib.crtp.init_drivers(enable_debug_driver=False)

    with SyncCrazyflie(URI, cf=Crazyflie(rw_cache='./cache')) as scf:
        # 当MotionCommander对象被创建后,其类内部的__enter__函数被触发。
        # 该函数调用了takeoff()函数,因此无需手动起飞,降落时也同理,其触发了__exit__函数(内部调用了land()降落函数)。
        with MotionCommander(scf) as mc:
            time.sleep(1)

            # MotionCommand封装了一些函数来供我们以米为单位来控制无人机的移动
            # 我们可以控制无人机朝着前后、上下、左右等方向来移动
            mc.forward(0.5)
            mc.back(0.5)
            time.sleep(1)

            mc.up(0.5)
            mc.down(0.5)
            time.sleep(1)

            # 移动的同时,我们也可以设置速度大小
            mc.right(0.5, velocity=0.8)
            time.sleep(1)
            mc.left(0.5, velocity=0.4)
            time.sleep(1)

            # 我们同样可以控制无人机进行旋转
            mc.circle_right(0.5, velocity=0.5, angle_degrees=180)

            # 向左掉头,本质上是封装了向左旋转的函数
            mc.turn_left(90)
            time.sleep(1)

            # 在三维空间中沿着一条线来移动
            mc.move_distance(-1, 0.0, 0.5, velocity=0.6)
            time.sleep(1)

            # MotionCommand也提供了持续移动的函数供使用,比如说开始向左保持0.5m的速度移动直到收到下个指令
            mc.start_left(velocity=0.5)
            # 该动作开始后我们可以做其他事情
            for _ in range(5):
                print('做一些其他事情')
                time.sleep(0.2)

            # 当然了,我们也可以发送命令来停止移动
            mc.stop()

            # With是Python的语法糖,在 `With` 这个 `Scope` 结束的时候,我们新建的MotionCommander对象的生命周期结束了。
            # 这个时候会触发__exit__(会调用land()函数),无人机会自行降落。

获取与设置Log参数

日志参数可以分为两种,一种是无人机本身自带的参数,另一种是我们在固件中自行设置的参数。这些参数在设置和获取时遵循一个基本格式,即group.name的形式。

在连接无人机的时候,TOC(Table Of Content)信息会被下载到本地,每次我们获取TOC里面的参数,往往通过group.name来找到对应参数并对无人机配置相应的LogConfig来实现。

下面是官方提供的获取Log使用的示例:

"""
Simple example that connects to the first Crazyflie found, logs the Stabilizer
and prints it to the console. After 10s the application disconnects and exits.
"""
import logging
import time
from threading import Timer

import cflib.crtp  # noqa
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig

# Only output errors from the logging framework
logging.basicConfig(level=logging.ERROR)


class LoggingExample:
    """
    Simple logging example class that logs the Stabilizer from a supplied
    link uri and disconnects after 5s.
    """

    def __init__(self, link_uri):
        """ Initialize and run the example with the specified link_uri """

        self._cf = Crazyflie(rw_cache='./cache')

        # Connect some callbacks from the Crazyflie API
        self._cf.connected.add_callback(self._connected)
        self._cf.disconnected.add_callback(self._disconnected)
        self._cf.connection_failed.add_callback(self._connection_failed)
        self._cf.connection_lost.add_callback(self._connection_lost)

        print('Connecting to %s' % link_uri)

        # Try to connect to the Crazyflie
        self._cf.open_link(link_uri)

        # Variable used to keep main loop occupied until disconnect
        self.is_connected = True

    def _connected(self, link_uri):
        """ This callback is called form the Crazyflie API when a Crazyflie
        has been connected and the TOCs have been downloaded."""
        print('Connected to %s' % link_uri)

        # The definition of the logconfig can be made before connecting
        self._lg_stab = LogConfig(name='Stabilizer', period_in_ms=10)
        self._lg_stab.add_variable('stabilizer.roll', 'float')
        self._lg_stab.add_variable('stabilizer.pitch', 'float')
        self._lg_stab.add_variable('stabilizer.yaw', 'float')

        # Adding the configuration cannot be done until a Crazyflie is
        # connected, since we need to check that the variables we
        # would like to log are in the TOC.
        try:
            self._cf.log.add_config(self._lg_stab)
            # This callback will receive the data
            self._lg_stab.data_received_cb.add_callback(self._stab_log_data)
            # This callback will be called on errors
            self._lg_stab.error_cb.add_callback(self._stab_log_error)
            # Start the logging
            self._lg_stab.start()
        except KeyError as e:
            print('Could not start log configuration,'
                  '{} not found in TOC'.format(str(e)))
        except AttributeError:
            print('Could not add Stabilizer log config, bad configuration.')

        # Start a timer to disconnect in 10s
        t = Timer(5, self._cf.close_link)
        t.start()

    def _stab_log_error(self, logconf, msg):
        """Callback from the log API when an error occurs"""
        print('Error when logging %s: %s' % (logconf.name, msg))

    def _stab_log_data(self, timestamp, data, logconf):
        """Callback from a the log API when data arrives"""
        print('[%d][%s]: %s' % (timestamp, logconf.name, data))

    def _connection_failed(self, link_uri, msg):
        """Callback when connection initial connection fails (i.e no Crazyflie
        at the specified address)"""
        print('Connection to %s failed: %s' % (link_uri, msg))
        self.is_connected = False

    def _connection_lost(self, link_uri, msg):
        """Callback when disconnected after a connection has been made (i.e
        Crazyflie moves out of range)"""
        print('Connection to %s lost: %s' % (link_uri, msg))

    def _disconnected(self, link_uri):
        """Callback when the Crazyflie is disconnected (called in all cases)"""
        print('Disconnected from %s' % link_uri)
        self.is_connected = False


if __name__ == '__main__':
    cflib.crtp.init_drivers(enable_debug_driver=False)

    print('Scanning interfaces for Crazyflies...')
    available = cflib.crtp.scan_interfaces()
    print('Crazyflies found:')
    for i in available:
        print(i[0])

    if len(available) > 0:
        le = LoggingExample(available[0][0])
    else:
        print('No Crazyflies found, cannot run example')

    while le.is_connected:
        time.sleep(1)

cflib还提供了SyncLogger用于同步读取log信息:

import logging
import time

import cflib.crtp
from cflib.crazyflie import Crazyflie
from cflib.crazyflie.log import LogConfig
from cflib.crazyflie.syncCrazyflie import SyncCrazyflie
from cflib.crazyflie.syncLogger import SyncLogger


logging.basicConfig(level=logging.ERROR)


if __name__ == '__main__':
    cflib.crtp.init_drivers(enable_debug_driver=False)
    
    print('Scanning interfaces for Crazyflies...')
    available = cflib.crtp.scan_interfaces()
    print('Crazyflies found:')
    for i in available:
        print(i[0])

    if len(available) == 0:
        print('No Crazyflies found, cannot run example')
    else:
        lg_stab = LogConfig(name='Stabilizer', period_in_ms=10)
        lg_stab.add_variable('stabilizer.roll', 'float')
        lg_stab.add_variable('stabilizer.pitch', 'float')
        lg_stab.add_variable('stabilizer.yaw', 'float')

        cf = Crazyflie(rw_cache='./cache')
        with SyncCrazyflie(available[0][0], cf=cf) as scf:
            # Note: it is possible to add more than one log config using an
            # array.
            # with SyncLogger(scf, [lg_stab, other_conf]) as logger:
            with SyncLogger(scf, lg_stab) as logger:
                endTime = time.time() + 10

                for log_entry in logger:
                    timestamp = log_entry[0]
                    data = log_entry[1]
                    logconf_name = log_entry[2]

                    print('[%d][%s]: %s' % (timestamp, logconf_name, data))

                    if time.time() > endTime:
                        break

如果想要自定义Log参数的话,可以通过引入官方提供的C语言库(log.h)文件,调用相关函数来实现,具体细节可参考log.h源代码。下面是一个简单的自定义Log示例:

LOG_GROUP_START(MyLog)
        LOG_ADD(LOG_INT16, logVar1, logVar1Addr)
        LOG_ADD(LOG_INT16, logVar2, logVar2Addr)
        LOG_ADD(LOG_INT16, logVar3, logVar3Addr)
LOG_GROUP_STOP(MyLog)