通过 Kafka 插件实时计算 K 线
admin
2024-01-30 21:46:59

3. 通过 Kafka 插件实时计算 K 线

3.1 环境准备

  • 部署 DolphinDB 集群,版本为v2.00.7
    • 快速体验,参考 单节点部署 启动 DolphinDB 单节点。
    • 生产环境,参考 高可用集群部署 完成 DolphinDB 高可用集群搭建。

  • 部署 Kafka 集群,版本为 2.13-3.1.0
    • 快速体验,参考 Apache Kakfa Quickstart 启动 Kafka 节点。
    • 生产环境,参考 Running Kafka in Production 完成 Kafka 集群搭建。

3.2 生产数据

本节通过 DolphinDB 的 replay 历史数据回放工具和 Kafka 插件,把逐笔成交数据实时发送到 Kafka 中。

Kafka 创建 Topic :topic-message

使用 Kafka 集群自带的 kafka-topics.sh 终端命令创建 Topic:

./bin/kafka-topics.sh --create --topic topic-message --bootstrap-server 192.193.168.4:9092

控制台输出结果:

Created topic topic-message.

加载 Kafka 插件并创建 Kafka Producer

DolphinDB GUI 连接 DolphinDB 节点后执行以下脚本:

// 加载插件
path = "/DolphinDB/server/plugins/kafka"
loadPlugin(path + "/PluginKafka.txt")
loadPlugin("/DolphinDB/server/plugins/kafka/PluginEncoderDecoder.txt");// 定义创建 Kafka Producer 的函数
def initKafkaProducerFunc(metadataBrokerList){producerCfg = dict(STRING, ANY)producerCfg["metadata.broker.list"] = metadataBrokerListreturn kafka::producer(producerCfg)
}// 创建 Kafka Producer 并返回句柄
producer = initKafkaProducerFunc("192.193.168.5:8992")
  • 本例中插件的安装路径为 /DolphinDB/server/plugins/kafka,用户需要根据自己实际环境进行修改。
  • 推荐 Kafka server 和 DolphinDB Server 在同一网段中。

推送数据到 Kafka Topic

DolphinDB GUI 中执行以下脚本:

// 定义推送数据到 KafKa "topic-message" Topic 的函数
def sendMsgToKafkaFunc(dataType, producer, msg){startTime = now()try {for(i in msg){kafka::produce(producer, "topic-message", 1, i, true) }		cost = now() - startTimewriteLog("[Kafka Plugin] Successed to send " + dataType + " : " + msg.size() + " rows, " + cost + " ms.")} catch(ex) {writeLog("[Kafka Plugin] Failed to send msg to kafka with error: " +ex)}
}// 创建 DolphinDB 流数据表 tickStream
colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
share(streamTable(35000000:0, colName, colType), `tickStream)// 订阅 tickStream,处理函数是 sendMsgToKafkaFunc
subscribeTable(tableName="tickStream", actionName="sendMsgToKafka", offset=-1, handler=sendMsgToKafkaFunc{`tick, producer}, msgAsTable=true, reconnect=true, batchSize=10000,throttle=1)
getHomeDir()
// 控速回放 DolphinDB 分布式中的历史数据至 tickStream
dbName = "dfs://SH_TSDB_tick"
tbName = "tick"
replayDay =  2021.12.08
testData = select * from loadTable(dbName, tbName) where date(TradeTime)=replayDay, time(TradeTime)>=09:30:00.000, time(TradeTime)<=10:00:00.000 order by TradeTime, SecurityID
submitJob("replay", "replay", replay, testData, objByName("tickStream"), `TradeTime, `TradeTime, 2000, true, 4)

kafka::produce 函数会将任意表结构的 msg 以 json 格式发送至指定的 Kafka topic。此处的 writeLog 函数会在 DolphinDB 节点的运行日志中打印每批推送的情况,方便代码调试和运维观察。

可以使用 kafka-console-consumer 命令行工具消费 Topic 为 topic-message 中的数据,验证数据是否成功写入:

./bin/kafka-console-producer.sh --bootstrap-server 192.193.168.4:9092 --from-beginning --topic topic-message

3.3 消费数据

创建消费者,主题并进行订阅

DolphinDB GUI 中执行以下脚本:

// 创建 Kafka Consumer 并返回句柄
consumerCfg = dict(STRING, ANY)
consumerCfg["metadata.broker.list"] = "192.193.168.5:8992"
consumerCfg["group.id"] = "topic-message"
consumer = kafka::consumer(consumerCfg)// 订阅 Kafka 中的 "topic-message" 主题的数据
topics = ["topic-message"]
kafka::subscribe(consumer, topics);

DolphinDB 订阅 Kafka 消息队列中数据

DolphinDB GUI 中执行以下脚本:

// 订阅 Kafka 发布消息,写入流表 tickStream_kafka
colName = `SecurityID`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum`TradeIndex`ChannelNo`TradeBSFlag`BizIndex
colType = [SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT, INT, INT, SYMBOL, INT]
share(streamTable(35000000:0, colName, colType), `tickStreamkafka)
go// Kafka 消息解析函数
def parse(mutable dictVar, mutable tickStreamkafka){try{t = dictVart.replaceColumn!(`TradeTime, temporalParse(dictVar[`TradeTime],"yyyy.MM.ddTHH:mm:ss.SSS"))tickStreamkafka.append!(t);}catch(ex){print("kafka errors : " + ex)}
}colType[1] = STRING;
decoder = EncoderDecoder::jsonDecoder(colName, colType, parse{, tickStreamkafka}, 15, 100000, 0.5)// 创建 subjob 函数
conn =  kafka::createSubJob(consumer, , decoder, "topic-message")

3.4 流计算引擎实时计算 K 线

使用 DolphinDB 内置流计算引擎计算分钟 K 线,并将结果输出到名为 OHLCVwap的结果表中。

DolphinDB GUI 中执行以下脚本:

// 创建接收实时计算结果的流数据表
colName = `TradeTime`SecurityID`OpenPrice`HighPrice`LowPrice`ClosePrice`Vwap
colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
share(streamTable(2000000:0, colName, colType), `OHLCStream)// K 线指标计算元表达式
aggrMetrics = <[ first(TradePrice), max(TradePrice), min(TradePrice), last(TradePrice), wavg(TradePrice, TradeQty) ]>// 创建引擎并将 kafka 中订阅的数据注入流计算引擎
createTimeSeriesEngine(name="OHLCVwap", windowSize=60000, step=60000, metrics=aggrMetrics, dummyTable=objByName("tickStreamkafka"), outputTable=objByName("OHLCStream"), useSystemTime=true, keyColumn=`SecurityID, useWindowStartTime=false)
subscribeTable(tableName="tickStreamkafka", actionName="OHLCVwap", offset=-1, handler=getStreamEngine("OHLCVw

相关内容

热门资讯

国泰海通:电力需求仍在上升趋势... 国泰海通发布研报称,11月份,规上工业发电量7792亿度,YOY+2.7%(10月+7.9%),YO...
神思电子(300479.SZ)... 格隆汇12月23日丨神思电子(300479.SZ)公布,近日,济南市公共资源交易中心发布《济南数字低...
金科环境:拟8480万元收购艾... 金科环境公告,公司拟向瑞能工业水基础设施亚洲有限公收购唐山艾瑞克环境科技有限公司(简称“艾瑞克”)1...
东阿阿胶(000423.SZ)... 格隆汇12月23日丨东阿阿胶(000423.SZ)公布,2025年12月23日,公司通过股票回购专用...
3000倍超购换来首日暴跌近5... 12月22日和12月23日,连续2个“18A大肉”上市首日破发,让不少投资者在年末感受到了一丝市场寒...