自動売買プログラムをカスタマイズして、高度な戦略に挑戦しよう【FXの自動売買プログラムをPythonで自作しよう⑧】

この記事には広告を含む場合があります。

記事内で紹介する商品を購入することで、当サイトに売り上げの一部が還元されることがあります。

これまでの記事ではシンプルな戦略を紹介してきましたが、徐々に高度な戦略に挑戦していきましょう。

自分で自動売買プログラムを作る大きなメリットは、自由度が高いところです。

この記事の内容を理解して、オリジナルの戦略を作れるようになりましょう。

2つのインジケーターを組み合わる

ひとつのインジケーターだけを使っていると、どうしてもダマシにあう確率が多くなってしまいます。

そこで複数のインジケーターを組み合わせることで、エントリーの精度を上げて行きたいと思います。

今回は、移動平均線とRSIを組み合わせます。そして、エントリーの条件は以下のようにします。

  • RSIが50以下 かつ 移動平均線のゴールデンクロス が発生した時に買いでエントリー
  • RSIが50以上 かつ 移動平均線のデッドクロス が発生した時に売りでエントリー

RSIは一般的に30以下で売られすぎ、70以上で買われすぎと言われますが、ゴールデンクロスやデッドクロスが発生したタイミングではその水準に満たない場合が多いです。この設定は、売られすぎの時に売らない、買われすぎの時に買わないようにするためのフィルターとして機能させます。

前回のプログラムにRSIを計算する関数の追加と複数の条件でシグナルを出すように変更していきましょう。

RSIを計算する関数

RSIは以下の計算式で求めることができます。

RSI = 直近N日間の上げ幅合計の絶対値 / (直近N日間の上げ幅合計の絶対値 + 下げ幅合計の絶対値) × 100

この計算式をプログラムで記述すると以下のようになります。

Python
# RSIを計算する関数
def calculate_rsi(df, period):

    df_close = df['close']
    df_close = df_close.astype(float)

    df_diff = df_close.diff()
    df_up = df_diff.copy()
    df_down = df_diff.copy()

    df_up[df_up < 0] = 0
    df_down[df_down > 0] = 0

    df_up_sum = df_up.rolling(window=period).mean()
    df_down_sum = df_down.abs().rolling(window=period).mean()

    df_rsi = df_up_sum / (df_up_sum + df_down_sum) * 100
    rsi = df_rsi.iat[-1]

    print('RSI:{}'.format(round(rsi, 2)))

    return rsi

引数は、価格データと期間です。戻り値は最新のRSIの値です。

複数の条件でのシグナル

前回のプログラムにRSIを計算する関数とエントリー条件の追加をしましょう。

Python
# 移動平均線のGC、DCとRSIを判定してエントリー注文を出す関数
def entry_signal(df):
    global flag

    df['MA-short'] = df['open'].rolling(MA_short_period).mean()
    df['MA-long'] = df['open'].rolling(MA_long_period).mean()
    rsi = calculate_rsi(df, 14)

    if df.iat[-2, 6] < df.iat[-2, 7] and df.iat[-1, 6] > df.iat[-1, 7] and rsi < 50:
        flag['order']['side'] = 'BUY'
        print('ゴールデンクロスが発生しました')

    elif df.iat[-2, 6] > df.iat[-2, 7] and df.iat[-1, 6] < df.iat[-1, 7] and rsi > 50:
        flag['order']['side'] = 'SELL'
        print('デッドクロスが発生しました')

    else:
        flag['order']['side'] = None

    return flag

RSIを計算する関数に価格のデータと計算する期間の14を渡します。

シグナルを判定するif文にandでRSIの条件を追加します。

オリジナルの戦略を運用する場合は、ここのif文でエントリーの条件を変えることができます。

損切りの価格に応じて発注する取引数量を調整する

底値から上昇してきたタイミングでゴールデンクロスが発生したとしましょう。

このゴールデンクロスがダマシだった場合は、底値をブレイクアウトして下落していきます。

この底値を損切り価格とした場合、リスク許容度の2%の損失となるように発注する取引数量の計算をしましょう。

ただし、資産残高の2%が損切り価格に満たなかった場合は、損失が2%になる価格を損切り価格とします。

Python
def calculate_lot(price_data):
    global flag

    balance = get_collateral()
    current_price = float(price_data.iat[-1, 3])

    # 買いエントリーの場合
    if flag['order']['side'] == 'BUY':
        stop = float(price.iloc[-12:, [4]]['low'].min())
        difference = current_price - stop
        limit = current_price + difference * risk_reward_ratio
        calc_lot = int(balance * trade_risk / difference)
        print('購入数量は{}です'.format(calc_lot))

        # 購入数量が10,000以下だった場合、購入数量を10,000に変更
        if calc_lot < 10000:
            stop = current_price - (balance * trade_risk / 10000)
            limit = current_price + (balance * trade_risk * risk_reward_ratio / 10000)
            calc_lot = 10000
            print('購入数量が10,000以下だったので、購入数量を10,000に変更しました')

    # 売りエントリーの場合
    elif flag['order']['side'] == 'SELL':
        stop = float(price.iloc[-12:, [3]]['high'].max())
        difference = stop - current_price
        limit = current_price - difference * risk_reward_ratio
        calc_lot = int(balance * trade_risk / difference)
        print('購入数は{}です'.format(calc_lot))

        # 購入数量が10,000以下だった場合、購入数量を10,000に変更
        if calc_lot < 10000:
            stop = current_price + (balance * trade_risk / 10000)
            limit = current_price - (balance * trade_risk * risk_reward_ratio / 10000)
            calc_lot = 10000
            print('購入数量が10,000以下だったので、購入数量を10,000に変更しました')

    max_lot = int((balance * 20) / current_price * 0.99)

    # 購入数量が購入可能数量を超えた場合は、購入数量を上限に変更
    if max_lot < calc_lot:
        calc_lot = max_lot
        print('購入可能数量を超えたので、購入数を{}に変更しました'.format(max_lot))

    return calc_lot, limit, stop

少しややこしいですが、次のような手順で計算しています。

  • 12時間前までで最も低い(高い)価格を抽出する。
  • その価格と現在の価格の差を計算する。
  • 価格差にリスクリワードレシオの係数を掛けて、現在の価格に足すことで利確価格とする。
  • 資産残高と許容リスクと価格差から取引数量を計算する。
  • 取引数量が最小注文数量の10,000以下だった場合は、損切り価格を変更する。
  • 取引数量が資産残高より多い場合は、上限に変更する。ただし、上限ギリギリだと価格の変動で購入できる数量を超えてしまう場合があるので、購入できる数量の99%とする。

サンプルコード

上記を踏まえて作成したプログラムは次の通りです。

Python
import requests
import json
import hmac
import hashlib
import time
from datetime import datetime
from datetime import timedelta
import pandas as pd

apiKey = 'YOUR_API_KEY'
secretKey = 'YOUR_SECRET_KEY'

# -----設定項目
interval = '1hour'  # 使用する時間軸
get_price_period = 10  # 四本値を取得する日数
MA_short_period = 5  # 移動平均線の短期の日数
MA_long_period = 15  # 移動平均線の長期の日数
trade_risk = 0.02  # 1トレードあたりで許容できる口座の損失(%)
risk_reward_ratio = 2  # リスクリワードレシオ


# 成行注文を出す関数
def market_order(size):
    global flag

    while True:
        try:
            timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
            method = 'POST'
            endPoint = 'https://forex-api.coin.z.com/private'
            path = '/v1/order'
            reqBody = {
                'symbol': 'USD_JPY',
                'side': flag['order']['side'],
                'size': str(size),
                'executionType': 'MARKET'
            }

            text = timestamp + method + path + json.dumps(reqBody)
            sign = hmac.new(bytes(secretKey.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()

            headers = {
                'API-KEY': apiKey,
                'API-TIMESTAMP': timestamp,
                'API-SIGN': sign
            }

            res = requests.post(endPoint + path, headers=headers, data=json.dumps(reqBody))
            print(json.dumps(res.json(), indent=2))

            return

        except Exception as e:
            print(f'action=market_order error={e}')
            print('10秒後にリトライします')
            time.sleep(10)


# OCO注文を出す関数
def oco_order(size, limit, stop):
    global flag

    if flag['order']['side'] == 'BUY':
        side = 'SELL'

    elif flag['order']['side'] == 'SELL':
        side = 'BUY'

    while True:
        try:
            timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
            method = 'POST'
            endPoint = 'https://forex-api.coin.z.com/private'
            path = '/v1/closeOrder'
            reqBody = {
                'symbol': 'USD_JPY',
                'side': side,
                'size': str(size),
                'executionType': 'OCO',
                'limitPrice': str(limit),
                'stopPrice': str(stop)
            }

            text = timestamp + method + path + json.dumps(reqBody)
            sign = hmac.new(bytes(secretKey.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()

            headers = {
                'API-KEY': apiKey,
                'API-TIMESTAMP': timestamp,
                'API-SIGN': sign
            }

            res = requests.post(endPoint + path, headers=headers, data=json.dumps(reqBody))
            print(json.dumps(res.json(), indent=2))

            return

        except Exception as e:
            print(f'action=market_order error={e}')
            print('10秒後にリトライします')
            time.sleep(10)


# ポジションを確認する関数
def check_positions():
    global flag

    timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
    method = 'GET'
    endPoint = 'https://forex-api.coin.z.com/private'
    path = '/v1/positionSummary'

    text = timestamp + method + path
    sign = hmac.new(bytes(secretKey.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()
    parameters = {
        'symbol': 'USD_JPY'
    }

    headers = {
        'API-KEY': apiKey,
        'API-TIMESTAMP': timestamp,
        'API-SIGN': sign
    }

    res = requests.get(endPoint + path, headers=headers, params=parameters)
    positions = eval(json.dumps(res.json(), indent=2))
    if not positions['data']['list']:
        flag['position']['price'] = 0.0
        flag['position']['lot'] = 0
        flag['position']['side'] = None
        flag['position']['exist'] = False

        print('現在ポジションは存在しません')

        return

    flag['position']['price'] = float(positions['data']['list'][0]['averagePositionRate'])
    flag['position']['lot'] = float(positions['data']['list'][0]['sumPositionSize'])
    flag['position']['side'] = positions['data']['list'][0]['side']
    flag['position']['exist'] = True

    return


# 四本値を取得する関数
def get_price_data():
    while True:
        try:
            price_data = []
            response_data = []

            # 0時から4時に最新レートが取得できない仕様の対策
            if (datetime.today().hour == 0 or
                    datetime.today().hour == 1 or
                    datetime.today().hour == 2 or
                    datetime.today().hour == 3 or
                    datetime.today().hour == 4 or
                    datetime.today().hour == 5):
                today = datetime.today().date()

            else:
                today = datetime.today().date() + timedelta(days=1)

            for i in range(get_price_period):
                period = i - get_price_period
                date = (today + timedelta(days=period)).strftime('%Y%m%d')
                endPoint = 'https://forex-api.coin.z.com/public'
                path = '/v1/klines?symbol=USD_JPY&priceType=ASK&interval=' + interval + '&date=' + date

                response = requests.get(endPoint + path)
                response_data.append(response.json())

            for j in range(len(response_data)):
                for i in response_data[j]['data']:
                    price_data.append({'openTime': i['openTime'],
                                       'openTime_dt': datetime.fromtimestamp(int(i['openTime'][:-3]))
                                      .strftime('%Y/%m/%d %H:%M'),
                                       'open': i['open'],
                                       'high': i['high'],
                                       'low': i['low'],
                                       'close': i['close']})

            print('TIME: ' + str(price_data[-1]['openTime_dt']) + ' H: ' + str(price_data[-1]['high']) + ' L: ' +
                  str(price_data[-1]['low']) + ' C: ' + str(price_data[-1]['close']))

            df_price_data = pd.DataFrame(price_data)

            return df_price_data

        except Exception as e:
            print(f'action=get_price_data error={e}')
            print('10秒後にリトライします')
            time.sleep(10)


# 資産残高を取得する関数
def get_collateral():
    while True:
        try:
            timestamp = '{0}000'.format(int(time.mktime(datetime.now().timetuple())))
            method = 'GET'
            endPoint = 'https://forex-api.coin.z.com/private'
            path = '/v1/account/assets'

            text = timestamp + method + path
            sign = hmac.new(bytes(secretKey.encode('ascii')), bytes(text.encode('ascii')), hashlib.sha256).hexdigest()

            headers = {
                'API-KEY': apiKey,
                'API-TIMESTAMP': timestamp,
                'API-SIGN': sign
            }

            res = requests.get(endPoint + path, headers=headers)
            resp = eval(json.dumps(res.json(), indent=2))
            collateral = int(float(resp['data']['equity']))
            print('現在のアカウント残高は{}円です'.format(collateral))
            return collateral

        except Exception as e:
            print(f'action=get_collateral error={e}')
            print('10秒後にリトライします')
            time.sleep(10)


# RSIを計算する関数
def calculate_rsi(df, period):

    df_close = df['close']
    df_close = df_close.astype(float)

    df_diff = df_close.diff()
    df_up = df_diff.copy()
    df_down = df_diff.copy()

    df_up[df_up < 0] = 0
    df_down[df_down > 0] = 0

    df_up_sum = df_up.rolling(window=period).mean()
    df_down_sum = df_down.abs().rolling(window=period).mean()

    df_rsi = df_up_sum / (df_up_sum + df_down_sum) * 100
    rsi = df_rsi.iat[-1]

    print('RSI:{}'.format(round(rsi, 2)))

    return rsi


# 移動平均線のGC、DCとRSIを判定してエントリー注文を出す関数
def entry_signal(df):
    global flag

    df['MA-short'] = df['open'].rolling(MA_short_period).mean()
    df['MA-long'] = df['open'].rolling(MA_long_period).mean()
    rsi = calculate_rsi(df, 14)

    if df.iat[-2, 6] < df.iat[-2, 7] and df.iat[-1, 6] > df.iat[-1, 7] and rsi < 50:
        flag['order']['side'] = 'BUY'
        print('ゴールデンクロスが発生しました')

    elif df.iat[-2, 6] > df.iat[-2, 7] and df.iat[-1, 6] < df.iat[-1, 7] and rsi > 50:
        flag['order']['side'] = 'SELL'
        print('デッドクロスが発生しました')

    else:
        flag['order']['side'] = None

    return flag


# 利確と損切りの価格を計算する関数
def calculate_settlement_price():
    global flag

    profitable_price = 0
    stop_loss_price = 0
    collateral = get_collateral()

    # 買いエントリーの場合
    if flag['position']['side'] == 'BUY':
        profitable_price = round(float(flag['position']['price']) + (collateral * trade_risk * risk_reward_ratio / order_size), 3)
        stop_loss_price = round(float(flag['position']['price']) - (collateral * trade_risk / order_size), 3)

    # 売りエントリーの場合
    elif flag['position']['side'] == 'SELL':
        profitable_price = round(float(flag['position']['price']) - (collateral * trade_risk * risk_reward_ratio / order_size), 3)
        stop_loss_price = round(float(flag['position']['price']) + (collateral * trade_risk / order_size), 3)

    return profitable_price, stop_loss_price


# 取引数量と利確価格と損切り価格を計算する関数
def calculate_lot(price_data):
    global flag

    balance = get_collateral()
    current_price = float(price_data.iat[-1, 3])

    # 買いエントリーの場合
    if flag['order']['side'] == 'BUY':
        stop = float(price.iloc[-12:, [4]]['low'].min())
        difference = current_price - stop
        limit = current_price + difference * risk_reward_ratio
        calc_lot = int(balance * trade_risk / difference)
        print('購入数量は{}です'.format(calc_lot))

        # 購入数量が10,000以下だった場合、購入数量を10,000に変更
        if calc_lot < 10000:
            stop = current_price - (balance * trade_risk / 10000)
            limit = current_price + (balance * trade_risk * risk_reward_ratio / 10000)
            calc_lot = 10000
            print('購入数量が10,000以下だったので、購入数量を10,000に変更しました')

    # 売りエントリーの場合
    elif flag['order']['side'] == 'SELL':
        stop = float(price.iloc[-12:, [3]]['high'].max())
        difference = stop - current_price
        limit = current_price - difference * risk_reward_ratio
        calc_lot = int(balance * trade_risk / difference)
        print('購入数は{}です'.format(calc_lot))

        # 購入数量が10,000以下だった場合、購入数量を10,000に変更
        if calc_lot < 10000:
            stop = current_price + (balance * trade_risk / 10000)
            limit = current_price - (balance * trade_risk * risk_reward_ratio / 10000)
            calc_lot = 10000
            print('購入数量が10,000以下だったので、購入数量を10,000に変更しました')

    max_lot = int((balance * 20) / current_price * 0.99)

    # 購入数量が購入可能数量を超えた場合は、購入数量を上限に変更
    if max_lot < calc_lot:
        calc_lot = max_lot
        print('購入可能数量を超えたので、購入数を{}に変更しました'.format(max_lot))

    return calc_lot, limit, stop


# メイン処理
flag = {
    'order': {
        'side': None,
    },
    'position': {
        'exist': False,
        'side': None,
        'price': 0.0,
        'lot': 0,
    },
}

check_positions()

while True:

    # ポジションがある場合
    if flag['position']['exist'] is True:
        time.sleep(10)
        check_positions()
        continue

    # ポジションがない場合
    else:
        price = get_price_data()
        flag = entry_signal(price)

        # シグナルが発生した時
        if flag['order']['side'] is not None:
            order_size, limit_price, stop_price = calculate_lot(price)
            market_order(order_size)
            time.sleep(5)
            check_positions()
            oco_order(order_size, limit_price, stop_price)
            time.sleep(3600)

        else:
            time.sleep(10)

おわりに

お疲れさまです。

プログラムのカスタマイズについて、イメージはできたでしょうか。

エントリーのシグナルに活用できるインジケーターの種類はたくさんあります。

ご自身の戦略に合ったインジケーターを実装してみましょう。

次回は、考案した戦略の有効性を検証するためのバックテストを紹介します。