Appearance
示例策略
多股票 RSI 算法示例
python
import talib
def init(context):
context.s1 = "000001.XSHE"
context.s2 = "601988.XSHG"
context.s3 = "000068.XSHE"
context.stocks = [context.s1, context.s2, context.s3]
context.TIME_PERIOD = 14
context.HIGH_RSI = 85
context.LOW_RSI = 30
context.ORDER_PERCENT = 0.3
def handle_bar(context, bar_dict):
# 对我们选中的股票集合进行loop,运算每一只股票的RSI数值
for stock in context.stocks:
# 读取历史数据
prices = history_bars(stock,context.TIME_PERIOD+1, '1d', 'close')
# 用Talib计算RSI值
rsi_data = talib.RSI(prices, timeperiod=context.TIME_PERIOD)[-1]
cur_position = context.portfolio.positions[stock].quantity
# 用剩余现金的30%来购买新的股票
target_available_cash = context.portfolio.cash * context.ORDER_PERCENT
# 当RSI大于设置的上限阀值,清仓该股票
if rsi_data > context.HIGH_RSI and cur_position > 0:
order_target_value(stock, 0)
# 当RSI小于设置的下限阀值,用剩余cash的一定比例补仓该股
if rsi_data < context.LOW_RSI:
logger.info("target available cash caled: " + str(target_available_cash))
# 如果剩余的现金不够一手 - 100shares,那么会被ricequant 的order management system reject掉
order_value(stock, target_available_cash)商品期货跨品种配对交易
该策略为分钟级别回测。运用了简单的移动平均以及布林带(Bollinger Bands)作为交易信号产生源。有关对冲比率(HedgeRatio)的确定,您可以在我们的研究平台上面通过 import statsmodels.api as sm 引入 statsmodels 中的 OLS 方法进行线性回归估计。具体估计窗口,您可以根据自己策略需要自行选择。
策略中的移动窗口选择为 60 分钟,即在每天开盘 60 分钟内不做任何交易,积累数据计算移动平均值。当然,这一移动窗口也可以根据自身需要进行灵活选择。下面例子中使用了黄金与白银两种商品期货进行配对交易。简单起见,例子中期货的价格并未做对数差处理。
python
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import numpy as np
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
context.s1 = 'AG1612'
context.s2 = 'AU1612'
# 设置全局计数器
context.counter = 0
# 设置滚动窗口
context.window = 60
# 设置对冲手数,通过研究历史数据进行价格序列回归得到该值
context.ratio = 15
context.up_cross_up_limit = False
context.down_cross_down_limit = False
# 设置入场临界值
context.entry_score = 2
# 初始化时订阅合约行情。订阅之后的合约行情会在handle_bar中进行更新
subscribe([context.s1, context.s2])
# before_trading此函数会在每天交易开始前被调用,当天只会被调用一次
def before_trading(context):
# 样例商品期货在回测区间内有夜盘交易,所以在每日开盘前将计数器清零
context.counter = 0
# 你选择的期货数据更新将会触发此段逻辑,例如日线或分钟线更新
def handle_bar(context, bar_dict):
# 获取当前一对合约的仓位情况。如尚未有仓位,则对应持仓量都为0
position_a = context.portfolio.positions[context.s1]
position_b = context.portfolio.positions[context.s2]
context.counter += 1
# 当累积满一定数量的bar数据时候,进行交易逻辑的判断
if context.counter > context.window:
# 获取当天历史分钟线价格队列
price_array_a = history_bars(context.s1, context.window, '1m', 'close')
price_array_b = history_bars(context.s2, context.window, '1m', 'close')
# 计算价差序列、其标准差、均值、上限、下限
spread_array = price_array_a - context.ratio * price_array_b
std = np.std(spread_array)
mean = np.mean(spread_array)
up_limit = mean + context.entry_score * std
down_limit = mean - context.entry_score * std
# 获取当前bar对应合约的收盘价格并计算价差
price_a = bar_dict[context.s1].close
price_b = bar_dict[context.s2].close
spread = price_a - context.ratio * price_b
# 如果价差低于预先计算得到的下限,则为建仓信号,'买入'价差合约
if spread <= down_limit and not context.down_cross_down_limit:
# 可以通过logger打印日志
logger.info('spread: {}, mean: {}, down_limit: {}'.format(spread, mean, down_limit))
logger.info('创建买入价差中...')
# 获取当前剩余的应建仓的数量
qty_a = 1 - position_a.buy_quantity
qty_b = context.ratio - position_b.sell_quantity
# 由于存在成交不超过下一bar成交量25%的限制,所以可能要通过多次发单成交才能够成功建仓
if qty_a > 0:
buy_open(context.s1, qty_a)
if qty_b > 0:
sell_open(context.s2, qty_b)
if qty_a == 0 and qty_b == 0:
# 已成功建立价差的'多仓'
context.down_cross_down_limit = True
logger.info('买入价差仓位创建成功!')
# 如果价差向上回归移动平均线,则为平仓信号
if spread >= mean and context.down_cross_down_limit:
logger.info('spread: {}, mean: {}, down_limit: {}'.format(spread, mean, down_limit))
logger.info('对买入价差仓位进行平仓操作中...')
# 由于存在成交不超过下一bar成交量25%的限制,所以可能要通过多次发单成交才能够成功建仓
qty_a = position_a.buy_quantity
qty_b = position_b.sell_quantity
if qty_a > 0:
sell_close(context.s1, qty_a)
if qty_b > 0:
buy_close(context.s2, qty_b)
if qty_a == 0 and qty_b == 0:
context.down_cross_down_limit = False
logger.info('买入价差仓位平仓成功!')
# 如果价差高于预先计算得到的上限,则为建仓信号,'卖出'价差合约
if spread >= up_limit and not context.up_cross_up_limit:
logger.info('spread: {}, mean: {}, up_limit: {}'.format(spread, mean, up_limit))
logger.info('创建卖出价差中...')
qty_a = 1 - position_a.sell_quantity
qty_b = context.ratio - position_b.buy_quantity
if qty_a > 0:
sell_open(context.s1, qty_a)
if qty_b > 0:
buy_open(context.s2, qty_b)
if qty_a == 0 and qty_b == 0:
context.up_cross_up_limit = True
logger.info('卖出价差仓位创建成功')
# 如果价差向下回归移动平均线,则为平仓信号
if spread < mean and context.up_cross_up_limit:
logger.info('spread: {}, mean: {}, up_limit: {}'.format(spread, mean, up_limit))
logger.info('对卖出价差仓位进行平仓操作中...')
qty_a = position_a.sell_quantity
qty_b = position_b.buy_quantity
if qty_a > 0:
buy_close(context.s1, qty_a)
if qty_b > 0:
sell_close(context.s2, qty_b)
if qty_a == 0 and qty_b == 0:
context.up_cross_up_limit = False
logger.info('卖出价差仓位平仓成功!')期权回测样例
通过沪深 300 股指期权认购认沽评价构造指数的空头,结合股沪深 300 股指期货多头进行对冲买入并持有策略。
python
import rqalpha_plus
import rqalpha_mod_option
__config__ = {
"base": {
"start_date": "20200101",
"end_date": "20200221",
'frequency': '1d',
"accounts": {
# 股指期权使用 future 账户
"future": 1000000
}
},
"mod": {
"option": {
"enabled": True,
"exercise_slippage": 0
},
'sys_simulation': {
'enabled': True,
'matching_type': 'current_bar',
'volume_limit': False,
'volume_percent': 0,
},
'sys_analyser': {
'plot': True,
},
}
}
def init(context):
context.s1 = 'IO2002C3900'
context.s2 = 'IO2002P3900'
context.s3 = 'IF2002'
subscribe(context.s1)
subscribe(context.s2)
subscribe(context.s3)
context.counter = 0
print('******* INIT *******')
def before_trading(context):
pass
def handle_bar(context, bar_dict):
context.counter += 1
if context.counter == 1:
sell_open(context.s1, 3)
buy_open(context.s2, 3)
buy_open(context.s3, 1)
def after_trading(context):
pass转债平价溢价率作为信号的分钟回测
python
import numpy as np
__config__ = {
"base": {
"start_date": "20180601",
"end_date": "20180610",
'frequency': '1m',
"accounts": {
"stock": 1000000 # 可转债使用 stock 账号
}
},
"mod": {
'sys_simulation': {
'enabled': True,
'matching_type': 'current_bar',
# 是否允许涨跌停状态下买入、卖出
'price_limit': False,
# 是否开启成交量限制
'volume_limit': False,
},
"convertible": {
"enabled": True,
# 设置转债回测的佣金费率
"commission_rate": 0,
# 设置转债回测的最小佣金
"min_commission": 0,
},
"sys_analyser": {
"plot": True,
},
}
}
def init(context):
context.o = "110030.XSHG"
subscribe(context.o)
context.count = 0
context.exercise_flag = False
context.stock_id = instruments(context.o).stock_code
context.conversion_value = 0
def handle_bar(context, bar_dict):
context.count += 1
cb_price = bar_dict[context.o].close
stock_price = bar_dict[context.stock_id].close
# 转债的转股价值
context.conversion_value = 100/7.24 * stock_price
# 转债的平价溢价率
ratio = cb_price / context.conversion_value - 1
quantity = get_position(context.o, POSITION_DIRECTION.LONG).quantity
if ratio < 0.31 and quantity < 2000:
print('当前可转债平价溢价率为 {},买入转债'.format(ratio))
order_shares(context.o, 100)
if ratio > 0.36 and quantity > 0:
print('当前可转债平价溢价率为 {},卖出转债'.format(ratio))
order_shares(context.o, -1*quantity)公募基金回测简单样例
python
INIT_CASH = 100000
__config__ = {
"base": {
"start_date": "20190105",
"end_date": "20200809",
"accounts": {
"stock": INIT_CASH
}
},
"mod": {
"sys_progress": {
"enabled": True,
"show": True
}, "sys_analyser": {
"enabled": True,
"plot":True
}, "fund": {
# 基金申购前端费率
"fee_ratio": 0.015,
# 基金份额到账时间
"subscription_receiving_days": 1,
# 赎回金回款时间
"redemption_receiving_days": 3,
# 申购金额上下限检查限制
"subscription_limit": True,
# 申购状态检查限制
"status_limit": True,
},
'sys_simulation': {
'enabled': True,
'matching_type': 'current_bar',
},
}
}
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
logger.info("init")
context.s1 = "004241"
context.fired = False
def before_trading(context):
pass
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
if not context.fired:
subscribe_value(context.s1,INIT_CASH)
context.fired = True
if context.portfolio.total_returns > 0.4 or context.portfolio.total_returns < -0.2:
context.quantity = get_position(context.s1).quantity
if context.quantity > 0:
redeem_shares(context.s1,context.quantity)黄金现货回测样例
- 引入黄金现货 AUTD.SGEX 合约和黄金期货主力合约 AU2006 进行配对交易。
- 两个合约的合约乘数相同,都是 1000,所以价差数量比例为 1:1 。合约乘数可以通过 rqdatac.instruments 查询到,对应字段为 contract_multiplier
- 计算期货、现货历史价差的最大最小值,如果当前价差超过历史 10 日最大价差,认为价差即将收敛,做多现货做空期货。
python
__config__ = {
'base': {
'start_date': '20200101',
'end_date': '20200321',
'frequency': '1d',
# 保证金倍率。基于基础保证金水平进行调整
'margin_multiplier': 1,
# 商品现货回测这里使用 stock 账户
'accounts': {
'stock': 1000000,
'future': 1000000,
},
# 期货交易佣金设置
'future_info': {
# 期货品种,如不设置,则按照默认费用进行收取
'AU': {
# 平仓费率
'close_commission_ratio': 0.00005,
# 开仓费率
'open_commission_ratio': 0.00005,
# 平今费率
'close_commission_today_ratio': 0,
# BY_MONEY 为按照名义价值收取, BY_VOLUME 为根据成交合约张数收取
'commission_type': 'BY_MONEY',
},
},
},
'mod': {
'spot': {
'enabled': True,
'commission_multiplier': 0,
},
'sys_simulation': {
'enabled': True,
# 是否开启信号模式。如果开启,限价单将按照指定价格成交,并且不受撮合成交量限制
'signal': False,
'matching_type': 'current_bar',
'volume_limit': True,
'volume_percent': 0.001,
},
'sys_analyser': {
'plot': True,
},
}
}
def init(context):
context.s1 = 'AUTD.SGEX'
context.s2 = 'AU2006'
subscribe(context.s1)
context.counter = 0
def handle_bar(context, bar_dict):
# 通过 bar_dict 获得当日数据计算当日价差
current_spread = bar_dict[context.s2].close - bar_dict[context.s1].close
# 通过 history_bars 获得历史价格序列,计算移动窗口历史价差的最大、最小值
spot_price = history_bars(context.s1, 10, '1d', 'close')
future_price = history_bars(context.s2, 10, '1d', 'close')
max_spread = max(future_price - spot_price)
min_spread = min(future_price - spot_price)
if current_spread >= max_spread:
print('当前价差为 {} 大于过去10天历史最大价差 {}, 买入现货卖出期货'.format(current_spread, max_spread))
buy_open(context.s1, 5)
sell_open(context.s2, 5)
if current_spread < min_spread:
print('当前价差为 {} 小于过去10天历史最小价差 {}, 买入期货卖出现货'.format(current_spread, min_spread))
buy_open(context.s2, 5)
sell_open(context.s1, 5)优化器回测样例
对于回测中使用优化器的场景,rqalpha 做了简单封装,用户无需传入时间参数, 策略中的优化器 API 参数见:portfolio_optimize
python
__config__ = {
'base': {
'accounts': {
'stock': 10000000,
},
'start_date': "20170101",
'end_date': "20200101",
'frequency': '1d',
},
"mod": {
"optimizer2": {
"enabled": True,
},
'sys_analyser': {
'enabled': True,
'benchmark': '000300.XSHG',
},
}
}
def rebalance(context, bar_dict):
cons = [
WildcardIndustryConstraint(lower_limit=-0.01, upper_limit=0.1, relative=True,
classification=IndustryClassification.ZX, hard=False),
WildcardStyleConstraint(lower_limit=-0.3, upper_limit=0.3, relative=True, hard=False)
]
pool = [s for s in index_components('000906.XSHG') if not is_suspended(s)]
s = portfolio_optimize(pool, cons=cons, benchmark='000300.XSHG')
s = s[s > 0.0001]
for order_book_id, position in context.stock_account.positions.items():
if order_book_id not in s:
order_target_value(order_book_id, 0)
s = s.sort_values()
portfolio_value = context.portfolio.total_value
for order_book_id, weight in s.items():
order_target_value(order_book_id, portfolio_value * weight)
def init(context):
scheduler.run_monthly(rebalance, 1)根据本地持仓权重运行回测范例
这里的样例与前面的精简版相比考虑了更复杂的场景,例如若调仓当天因为风控等原因发单失败,第二个交易日会继续发单,仅供用户参考。
python
import pandas
import numpy
from rqalpha.apis import *
__config__ = {
"base": {
"start_date": "20191201",
"end_date": "20200930",
"accounts": {
"stock": 100000000,
},
},
}
def read_tables_df():
# need pandas version 0.21.0+
# need xlrd
d_type = {'NAME': numpy.str_, 'TARGET_WEIGHT': numpy.float64, 'TICKER': numpy.str_, 'TRADE_DT': numpy.int32}
columns_name = ["TRADE_DT", "TICKER", "NAME", "TARGET_WEIGHT"]
df = pandas.read_excel(r'调仓权重样例.xlsx', dtype=d_type)
if not df.columns.isin(d_type.keys()).all():
raise TypeError("xlsx文件格式必须有{}四列".format(list(d_type.keys())))
for date, weight_data in df.groupby("TRADE_DT"):
if round(weight_data["TARGET_WEIGHT"].sum(), 6) > 1:
raise ValueError("权重之和出错,请检查{}日的权重".format(date))
# 转换为米筐order_book_id
df['TICKER'] = df['TICKER'].apply(lambda x: rqdatac.id_convert(x) if ".OF" not in x else x)
return df
def on_order_failure(context, event):
# 拒单时,未成功下单的标的放入第二天下单队列中
order_book_id = event.order.order_book_id
context.next_target_queue.append(order_book_id)
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
import rqalpha
import rqalpha_mod_fund
df = read_tables_df() # 调仓权重文件
context.target_weight = df
context.adjust_days = set(context.target_weight.TRADE_DT.to_list()) # 需要调仓的日期
context.target_queue = [] # 当日需要调仓标的队列
context.next_target_queue = [] # 次日需要调仓标的队列
context.current_target_table = dict() # 当前持仓权重比例
subscribe_event(EVENT.ORDER_UNSOLICITED_UPDATE, on_order_failure)
# before_trading此函数会在每天策略交易开始前被调用,当天只会被调用一次
def before_trading(context):
def dt_2_int_dt(dt):
return dt.year * 10000 + dt.month * 100 + dt.day
dt = dt_2_int_dt(context.now)
if dt in context.adjust_days:
today_df = context.target_weight[context.target_weight.TRADE_DT == dt].set_index("TICKER").sort_values(
"TARGET_WEIGHT")
context.target_queue = today_df.index.to_list() # 更新需要调仓的队列
context.current_target_table = today_df["TARGET_WEIGHT"].to_dict()
context.next_target_queue.clear()
# 非目标持仓 需要清空
for i in context.portfolio.positions.keys():
if i not in context.target_queue:
# 非目标权重持仓 需要清空
context.target_queue.insert(0, i)
else:
# 当前持仓权重大于目标持仓权重 需要优先卖出获得资金
equity = context.portfolio.positions[i].long.equity + context.portfolio.positions[i].short.equity
total_value = context.portfolio.accounts[instruments(i).account_type].total_value
current_percent = equity / total_value
if current_percent > context.current_target_table[i]:
context.target_queue.remove(i)
context.target_queue.insert(0, i)
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
if context.target_queue:
for _ticker in context.target_queue:
_target_weight = context.current_target_table.get(_ticker, 0)
o = order_target_percent(_ticker, round(_target_weight, 6))
if o is None:
logger.info("[{}]下单失败,该标将于次日下单".format(_ticker))
context.next_target_queue.append(_ticker)
else:
logger.info("[{}]下单成功,现下占比{}%".format(_ticker, round(_target_weight, 6) * 100))
# 下单完成 下单失败的的在队列context.next_target_queue中
context.target_queue.clear()
# after_trading函数会在每天交易结束后被调用,当天只会被调用一次
def after_trading(context):
if context.next_target_queue:
context.target_queue += context.next_target_queue
context.next_target_queue.clear()
if context.target_queue:
logger.info("未完成调仓的标的:{}".format(context.target_queue))
if __name__ == '__main__':
from rqalpha_plus import run_func
run_func(init=init, before_trading=before_trading, after_trading=after_trading, handle_bar=handle_bar,
config=__config__)