实时计算<2>最大回撤

最大回撤计算

描述:计算一批股票在一天的实时行情最大回撤值(绝对额)。
名词解释:最大回撤率,在选定周期内任一历史时点往后推,产品净值走到最低点时的收益率回撤幅度的最大值。
最大回撤用来描述买入产品后可能出现的最糟糕的情况。最大回撤是一个重要的风险指标,对于对冲基金和数量化策略交易,该指标比波动率还重要。

要求

实时计算,每只股票处理延迟平均不超过50ms
计算结果持久化,可查询每只股票某天的最大回撤值。

数据准备

模拟5000只股票信息
模拟5000只股票一天的实时行情记录,每只股票暂定3秒抓取一次实时行情,那么每天应该有60/3×60×4=4800条行情记录。总记录数=4800×5000=2400万条行情记录。

场景实现分析

实时计算股票最大回撤值,拟用Redis保存行情记录和股票信息,Storm计算股票某天的最大回撤值。计算完毕保存计算结果到Redis,提供查询。

Redis 数据结构


Storm计算拓扑

1
2
3
4
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("drpc-input", drpcSpout,25);
builder.setBolt("drawdown", new DrawdownBolt(), 50).noneGrouping("drpc-input");
builder.setBolt("return", new ReturnResults(),50).noneGrouping("drawdown");

算法公式

定义:对于序列$(x_1,x_2,\cdots,x_n)x_1,x_2,\cdots,x_n$ ,定义最大回撤 $d$为

$$ d = \min_{i\leq j} (x_j - x_i) = \min_j (x_j - \max_{i\leq j} x_i) $$

考虑到优化时间复杂度方面,很容易想到一个O(n)的时间复杂度 算法。因为对于每个j,j之前的最大值与j-1之前的最大值有关系,即maxj=max{xj,maxj-1},因此可以对于每次遍历只要将当前值与之前的最大值做比较,如果当前值比i之前的最大值还大,则当前最大值等于当前值,反之则等于之前最大值。对于d的维护也一样,如果当前值xj-当前最大值比之前的d还小,则当前最大回撤等于当前xj-当前最大值,反之等于之前的d。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 获取最大回撤值
* @param 按时间排序的价格列表
* @return
*/
public static double getMaxDrawdown(List<String> list){
double maxDrawdown = 0;//最大回撤值
double price = 0;//初始化比较价格
for(String v : list){
double realtimePrice = Double.parseDouble(v);//取出实时价格
double val = price - realtimePrice ;
if(val < 0)//如果实时价格大于比较价格
price = realtimePrice;
else
if(maxDrawdown < val) maxDrawdown = val;//取最大回撤值
}
return maxDrawdown;
}

软硬件环境

机器名 IP 硬件配置 操作系统 软件配置
nimbus 192.168.55.173 4cpu 2.93GHz 4G内存 Linux x86_64 redis2.8、 storm0.9.1 nimbus/UI/DRPC、zookeeper3.4.6
supervisor1 192.168.55.174 4cpu 2.93GHz 4G内存 Linux x86_64 storm0.9.1 supervisor、zookeeper3.4.6
supervisor2 192.168.55.175 4cpu 2.93GHz 4G内存 Linux x86_64 storm0.9.1 supervisor、zookeeper3.4.6
  • Nimbus 安装Redis、Storm nimbus、Storm UI和 Storm DRPC服务
  • Supervisor1和Supervisor2作为Storm supervisor计算节点

Redis状态

1
2
3
4
5
6
7
8
9
# Memory
used_memory:308482824
used_memory_human:294.19M
used_memory_rss:322035712
used_memory_peak:332559416
used_memory_peak_human:317.15M
used_memory_lua:35840
mem_fragmentation_ratio:1.04
mem_allocator:jemalloc-3.6.0

Storm UI

场景测试结果

  • 并行调用对耗时的影响

    上图可以看出Storm在并发实时计算上的优势,当调用线程数越多,平均计算耗时越少。但是提高Storm的并行节点数并没能提高计算耗时,这可能是因为并行节点在20个左右的情况已经达到并行处理最优,再多的节点只会更多耗时在节点之间的消息传递上。

  • 并行调用对CPU和内存占用率影响

    上图可以看出随着并发调用线程数提高,瞬时cpu占用率会逐渐提高。当线程数很少,Supervisor2计算节点没有分配计算请求,cpu占用率不会变化。对于内存占用率,一般情况当计算需要的内存没有超出Storm预先开辟的内存空间时,内存占用率基本不会变化。

场景测试结论

  1. 对某只股票,某天的行情最大回撤值计算平均耗时最低2.21ms最高50ms,计算结果可查询满足场景计算要求。
  2. Storm系统的处理延迟为毫秒级。
  3. Storm 在场景测试中,tuple全部ack没有fail出现。
  4. Redis 基于内存的数据库在读取和写入配合storm并行计算可以把延迟控制在毫秒级
  5. Redis 由于单机部署在 storm nimbus机器上,没有做性能测试。作为内存数据库redis提供了丰富的数据结构和处理方式。

场景应用中的一些问题

  1. supervisor2 由于未知原因当机后,DRPC服务不可用。supervisor2重启后DRPC服务依然不可用,执行rebalance依然不能分配计算任务到supervisor2。重启nimbus和重新部署Topology后正常。
  2. 当计算处理延迟在毫秒级时,对redis的访问要及时释放连接池连接,不然会在导致连接池满,无法连接redis。