海龟交易系统

海龟交易系统

嗯,用户让我用Python实现一个海龟交易系统。首先,我需要回顾一下海龟交易法则的核心要点,确保代码能准确反映这些规则。根据提供的链接内容,海龟交易系统主要包括趋势跟踪、风险控制、入场加仓、止损止盈等部分。

首先,趋势跟踪方面,入场信号是20日和55日的高点突破。我需要计算这两个时间段的最高价和最低价,然后判断当前价格是否突破这些通道。可能需要用到rolling函数来计算移动窗口的最高价和最低价。

然后是头寸管理,根据ATR(平均真实波幅)动态调整仓位。公式是头寸规模等于账户风险比例乘以总资金,然后除以ATR乘以合约乘数。这里需要注意的是,ATR的计算周期通常是14天,但海龟交易法则可能有所不同?不过用户提供的资料里提到的是用ATR来计算止损和加仓,所以可能需要确认ATR的周期。通常,ATR是14天的,但用户提到的止损是2倍ATR,所以可能需要计算不同周期的ATR吗?

接下来是风险控制,每笔交易的风险控制在总资金的1%-2%。这涉及到计算每个头寸的大小,确保损失不超过这个比例。例如,如果账户总资金是100万,风险比例是1%,那么每笔交易的最大风险是1万,然后根据止损幅度(2倍ATR)来计算可以买入的数量。

入场和加仓策略部分,价格突破20日高点做多,跌破20日低点做空。金字塔加仓是每上涨0.5倍ATR加仓一次,最多加3次。这里需要跟踪每次突破后的价格变动,并在满足条件时加仓。

止损止盈方面,硬性止损是亏损达到2倍ATR时平仓。动态止盈则是浮盈回撤50%时部分平仓,趋势逆转(跌破55日低点)时清仓。这需要在代码中设置止损点和止盈条件,可能需要跟踪持仓的成本价和当前价格,计算回撤情况。

另外,适用市场方面,用户提到可能需要调整参数,比如A股用15日突破加30日均线过滤,加密货币用18小时突破。但用户现在需要的是一个基础版本,可能用默认的20日和55日参数。

现在,我需要考虑如何将这些规则转化为Python代码。首先,数据获取部分,可能需要使用pandas从CSV或在线API获取历史数据。然后计算必要的指标,比如20日最高、55日最高,ATR等。

接下来是模拟交易逻辑。需要维护账户的资金、持仓、当前头寸等信息。每个交易日检查是否有入场信号,如果有,计算头寸规模,执行买入或卖出。同时,对于已有持仓,检查是否达到加仓、止损或止盈条件。

可能的难点在于处理多个持仓和加仓的情况,需要跟踪每次加仓的价格和数量。此外,动态止盈部分,需要记录最高浮盈,并在回撤50%时部分平仓。这部分可能需要详细的状态跟踪。

风险控制方面,头寸规模的计算是关键。确保每笔交易的风险不超过总资金的一定比例。例如,总资金是100万,风险比例是1%,ATR是某个值,合约乘数(比如期货中的每点价值)也需要考虑。但如果是股票或加密货币,合约乘数可能为1,所以公式可以简化为头寸规模 = (风险比例 * 总资金) / (2 * ATR),因为止损是2倍ATR。

在代码结构上,可能需要一个回测框架,循环遍历每个交易日,检查信号,执行交易,更新账户状态。最后输出绩效指标,比如年化收益率、最大回撤、胜率等。

需要注意的是,海龟交易系统的入场信号有两种:20日突破和55日突破,可能作为不同系统的入场点。但用户提供的资料中,可能主要使用20日突破作为入场,55日作为趋势过滤?或者作为另一个入场点?需要仔细确认。根据问题中的描述,入场信号是价格突破20日高点做多,跌破20日低点做空。同时,趋势过滤中提到仅交易50日波动率高于300日均值的市场,但可能在代码中暂时忽略这个过滤条件,先实现基础版本。

此外,止损是2倍ATR,即买入价减去2倍ATR作为止损价。例如,买入价是100,ATR是5,止损价是90。当价格跌到90时止损。这需要计算每个头寸的止损价,并在价格触发时平仓。

加仓策略是每上涨0.5倍ATR加仓一次,最多加3次。例如,初始买入后,价格每上涨0.5*ATR就加仓,但总加仓次数不超过3次。这需要跟踪当前加仓次数,并计算下一次加仓的价格点位。

总结步骤:

数据准备:获取历史数据,计算20日高点、低点,ATR(比如14日ATR)。

生成交易信号:当收盘价突破20日高点时产生买入信号,跌破20日低点时卖出。

计算头寸规模:根据当前资金、风险比例(1%)、ATR计算可买入的数量。

执行交易:包括初始入场和后续加仓。

止损止盈:监控价格是否触发止损或止盈条件。

模拟交易循环:逐日处理,更新持仓和账户状态。

输出结果:交易记录、绩效指标。

现在,考虑代码实现。使用pandas处理数据,用向量化操作或循环遍历每个交易日。由于需要跟踪状态(如持仓、加仓次数等),可能更适合用循环处理每个bar。

可能的代码结构:

读取数据,计算指标(20日高点、ATR等)。

初始化账户变量:现金、持仓数量、持仓成本、加仓次数等。

遍历每个交易日:

检查是否有入场信号。

如果有信号,计算头寸规模,执行交易。

检查现有持仓是否需要加仓(价格达到加仓条件,且加仓次数未满)。

检查止损和止盈条件,执行平仓。

记录每笔交易的详情,最后计算绩效。

需要注意的是,海龟交易法则中的加仓是在价格波动一定幅度后加仓,比如每上涨0.5倍ATR加仓一次。例如,初始买入价是100,ATR是2,那么加仓价位是101(100 + 0.5 * 2)。每次加仓后,新的加仓价位再增加0.5*ATR,直到加满3次。

止损价是每个头寸的入场价减去2倍ATR。如果有多次加仓,每个加仓的头寸可能有不同的止损价?或者统一使用最近的入场价?可能需要统一跟踪每个加仓的止损价,或者使用整体的平均成本价减去2倍ATR。根据海龟法则的描述,应该是每个头寸单独管理,但实际操作中可能使用统一止损点。例如,初始买入后,止损价是入场价-2ATR。如果价格继续上涨并加仓,新的止损价可能更新为最新的入场价-2ATR,或者维持初始的止损价?这点需要澄清。根据用户提供的例子,初始买入价100,ATR5,止损线90。如果加仓在101元,那么新的止损价是101-2 * 5=91?还是仍然以初始的100-2 * 5=90为止损?原问题中的例子显示,加仓后的止损可能单独管理,但海龟法则可能采用跟踪止损,比如使用最近的价格波动来调整止损。或者,可能使用初始止损价,但需要进一步确认。

根据海龟交易法则的止损规则,每个头寸的止损是2倍ATR,但ATR是动态计算的。所以每次价格变动导致ATR变化时,止损价可能也会变化。或者,入场时的ATR被固定用于计算该头寸的止损?需要明确。例如,在入场时计算ATR,然后止损价是入场价±2ATR(做多时止损价是入场价-2ATR,做空时是入场价+2*ATR)。这样,即使之后ATR变化,该头寸的止损价仍然基于入场时的ATR。或者,每次更新ATR,动态调整止损价。根据用户提供的例子,初始ATR是5元,止损线是90元,即100-2 * 5=90。所以可能使用的是入场时的ATR来计算止损价,而不是动态更新。因此,在代码中,每个头寸的止损价应该在入场时确定,基于当时的ATR值。

此外,关于ATR的计算周期,用户提到的是用当前的ATR,可能默认是14日,但需要确认。根据原问题中的描述,止损使用的是2倍ATR,但没有说明ATR的计算周期。通常,ATR的计算周期是14天,所以代码中可能需要计算14日ATR。

总结:

数据预处理步骤:

计算20日最高价(rolling(20).max())

计算20日最低价(rolling(20).min())

计算ATR(14日),需要计算真实波幅(TR),然后取14日平均。

入场信号:

当收盘价上穿20日最高价时,产生买入信号。

当收盘价下穿20日最低价时,产生卖出信号。

头寸规模计算:

风险比例(例如1%)乘以总资金,得到可承受的风险金额。

每笔交易的风险金额 = 总资金 * 风险比例

头寸规模 = 风险金额 / (2 * ATR * 合约乘数)

假设合约乘数为1(比如股票或加密货币),则简化为 风险金额 / (2 * ATR)

止损价:

买入时,止损价为入场价 - 2 * ATR(入场时的ATR值)

卖出时,止损价为入场价 + 2 * ATR

加仓逻辑:

初始买入后,价格每上涨0.5倍ATR(入场时的ATR值)则加仓一次,最多加3次。

例如,初始买入价为100,ATR为2,加仓价位为101、102、103,共加3次。

但需要注意,每次加仓后的止损价是否重新计算?根据原问题中的例子,加仓后的止损价可能基于该次加仓的入场价和当时的ATR。但用户提供的例子中,初始ATR是2元,加仓价是101元,ATR可能已经变化。但根据问题中的例子,加仓时的ATR可能没有重新计算,而是继续使用初始的ATR?或者每次加仓时重新计算ATR?

例如,问题中的例子说初始仓位100万,ATR为2元,涨到101元加仓。这里的ATR可能是指当时的ATR值,还是初始的?可能需要根据当时的ATR来计算加仓条件。这可能会影响代码的实现。例如,每次加仓时,需要重新计算当前的ATR,然后判断是否满足0.5倍ATR的上涨幅度。

或者,是否在初始入场时确定的ATR用于后续的加仓条件?这可能比较复杂。例如,初始入场时ATR是2,那么加仓条件是每上涨0.5 * 2=1元加仓,不管后续ATR如何变化。但根据问题中的例子,初始ATR是2元,之后价格涨到101元(即上涨1元,满足0.5 * 2=1的条件)加仓。这说明加仓条件是基于初始的ATR值,而不是动态更新的。因此,在代码中,每个头寸的加仓条件应该基于入场时的ATR值,而不是当前的ATR。

因此,在实现时,每个头寸需要记录入场时的ATR值,以便计算加仓的价格间隔(0.5*ATR)。

这样,每次加仓的价格间隔是0.5*初始ATR,每次加仓的止损价也是基于该次加仓的入场价和初始ATR。

这可能导致多次加仓,每个加仓的头寸都有自己的入场价和止损价。但海龟交易法则中的加仓是否独立管理每个头寸,还是整体管理?根据资料中的描述,金字塔加仓,每上涨0.5倍ATR加仓一次,最多加3次。例如,初始仓位100万,ATR为2元,价格到101元加仓,此时ATR可能还是2元,或者已经变化?根据例子中的情况,可能假设ATR保持不变,或者每次加仓时使用当前的ATR?

这可能需要进一步查证海龟交易法则的原始规则。根据《海龟交易法则》一书中的描述,加仓是在价格向有利方向波动N(即ATR)的0.5倍时加仓,这里的N是当前的ATR,还是初始的?根据书中的描述,N是动态计算的,所以每次加仓时的价格波动基于当前的N值。因此,每次加仓的间隔是0.5倍的当前N(ATR)。因此,在代码中,每次检查是否达到加仓条件时,需要用当前的ATR来计算需要的波动幅度。

这可能增加代码的复杂度,因为每次加仓的间隔可能不同,取决于当时的ATR值。

例如,初始买入后,ATR是2元,所以第一次加仓需要上涨1元(0.5 * 2)。如果之后ATR增加到3元,那么下一次加仓需要上涨1.5元(0.5 * 3)。

这似乎更符合动态调整的逻辑,但会增加实现的难度,因为每次加仓都需要重新计算ATR。

或者,是否在每次加仓时,重新计算所需的波动幅度?这可能需要更复杂的状态管理。

不过,根据用户提供的例子中的描述:“每上涨0.5倍ATR加仓一次”,这里的ATR可能是指入场时的ATR。例如,问题中的例子,初始ATR是2元,加仓价位是101元(100 + 0.5 * 2 = 101)。所以,可能加仓条件是基于入场时的ATR,而不是动态的。

因此,在代码中,每个头寸在入场时会记录当前的ATR值,后续的加仓条件基于该ATR值。例如,初始ATR是2,加仓间隔是1元(0.5 * 2),不管后续的ATR如何变化。

这可能简化代码的实现,因为每个头寸的加仓条件固定。但需要确认原始规则是否如此。

假设采用入场时的ATR来计算加仓间隔,那么代码中每个头寸需要保存入场时的ATR值。例如:

当入场信号触发时,记录当前的ATR值(比如ATR_entry),然后加仓条件是价格比上次入场价上涨0.5*ATR_entry。每次加仓后,新的加仓条件依然是基于初始的ATR_entry,因此加仓间隔固定。

这样,即使后续ATR变大或变小,加仓间隔保持不变。这可能更符合例子中的情况。

因此,代码中的每个头寸需要保存以下信息:

方向(做多或做空)

入场价

止损价

加仓次数(已加几次)

下次加仓的价格

使用的ATR_entry

每次价格达到加仓条件时,执行加仓,并更新下次加仓的价格(当前价 + 0.5*ATR_entry)。

这样,即使价格波动导致ATR变化,加仓间隔仍然固定。

这可能更容易实现,并且符合用户例子中的情况。

接下来,代码中需要处理多个头寸的情况,因为可能有多个不同的入场点,或者同一标的的不同加仓头寸。但海龟交易法则中的加仓是金字塔式,即在同一方向上加仓,因此可能将所有加仓视为同一头寸的不同单位,统一管理止损和止盈?或者每个加仓单位独立管理?

根据海龟法则,止损是每个单位单独管理吗?例如,初始头寸的止损价是入场价-2ATR_entry,第一次加仓的头寸止损价是加仓价-2ATR_entry_new(如果是用当时的ATR),或者统一使用初始的?

这个问题可能需要参考原始规则。根据海龟交易法则,止损是每个头寸的2ATR,但具体是每个加仓单位独立止损,还是整体头寸的止损?

可能,每个加仓的单位被视为独立的头寸,每个都有自己的止损价。例如,初始买入价100,ATR_entry=2,止损价90;加仓价101,当时的ATR_entry可能还是2(如果是使用入场时的),则止损价101-2 * 2=97。这样,每个加仓单位都有自己的止损价。当价格下跌触发任何一个止损价时,对应的单位被平仓。

但这样管理起来比较复杂,需要跟踪每个加仓单位。或者,是否采用统一止损,即所有单位共享一个止损价,比如基于最新的价格波动?

根据用户提供的例子中的止损线是90元,初始价100,ATR5,所以止损价是100-2 * 5=90。如果加仓在101元,当时的ATR可能还是5,那么该加仓单位的止损价是101-2 * 5=91。这样,每个加仓单位都有自己的止损价。当价格下跌到91时,该单位被止损,但初始单位可能在90止损。

但这样处理的话,代码需要维护每个加仓单位的信息,包括入场价、ATR_entry、止损价等。这可能比较复杂,尤其是在多次加仓的情况下。

另一种可能是,所有加仓单位共享一个动态止损价,例如基于最新的ATR值。但根据原问题中的例子,这似乎不是情况。

因此,在代码实现中,可能需要将每个加仓视为独立的头寸,每个都有自己的入场价、ATR_entry、止损价,以及加仓次数限制。

不过,这会增加代码的复杂度,尤其是当允许最多加仓3次时,会有4个头寸单位(初始+3次加仓)。每个单位都需要独立跟踪。

或者,可能采用平均成本价,并计算整体止损价。例如,初始买入100元,加仓101元,平均成本价=(100+101)/2=100.5,ATR_entry=2,止损价为100.5 - 2 * 2=96.5。但这样可能不符合海龟法则的原始规则,因为每个单位的止损应该独立。

因此,正确的做法可能是每个加仓单位独立管理,各自有止损价。当价格触发任何一个止损价时,平仓对应的单位。

但这样,代码需要处理多个持仓单位,这在实现时会比较麻烦,特别是用Python处理时,可能需要用列表或字典来跟踪每个单位的信息。

这可能会影响代码的效率和复杂度,但为了准确实现海龟法则,可能需要这么做。

例如,持仓可以是一个列表,每个元素是一个字典,包含:

方向(long/short)

入场价

止损价

加仓次数(用于限制最多3次加仓)

下次加仓价

该单位的ATR_entry

每次价格更新时,检查是否有加仓信号(当前价格 >= 上次加仓价 + 0.5*ATR_entry),并且加仓次数未满3次。如果是,则执行加仓,添加新的单位到持仓列表。

同时,每个交易日检查每个持仓单位的止损价是否被触发,如果是,平仓该单位。

此外,止盈条件中的动态止盈,当浮盈回撤50%时部分平仓,或者趋势逆转(跌破55日低点)时清仓。这部分也需要处理。

例如,动态止盈:当价格从最高点回撤50%时,平仓部分或全部。这需要跟踪持仓以来的最高价,计算回撤幅度。

趋势逆转:当价格跌破55日低点时,清仓所有多头头寸。

因此,代码中需要为每个持仓单位记录最高价,或者整体持仓的最高价,以计算回撤。

这进一步增加了复杂性。

考虑到用户可能希望一个简化的实现,可以先实现核心部分,如入场、加仓、止损,而将动态止盈和趋势逆转作为进阶功能后续添加。

因此,分步骤实现:

实现基础版本:入场信号(20日突破),头寸计算,止损,加仓(基于初始ATR_entry)。

处理多单位持仓,每个单位独立止损。

添加动态止盈和趋势逆转条件。

由于复杂度较高,可能先实现一个简化版本,假设每次加仓后,所有单位共享一个止损价,或者只允许一次加仓,以简化代码。

但为了符合海龟法则的精髓,可能需要处理多次加仓和独立止损。

现在,编写代码的大致步骤:

数据准备:

计算20日最高价(rolling_max_20)

计算20日最低价(rolling_min_20)

计算ATR(14日)

交易信号生成:

当收盘价 > rolling_max_20,产生买入信号

当收盘价 < rolling_min_20,产生卖出信号

需要处理多空信号,但海龟法则通常用于做多趋势,或者同时支持多空?

根据用户提供的资料,海龟交易法则可以做多和做空。例如,突破20日高点做多,突破20日低点做空。因此,代码需要支持多空双向交易。

但为了简化,可能先实现做多部分,后续再扩展做空。

或者,同时处理多空,但需要更复杂的逻辑。

假设用户希望实现完整的海龟系统,支持多空,但可能需要分别处理。

头寸管理部分,做多和做空的止损计算不同。例如,做多的止损价是入场价 - 2ATR_entry,做空的止损价是入场价 + 2ATR_entry。

因此,代码中需要区分多空方向。

入场时,如果是做多信号,且当前没有持仓或允许加仓,则买入。做空同理。

但海龟法则中,同一时间可能只持有一个方向的头寸,因此当有买入信号时,如果当前持有多头,可能只加仓;如果持有空头,可能需要先平仓再开仓?或者根据突破信号反向操作?

根据海龟法则,当价格突破20日高点时,做多;如果之前有空头头寸,可能需要先平仓。或者允许同时持有多头和空头?这似乎不太可能,所以应该平掉反向头寸后再开仓。

因此,在代码中,当产生买入信号时,如果当前持有空头头寸,先平仓,再开多头。反之亦然。

或者,可能同时持有多头和空头,但通常海龟法则可能在同一时间只持有一个方向的头寸。因此,入场信号会覆盖反向持仓。

因此,在代码中,当新的信号与现有持仓方向相反时,需要先平仓再开新仓。

这需要处理持仓的方向,并在信号变化时平仓。

例如,当前持有多头,当出现卖出信号(跌破20日低点)时,平掉多头,开空头。

因此,在代码中,每次信号出现时,需要检查当前持仓方向,如果不同,则平仓,并开新仓。

但海龟法则是否允许同时持有多头和空头?这可能需要查阅原始资料。根据常见做法,趋势跟踪系统通常在同一时间只持有一个方向的头寸,因此代码中应处理这种情况。

因此,在代码中,当新的信号与当前持仓方向相反时,平仓现有头寸,并按照新信号开仓。

同时,如果信号方向与当前持仓相同,则可能加仓(如果允许)。

但根据海龟法则的金字塔加仓规则,加仓是在同一方向上进行,因此只有当信号方向与持仓相同时,才可能加仓。

因此,在代码中,需要跟踪当前持仓方向,并在信号同向时执行加仓,反向时平仓并开新仓。

这增加了代码的复杂度,因为需要管理持仓方向和多个加仓单位。

现在,假设我们只处理多头头寸,忽略空头部分,以简化初始实现。后续再扩展。

或者,同时处理多空,但需要更复杂的状态管理。

现在,回到代码结构:

首先,读取数据,计算指标:

import pandas as pd

假设data是DataFrame,包含日期、开盘价、最高价、最低价、收盘价等列

data = pd.read_csv('data.csv', parse_dates=['date'])

data['rolling_max_20'] = data['high'].rolling(20).max()

data['rolling_min_20'] = data['low'].rolling(20).min()

计算ATR(14日)

def calculate_atr(df, window=14):

df['prev_close'] = df['close'].shift(1)

df['tr1'] = df['high'] - df['low']

df['tr2'] = abs(df['high'] - df['prev_close'])

df['tr3'] = abs(df['low'] - df['prev_close'])

df['tr'] = df[['tr1', 'tr2', 'tr3']].max(axis=1)

df['atr'] = df['tr'].rolling(window).mean()

return df

data = calculate_atr(data)

接下来,生成交易信号:

data['signal'] = 0 # 1表示买入,-1表示卖出

data.loc[data['close'] > data['rolling_max_20'].shift(1), 'signal'] = 1

data.loc[data['close'] < data['rolling_min_20'].shift(1), 'signal'] = -1

这里使用.shift(1)以避免未来数据问题,因为当天的最高价在收盘时才能确定,因此信号应在下一个交易日执行。

例如,在t日的收盘价突破t-1日的20日最高价,则在t+1日开盘时买入。

但实际回测中,通常使用当天的收盘价作为信号,并在下一个交易日开盘执行。这里需要明确执行价格。假设在产生信号的当天收盘时执行,以收盘价成交。

因此,可能需要调整.shift()的位置。

例如,当日的收盘价突破当日的20日最高价,则产生信号,并在当日收盘价执行交易。

但计算rolling_max_20时,使用的是过去20天的数据,包括当日。例如,rolling(20).max()会计算到当日,因此当日的最高价是否在计算中?

是的,rolling_max_20是当日的20日最高价,包括当日的数据。因此,当日的收盘价是否突破当日的rolling_max_high,这可能存在未来数据的问题,因为最高价在当日交易结束后才确定,无法在盘中知道当日最高价是否被突破。

因此,正确的做法是使用前一天的rolling_max_20来判断当日的收盘价是否突破。例如,在t日收盘时,比较收盘价与t-1日的20日最高价(即前20天,到t-1日为止的最高价),如果收盘价超过该值,则在t日收盘时产生买入信号,并在下一个交易日(t+1日)的开盘价执行。

因此,代码中的rolling_max_20应该使用.shift(1)来避免未来数据。

修改信号生成部分:

data['signal'] = 0

当天的收盘价超过前一天的20日最高价,买入信号

data.loc[data['close'] > data['rolling_max_20'].shift(1), 'signal'] = 1

当天的收盘价低于前一天的20日最低价,卖出信号

data.loc[data['close'] < data['rolling_min_20'].shift(1), 'signal'] = -1

这样,信号在当日收盘时产生,但实际交易执行在下一个交易日开盘时。这里假设以收盘价执行,但实际中可能存在滑点,这里简化处理,假设以收盘价成交。

接下来,管理交易逻辑:

初始化账户变量:

cash = 1_000_000 # 初始资金

position = 0 # 持仓数量

position_direction = 0 # 持仓方向,0表示无持仓,1表示多,-1表示空

entries = [] # 保存每个头寸的信息,如入场价、ATR_entry、止损价、加仓次数等

trade_log = [] # 记录交易记录

遍历每个交易日:

for i in range(1, len(data)):

current_row = data.iloc[i]

prev_row = data.iloc[i-1]

检查是否有信号

signal = prev_row['signal'] # 因为信号是基于前一日计算的

处理止损

for entry in entries:

if entry['direction'] == 1:  # 多头
    if current_row['low'] <= entry['stop_loss']:
        # 触发止损,平仓
        cash += entry['units'] * current_row['close']
        position -= entry['units']
        trade_log.append({
            'date': current_row['date'],
            'type': '止损平仓',
            'price': current_row['close'],
            'units': entry['units'],
            'profit': (current_row['close'] - entry['entry_price']) * entry['units']
        })
        entry['active'] = False  # 标记为已平仓
elif entry['direction'] == -1:  # 空头
    if current_row['high'] >= entry['stop_loss']:
        cash += entry['units'] * (2 * entry['entry_price'] - current_row['close'])  # 假设平仓空头
        position += entry['units']
        trade_log.append({
            'date': current_row['date'],
            'type': '止损平仓',
            'price': current_row['close'],
            'units': entry['units'],
            'profit': (entry['entry_price'] - current_row['close']) * entry['units']
        })
        entry['active'] = False

移除已平仓的entry

entries = [entry for entry in entries if entry['active']]

检查加仓条件

if position_direction != 0:

for entry in entries:
    if entry['direction'] == position_direction:
        # 检查是否满足加仓条件
        if position_direction == 1 and current_row['close'] >= entry['next_add_price'] and entry['add_count'] < 3:
            # 计算加仓头寸
            atr_entry = entry['atr_entry']
            risk_amount = cash * 0.01  # 1%风险
            units = risk_amount / (2 * atr_entry)
            # 执行加仓
            entry_cost = units * current_row['close']
            if cash >= entry_cost:
                cash -= entry_cost
                position += units
                new_entry = {
                    'entry_price': current_row['close'],
                    'atr_entry': atr_entry,
                    'stop_loss': current_row['close'] - 2 * atr_entry,
                    'add_count': entry['add_count'] + 1,
                    'next_add_price': current_row['close'] + 0.5 * atr_entry,
                    'direction': 1,
                    'units': units,
                    'active': True
                }
                entries.append(new_entry)
                trade_log.append({
                    'date': current_row['date'],
                    'type': '加仓买入',
                    'price': current_row['close'],
                    'units': units
                })
        # 类似处理空头加仓...

处理交易信号

if signal != 0:

if (signal == 1 and position_direction != 1) or (signal == -1 and position_direction != -1):
    # 平仓现有反向持仓
    if position != 0:
        # 平仓所有持仓
        for entry in entries:
            if entry['direction'] == position_direction:
                cash += entry['units'] * current_row['close'] if position_direction == 1 else entry['units'] * (2 * entry['entry_price'] - current_row['close'])
                position -= entry['units'] if position_direction == 1 else -entry['units']
                trade_log.append({
                    'date': current_row['date'],
                    'type': '平仓',
                    'price': current_row['close'],
                    'units': entry['units'],
                    'profit': (current_row['close'] - entry['entry_price']) * entry['units'] if position_direction == 1 else (entry['entry_price'] - current_row['close']) * entry['units']
                })
        entries = []
        position = 0
        position_direction = 0

    # 开新仓
    atr_entry = prev_row['atr']
    risk_amount = cash * 0.01  # 1%风险
    units = risk_amount / (2 * atr_entry)
    entry_cost = units * current_row['close']
    if cash >= entry_cost:
        cash -= entry_cost
        position += units if signal == 1 else -units
        position_direction = signal
        new_entry = {
            'entry_price': current_row['close'],
            'atr_entry': atr_entry,
            'stop_loss': current_row['close'] - 2 * atr_entry if signal == 1 else current_row['close'] + 2 * atr_entry,
            'add_count': 0,
            'next_add_price': current_row['close'] + (0.5 * atr_entry if signal == 1 else -0.5 * atr_entry),
            'direction': signal,
            'units': units,
            'active': True
        }
        entries.append(new_entry)
        trade_log.append({
            'date': current_row['date'],
            'type': '买入' if signal == 1 else '卖出',
            'price': current_row['close'],
            'units': units
        })

这部分代码可能有很多错误,比如:

信号的处理是否正确?

加仓条件的检查是否在正确的位置?

平仓逻辑是否正确,尤其是在处理多空反向时?

如何管理多个entry的止损和加仓?

资金和头寸的计算是否正确?

此外,上述代码仅处理了多头加仓的情况,空头部分需要类似处理,但为了简化,可能暂时忽略。

此外,海龟交易法则中的头寸规模计算还需要考虑合约乘数和市场波动性。例如,在期货中,一个合约代表一定数量的标的物,但这里假设合约乘数为1,如股票或加密货币,因此可以忽略。

另一个问题是,加仓时使用的风险比例是否重新计算?例如,初始风险是1%,每次加仓是否也使用当前总资金的1%?或者继续使用初始的1%风险?

根据海龟法则,头寸规模是基于当前资金和波动性(ATR)动态调整的。因此,每次加仓时的头寸规模应基于当前的可用资金和当时的ATR值。但根据用户提供的例子,似乎每次加仓的规模与初始仓位相同,或者根据当时的资金情况调整。

例如,初始头寸是100万,加仓时可能账户资金已经变化,因此需要重新计算可投入的风险金额。

在代码中,每次加仓时,风险金额是当前现金的1%吗?或者总资金的1%?总资金包括持仓的价值。

这可能更复杂,因为总资金是现金加上持仓的当前市值。因此,每次加仓时,需要计算当前的总资金,然后取1%作为风险金额。

因此,头寸规模的计算应该是:

总资金 = cash + position * current_price

风险金额 = 总资金 * 风险比例(1%)

units = 风险金额 / (2 * ATR_entry)

这可能需要动态计算总资金,但在回测中,当前价格是已知的,可以计算。

因此,在加仓时,需要重新计算总资金,并据此计算风险金额和可加仓的units。

这增加了代码的复杂度,因为每次加仓时需要知道当前的总资金,而总资金取决于持仓的当前市值。

这可能导致每次加仓的规模逐渐减小,因为持仓部分的价值会随价格波动而变化。

例如,初始资金100万,买入100万的头寸(假设全仓,但根据风险控制,这里可能不是全仓)。之后价格上涨,持仓价值增加,总资金增加,因此加仓的规模可能更大。

这需要更精确的资金管理逻辑。

因此,在代码中,每次计算头寸规模时,需要:

总资金 = cash + sum(entry['units'] * current_price for entry in entries)

风险金额 = 总资金 * risk_percent

units = risk_amount / (2 * atr_entry)

这可能更符合海龟法则的动态调整逻辑。

但这也意味着,每次加仓时的头寸规模可能不同,取决于当前的总资金和ATR。

这在代码中需要仔细处理,尤其是在计算可用现金和头寸时。

综上所述,实现一个完整的海龟交易系统需要处理大量细节,代码的复杂度较高。为了简化,可能需要做出一些假设,比如:

只处理多头头寸,忽略空头。

每次加仓使用固定的风险比例,基于初始资金而非当前总资金。

忽略动态止盈和趋势逆转条件,只处理止损和加仓。

基于这些简化,可以编写一个较为基础的实现。

例如,以下是一个简化的Python代码框架:

import pandas as pd

def calculate_atr(df, window=14):

df['prev_close'] = df['close'].shift(1)

df['tr1'] = df['high'] - df['low']

df['tr2'] = abs(df['high'] - df['prev_close'])

df['tr3'] = abs(df['low'] - df['prev_close'])

df['tr'] = df[['tr1', 'tr2', 'tr3']].max(axis=1)

df['atr'] = df['tr'].rolling(window).mean()

return df

def turtle_trading_system(df, initial_capital=1e6, risk_percent=0.01):

df = df.copy()

计算指标

df['rolling_max_20'] = df['high'].rolling(20).max().shift(1)

df['rolling_min_20'] = df['low'].rolling(20).min().shift(1)

df = calculate_atr(df)

初始化交易信号

df['signal'] = 0

df.loc[df['close'] > df['rolling_max_20'], 'signal'] = 1

df.loc[df['close'] < df['rolling_min_20'], 'signal'] = -1

初始化持仓和资金

cash = initial_capital

position = 0

active_trades = []

trade_log = []

for i in range(len(df)):

current = df.iloc[i]
date = current.name  # 假设索引是日期

# 检查止损
for trade in active_trades:
    if trade['direction'] == 'long':
        if current['low'] <= trade['stop_loss']:
            # 止损平仓
            cash += trade['units'] * current['close']
            position -= trade['units']
            trade_log.append({
                'date': date,
                'action': '止损',
                'price': current['close'],
                'units': trade['units'],
                'profit': (current['close'] - trade['entry_price']) * trade['units']
            })
            trade['active'] = False
    # 类似处理空头止损

# 移除已平仓的交易
active_trades = [t for t in active_trades if t['active']]

# 检查加仓条件
for trade in active_trades:
    if trade['direction'] == 'long' and current['close'] >= trade['next_add_price'] and trade['add_count'] < 3:
        # 计算加仓头寸
        atr = trade['atr_entry']
        total_equity = cash + position * current['close']
        risk_amount = total_equity * risk_percent
        units = risk_amount / (2 * atr)
        if cash >= units * current['close']:
            cash -= units * current['close']
            position += units
            new_trade = {
                'entry_price': current['close'],
                'atr_entry': atr,
                'stop_loss': current['close'] - 2 * atr,
                'add_count': trade['add_count'] + 1,
                'next_add_price': current['close'] + 0.5 * atr,
                'direction': 'long',
                'units': units,
                'active': True
            }
            active_trades.append(new_trade)
            trade_log.append({
                'date': date,
                'action': '加仓',
                'price': current['close'],
                'units': units
            })

# 处理交易信号
if current['signal'] != 0:
    # 平仓现有反向持仓
    current_direction = 'long' if current['signal'] == 1 else 'short'
    if active_trades and active_trades[0]['direction'] != current_direction:
        for trade in active_trades:
            cash += trade['units'] * current['close'] if trade['direction'] == 'long' else trade['units'] * (2 * trade['entry_price'] - current['close'])
            position -= trade['units'] if trade['direction'] == 'long' else -trade['units']
            trade_log.append({
                'date': date,
                'action': '平仓',
                'price': current['close'],
                'units': trade['units'],
                'profit': (current['close'] - trade['entry_price']) * trade['units'] if trade['direction'] == 'long' else (trade['entry_price'] - current['close']) * trade['units']
            })
        active_trades = []
        position = 0
    
    # 开新仓
    atr = current['atr']
    total_equity = cash + position * current['close']
    risk_amount = total_equity * risk_percent
    units = risk_amount / (2 * atr)
    if cash >= units * current['close']:
        cash -= units * current['close']
        position += units if current['signal'] == 1 else -units
        new_trade = {
            'entry_price': current['close'],
            'atr_entry': atr,
            'stop_loss': current['close'] - 2 * atr if current['signal'] == 1 else current['close'] + 2 * atr,
            'add_count': 0,
            'next_add_price': current['close'] + (0.5 * atr if current['signal'] == 1 else -0.5 * atr),
            'direction': 'long' if current['signal'] == 1 else 'short',
            'units': units,
            'active': True
        }
        active_trades.append(new_trade)
        trade_log.append({
            'date': date,
            'action': '买入' if current['signal'] == 1 else '卖出',
            'price': current['close'],
            'units': units
        })

最终平仓

if position != 0:

for trade in active_trades:
    cash += trade['units'] * df.iloc[-1]['close']
    position -= trade['units']
    trade_log.append({
        'date': df.iloc[-1].name,
        'action': '平仓',
        'price': df.iloc[-1]['close'],
        'units': trade['units'],
        'profit': (df.iloc[-1]['close'] - trade['entry_price']) * trade['units']
    })

计算最终净值

final_value = cash + position * df.iloc[-1]['close']

return trade_log, final_value

使用示例

假设data是包含必要列的DataFrame

trade_log, final_value = turtle_trading_system(data)

print(f"最终资金:{final_value}")

这段代码是一个简化版本,可能还存在一些错误和未处理的情况,比如:

空头头寸的处理未完全实现。

加仓时未考虑多个加仓单位的不同止损价。

信号可能在同一天多次触发,导致重复交易。

ATR在入场时可能为NaN,导致计算错误。

未处理数据开始的NaN值(如rolling_max_20前19天为NaN)。

资金管理中的总资金计算可能不正确,尤其是在持仓存在浮盈或浮亏时。

此外,代码中的交易执行价格假设为当天的收盘价,而实际中信号是在收盘后产生,交易在下一个交易日执行。因此,可能需要调整信号和执行的对应关系,比如使用.shift()来避免未来函数。

尽管如此,这段代码提供了一个基本的框架,用户可以根据实际需求进行调整和优化,例如添加更多风险管理规则、参数调整、绩效评估等。

Python实现海龟交易系统

Python实现海龟交易系统

下面我将用Python实现一个简化版的海龟交易系统,包含核心的交易逻辑和风险管理功能。

  
import numpy as np
  
import pandas as pd
  
import matplotlib.pyplot as plt
  

  
class TurtleTradingSystem:
  
    def __init__(self, initial_capital=1000000, risk_per_trade=0.01, atr_period=20, 
  
                 entry_period=20, exit_period=55, max_position_units=4):
  
        """
  
        初始化海龟交易系统
  
        
  
        参数:
  
            initial_capital: 初始资金
  
            risk_per_trade: 单笔交易风险比例(1-2%)
  
            atr_period: ATR计算周期
  
            entry_period: 入场突破周期(20日)
  
            exit_period: 出场突破周期(55日)
  
            max_position_units: 最大加仓次数(金字塔加仓)
  
        """
  
        self.initial_capital = initial_capital
  
        self.capital = initial_capital
  
        self.risk_per_trade = risk_per_trade
  
        self.atr_period = atr_period
  
        self.entry_period = entry_period
  
        self.exit_period = exit_period
  
        self.max_position_units = max_position_units
  
        self.positions = []
  
        self.trades = []
  
        self.equity = []
  
        
  
    def calculate_atr(self, data):
  
        """计算真实波幅(ATR)"""
  
        high_low = data['high'] - data['low']
  
        high_close = np.abs(data['high'] - data['close'].shift())
  
        low_close = np.abs(data['low'] - data['close'].shift())
  
        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
  
        atr = tr.rolling(self.atr_period).mean()
  
        return atr
  
    
  
    def calculate_breakouts(self, data):
  
        """计算突破点"""
  
        data['20_day_high'] = data['high'].rolling(self.entry_period).max()
  
        data['20_day_low'] = data['low'].rolling(self.entry_period).min()
  
        data['55_day_low'] = data['low'].rolling(self.exit_period).min()
  
        data['55_day_high'] = data['high'].rolling(self.exit_period).max()
  
        return data
  
    
  
    def calculate_position_size(self, price, atr, contract_size=1):
  
        """
  
        计算头寸规模
  
        公式: 头寸规模 = (账户风险比例 × 总资金) / (ATR × 合约乘数)
  
        """
  
        if atr == 0:
  
            return 0
  
        position_size = (self.risk_per_trade * self.capital) / (atr * contract_size)
  
        return int(position_size)
  
    
  
    def run_backtest(self, data, contract_size=1):
  
        """运行回测"""
  
        data = data.copy()
  
        data = self.calculate_breakouts(data)
  
        data['atr'] = self.calculate_atr(data)
  
        
  
        position = None
  
        units = 0
  
        entry_price = 0
  
        stop_loss = 0
  
        add_positions = []
  
        
  
        for i in range(len(data)):
  
            current_price = data['close'].iloc[i]
  
            current_atr = data['atr'].iloc[i]
  
            current_20_high = data['20_day_high'].iloc[i]
  
            current_20_low = data['20_day_low'].iloc[i]
  
            current_55_low = data['55_day_low'].iloc[i]
  
            current_55_high = data['55_day_high'].iloc[i]
  
            
  
            # 记录当前权益
  
            self.equity.append(self.capital)
  
            
  
            # 止损检查
  
            if position == 'long' and current_price <= stop_loss:
  
                profit = (current_price - entry_price) * units * contract_size
  
                self.capital += profit
  
                self.trades.append({
  
                    'type': 'long',
  
                    'entry_price': entry_price,
  
                    'exit_price': current_price,
  
                    'units': units,
  
                    'profit': profit,
  
                    'exit_reason': 'stop_loss'
  
                })
  
                position = None
  
                units = 0
  
                add_positions = []
  
            
  
            elif position == 'short' and current_price >= stop_loss:
  
                profit = (entry_price - current_price) * units * contract_size
  
                self.capital += profit
  
                self.trades.append({
  
                    'type': 'short',
  
                    'entry_price': entry_price,
  
                    'exit_price': current_price,
  
                    'units': units,
  
                    'profit': profit,
  
                    'exit_reason': 'stop_loss'
  
                })
  
                position = None
  
                units = 0
  
                add_positions = []
  
            
  
            # 趋势反转退出
  
            elif position == 'long' and current_price <= current_55_low:
  
                profit = (current_price - entry_price) * units * contract_size
  
                self.capital += profit
  
                self.trades.append({
  
                    'type': 'long',
  
                    'entry_price': entry_price,
  
                    'exit_price': current_price,
  
                    'units': units,
  
                    'profit': profit,
  
                    'exit_reason': 'trend_reversal'
  
                })
  
                position = None
  
                units = 0
  
                add_positions = []
  
            
  
            elif position == 'short' and current_price >= current_55_high:
  
                profit = (entry_price - current_price) * units * contract_size
  
                self.capital += profit
  
                self.trades.append({
  
                    'type': 'short',
  
                    'entry_price': entry_price,
  
                    'exit_price': current_price,
  
                    'units': units,
  
                    'profit': profit,
  
                    'exit_reason': 'trend_reversal'
  
                })
  
                position = None
  
                units = 0
  
                add_positions = []
  
            
  
            # 入场信号
  
            if position is None:
  
                if current_price >= current_20_high:
  
                    # 做多信号
  
                    position = 'long'
  
                    entry_price = current_price
  
                    units = self.calculate_position_size(entry_price, current_atr, contract_size)
  
                    stop_loss = entry_price - 2 * current_atr
  
                    self.trades.append({
  
                        'type': 'long',
  
                        'entry_price': entry_price,
  
                        'units': units,
  
                        'atr': current_atr,
  
                        'stop_loss': stop_loss
  
                    })
  
                
  
                elif current_price <= current_20_low:
  
                    # 做空信号
  
                    position = 'short'
  
                    entry_price = current_price
  
                    units = self.calculate_position_size(entry_price, current_atr, contract_size)
  
                    stop_loss = entry_price + 2 * current_atr
  
                    self.trades.append({
  
                        'type': 'short',
  
                        'entry_price': entry_price,
  
                        'units': units,
  
                        'atr': current_atr,
  
                        'stop_loss': stop_loss
  
                    })
  
            
  
            # 金字塔加仓
  
            elif position == 'long' and len(add_positions) < self.max_position_units - 1:
  
                if current_price >= entry_price + 0.5 * current_atr:
  
                    add_units = self.calculate_position_size(current_price, current_atr, contract_size)
  
                    units += add_units
  
                    add_positions.append(current_price)
  
                    stop_loss = current_price - 2 * current_atr
  
                    self.trades.append({
  
                        'type': 'long_add',
  
                        'entry_price': current_price,
  
                        'add_units': add_units,
  
                        'total_units': units,
  
                        'stop_loss': stop_loss
  
                    })
  
            
  
            elif position == 'short' and len(add_positions) < self.max_position_units - 1:
  
                if current_price <= entry_price - 0.5 * current_atr:
  
                    add_units = self.calculate_position_size(current_price, current_atr, contract_size)
  
                    units += add_units
  
                    add_positions.append(current_price)
  
                    stop_loss = current_price + 2 * current_atr
  
                    self.trades.append({
  
                        'type': 'short_add',
  
                        'entry_price': current_price,
  
                        'add_units': add_units,
  
                        'total_units': units,
  
                        'stop_loss': stop_loss
  
                    })
  
        
  
        return data
  
    
  
    def get_performance(self):
  
        """获取回测绩效"""
  
        if not self.trades:
  
            return None
  
            
  
        df_trades = pd.DataFrame(self.trades)
  
        df_trades['cum_profit'] = df_trades['profit'].cumsum().fillna(0)
  
        
  
        winning_trades = df_trades[df_trades['profit'] > 0]
  
        losing_trades = df_trades[df_trades['profit'] < 0]
  
        
  
        total_return = (self.capital - self.initial_capital) / self.initial_capital * 100
  
        win_rate = len(winning_trades) / len(df_trades) * 100 if len(df_trades) > 0 else 0
  
        avg_win = winning_trades['profit'].mean() if len(winning_trades) > 0 else 0
  
        avg_loss = losing_trades['profit'].mean() if len(losing_trades) > 0 else 0
  
        profit_factor = -avg_win / avg_loss if avg_loss != 0 else np.inf
  
        
  
        performance = {
  
            'initial_capital': self.initial_capital,
  
            'final_capital': self.capital,
  
            'total_return(%)': total_return,
  
            'total_trades': len(df_trades),
  
            'winning_trades': len(winning_trades),
  
            'losing_trades': len(losing_trades),
  
            'win_rate(%)': win_rate,
  
            'avg_win': avg_win,
  
            'avg_loss': avg_loss,
  
            'profit_factor': profit_factor,
  
            'max_drawdown(%)': self.calculate_max_drawdown()
  
        }
  
        
  
        return performance
  
    
  
    def calculate_max_drawdown(self):
  
        """计算最大回撤"""
  
        equity = pd.Series(self.equity)
  
        rolling_max = equity.cummax()
  
        drawdown = (equity - rolling_max) / rolling_max * 100
  
        max_drawdown = drawdown.min()
  
        return max_drawdown
  
    
  
    def plot_equity(self):
  
        """绘制资金曲线"""
  
        plt.figure(figsize=(12, 6))
  
        plt.plot(self.equity)
  
        plt.title('Equity Curve')
  
        plt.xlabel('Period')
  
        plt.ylabel('Capital')
  
        plt.grid(True)
  
        plt.show()
  

使用示例

  
# 示例数据加载和回测
  
if __name__ == "__main__":
  
    # 假设我们有一些历史数据
  
    np.random.seed(42)
  
    dates = pd.date_range(start='2020-01-01', end='2022-12-31')
  
    prices = np.cumprod(1 + np.random.normal(0, 0.01, len(dates)))
  
    
  
    data = pd.DataFrame({
  
        'date': dates,
  
        'open': prices,
  
        'high': prices * 1.02,
  
        'low': prices * 0.98,
  
        'close': prices
  
    }).set_index('date')
  
    
  
    # 初始化海龟交易系统
  
    turtle = TurtleTradingSystem(initial_capital=1000000, risk_per_trade=0.01)
  
    
  
    # 运行回测
  
    data = turtle.run_backtest(data)
  
    
  
    # 获取绩效报告
  
    performance = turtle.get_performance()
  
    print("Performance Metrics:")
  
    for key, value in performance.items():
  
        print(f"{key}: {value}")
  
    
  
    # 绘制资金曲线
  
    turtle.plot_equity()
  

代码说明

  1. 核心功能:

    • 入场信号: 20日突破(高点和低点)

    • 出场信号: 55日突破或2倍ATR止损

    • 金字塔加仓: 每0.5倍ATR加仓一次

    • 风险管理: 基于ATR的头寸规模计算

  2. 关键参数:

    risk_per_trade: 单笔交易风险比例(1-2%)

    atr_period: ATR计算周期(默认20)

    entry_period: 入场突破周期(默认20)

    exit_period: 出场突破周期(默认55)

    max_position_units: 最大加仓次数(默认4次)

  3. 绩效指标:

    • 总收益率

    • 胜率

    • 平均盈利/亏损

    • 盈亏比

    • 最大回撤

注意事项

  1. 这是一个简化版本,实际应用中可能需要:

    • 添加滑点和手续费计算

    • 考虑不同市场的合约乘数

    • 添加过滤条件(如50日波动率过滤)

    • 多品种组合测试

  2. 海龟交易系统的成功关键:

    • 严格执行规则

    • 忍受高亏损率(60%交易可能亏损)

    • 长期坚持(1年可能只有3-5笔大趋势盈利)

  3. 参数优化:

    • 不同市场可能需要调整突破周期

    • ATR倍数和加仓间隔可根据市场特性调整

这个实现包含了海龟交易法则的核心逻辑,你可以根据实际需求进一步扩展和完善。