Ricequant Docs
  • RQSDK快速上手
  • RQData - 金融数据 API

    • Python API文档
    • Http API文档
  • RQAlpha Plus - 回测框架

    • 使用教程
    • API使用手册 (opens new window)
  • RQFactor - 因子投研工具

    • 使用教程
    • API使用手册
  • RQOptimizer - 股票组合优化器

    • 使用教程
    • API使用手册 (opens new window)
  • RQPAttr - 绩效归因工具

    • 使用教程
  • RQAMS 用户说明文档
  • RQAMSC 说明文档
量化平台文档
米筐帮助中心
返回官网
  • RQSDK快速上手
  • RQData - 金融数据 API

    • Python API文档
    • Http API文档
  • RQAlpha Plus - 回测框架

    • 使用教程
    • API使用手册 (opens new window)
  • RQFactor - 因子投研工具

    • 使用教程
    • API使用手册
  • RQOptimizer - 股票组合优化器

    • 使用教程
    • API使用手册 (opens new window)
  • RQPAttr - 绩效归因工具

    • 使用教程
  • RQAMS 用户说明文档
  • RQAMSC 说明文档
量化平台文档
米筐帮助中心
返回官网
  • 量化平台文档

    • 简介
    • 投资研究
    • 因子研究
    • 回测
    • 模拟交易
    • 策略协作
    • 技术分析
    • 常见问题
    • 策略开发API
    • 策略实例
      • 第一个策略-买入&持有
      • Golden Cross 算法示例
      • 单股票 MACD 算法示例
      • 多股票 RSI 算法示例
      • 财务数据策略
      • 根据收益和市盈率定期调仓策略
      • 海龟交易系统
      • 股指期货 MACD 日回测
      • 商品期货跨品种配对交易
      • Alpha 策略----多因子对冲
    • 外部数据和 Python 模块

# 策略实例

在下面部分我们列举了一些常用的算法范例,您也可以通过右上角的 clone 按钮很方便的把范例算法复制到自己的算法列表中进行调试、更改甚至模拟、真实交易。

# 第一个策略-买入&持有

万事开头难,这是一个最简单的策略:在回测开始的第一天买入资金量的 100%的平安银行并且一直持有。可以通过右上角的 clone 按钮很方便的把范例算法复制到自己的算法列表中, 您可以更改一下股票代码以及回测时间、金钱等就可以很快的知道某个时期您测试某个股票的表现了,各种风险数值也可以很快地计算出来。

clone
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
    context.s1 = "000001.XSHE"
    # 是否已发送了order
    context.fired = False
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
    # 开始编写你的主要的算法逻辑
    # bar_dict[order_book_id] 可以拿到某个证券的bar信息
    # context.portfolio 可以拿到现在的投资组合状态信息
    # 使用order_shares(id_or_ins, amount)方法进行落单
    # TODO: 开始编写你的算法吧!
    if not context.fired:
        # order_percent并且传入1代表买入该股票并且使其占有投资组合的100%
        order_percent(context.s1, 1)
        context.fired = True

# Golden Cross 算法示例

以下是一个我们使用 TALib 在我们的平台上编写的 golden cross 算法的示例,使用了 simple moving average 方法:

clone
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import talib
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
    context.s1 = "000001.XSHE"
    # 设置这个策略当中会用到的参数,在策略中可以随时调用,这个策略使用长短均线,我们在这里设定长线和短线的区间,在调试寻找最佳区间的时候只需要在这里进行数值改动
    context.SHORTPERIOD = 20
    context.LONGPERIOD = 120
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
    # 开始编写你的主要的算法逻辑
    # bar_dict[order_book_id] 可以拿到某个证券的bar信息
    # context.portfolio 可以拿到现在的投资组合状态信息
    # 使用order_shares(id_or_ins, amount)方法进行落单
    # TODO: 开始编写你的算法吧!
    # 因为策略需要用到均线,所以需要读取历史数据
    prices = history_bars(context.s1,context.LONGPERIOD+1, '1d', 'close')
    # 使用talib计算长短两根均线,均线以array的格式表达
    short_avg = talib.SMA(prices, context.SHORTPERIOD)
    long_avg = talib.SMA(prices, context.LONGPERIOD)
    plot("short avg", short_avg[-1])
    plot("long avg", long_avg[-1])
    # 计算现在portfolio中股票的仓位
    cur_position = context.portfolio.positions[context.s1].quantity
    # 计算现在portfolio中的现金可以购买多少股票
    shares = context.portfolio.cash/bar_dict[context.s1].close
    # 如果短均线从上往下跌破长均线,也就是在目前的bar短线平均值低于长线平均值,而上一个bar的短线平均值高于长线平均值
    if short_avg[-1] - long_avg[-1] < 0 and short_avg[-2] - long_avg[-2] > 0 and cur_position > 0:
        # 进行清仓
        order_target_value(context.s1, 0)
    # 如果短均线从下往上突破长均线,为入场信号
    if short_avg[-1] - long_avg[-1] > 0 and short_avg[-2] - long_avg[-2] < 0:
        # 满仓入股
        order_shares(context.s1, shares)

# 单股票 MACD 算法示例

以下是一个我们使用 TALib 在我们的平台上编写的单股票 MACD 算法示例,使用了 TALib 的 MACD 方法:

clone
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import talib
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
    context.s1 = "000001.XSHE"
    # 使用MACD需要设置长短均线和macd平均线的参数
    context.SHORTPERIOD = 12
    context.LONGPERIOD = 26
    context.SMOOTHPERIOD = 9
    context.OBSERVATION = 100
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
    # 开始编写你的主要的算法逻辑
    # bar_dict[order_book_id] 可以拿到某个证券的bar信息
    # context.portfolio 可以拿到现在的投资组合状态信息
    # 使用order_shares(id_or_ins, amount)方法进行落单
    # TODO: 开始编写你的算法吧!
    # 读取历史数据,使用sma方式计算均线准确度和数据长度无关,但是在使用ema方式计算均线时建议将历史数据窗口适当放大,结果会更加准确
    prices = history_bars(context.s1,context.OBSERVATION,'1d','close')
    # 用Talib计算MACD取值,得到三个时间序列数组,分别为macd, signal 和 hist
    macd, signal, hist = talib.MACD(prices, context.SHORTPERIOD,
                                    context.LONGPERIOD, context.SMOOTHPERIOD)
    plot("macd", macd[-1])
    plot("macd signal", signal[-1])
    # macd 是长短均线的差值,signal是macd的均线,使用macd策略有几种不同的方法,我们这里采用macd线突破signal线的判断方法
    # 如果macd从上往下跌破macd_signal
    if macd[-1] - signal[-1] < 0 and macd[-2] - signal[-2] > 0:
        # 计算现在portfolio中股票的仓位
        curPosition = context.portfolio.positions[context.s1].quantity
        #进行清仓
        if curPosition > 0:
            order_target_value(context.s1, 0)
    # 如果短均线从下往上突破长均线,为入场信号
    if macd[-1] - signal[-1] > 0 and macd[-2] - signal[-2] < 0:
        # 满仓入股
        order_target_percent(context.s1, 1)

# 多股票 RSI 算法示例

以下是一个我们使用 TALib 在我们的平台上编写的多股票 RSI 算法示例,使用了 TALib 的 RSI 方法:

clone
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import talib
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
    # 选择我们感兴趣的股票
    context.s1 = "000001.XSHE"
    context.s2 = "601988.XSHG"
    context.s3 = "000068.XSHE"
    context.stocks = [context.s1, context.s2, context.s3]
    context.TIME_PERIOD = 14
    context.HIGH_RSI = 85
    context.LOW_RSI = 30
    context.ORDER_PERCENT = 0.3
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
    # 开始编写你的主要的算法逻辑
    # bar_dict[order_book_id] 可以拿到某个证券的bar信息
    # context.portfolio 可以拿到现在的投资组合状态信息
    # 使用order_shares(id_or_ins, amount)方法进行落单
    # TODO: 开始编写你的算法吧!
    # 对我们选中的股票集合进行loop,运算每一只股票的RSI数值
    for stock in context.stocks:
        # 读取历史数据
        prices = history_bars(stock,context.TIME_PERIOD+1, '1d', 'close')
        # 用Talib计算RSI值
        rsi_data = talib.RSI(prices, timeperiod=context.TIME_PERIOD)[-1]
        cur_position = context.portfolio.positions[stock].quantity
        # 用剩余现金的30%来购买新的股票
        target_available_cash = context.portfolio.cash * context.ORDER_PERCENT
        # 当RSI大于设置的上限阀值,清仓该股票
        if rsi_data > context.HIGH_RSI and cur_position > 0:
            order_target_value(stock, 0)
        # 当RSI小于设置的下限阀值,用剩余cash的一定比例补仓该股
        if rsi_data < context.LOW_RSI:
            logger.info("target available cash caled: " + str(target_available_cash))
            # 如果剩余的现金不够一手 - 100shares,那么会被ricequant 的order management system reject掉
            order_value(stock, target_available_cash)

# 财务数据策略

在回测开始前,通过查询回测开始当天的财务数据,获得市盈率大于 55 且小于 60,营业总收入前 10 的股票,然后将所有资金平摊到这 10 个股票 Buy & Hold 的策略。您可以 clone 之后自行修改,通过查询回测开始当天的其它财务数据指标来筛选股票。但是注意需要调整资金量到¥ 300,000 以上才会有比较好的落单效果(否则资金量不足以满足买入如此多股票组成的一篮子投资组合)

clone
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
    # 查询revenue前十名的公司的股票并且他们的pe_ratio在55和60之间。打fundamentals的时候会有auto-complete方便写查询代码。
    stocks = all_instruments('CS').order_book_id
    fundamental_df=get_factor(stocks,['pe_ratio','revenue'])
    fundamental_df = fundamental_df[(fundamental_df['pe_ratio'] > 55) & (fundamental_df['pe_ratio'] < 60)]
    fundamental_df = fundamental_df.sort_values(by='revenue', ascending=False).head(10).reset_index(1,drop=True)
    # 将查询结果dataframe的fundamental_df存放在context里面以备后面只需:
    context.fundamental_df = fundamental_df
    # 实时打印日志看下查询结果,会有我们精心处理的数据表格显示:
    logger.info(context.fundamental_df)
    update_universe(context.fundamental_df.index.values)
    # 对于每一个股票按照平均现金买入:
    context.stocks = context.fundamental_df.index.values
    stocks_number = len(context.stocks)
    context.average_percent = 0.99 / stocks_number
    logger.info("Calculated average percent for each stock is: %f" % context.average_percent)
    context.fired = False
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
    # 开始编写你的主要的算法逻辑
    # bar_dict[order_book_id] 可以拿到某个证券的bar信息
    # context.portfolio 可以拿到现在的投资组合状态信息
    # 使用order_shares(id_or_ins, amount)方法进行落单
    # TODO: 开始编写你的算法吧!
    # 对于选择出来的股票按照平均比例买入:
    if not context.fired:
        for stock in context.stocks:
            order_target_percent(stock, context.average_percent)
            logger.info("Bought: " + str(context.average_percent) + " % for stock: " + str(stock))
        context.fired = True

# 根据收益和市盈率定期调仓策略

这是一个利用财务数据定期选股并且调仓的策略:选取市盈率在 53 到 67 之间,盈利前 10 的股票,然后按照等比例分配资金买入一个 10 个股票的投资组合:

clone
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
def query_fundamental(context, bar_dict):
    # 查询revenue前十名的公司的股票并且他们的pe_ratio在53和67之间。
    stocks = all_instruments('CS').order_book_id
    fundamental_df = get_factor(stocks, ['pe_ratio', 'revenue'])
    fundamental_df = fundamental_df[(fundamental_df['pe_ratio'] > 53) & (fundamental_df['pe_ratio'] < 67)]
    fundamental_df = fundamental_df.sort_values(by='revenue', ascending=False).head(10).reset_index(1, drop=True)
    # 将查询结果dataframe的fundamental_df存放在context里面以备后面只需:
    context.fundamental_df = fundamental_df
    # 实时打印日志看下查询结果,会有我们精心处理的数据表格显示:
    logger.info(context.fundamental_df)
    # 对于每一个股票按照平均现金买入:
    context.stocks = context.fundamental_df.index.values
    update_universe(context.stocks)
    stocksNumber = len(context.stocks)
    context.average_percent = 0.99 / stocksNumber
    logger.info("Calculated average percent for each stock is: %f" % context.average_percent)
    # 先查一下选出来的股票是否在已有的portfolio里面:
    # 这样做并不是最好的,只是代码比较简单
    # 先清仓然后再买入这一个月新的符合条件的股票
    logger.info("Clearing all the current positions.")
    for holding_stock in context.portfolio.positions.keys():
        if context.portfolio.positions[holding_stock].quantity != 0:
            order_target_percent(holding_stock, 0)
    logger.info("Building new positions for portfolio.")
    for stock in context.stocks:
        order_target_percent(stock, context.average_percent)
        logger.info("Buying: " + str(context.average_percent) + " % for stock: " + str(stock))
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
    scheduler.run_monthly(query_fundamental, monthday=1)
# 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def handle_bar(context, bar_dict):
    # 开始编写你的主要的算法逻辑
    # bar_dict[order_book_id] 可以拿到某个证券的bar信息
    # context.portfolio 可以拿到现在的投资组合状态信息
    # 使用order_shares(id_or_ins, amount)方法进行落单
    # TODO: 开始编写你的算法吧!
    pass

# 海龟交易系统

海龟交易系统也是非常经典的一种策略,我们也放出了范例代码如下。

clone
#导入必须的模块
import numpy as np
import talib
import math
def get_extreme(array_high_price_result, array_low_price_result):
    #抛开最新价格的价格序列
    np_array_high_price_result = np.array(array_high_price_result[:-1])
    np_array_low_price_result = np.array(array_low_price_result[:-1])
    #序列最大值
    max_result = np_array_high_price_result.max()
    #最小值
    min_result = np_array_low_price_result.min()
    #返回一个两个元素的list
    return [max_result, min_result]
#拿到真实震幅与头寸
def get_atr_and_unit( atr_array_result,  atr_length_result, total_value_result):
    #atr为真实平均振幅的最新值
    atr =  atr_array_result[ atr_length_result-1]
    #头寸
    unit = math.floor(total_value_result * .01 / atr)
    return [atr, unit]
#得到止损价格
def get_stop_price(first_open_price_result, units_hold_result, atr_result):
    stop_price = first_open_price_result - 2 * atr_result \
                 + (units_hold_result - 1) * 0.5 * atr_result
    return stop_price
def init(context):
    context.trade_day_num = 0
    context.unit = 0
    context.atr = 0
    context.trading_signal = 'start'
    context.pre_trading_signal = ''
    context.units_hold_max = 4
    context.units_hold = 0
    context.quantity = 0
    context.max_add = 0
    context.first_open_price = 0
    context.s = '000300.XSHG'
    context.open_observe_time = 55
    context.close_observe_time = 20
    context.atr_time = 20
def handle_bar(context, bar_dict):
    #当前合约的价值
    total_value = context.portfolio.total_value
    #context.open_observe_time+1个bar的每日最高价
    high_price = history_bars(context.s,context.open_observe_time+1, '1d', 'high')
    low_price_for_atr = history_bars(context.s,context.open_observe_time+1, '1d', 'low')
    low_price_for_extreme = history_bars(context.s,context.close_observe_time+1, '1d', 'low')
    close_price = history_bars(context.s,context.open_observe_time+2, '1d', 'close')
    close_price_for_atr = close_price[:-1]
    #talib.ATR平均真实振幅
    atr_array = talib.ATR(high_price, low_price_for_atr, close_price_for_atr, timeperiod=context.atr_time)
    #得到max_result
    maxx = get_extreme(high_price, low_price_for_extreme)[0]
    #得到min_result
    minn = get_extreme(high_price, low_price_for_extreme)[1]
    #前两日的平均真实振幅
    atr = atr_array[-2]
    if context.trading_signal != 'start':
        if context.units_hold != 0:
            context.max_add += 0.5 * get_atr_and_unit(atr_array, atr_array.size, total_value)[0]
    else:
        context.max_add = bar_dict[context.s].last
    #当前context.s持仓股数
    cur_position = context.portfolio.positions[context.s].quantity
    #当前仓位的现金
    available_cash = context.portfolio.cash
    #当前仓位的市场价值
    market_value = context.portfolio.market_value
    #当前仓位大于0并且当前价格小于止损价,产生信号"stop"
    if (cur_position > 0 and
            bar_dict[context.s].last < get_stop_price(context.first_open_price, context.units_hold, atr)):
        context.trading_signal = 'stop'
    else:
        #仓位大于0,并且当前价格小于观察时期的low的最小值
        if cur_position > 0 and bar_dict[context.s].last < minn:
            context.trading_signal = 'exit'
        else:
            #仓位大于0但小于策略设计仓位,并且当前价格大于观察时期的high的最大值,产生追加持仓的信号
            if (bar_dict[context.s].last > context.max_add and context.units_hold != 0 and
                    context.units_hold < context.units_hold_max and
                    available_cash > bar_dict[context.s].last*context.unit):
                context.trading_signal = 'entry_add'
            else:
                #价格大于maxx并且持仓为0,产生信号"entry"
                if bar_dict[context.s].last > maxx and context.units_hold == 0:
                    context.max_add = bar_dict[context.s].last
                    context.trading_signal = 'entry'
    atr = get_atr_and_unit(atr_array, atr_array.size, total_value)[0]
    if context.trade_day_num % 5 == 0:
        #每5个bar计算一次context.unit寸头
        context.unit = get_atr_and_unit(atr_array, atr_array.size, total_value)[1]
    context.trade_day_num += 1
    context.quantity = context.unit
    #不同信号下的操作
    if (context.trading_signal != context.pre_trading_signal or
            (context.units_hold < context.units_hold_max and context.units_hold > 1) or
            context.trading_signal == 'stop'):
        if context.trading_signal == 'entry':
            context.quantity = context.unit
            if available_cash > bar_dict[context.s].last*context.quantity:
                order_shares(context.s, context.quantity)
                context.first_open_price = bar_dict[context.s].last
                context.units_hold = 1
        if context.trading_signal == 'entry_add':
            context.quantity = context.unit
            order_shares(context.s, context.quantity)
            context.units_hold += 1
        if context.trading_signal == 'stop':
            if context.units_hold > 0:
                order_shares(context.s, -context.quantity)
                context.units_hold -= 1
        if context.trading_signal == 'exit':
            if cur_position > 0:
                order_shares(context.s, -cur_position)
                context.units_hold = 0
    context.pre_trading_signal = context.trading_signal

# 股指期货 MACD 日回测

以下是一个使用 TALib 进行股指期货主力合约日级别回测 MACD 算法示例:

clone
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等
import talib
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递
def init(context):
    # context内引入全局变量s1,存储目标合约信息
    context.s1 = 'IF1606'
    # 使用MACD需要设置长短均线和macd平均线的参数
    context.SHORTPERIOD = 12
    context.LONGPERIOD = 26
    context.SMOOTHPERIOD = 9
    context.OBSERVATION = 50
    #初始化时订阅合约行情。订阅之后的合约行情会在handle_bar中进行更新
    subscribe(context.s1)
# 你选择的期货数据更新将会触发此段逻辑,例如日线或分钟线更新
def handle_bar(context, bar_dict):
    # 开始编写你的主要的算法逻辑
    # 获取历史收盘价序列,history_bars函数直接返回ndarray,方便之后的有关指标计算
    prices = history_bars(context.s1, context.OBSERVATION, '1d', 'close')
    # 用Talib计算MACD取值,得到三个时间序列数组,分别为macd,signal 和 hist
    macd, signal, hist = talib.MACD(prices, context.SHORTPERIOD,
                                    context.LONGPERIOD, context.SMOOTHPERIOD)
    # macd 是长短均线的差值,signal是macd的均线,如果短均线从下往上突破长均线,为入场信号,进行买入开仓操作
    if macd[-1] - signal[-1] > 0 and macd[-2] - signal[-2] < 0:
        sell_qty = context.portfolio.positions[context.s1].sell_quantity
        # 先判断当前卖方仓位,如果有,则进行平仓操作
        if sell_qty > 0:
            buy_close(context.s1, 1)
        # 买入开仓
        buy_open(context.s1, 1)
    if macd[-1] - signal[-1] < 0 and macd[-2] - signal[-2] > 0:
        buy_qty = context.portfolio.positions[context.s1].buy_quantity
        # 先判断当前买方仓位,如果有,则进行平仓操作
        if buy_qty > 0:
            sell_close(context.s1, 1)
        # 卖出开仓
        sell_open(context.s1, 1)

# 商品期货跨品种配对交易

该策略为分钟级别回测。运用了简单的移动平均以及布林带(Bollinger Bands)作为交易信号产生源。有关对冲比率(HedgeRatio)的确定,您可以在我们的研究平台上面通过import statsmodels.api as sm引入statsmodels中的 OLS 方法进行线性回归估计。具体估计窗口,您可以根据自己策略需要自行选择。

策略中的移动窗口选择为 60 分钟,即在每天开盘 60 分钟内不做任何交易,积累数据计算移动平均值。当然,这一移动窗口也可以根据自身需要进行灵活选择。下面例子中使用了黄金与白银两种商品期货进行配对交易。简单起见,例子中期货的价格并未做对数差处理。

clone
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import numpy as np
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
    context.s1 = 'AG1612'
    context.s2 = 'AU1612'
    # 设置全局计数器
    context.counter = 0
    # 设置滚动窗口
    context.window = 60
    # 设置对冲手数,通过研究历史数据进行价格序列回归得到该值
    context.ratio = 15
    context.up_cross_up_limit = False
    context.down_cross_down_limit = False
    # 设置入场临界值
    context.entry_score = 2
    # 初始化时订阅合约行情。订阅之后的合约行情会在handle_bar中进行更新
    subscribe([context.s1, context.s2])
# before_trading此函数会在每天交易开始前被调用,当天只会被调用一次
def before_trading(context):
    # 样例商品期货在回测区间内有夜盘交易,所以在每日开盘前将计数器清零
    context.counter = 0
# 你选择的期货数据更新将会触发此段逻辑,例如日线或分钟线更新
def handle_bar(context, bar_dict):
    # 获取当前一对合约的仓位情况。如尚未有仓位,则对应持仓量都为0
    position_a = context.portfolio.positions[context.s1]
    position_b = context.portfolio.positions[context.s2]
    context.counter += 1
    # 当累积满一定数量的bar数据时候,进行交易逻辑的判断
    if context.counter > context.window:
        # 获取当天历史分钟线价格队列
        price_array_a = history_bars(context.s1, context.window, '1m', 'close')
        price_array_b = history_bars(context.s2, context.window, '1m', 'close')
        # 计算价差序列、其标准差、均值、上限、下限
        spread_array = price_array_a - context.ratio * price_array_b
        std = np.std(spread_array)
        mean = np.mean(spread_array)
        up_limit = mean + context.entry_score * std
        down_limit = mean - context.entry_score * std
        # 获取当前bar对应合约的收盘价格并计算价差
        price_a = bar_dict[context.s1].close
        price_b = bar_dict[context.s2].close
        spread = price_a - context.ratio * price_b
        # 如果价差低于预先计算得到的下限,则为建仓信号,'买入'价差合约
        if spread <= down_limit and not context.down_cross_down_limit:
            # 可以通过logger打印日志
            logger.info('spread: {}, mean: {}, down_limit: {}'.format(spread, mean, down_limit))
            logger.info('创建买入价差中...')
            # 获取当前剩余的应建仓的数量
            qty_a = 1 - position_a.buy_quantity
            qty_b = context.ratio - position_b.sell_quantity
            # 由于存在成交不超过下一bar成交量25%的限制,所以可能要通过多次发单成交才能够成功建仓
            if qty_a > 0:
                buy_open(context.s1, qty_a)
            if qty_b > 0:
                sell_open(context.s2, qty_b)
            if qty_a == 0 and qty_b == 0:
                # 已成功建立价差的'多仓'
                context.down_cross_down_limit = True
                logger.info('买入价差仓位创建成功!')
        # 如果价差向上回归移动平均线,则为平仓信号
        if spread >= mean and context.down_cross_down_limit:
            logger.info('spread: {}, mean: {}, down_limit: {}'.format(spread, mean, down_limit))
            logger.info('对买入价差仓位进行平仓操作中...')
            # 由于存在成交不超过下一bar成交量25%的限制,所以可能要通过多次发单成交才能够成功建仓
            qty_a = position_a.buy_quantity
            qty_b = position_b.sell_quantity
            if qty_a > 0:
                sell_close(context.s1, qty_a)
            if qty_b > 0:
                buy_close(context.s2, qty_b)
            if qty_a == 0 and qty_b == 0:
                context.down_cross_down_limit = False
                logger.info('买入价差仓位平仓成功!')
        # 如果价差高于预先计算得到的上限,则为建仓信号,'卖出'价差合约
        if spread >= up_limit and not context.up_cross_up_limit:
            logger.info('spread: {}, mean: {}, up_limit: {}'.format(spread, mean, up_limit))
            logger.info('创建卖出价差中...')
            qty_a = 1 - position_a.sell_quantity
            qty_b = context.ratio - position_b.buy_quantity
            if qty_a > 0:
                sell_open(context.s1, qty_a)
            if qty_b > 0:
                buy_open(context.s2, qty_b)
            if qty_a == 0 and qty_b == 0:
                context.up_cross_up_limit = True
                logger.info('卖出价差仓位创建成功')
        # 如果价差向下回归移动平均线,则为平仓信号
        if spread < mean and context.up_cross_up_limit:
            logger.info('spread: {}, mean: {}, up_limit: {}'.format(spread, mean, up_limit))
            logger.info('对卖出价差仓位进行平仓操作中...')
            qty_a = position_a.sell_quantity
            qty_b = position_b.buy_quantity
            if qty_a > 0:
                buy_close(context.s1, qty_a)
            if qty_b > 0:
                sell_close(context.s2, qty_b)
            if qty_a == 0 and qty_b == 0:
                context.up_cross_up_limit = False
                logger.info('卖出价差仓位平仓成功!')

# Alpha 策略----多因子对冲

多因子模型是应用最广泛的一种选股模型,基本原理是采用一系列的因子作为选股标准,满足这些因子的股票则被买入,不满足的则卖出。各种多因子模型核心的区别第一是在因子的选取上,第二是在如何用多因子综合得到一个最终的判断。股票端我们按照多因子的模型买入,期货端则一手空单对冲风险。

clone
# 可以自己import我们平台支持的第三方python模块,比如pandas、numpy等。
import numpy as np
import math
# 在这个方法中编写任何的初始化逻辑。context对象将会在你的算法策略的任何方法之间做传递。
def init(context):
#选取板块
    context.stks=[]
    context.stks.append(sector("consumer discretionary"))
    context.stks.append(sector("consumer staples"))
    context.stks.append(sector("health care"))
    context.stks.append(sector("telecommunication services"))
    context.stks.append(sector("utilities"))
    context.stks.append(sector("materials"))
    context.flag=True
    # 确定运行频率
    scheduler.run_daily(rebalance)
    # 手动添加银行板块
    context.stocks = ['000001.XSHE','002142.XSHE','600000.XSHG','600015.XSHG','600016.XSHG','600036.XSHG','601009.XSHG','601166.XSHG','601169.XSHG','601288.XSHG','601328.XSHG','601398.XSHG','601818.XSHG','601939.XSHG','601988.XSHG','601998.XSHG']
    # 你选择的证券的数据更新将会触发此段逻辑,例如日或分钟历史数据切片或者是实时数据切片更新
def get_stocks(context, bar_dict):
    stocks=set([])
    for i in range(0,6):
        # 在这个循环里,首先获取每个板块的财务数据
        fundamental_df=get_factor(context.stks[i],['adjusted_return_on_equity_weighted_average','pe_ratio','pb_ratio']).reset_index(1,drop=True)
        fundamental_df=fundamental_df.loc[context.stks[i]]
        fundamental_df=fundamental_df[fundamental_df.pe_ratio<999]
        # 使用pandas对财务数据进行排名并打分
        fundamental_df=fundamental_df.sort_values('pb_ratio')
        fundamental_df['pb_score']=list(range(1,len(fundamental_df)+1))
        fundamental_df=fundamental_df.sort_values('pe_ratio')
        fundamental_df['pe_score']=list(range(1,len(fundamental_df)+1))
        scores=[]
        for stock in fundamental_df.T.columns.values:
            scores.append(fundamental_df.loc[stock,'pe_score']+fundamental_df.loc[stock,'pb_score'])
        fundamental_df['scores']=list(scores)
        fundamental_df=fundamental_df.sort_values('scores')
        #取得分最低的三个股票
        fundamental_df=fundamental_df.head(3)
        #logger.info(df)
        stocks = stocks | set(fundamental_df.index.values)
        #logger.info(i)
    # 银行板块单独按照市净率取市净率最低的两个
    fundamental_df=get_factor(context.stocks,['adjusted_return_on_equity_weighted_average','pb_ratio']).reset_index(1,drop=True)
    fundamental_df = fundamental_df.loc[context.stocks]
    fundamental_df=fundamental_df.sort_values('pb_ratio').head(2)
    # 买入的股票,进行调仓操作
    stocks =stocks | set(fundamental_df.index.values)
    return stocks
def rebalance(context, bar_dict):
    stocks =  get_stocks(context, bar_dict)
    holdings = set(get_holdings(context))
    to_buy = stocks - holdings
    to_sell = holdings - stocks
    to_buy2= stocks - holdings
    for stock in to_sell:
        if bar_dict[stock].suspended == False:
            order_target_percent(stock , 0)
    if len(to_buy) == 0:
        return
    to_buy = get_trading_stocks(to_buy, context, bar_dict)
    cash = context.portfolio.cash
    total_value=context.portfolio.total_value
    if len(to_buy) >0:
        average_value = total_value *0.025
        if average_value > total_value/len(to_buy):
            average_value = total_value/len(to_buy)
    for stock in to_buy:
        if (bar_dict[stock].suspended == False)and(context.portfolio.cash>average_value):
            order_target_value(stock, average_value)
    if context.flag==True :
        sell_open('IF88', 1)
        context.flag=False
# 得到交易的股票
def get_trading_stocks(to_buy, context, bar_dict):
    trading_stocks = []
    for stock in to_buy:
        if bar_dict[stock].suspended == False:
            trading_stocks.append(stock)
    return trading_stocks
# 持仓的股票
def get_holdings(context):
    positions = context.portfolio.stock_account.positions
    holdings = []
    for position in positions:
        if positions[position].quantity > 0:
            holdings.append(position)
    return holdings
def handle_bar(context, bar_dict):
    # TODO: 开始编写你的算法吧!
    pass

Last Updated: 1/22/2021, 5:50:32 PM

← 策略开发API 外部数据和 Python 模块 →