Blogress

機械学習関連ばっかり書きます

Pythonのクラス内におけるメソッドについてまとめてみた

目的

備忘録用です。普段Pythonでプログラミングするとき、めんどくさいのであまりクラスを使ってプログラミングしないことが多いです(jupyter notebookのベタ書き脳死コーディングのせい)。
ですが、最近はちょっとこのままでは良くないなと、あまり使う必要ないと思う場面でもわざわざクラスにしてPythonのコードを書いてます。

今回の記事は二番煎じ感満載かもしれませんが、ご了承ください。

メソッド

クラス内のメソッドには以下の3種類があります。

インスタンスメソッド

普通のメソッドがこれです。簡単に例題プログラムを書きます。

class Student:
    
    def __init__(self, name, s_id):
        self.name = name
        self.s_id = s_id
        
    def reply_name(self):
        print("私は" + self.name + "と言います")
        
    def reply_id(self):
        print("私の学籍番号は" + str(self.s_id) + "です")

if __name__ == '__main__':
    st = Student('山田太郎', '20A999')
    st.reply_name()
    st.reply_id()
山田太郎と言います
学籍番号は20A999です

学生を表すStudentクラスを作りました。結果は上記のようになります。
クラス内のインスタンスメソッドであるreply_nameとreply_idはどちらもpublicなメソッドになります。
privateなメソッドも追加してみましょう。

class Student:

    def __init__(self, name, s_id):
        self.name = name
        self.s_id = s_id
        # 成績は順番に「秀、優、良、可、不可」
        self.school_credit = {}

    def reply_name(self):
        print(self.name + "と言います")
        
    def reply_id(self):
        print("学籍番号は" + str(self.s_id) + "です")

    def reply_gpa(self):
        print("GPAは" + str(self.__calc_GPA()) + "です")

    def input_credit(self, credit_dict):
        self.school_credit = credit_dict

    def __calc_GPA(self):
        gpa = 0
        gpa_list = ['不可', '可', '良', '優', '秀']

        # 取得した単位はすべて1とする
        for sc in self.school_credit.values():
            if sc in gpa_list:
                gpa += gpa_list.index(sc)
               
        gpa /= len(self.school_credit)
        
        return gpa

if __name__ == '__main__':
    st = Student('山田太郎', '20A999')
    st.reply_name()
    st.reply_id()
    st.input_credit({'英語':'可', '実験':'優', 'プログラミング':'秀', 'DB':'良'})
    st.reply_gpa()
山田太郎と言います
学籍番号は20A999です
GPAは2.5です

GPAの計算式は雑ですがこんな感じで。計算式間違ってたらごめんなさい。
__calc_GPAメソッドがprivateなので、次のようにするとエラーを吐きます。

if __name__ == '__main__':
    st = Student('山田太郎', '20A999')
    st.reply_name()
    st.reply_id()
    st.input_credit({'英語':'可', '実験':'優', 'プログラミング':'秀', 'DB':'良'})
    print(st.__calc_GPA())
山田太郎と言います
学籍番号は20A999です
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
 in 
      4     st.reply_id()
      5     st.input_credit({'英語':'可', '実験':'優', 'プログラミング':'秀', 'DB':'良'})
----> 6     print(st.__calc_GPA())

AttributeError: 'Student' object has no attribute '__calc_GPA'

privateなので外からアクセスできません。
しかし、実は「インスタンス._クラス名__変数名」でアクセスできます。推奨されないので覚えなくてもいいですが……。ちなみに変数も同様です。

if __name__ == '__main__':
    st = Student('山田太郎', '20A999')
    st.reply_name()
    st.reply_id()
    st.input_credit({'英語':'可', '実験':'優', 'プログラミング':'秀', 'DB':'良'})
    print(st._Student__calc_GPA())
山田太郎と言います
学籍番号は20A999です
2.5

クラスメソッド

インスタンスメソッドとの違い

ここから個人的には本題です。
インスタンスメソッドとの違いを挙げるとすれば、まずはクラスから直接呼び出せることだと思います。メソッドの頭に@classmethodとデコレートし、慣例的に引数をclsにします。selfはインスタンスなのに対し、こちらはクラスを意味します。

class Student:

    def __init__(self, name, s_id):
        self.name = name
        self.s_id = s_id

    def reply_name(self):
        print(self.name + "と言います")
        
    @classmethod
    def reply_greeting(cls):
        print('こんにちは')

if __name__ == '__main__':
    st = Student('山田太郎', '20A999')
    st.reply_greeting()
    Student.reply_greeting()
こんにちは
こんにちは

クラスメソッドの使い所

これだけの例だと、いまいち利点や使い所がわかりません。
例えば外部ファイル(JSONなど)から学生の情報を取得するとします(普通はDBでしょうが)。今までと同様に書けば以下のようになります。

import pandas as pd

class Student:

    def __init__(self, name, s_id):
        self.name = name
        self.s_id = s_id

    def reply_name(self):
        print(self.name + "と言います")

if __name__ == '__main__':
    info_df = pd.read_json('student.json', encoding='UTF-8')
    st = Student(info_df['name'].values[0], info_df['id'].values[0])
    st.reply_name()

関数にするなら以下です。

import pandas as pd

class Student:

    def __init__(self, name, s_id):
        self.name = name
        self.s_id = s_id

    def reply_name(self):
        print(self.name + "と言います")

def get_student():
    info_df = pd.read_json('student.json', encoding='UTF-8')
    return Student(info_df['name'].values[0], info_df['id'].values[0])

if __name__ == '__main__':
    st = get_student()
    st.reply_name()

では、続いてクラスメソッドで書きます。

import pandas as pd

class Student:

    def __init__(self, name, s_id):
        self.name = name
        self.s_id = s_id

    def reply_name(self):
        print(self.name + "と言います")

    @classmethod
    def get_student(cls):
        info_df = pd.read_json('student.json', encoding='UTF-8')
        return cls(info_df['name'].values[0], info_df['id'].values[0])

if __name__ == '__main__':
    st = Student.get_student()
    st.reply_name()

違いはなんとなくわかるかもしれませんが、クラスメソッドの利点がわかりにくいので解説します。
最大のうまみはクラスの中にインスタンスを作るメソッドを書けることです。

関数にした場合とクラスメソッドにした場合を比較するとわかりやすいですが、get_studentをクラスメソッドではStudentクラス内でまとめて管理できます。
MainクラスからStudentクラスをimportするときなど、まとめられている方が使い勝手が良いです。
クラスに依存したメソッドはクラスメソッドで定義するほうが好ましいと思います。

また、実装方法によって一概には言えませんが、クラスメソッドを使わない場合、毎回

info_df = pd.read_json('student.json', encoding='UTF-8')

を書かなければなりません。非常に面倒です。

スタティックメソッド

最後にスタティックメソッドとクラスメソッドの違いをまとめます。
スタティックメソッドもクラスメソッド同様クラスからもインスタンスからも呼び出せます。
しかし、いくつか違いあり、1つ目は@staticmethodでデコレートすること。2つ目は引数になにも受け取らない実装が可能なことです。
インスタンスメソッドではself、クラスメソッドではclsを暗黙的に引数として書く必要がありますが、スタティックメソッドは書く必要はありません。そのため、クラスに依存しないメソッドと明示的に記述することができます。

class Student:

    def __init__(self, name, s_id):
        self.name = name
        self.s_id = s_id

    def reply_name(self):
        print(self.name + "と言います")

    @staticmethod
    def reply_greeting():
        print('こんにちは')

if __name__ == '__main__':
    Student.reply_greeting()
    st = Student('山田太郎', '20E999')
    st.reply_name()
こんにちは
山田太郎と言います

reply_greetingはこんにちはと出力するだけなため、まったくクラスに依存しません。このようなメソッドはクラスメソッドではなく、スタティックメソッドにすることが好ましいです。

しかし、クラスメソッドがあれば、スタティックメソッドがなくても実装上は問題ありません。
最初にクラスメソッドのプログラム例で書いたように、スタティックメソッドを使って書けるコードはクラスメソッドでも書けるからです(reply_greetingの話)。

Pythonにスタティックメソッドが必要なのかという記事もあるため、スタティックメソッドを使うのはある意味自己満足の世界な気もします。

参考文献

【Python】インスタンスメソッド、staticmethod、classmethodの違いと使い方 - Djangoの学習ができるチュートリアルサイトDjangoBrothers

[Python] クラスメソッド・スタティックメソッドの違い - Qiita

Pythonのクラスメソッド(@classmethod)とは?使いどころとメソッドとの違いを解説 - Python学習チャンネル by PyQ

Python クラスについて - Qiita

機械学習を使わずに時系列データの異常検知

はじめに

研究で時系列データの異常検知に関する研究を行っています。そのため、機械学習による異常検知のアルゴリズムについて普段から文献調査などしているわけですが、機械学習を使わずとも異常検知できるという記事を見つけました。

大変興味深かったので、記事の内容に少しだけアレンジを加えて実装してみたいと思います。

対象データ

自分で異常検知用のデータを作成しても良いのですが、それだとどうしても作業的になってしまい、面白くないのでこちらの心電図のデータを使わせていただきました。
ダウンロードしたデータをそのままグラフのさせると以下のようになります。ただし、そのまま出力させるとデータが多過ぎて潰れたグラフになって見にくいので、データの頭から32秒付近までの間のデータを抽出しています。

f:id:Noleff:20200707152820p:plain

異常検知

見つけたい異常

上のグラフからわかるように、17秒付近で他とは振動をしていることがわかると思います。今回はこの異常を検知したいと思います。

特徴量

1. 単純移動平均

一つ目の特徴量は移動平均です。移動平均とはある区間を決めてその区間をずらしていき、各区間内で平均値を求めていく計算になります。
平均は一般的にあまり有効な特徴量となり得ませんが、今回は二つ目の特徴量を際立たせるためにあえて平均を採用しています。(といっても今回の場合この特徴量を使っていないとうまくいってないところもありますが)
以下に移動平均のグラフを載せます。移動平均なため、データの頭は0で埋めていることに注意です。

f:id:Noleff:20200707143036p:plain

2. ローパスフィルタとの差分

フーリエ変換

ローパスフィルタを説明するために、まずフーリエ変換について簡単に触れます。

フーリエ変換とは実変数の複素または実数値関数を別の同種の関数に写す変換である。変換後の関数はもとの関数に含まれる周波数を記述し、しばしばもとの関数の周波数領域表現 (frequency domain representation) と呼ばれる。実質的に、フーリエ変換は関数を振動関数に分解する。(Wikipediaより) f:id:Noleff:20200707011937g:plain https://ja.wikipedia.org/wiki/%E3%83%95%E3%83%BC%E3%83%AA%E3%82%A8%E5%A4%89%E6%8F%9B

言葉にするとわかりにくいですが、同じくWikipediaの載ってあるGIF画像を見ると非常にわかりやすいです。
複雑な振動波形でも、複数の振動波形が合成されてできています。この一つ一つの振動波形を取り出し、その振動波形からどの周期にどのぐらいの振れ幅を持っているかわかります。この振動波形から周波数成分に分解する変換のことをフーリエ変換といいます。
詳しくはこちらのサイトが大変わかりやすかったです。

ローパスフィルタ

続いてローパスフィルタの説明をします。フーリエ変換をすることで、振動波形を周波数成分に変換しました。今回の心電図をデータをフーリエ変換すると以下のようになります。ただし、0から10Hzの範囲のみ出力させています。なお、縦軸は各周期数成分の大きさを表す振幅スペクトルになります。

f:id:Noleff:20200707140545p:plain

大きな振幅スペクトルが二つ、中くらいの振幅スペクトルが一つ、小さな振幅スペクトルが三つできているのがわかると思います。
ローパスフィルタはこれらの周期数の内、Lowな振幅スペクトルの周期数のみ抽出します。つまり、振幅スペクトルが小さい高周波を除去しているイメージです。 ローパスフィルタ後の心電図とそれをフーリエ変換したしたものを以下に載せます。

f:id:Noleff:20200707140640p:plain

f:id:Noleff:20200707140709p:plain

今回ローパスフィルタは2.5Hzよりも大きな値を除去したので、二つ目の振幅スペクトルがやや鋭角になってますが、今回はこれでいきたいと思います。

差分の計算

長くなりましたが、ここまでが前置きです。
2つ目の特徴量はローパスフィルタとの差分でした。つまり、ローパスフィルタと元のデータの差分をとり、それを異常度として算出します。上のグラフでいうと、橙がローパスフィルタ後の心電図、青が元のデータの心電図なため、橙から青を引いた絶対値を異常度とします。なお、以下のグラフは0から1の範囲に値が収まるように正規化してます。

f:id:Noleff:20200707135208p:plain

思ったよりきれいに異常度を算出できませんでした(汗)。
ですが、心電図に異常が起きている17秒付近で異常度が大きくなっていることは見てわかると思います。

プログラム

環境

  • OS:windows10
  • python:3.7.6
  • numpy:1.18.1
  • pandas:1.0.1
  • scikit-learn:0.22.1

コード

上の描画のコードなど一部省略します。ほとんど参考から流用したので、気になる方はそちらをご覧ください。

生データ

最初に読み込むecg.csvはこんな感じです。timeが200スタートだったり、signal1とsignal2の違いは詳しく調べてないのでわかりません。今回はsignal2の方を使用しました。データはtimeからわかるように0.004秒ごとに記録されています。

f:id:Noleff:20200707141231p:plain

関数

# データ取得
def read_df(path):
    df = pd.read_csv(path)
    return df

# Timeから時間の差を追加
def differ_time(df):
    # Timeから時間の差を割り当てる
    df['dif_sec'] = df['time'].diff().fillna(0)
    df['cum_sec'] = df['dif_sec'].cumsum()
    return df

# 正規化
def minxmax_scaler(df):
    scaler = MinMaxScaler()
    scaler.fit(df)
    # 正規化したデータを新規のデータフレームに
    df_mc = pd.DataFrame(scaler.transform(df), columns=df.columns) 
    # 正規化したデータをリストに
    # mc_list = scaler.fit_transform(df)
    return df_mc

以下べた書き

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler

# データ読み込み
df = read_df('./ecg.csv')
dif_df = differ_time(df) # 秒数を計算 
test_df = dif_df.iloc[0:8192, :] # 使うデータだけ抽出

N = len(test_df) # FFTのサンプル数 
dt = 0.004 # サンプリング周波数 
freq = np.fft.fftfreq(N, d=dt) # 周波数

# 高速フーリエ変換(FFT)
F = np.fft.fft(test_df['signal2'].values) 
F_abs = np.abs(F/(N/2))

# ローパスフィルタ
threshold_period = 0.4
threshold_freq = 1 / threshold_period
F_lowpass = np.where(abs(freq) > threshold_freq, 0, F)
lowpass = np.fft.ifftn(F_lowpass).real

# 特徴量
sma = test_df['signal2'].rolling(25).mean().fillna(0) # 0.1秒ごとの移動平均
diff = abs(lowpass - test_df['signal2']) # ローパスフィルタとの差分

# 結果をまとめるデータフレーム
result_df = pd.DataFrame(diff, columns=['sma', 'diff', 'anomaly', 'label'])
result_df['sma'] = sma
result_df['diff'] = diff

 # 異常度を正規化
result_df = minxmax_scaler(result_df)

# 移動平均とローパスフィルタとの差分を乗算
result_df['anomaly'] = result_df['sma'] * result_df['diff'] 

# ラベル振り
result_df.loc[result_df['sma'] == 0, 'label'] = 2
result_df.loc[result_df['anomaly'] > 0.6, 'label'] = 1
result_df['label'] = result_df['label'].fillna(0)

これで、結果をまとめたresult_dfができました。

結果

移動平均とローパスフィルタを乗算した結果を示します。

plt.figure(figsize=(16,4))
plt.plot(test_df['cum_sec'], minxmax_scaler(test_df)['signal2'], color='blue', label='row')
plt.plot(test_df['cum_sec'], result_df['anomaly'], color='red', label='anomaly')
plt.legend()
plt.xlabel('sec')
plt.ylabel('anomaly')
plt.show()

f:id:Noleff:20200707144458p:plain

異常度が0から1の範囲に収まるようにしているので、閾値を設定すれば異常検知できます。今回は異常度が0.6より大きい箇所を異常としています。
グラフを見ると、心電図の17秒付近のみが0.6を越えていることがわかります。

今度は散布図として出力させます。

c_list = ['blue', 'red', 'green']

for i, c in enumerate(c_list):
    plt.scatter(result_df.loc[result_df['label']==i, 'diff'], result_df.loc[result_df['label']==i, 'sma'], color=c, label=i)

plt.xlabel('diff')
plt.ylabel('sma')
plt.legend()
plt.show()

f:id:Noleff:20200707144934p:plain

青色が正常、赤色が異常、緑色が移動平均時に0を埋めた値です。 異常なデータのみだけをグラフの右上付近に寄せることができていると思います。

参考

【データ分析入門】機械学習未使用!Pythonでゼロから始める振動解析|はやぶさの技術ノート

FFT を使った時系列データ解析 - nykergoto’s blog

<NumPy> 高速フーリエ変換による周波数解析 - Helve’s Python memo

https://cpp-learning.com/hampel-filter/

https://qiita.com/hoto17296/items/d337fe0215907432d754

https://ja.wikipedia.org/wiki/%E3%83%95%E3%83%BC%E3%83%AA%E3%82%A8%E5%A4%89%E6%8F%9B

http://www.cs.ucr.edu/~eamonn/discords/

Telloのセンサデータ収集に関しての捕捉

前回

前回の記事でTelloのセンサデータ収集方法について書きました。今回はその記事の補足となります。良かったら、まず先にそちらを読んでいただければなと思います。

今回の記事内容

今回の記事は前回の内容データ収集部分に関する捕捉です。具体的にはtello.pyの中のget_tello_sensorメソッドの話になります。以下コードです。

import socket
import threading
import time
from datetime import datetime
import csv
from stats import Stats

class Tello:
    def __init__(self):
        self.INTERVAL = 0.1

        self.header = ['datetime', 'status',
                'mid', 'x', 'y', 'z',
                'mpry1', 'mpry2', 'mpry3',
                'pitch', 'roll', 'yaw', 
                'agx', 'agy', 'agz',
                'vgx', 'vgy', 'vgz', 
                'templ', 'temph', 'tof', 'h', 
                'bat', 'baro', 'time']
        self.flag = True
        self.name = str(datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
        self.status = None

        # ステータス受信用のUDPサーバの設定
        self.local_ip = ''
        self.local_port = 8890
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # socket for sending cmd
        self.socket.bind((self.local_ip, self.local_port))

       # コマンド送信用の設定
        self.receive_thread = threading.Thread(target=self.get_tello_sensor)
        # self.receive_thread = threading.Thread(target=self._receive_thread)
        self.receive_thread.daemon = True
        self.receive_thread.start()

        self.tello_ip = '192.168.10.1'
        self.tello_port = 8889
        self.tello_address = (self.tello_ip, self.tello_port)
        self.log = []

        self.MAX_TIME_OUT = 15.0

    def send_command(self, command):
        """
        Send a command to the ip address. Will be blocked until
        the last command receives an 'OK'.
        If the command fails (either b/c time out or error),
        will try to resend the command
        :param command: (str) the command to send
        :param ip: (str) the ip of Tello
        :return: The latest command response
        """
        self.status = command
        self.log.append(Stats(command, len(self.log)))

        self.socket.sendto(command.encode('utf-8'), self.tello_address)
        print('sending command: %s to %s' % (command, self.tello_ip))

        start = time.time()
        while not self.log[-1].got_response():
            now = time.time()
            diff = now - start
            if diff > self.MAX_TIME_OUT:
                print ('Max timeout exceeded... command %s' % command)
                # TODO: is timeout considered failure or next command still get executed
                # now, next one got executed
                return
        print('Done!!! sent command: %s to %s' % (command, self.tello_ip))

    def _receive_thread(self):
        """Listen to responses from the Tello.

        Runs as a thread, sets self.response to whatever the Tello last returned.

        """
        while True:
            try:
                self.response, ip = self.socket.recvfrom(1024)
                print('from %s: %s' % (ip, self.response))
                self.log[-1].add_response(self.response)
            except socket.error as exc:
                print("Caught exception socket.error : %s" % exc)

    def get_tello_sensor(self):
        while True:
            index = 0                                
            try:
                index += 1
                time.sleep(self.INTERVAL) # 一定時間待つ
                now = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f'))
                self.response, ip = self.socket.recvfrom(1024) # 受信は最大1024バイトまで
                self.response = str(self.response)
                
                # print('from %s: %s' % (ip, self.response))
                self.log[-1].add_response(self.response)
                
                if self.response == "b'ok'":
                    continue
                
                # 受信データに手を加える
                self.response = self.response.split(';')[:21] # センサ関係だけ抽出
                sensor_list = []
                for sensor in self.response:

                    s = sensor.split(':')
                    if s[0] == 'mpry':
                        mpry_list = s[1].split(',')
                    else:
                        sensor_list.append(s[1])
                    
                sensor_dict = {
                    'datetime':now, 'status':self.status,
                    'mid':sensor_list[0], 'x':sensor_list[1], 'y':sensor_list[2], 'z':sensor_list[3],
                    'mpry1':mpry_list[0], 'mpry2':mpry_list[1], 'mpry3':mpry_list[2],
                    'pitch':sensor_list[4], 'roll':sensor_list[5], 'yaw':sensor_list[6],
                    'agx':sensor_list[17], 'agy':sensor_list[18], 'agz':sensor_list[19],
                    'vgx':sensor_list[7], 'vgy':sensor_list[8], 'vgz':sensor_list[9],
                    'templ':sensor_list[10], 'temph':sensor_list[11], 'tof':sensor_list[12], 'h':sensor_list[13],
                    'bat':sensor_list[14], 'baro':sensor_list[15], 'time':sensor_list[16]
                }
                
                self.write_csv('../../../data/raw/'+ self.name + '.csv', self.header, sensor_dict, self.flag)

                if self.flag:
                    self.flag = False
                
            except socket.error as exc:
                print("Caught exception socket.error : %s" % exc)

    def write_csv(self, filename, header, value, flag):
        with open(filename, mode='a', newline="") as f:
            writer = csv.DictWriter(f, fieldnames=header)
            if flag:
                writer.writeheader()
            writer.writerow(value)

    def on_close(self):
        # for ip in self.tello_ip_list:
        self.socket.sendto('land'.encode('utf-8'), self.tello_address)
        self.socket.close()

    def get_log(self):
        return self.log

ちょっと長いですが見るのはget_tello_sensorメソッドだけで良いです。このメソッド内の5行目で、time.sleep(self.INTERVAL)と記述しています。これは取得したセンサデータをCSVに出力するタイミングを操作しています。今回のコードではイニシャライザでself.INTERVAL = 0.1としていますので、0.1秒ごとにセンサデータが書き込まれます。

問題点

さて、何が問題なのか説明します。このself.INTERVALは、このコードを作る上で参考にしたサンプルプログラムがあります。 ここでは、self.INTERVALは0.2秒となっていますが、本プログラムでこれを0.2秒にすると、センサデータが間延びした形になるという問題が発生してしまいます。

github.com

検証方法

検証にはTelloとiPhone、二つのプログラムを同時に実行させ、同じ挙動をさせたときのセンサ値を見ます。今回はroll回転(Y軸)を180度回転しては戻す操作を繰り返しました。 動作のフローとしては、プログラム実行 ー> 10秒停止 ー> 約20秒間Roll回転 ー> 10秒放置 ー> プログラム停止の順番です。

検証結果

結果、Telloのグラフだけ、間延びしていることがわかると思います。iPhoneの方はself.INTERVALの秒数を増やすごとに、データ数は減っていますが、ほぼ同じグラフをプロットしています。 上から順に、self.INTERVALが0.1秒、0.2秒、0.3秒だったときのグラフになります。

0.1秒のとき、TelloとiPhoneが微妙にずれているのは、両手にTelloとiPhoneを持って回転させたため、逆位相になっているからです(ここは本当にミスりました)。
0.2秒のとき、TelloのセンサデータのRoll値の変化が現れる時間がiPhoneより10秒以上遅くなっています。最初はこのずれは、Telloのセンサデータを送るときのラグだと思っていましたが、検証したことからtime.sleepが悪さしていることがわかりました。
0.3秒のとき、もう明らかに間延びしています。間延びしてしまう原因は不明ですが、0.1秒以外は信用できないと言えるでしょう。

  • 0.1秒 f:id:Noleff:20200606141952p:plain

  • 0.2秒 f:id:Noleff:20200606142011p:plain

  • 0.3秒 f:id:Noleff:20200606142024p:plain

また、0.1秒が問題ない理由としては、Telloのセンサデータを送る仕組みが、PC側でUDPサーバを立てて、TelloがPCにひたすらセンサデータを送り続けてくるからだと思います。この秒数がおそらく0.1秒なため、0.1秒以上にすると内部的におかしなことが起きているのかもしれません。
実はself.INTERVALを入れなくてもプログラムは動きます。下のグラフが入れなかったときのグラフです。0.1秒のときと大差ないことがわかると思います。

  • 0秒 f:id:Noleff:20200606155353p:plain

まとめ

今回の記事ではTelloのセンサデータが間延びしていることを検証しました。検証方法としては、同じ挙動をさせたiPhoneのセンサデータと比較するというシンプルなものです。 結果、time.sleepを0.1秒より大きな値を入れると、間延びしていることがわかりました。原因はわかりませんが、TelloはiPhoneと異なり通信してデータをCSVに溜め込んでいることが原因な気がします。

Appnedix

iPhoneのプログラムはpythonista3という環境で作成しました。以下コードです。説明は省略します。

import ui, location, csv, datetime, time, motion, sound
from datetime import datetime
import os

def csv_writer(filename, header, value, flag):
    with open(filename, mode="a") as f:
        writer = csv.DictWriter(f, fieldnames=header)
        if flag:
            writer.writeheader()
        writer.writerow(value)

# motion header
header = ['datetime', 'latitude', 'longitude', 'altitude', 'timestamp', 'horizontal_accuracy', 'vertical_accuracy', 'speed', 'course', 'pitch', 'roll', 'yaw', 'agx', 'agy', 'agz', 'gra_x', 'gra_y', 'gra_z', 'uacc_x', 'uacc_y', 'uacc_z', 'com_x', 'com_y', 'com_z', 'accuracy']

flag = True
name = str(datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))

while True:    
    # get motion
    motion.start_updates()
    # get GPSdata
    location.start_updates() # updata GPSdata 
    time.sleep(0.1)
    gravity = motion.get_gravity()
    user_accele = motion.get_user_acceleration()
    attitude = motion.get_attitude()
    magnetic = motion.get_magnetic_field()
    gps = location.get_location() # get GPSdata
    motion.stop_updates()
    location.stop_updates() 
    
    # get realtime
    now = str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f'))
    # user accele
    accele = [a + g for (a, g) in zip(user_accele, gravity)]
    
    # dict
    sensor_dict = {'datetime':now, 'latitude':gps['latitude'], 'longitude':gps['longitude'], 'altitude':gps['altitude'], 'timestamp':gps['timestamp'], 'horizontal_accuracy':gps['horizontal_accuracy'], 'vertical_accuracy':gps['vertical_accuracy'], 'speed':gps['speed'], 'course':gps['course'], 'pitch':attitude[1], 'roll':attitude[0], 'yaw':attitude[2], 'agx':accele[0], 'agy':accele[1], 'agz':accele[2], 'gra_x':gravity[0], 'gra_y':gravity[1], 'gra_z':gravity[2], 'uacc_x':user_accele[0], 'uacc_y':user_accele[1], 'uacc_z':user_accele[2], 'com_x':magnetic[0], 'com_y':magnetic[1], 'com_z':magnetic[2], 'accuracy':magnetic[3]}
    
    # Open csv file and write motion and GPS dictionary data
    csv_writer("/private/var/mobile/Library/Mobile Documents/iCloud~com~omz-software~Pythonista3/Documents/csvData/" + name + ".csv", header, sensor_dict, flag)
    
    if flag:
        flag = False

Telloのセンサデータ収集

はじめに

Ryze Telloでプログラミングしたときのセンサデータ収集方法についてまとめます。言語はPythonでやりました。SDKとして、DJIの公式SDK「Tello-Python」を使っています。

https://github.com/dji-sdk/Tello-Python

本ブログでは、上記のサンプルコードを理解、実装済みであることを想定しています。まだの方はgit cloneして動かしてみてください。 以下のサイトが参考になると思います。

https://qiita.com/hsgucci/items/3327cc29ddf10a321f3c

開発環境

  • macOS Catalina バージョン10.15.3(windows10でも動作確認済み)
  • Python3.7.6
  • Tello EDU (Tello SDK 2.0)

目的

今回やりたいことはプログラムでTelloを飛行させつつ、Tello内に内蔵されているセンサの情報を取得し、CSVファイルに保存することです。Telloには内部にデータを保存する機構がないため、取得してあるデータは操作しているPCに送る必要があります。

簡単に言うと、Telloとの通信には3種類あるようです。その中にTelloの機体ステータスを受信し続ける8890ポートがあります。PC側でポート8890を指定してUDPサーバーを立てると、Telloが自身の状態、つまりセンサデータを送信し続けてくれます。

データの取得はポート8890で良いですが、こちらからコマンドを送り、Telloに飛行命令を出すのはポート8889になります。この通信は双方向通信で、PCからコマンドを送ると、Tello側から応答が帰ってきます。

今回のプログラムではカメラは使わないため、ポート8890と8889を使います。ポート8890に関するサンプルプログラムはtello_state.py、ポート8889に関するサンプルプログラムがSingle_Tello_Testフォルダ内にある、tello_test.py、tello.py、stats.pyです。

上記のサイトが「Tello-Python」の中にあるtello_state.pyとSingle_Tello_Testのサンプルプログラムを詳しく解説してくださっています。 

前準備

今回のプログラムはPython3環境で実装していますが、「Tello-Python」はPython2環境で動かすプログラムとなっています。そのため、そのままPython3環境で実行すると、しょうもないエラーが出てしまいます。まずはそれをどうにかしましょう。

https://qiita.com/coffiego/items/54c8bb553394590787f9

と言ってもリンクを貼るだけですが。

上記のサイトではSingle_Tello_TestのサンプルプログラムをPython3で動くように修正してくださっています。tello_test.pyを手動でコマンドを手入力で実行できるように変更していますが、今回はSDKのサンプルプログラム通り、コマンドがあらかじめ書かれたテキストを読み込んで動くようにプログラムを実装しています。自分がやりたい方で実装してみてください。

センサデータ収集

Tellloのセンサデータを収集するのに使う主なプログラムは以下の4つです。

  • main.py
  • move_Tello.py
  • tello.py
  • stats.py

ディレクトリ構成は以下のようになっています。

data
  |--raw
  |    |--実行した日付.csv
notebooks
  |--visualize.ipynb
scripts
  |--data
  |    |--Tello
  |    |    |--FlightPlan
  |    |    |    |--command.txt
  |    |    |--log
  |    |    |    |--実行した日付.txt
  |    |    |--main.py
  |    |    |--move_Tello.py
  |    |    |--tello.py
  |    |    |--stats.py

main.py

move_Tello.pyを実行しているだけです。なくてもいいですが、ここで、手動でコマンドを打つか、飛行計画からコマンドを実行するかを分岐させると良いかもしれません。

from move_Tello import MoveTello

def move_tello():
    move = MoveTello()
    # 飛行計画から動かす
    move.auto_move()

if __name__ == "__main__":
    print('started')
    move_tello()

move_Tello.py

  • post_commandメソッド

    このコードでTelloに実行命令を出しています。 飛行計画にdelayが合った場合、何秒delayしたかを標準出力します。

  • read_FlightPlanメソッド

    command.txtに書いてあるコマンドを読み込んで、リストとして保存しています。command.txtの中身は以下のとおりです。飛行計画はお好みで設定してみてください。下記のコマンドは離陸 ー> 4m前進 ー> 180度時計回り ー> 4m前進 ー> 着陸の順に実行されます。

command
takeoff
delay 3
forward 400
delay 3
cw 180
delay 3
forward 400
delay 3
land
  • outputメソッド Single_Tello_Testのtello_test.pyのログ出力部分をメソッド化しているだけです。正直センサデータを記録しているので、今回このログはあまり重要ではありません。実行する度にログファイルが作られるため、筆者はずっとコメントアウトしてました。

  • auto_moveメソッド

    read_FlightPlanから入手したコマンドを順番にpost_commandに渡し、コマンドを制御しています。for文の中にsleepを入れているのは、Telloに一気にコマンドを渡さないためです。現状、Telloにコマンドを渡し、そのコマンドの挙動を実行している間に次のコマンドを送っても無視されるため、それをなくしたいというアプローチです。Telloが確実に前のコマンドの挙動を終えている5秒後に次のコマンドを送るようにしています。

from tello import Tello
import sys
from datetime import datetime
import time

class MoveTello:
    def __init__(self):
        # ログ記録用時刻
        self.name = str(datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
        # 強制着陸用時刻
        self.t1 = time.time()
        # インスタンス
        self.tello = Tello()
        
    # 飛行計画から動かす
    def auto_move(self):
        # 飛行計画読み込み
        commands = self.read_FlightPlan()
        # コマンドを判定して投げる
        for command in commands:
            self.post_command(command)
            time.sleep(5)
        # ログ出力
        self.output() #邪魔ならここをコメントアウト

    # logと標準出力
    def output(self):
        log = self.tello.get_log()
        out = open('log/' + self.name + '.txt', 'w')
        for stat in log:
            stat.print_stats()
            str = stat.return_stats()
            out.write(str)

    # 飛行計画読み込み
    def read_FlightPlan(self):
        with open('FlightPlan/command.txt', 'r') as f:
            commands = f.readlines()
        return commands
    
    # コマンド判定して投げる
    def post_command(self, command):
        # 空文字かつ改行
        if command != '' and command != '\n':
            # 末尾の空白を削除
            command = command.rstrip() 
            # commandの中にdelayがない
            if command.find('delay') != -1: 
                # 秒数抽出
                sec = float(command.partition('delay')[2])
                print('delay %s' % sec)
                time.sleep(sec)
            else:
                self.tello.send_command(command)

tello.py

  • イニシャライザに追記

    ほぼSingle_Tello_Testのtello.pyそのままなので、主な変更点だけ述べます。
    __init__.pyにいくつか変数を追記しています。CSVファイルに記録するために必要なヘッダー、フラグ、記録日、コマンドを変数として持ちます。Telloのステータスを受信するためにlocal_postは8890に変更し、threading.Threadの引数targetをget_tello_sensorに変更して、別スレッドとしてセンサデータの収集とCSV出力を行います。

  • get_tello_sensorメソッド

    このコードはtello_state.pyにいくつか変更を加えたプログラムです。主な変更分はデータの整形になるため、少々複雑です(くそコードとも言います)。ポート8890から得られるステータスの出力例は以下のようになっています。

    f:id:Noleff:20200501011644p:plain

    socket.recvfrom(1024)から返ってくるreseponseはbytes型なので、splitするためにstring型にしています。responseのセンサデータ部分だけをsplitで抽出し、sensor_listに保存します。mpryは同じsplit方法で抽出できないので別途splitしています。
    CSVファイルに保存するのはwrite_csvメソッドです。フラグはヘッダを最初の一回だけ保存するために用いています。

import socket
import threading
import time
from datetime import datetime
import csv
from stats import Stats

class Tello:
    def __init__(self):
        self.INTERVAL = 0.1

        self.header = ['datetime', 'status',
                'mid', 'x', 'y', 'z',
                'mpry1', 'mpry2', 'mpry3',
                'pitch', 'roll', 'yaw', 
                'agx', 'agy', 'agz',
                'vgx', 'vgy', 'vgz', 
                'templ', 'temph', 'tof', 'h', 
                'bat', 'baro', 'time']
        self.flag = True
        self.name = str(datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
        self.status = None

        # ステータス受信用のUDPサーバの設定
        self.local_ip = ''
        self.local_port = 8890
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  # socket for sending cmd
        self.socket.bind((self.local_ip, self.local_port))

       # コマンド送信用の設定
        self.receive_thread = threading.Thread(target=self.get_tello_sensor)
        # self.receive_thread = threading.Thread(target=self._receive_thread)
        self.receive_thread.daemon = True
        self.receive_thread.start()

        self.tello_ip = '192.168.10.1'
        self.tello_port = 8889
        self.tello_address = (self.tello_ip, self.tello_port)
        self.log = []

        self.MAX_TIME_OUT = 15.0

    def send_command(self, command):
        """
        Send a command to the ip address. Will be blocked until
        the last command receives an 'OK'.
        If the command fails (either b/c time out or error),
        will try to resend the command
        :param command: (str) the command to send
        :param ip: (str) the ip of Tello
        :return: The latest command response
        """
        self.status = command
        self.log.append(Stats(command, len(self.log)))

        self.socket.sendto(command.encode('utf-8'), self.tello_address)
        print('sending command: %s to %s' % (command, self.tello_ip))

        start = time.time()
        while not self.log[-1].got_response():
            now = time.time()
            diff = now - start
            if diff > self.MAX_TIME_OUT:
                print ('Max timeout exceeded... command %s' % command)
                # TODO: is timeout considered failure or next command still get executed
                # now, next one got executed
                return
        print('Done!!! sent command: %s to %s' % (command, self.tello_ip))

    def _receive_thread(self):
        """Listen to responses from the Tello.

        Runs as a thread, sets self.response to whatever the Tello last returned.

        """
        while True:
            try:
                self.response, ip = self.socket.recvfrom(1024)
                print('from %s: %s' % (ip, self.response))
                self.log[-1].add_response(self.response)
            except socket.error as exc:
                print("Caught exception socket.error : %s" % exc)

    def get_tello_sensor(self):
        while True:
            index = 0                                
            try:
                index += 1
                time.sleep(self.INTERVAL) # 一定時間待つ
                now = str(datetime.now().strftime('%Y-%m-%d %H:%M:%S:%f'))
                self.response, ip = self.socket.recvfrom(1024) # 受信は最大1024バイトまで
                self.response = str(self.response)
                
                # print('from %s: %s' % (ip, self.response))
                self.log[-1].add_response(self.response)
                
                if self.response == "b'ok'":
                    continue
                
                # 受信データに手を加える
                self.response = self.response.split(';')[:21] # センサ関係だけ抽出
                sensor_list = []
                for sensor in self.response:

                    s = sensor.split(':')
                    if s[0] == 'mpry':
                        mpry_list = s[1].split(',')
                    else:
                        sensor_list.append(s[1])
                    
                sensor_dict = {
                    'datetime':now, 'status':self.status,
                    'mid':sensor_list[0], 'x':sensor_list[1], 'y':sensor_list[2], 'z':sensor_list[3],
                    'mpry1':mpry_list[0], 'mpry2':mpry_list[1], 'mpry3':mpry_list[2],
                    'pitch':sensor_list[4], 'roll':sensor_list[5], 'yaw':sensor_list[6],
                    'agx':sensor_list[17], 'agy':sensor_list[18], 'agz':sensor_list[19],
                    'vgx':sensor_list[7], 'vgy':sensor_list[8], 'vgz':sensor_list[9],
                    'templ':sensor_list[10], 'temph':sensor_list[11], 'tof':sensor_list[12], 'h':sensor_list[13],
                    'bat':sensor_list[14], 'baro':sensor_list[15], 'time':sensor_list[16]
                }
                
                self.write_csv('../../../data/raw/'+ self.name + '.csv', self.header, sensor_dict, self.flag)

                if self.flag:
                    self.flag = False
                
            except socket.error as exc:
                print("Caught exception socket.error : %s" % exc)

    def write_csv(self, filename, header, value, flag):
        with open(filename, mode='a', newline="") as f:
            writer = csv.DictWriter(f, fieldnames=header)
            if flag:
                writer.writeheader()
            writer.writerow(value)

    def on_close(self):
        # for ip in self.tello_ip_list:
        self.socket.sendto('land'.encode('utf-8'), self.tello_address)
        self.socket.close()

    def get_log(self):
        return self.log

stats.py

Single_Tello_Testのstats.pyをpython3環境で実行できるようにしただけです。説明は省略します。

from datetime import datetime

class Stats:
    def __init__(self, command, id):
        self.command = command
        self.response = None
        self.id = id

        self.start_time = datetime.now()
        self.end_time = None
        self.duration = None

    def add_response(self, response):
        self.response = response
        self.end_time = datetime.now()
        self.duration = self.get_duration()
        # self.print_stats()

    def get_duration(self):
        diff = self.end_time - self.start_time
        return diff.total_seconds()

    def print_stats(self):
        print('\nid: %s' % self.id)
        print('command: %s' % self.command)
        print('response: %s' % self.response)
        print('start time: %s' % self.start_time)
        print('end_time: %s' % self.end_time)
        print('duration: %s\n' % self.duration)

    def got_response(self):
        if self.response is None:
            return False
        else:
            return True

    def return_stats(self):
        str = ''
        str +=  '\nid: %s\n' % self.id
        str += 'command: %s\n' % self.command
        str += 'response: %s\n' % self.response
        str += 'start time: %s\n' % self.start_time
        str += 'end_time: %s\n' % self.end_time
        str += 'duration: %s\n' % self.duration
        return str

データの描画

コード

実際に取得したセンサをグラフ化しました。こちらはnotebook形式でコーディングしています。 'mid'、 'x'、 'y'、 'z'、'mpry1'、'mpry2'、'mpry3'に関しては、ミッションパッド使用時にしか値に変化が現れないとのことなのでグラフとして出力はしていません。

import pandas as pd
import matplotlib.pyplot as plt

# センサデータ取得
def get_TelloSensor_data(filename):
    df = pd.read_csv('../data/raw/'+ filename +'.csv')
    return df

# datetimeから時間の差を追加
def differ_datetime(df):
    # dateimeから時間の差を割り当てる
    df['datetime'] = pd.to_datetime(df['datetime'], format='%Y-%m-%d %H:%M:%S:%f')
    df['dif_sec'] = df['datetime'].diff().dt.total_seconds()
    df['dif_sec'] = df['dif_sec'].fillna(0)
    df['cum_sec'] = df['dif_sec'].cumsum()
    return df

# main
sensor_list = ['pitch', 'roll', 'yaw', 
            'agx', 'agy', 'agz',
            'vgx', 'vgy', 'vgz', 
            'templ', 'temph', 'tof', 'h', 
            'bat', 'baro']

for sensor in sensor_list:
    # tello
    tel_df = get_TelloSensor_data(test)
    tel_dif_df = differ_datetime(tel_df)
    # visualize
    fig = plt.figure(figsize=(16,4))
    plt.plot(tel_dif_df['cum_sec'], tel_dif_df[sensor], label='tello')
    plt.xlabel('cum_sec')
    plt.ylabel(sensor)
    plt.legend()
    plt.show()

グラフ

データとして挙動がわかる部分のセンサだけをピックアップしました。 ソースは見つかりませんでしたが、Telloの進行方向(カメラがついている方向)がy軸、横方向がx軸、垂直方向がz軸です。センサデータが収集できるスマートフォンと同じ挙動をさせたときに同じグラフになるかどうかで検証しました。

vgy

速度のvgy(y軸)では、Telloが前進したときに速度変化が現れているのがわかると思います。なぜ、往路の前進ではマイナス値が出ているのかは不明ですが……。

f:id:Noleff:20200501014416p:plain

pitch

ジャイロのpitch(x軸の回転)では、とてもわかりやすい値が出ました。前進するときTelloは少しだけ前側に傾きます。その後4m先で停止するために後ろ側に傾きます。これより、谷ができた後に山ができているというグラフが描画されたと考えられます。なお、SDK2.0公式ドキュメントには回転の命令は1~360の範囲となっていますが、なぜかジャイロセンサの出力値は-180~180の範囲で出力されます。

f:id:Noleff:20200501014437p:plain

yaw

ジャイロのyaw(z軸の回転)では、4m前進した後の時計回りに180度回転するときに値がきちんと変わりました。-125~50なので、ほぼ180度変化があったとみていいでしょう。

f:id:Noleff:20200501023146p:plain

tof

距離センサのtof(Time Of Flight)では、Telloの高さが出力されます。Telloををひっくり返すと赤く光っているやつだと思います。おおむね正しく高さが検出できていると思います。

f:id:Noleff:20200501014514p:plain

今後の課題

今回はTelloにテキストファイルから飛行計画を読み込んで命令を与えていました。しかし、現状の方法は一つのコマンド命令を実行 ー> 命令が終わるとホバリング(sleep中) ー> 次のコマンド命令を実行……、というふうにTelloの挙動はスムーズではありません。今後はスムーズな飛行できるように改良していこうと思います。