【Python + 仮想通貨】ボリンジャーバンドを活用した売買アルゴリズムをOptunaで最適化する

仮想通貨

今回の記事では以下の内容に取り組みます。

  • Coincheckから取得したローソク足データを使う
  • bollinger bandを指標にした売買アルゴリズムで取引した場合の損益計算を関数化する
  • Optunaで売買アルゴリズムのパラメータを最適化
  • 最適化パラメータの可視化、考察

結論から言うと、Optunaで最適化したパラメータで自動売買すれば、ある程度現実的な原資でプラス収支になる可能性が示唆されました。

ボリンジャーバンドを活用した取引

ボリンジャーバンドとは?

ボリンジャーバンドは移動平均(MA: Moving Average)と移動平均の標準偏差(σ)から算出されるテクニカル指標のひとつです。(詳しくはこちら:テクニカル指標一覧 – Wikipedia

統計学的に現在価格がMA±σ, MA±2σ, MA±3σの範囲に入っている確率はそれぞれ約68%, 95%, 99.7%であることが推測されます。つまり現在価格がMA±2σ範囲外に推移することは確率約5%という非常に稀なことが起こっており、その後反発して高値に推移する可能性が高いということになります。

ボリンジャーバンド図解

一般的なボリンジャーバンドの活用方法としてはMA-2σを買いのタイミングと判断してエントリーするという方法です。

売買アルゴリズムと最適化するパラメータ

ボリンジャーバンドの一般的な活用方法を参考にして次のアルゴリズムと最適化したいパラメータを考えました。

  • C:エントリー判断に使うσの係数。MA-(C*σ)のタイミングでエントリーする。
  • sell:エントリー価格のsell倍の値で利益確定用の指値売り。
  • loss_cut:エントリー価格のloss_cut倍以下になったタイミングでロスカット判断して売り。

自分の雑な調査によるとσの係数を調整している人は少なそうなので、どんな結果になるのか楽しみです。

売買アルゴリズムの実装

今回はこちらのデータを使って分析していきます。

1分足ではデータが欠けているところがあるので、resampleメソッドで5分足にしてボリンジャーバンドを計算します。

import pandas as pd

# data準備
file = '1min_20230806.csv'
df = pd.read_csv(file)
df['date'] = pd.to_datetime(df['date'])
df.set_index('date', inplace=True)

# 5分足に変更
th = '5T'
df_resampled = pd.DataFrame()
df_resampled['open'] = df['open'].resample(th).first()
df_resampled['high'] = df['high'].resample(th).max()
df_resampled['low'] = df['low'].resample(th).min()
df_resampled['close'] = df['close'].resample(th).last()
df_resampled['volume'] = df['volume'].resample(th).sum()

df = df_resampled

ボリンジャーバンドのアルゴリズム

pandasのrollingメソッドを使って移動平均と標準偏差を簡単に計算できます。データポイント数は20にしました。

window = 20
df['MA'] = df['close'].rolling(window).mean()
df['std'] = df['close'].rolling(window).std()

続いてパラメータに適当な設定し、売買シグナルカラムをbool形式で追加していきます。パラメータは後に最適化しますが、まずは探索範囲を確認するために適当な値で結果を確認する必要があります。

# パラメータ設定
c = 2           # 買い判断に使う標準偏差の係数
sell = 1.015    # 買付価格の101.5%で利確売り
loss_cut = 0.99 # 買付価格の99%でロスカット

# 買い判断のデータインデックスを取得
df['buy_th'] = df['MA'] - df['std'] * c
df['diff'] = df['close'] - df['buy_th']
df['buy_signal'] = df['diff'].apply(lambda x: True if x < 0 else False)
buy_signal_idx = df[df['buy_signal']].index.tolist()

ここから先がやや複雑です。取得した買い判断インデックスでforループを回して買い判断以降の行を持つデータフレームを生成します。そして利確またはロスカットの判断カラムをbool形式で追加し、一番最初に出てくる売り判断で取引成立したことにして、損益をリストに格納していきます。

profits = []
loss_cuts = []
time_out_count = 0

for idx in buy_signal_idx:
    # 買付価格から利確、ロスカット価格を計算
    buy_price = df.loc[idx, 'close']
    sell_price = buy_price * sell
    loss_cut_price = buy_price * loss_cut
    
    # 買い判断以降の行を取得
    data = df.loc[idx:,]
    data = data.reset_index()
    close_idx = data.columns.get_loc('close')

    # 利確、ロスカット判断のbool形式カラムを追加
    data['profit'] = data['high'].apply(lambda x: True if x > sell_price else False)
    data['loss_cut'] = data['close'].apply(lambda x: True if x < loss_cut_price else False)

    # 利確、ロスカット判断がいずれかにTrueがある場合
    if any(data['loss_cut']) or any(data['profit']):

        # 最初に利確判断/ロスカット判断になるインデックス値を取得
        try:
            first_profit = data[data['profit']].index.tolist()[0]
        except IndexError: # すべてFalseだとIndexErrorになるので対策
            first_profit = len(data)
        try:
            first_loss_cut = data[data['loss_cut']].index.tolist()[0]
        except IndexError:
            first_loss_cut = len(data)

    else:
        time_out_count += 1
        continue
   
    # 利確/ロスカットの早い方で損益をリストに格納
    if first_profit <= first_loss_cut:
        profits.append(sell_price - buy_price)
        profit_after_buy.append(first_profit)
    
    elif first_loss_cut < first_profit:
        loss_cut_price = data.iloc[first_loss_cut, close_idx] # 終値でロスカット成立と仮定
        loss_cuts.append(loss_cut_price - buy_price)
        loss_cut_after_buy.append(first_loss_cut)

# 損益を通算
profit = sum(profits)
loss_cut = sum(loss_cuts)
total_profit = profit + loss_cut

利確/ロスカットのところでゴチャゴチャしてしまいました。もっと上手い実装方法がありそうですが、私のアイデアではこれが限界でした。結果を見ていきます。

# 結果
print(f"buy signal rate: {len(df[df['buy_signal']]) / len(df)}")
print(f"time outs: {time_out_count}/{len(df[df['buy_signal']])}")
print(f'profit: {profit}, {len(profits)}')
print(f'loss_cut: {loss_cut}, {len(loss_cuts)}')
print(f'total_profit: {total_profit}')

>> buy signal rate: 0.053835021707670044
>> time outs: 29/186
>> profit: 4025765.429999973, 65
>> loss_cut: -4175854.0, 92
>> total_profit: -150088.57000002684

結果はマイナスでしたが、タイムアウトの回数もそれほど多くないことから、ある程度まともパラメータだっと察します。ロスカット回数が多いのでロスカットの係数がもう少し下げてもよいかもしれません。

目的関数の実装

目的関数は以下になります。読み込むCSVは予め5分足の移動平均と標準偏差を計算することで、イテレーション時の処理時間を少しでも早くします。

def objective(trial):

    # data準備
    file = './data/5min_20230806.csv' # 5分足で移動平均、標準偏差の計算まで処理済み
    df = pd.read_csv(file)

    # best parameterで
    c = trial.suggest_uniform('c', 0.7, 3)
    sell = trial.suggest_uniform('sell', 1.00005, 1.01) 
    loss_cut = trial.suggest_uniform('loss_cut', 0.99, 1)

    # 結果の一時保存
    profits = []
    loss_cuts = []

    # 買いシグナルのindexを取得
    df['buy_th'] = df['MA'] - df['std'] * c
    df['diff'] = df['close'] - df['buy_th']
    df['buy_signal'] = df['diff'].apply(lambda x: True if x < 0 else False)
    buy_signal_idx = df[df['buy_signal']].index.tolist()

    for idx in buy_signal_idx:
        buy_price = df.loc[idx, 'close']
        sell_price = buy_price * sell
        loss_cut_price = buy_price * loss_cut
        
        data = df.loc[idx:,]
        data = data.reset_index()

        data['profit'] = data['high'].apply(lambda x: True if x > sell_price else False)
        data['loss_cut'] = data['close'].apply(lambda x: True if x < loss_cut_price else False)

        if any(data['loss_cut']) or any(data['profit']):
            try:
                first_profit = data[data['profit']].index.tolist()[0]
            except IndexError:
                first_profit = len(data)

            try:
                first_loss_cut = data[data['loss_cut']].index.tolist()[0]
            except IndexError:
                first_loss_cut = len(data)
        
        else:
            continue

        if first_profit <= first_loss_cut:
            profits.append(sell_price - buy_price)
        
        elif first_loss_cut < first_profit:
            loss_cut_price = data.loc[first_loss_cut, 'close']
            loss_cuts.append(loss_cut_price - buy_price)

    # 損益を通算
    total_profit = sum(profits) + sum(loss_cuts)

    return total_profit

最適化と結果の可視化

今回もoptunaライブラリのstudyインスタンスを使います。インスタンス生成時に引数diresctionに’maximize’を指定することで、目的関数を最大化するパラメータ探索を行えます。

import optuna

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=200)

print(study.best_params)
print(study.best_value)

>> {'c': 0.7447882343598757, 'sell': 1.0044986849434376, 'loss_cut': 0.9983094867526481}
>> 2250214.5933372173

200回のイテレーションで発見されたベストパラメータと最高収益が得られました。標準偏差の係数は1よりも小さい値なので軽い下ブレでエントリーし、利確とロスカットも買付価格の1%以内に収める小まめな取引アルゴリズムのようです。

次にイテレーション毎の各パラメータの推移を3次元プロットで描画してみます。3次元散布図にはplotlyライブライを使います。

import plotly.express as px

results = []
for each in study.trials:
    result_dict = each.params
    result_dict['profit'] = each.value
    results.append(result_dict)

result_df = pd.DataFrame(results)
result_df.to_csv('optimized_bollinger_5min_20230806.csv')

# 3D plot
fig = px.scatter_3d(result_df, x='c', y='sell', z='loss_cut', color='profit')
fig.show()

収益が高い領域を重点的にイテレーションが進んでいることが確認できます。

またエントリー後のホールド期間も確認します。このためには最適化パラメータで目的関数を計算し、その際にホールド期間のログを取る必要がります。詳細コードは割愛しますが、ボックスプロットで可視化すると以下の結果となりました。

ホールド期間は利確の方が長く、中央値で34(=170分)という結果でした。標準偏差の係数0.744でエントリーするので、15分に1回程度の頻度でエントリーします。つまり買付単価の11倍ぐらいの原資で運用する必要があります。

今回最適化したパラメータで仮に1BTCで取引を繰り返すと225万円の収益だったようですが、現在1BTC/400万円程度のレートなのであまり現実的ではありません。笑

Coincheckの最低取引単位0.005 BTCで取引すると、原資としては余裕をもって40万円ぐらいあれば約1万円の利益になったようです。

アルゴリズムに改良が必要な点

ボリンジャーバンド単体では急な下落相場などでも頻繁にエントリーとロスカットを繰り返します。移動平均などから下落相場を察知してエントリーしないようにするような工夫が必要です。

まとめ

今回はボリンジャーバンドを使ったビットコインの売買アルゴリズムのパラメータをOptunaで最適化してみました。改良は必要ですが使えないこともない結果だと思います。

今回は学習に使ったデータ量が少ないので、もう少しOHLCVデータがたまってからまた挑戦してみます!

コメント