目录摘要一、流数据表概述1.1 流数据表特点1.2 流表 vs 普通表二、创建流数据表2.1 基本创建2.2 指定容量2.3 创建带默认值的流表2.4 查看流表结构三、流表订阅3.1 基本订阅3.2 订阅参数详解3.3 多订阅者3.4 取消订阅四、处理函数4.1 简单处理4.2 过滤处理4.3 转换处理4.4 聚合处理4.5 复杂处理五、批量处理5.1 批量处理配置5.2 批量处理优势5.3 批量处理最佳实践六、流表持久化6.1 启用持久化6.2 持久化模式6.3 持久化配置6.4 持久化监控七、流表高可用7.1 高可用配置7.2 故障恢复八、实战案例8.1 实时数据采集系统九、总结参考资料摘要本文深入讲解DolphinDB流数据表的创建与订阅。从流表创建到订阅配置从处理函数到批量处理从持久化到高可用全面介绍流数据表的核心操作。通过丰富的代码示例帮助读者掌握流数据表管理的核心技能。一、流数据表概述1.1 流数据表特点流数据表特点实时写入发布订阅多订阅者实时处理核心功能数据缓冲持久化高可用1.2 流表 vs 普通表特性普通表流表写入普通写入实时写入订阅不支持支持发布订阅持久化手动自动持久化容量无限制可配置容量二、创建流数据表2.1 基本创建//创建流数据表 share streamTable(1:0,device_idtimestamptemperaturehumidity,[INT,TIMESTAMP,DOUBLE,DOUBLE])assensor_stream//参数说明//-1:0-初始容量:初始行数//-device_idtimestamptemperaturehumidity-列名//-[INT,TIMESTAMP,DOUBLE,DOUBLE]-列类型//-share-共享给所有会话//-assensor_stream-表名2.2 指定容量//指定初始容量 share streamTable(100000:0,device_idtimestamptemperaturehumidity,[INT,TIMESTAMP,DOUBLE,DOUBLE])assensor_stream//容量说明//-100000:0-预分配100000行空间初始0行//-预分配可以提高写入性能2.3 创建带默认值的流表//创建带默认值的流表 share streamTable(10000:0,device_idtimestamptemperaturehumiditystatus,[INT,TIMESTAMP,DOUBLE,DOUBLE,SYMBOL])assensor_stream//写入时可以省略某些列 insert into sensor_stream(device_id,timestamp,temperature)values(1,now(),25.5)//humidity和status使用默认值2.4 查看流表结构//查看流表结构 schema(sensor_stream)//查看流表数据 select count(*)fromsensor_stream select top100*fromsensor_stream三、流表订阅3.1 基本订阅//创建流表 share streamTable(1:0,device_idtimestamptemperature,[INT,TIMESTAMP,DOUBLE])assensor_stream//创建结果表 share table(1:0,device_idtimestamptemperature,[INT,TIMESTAMP,DOUBLE])asresult_table//订阅流表 subscribeTable(,sensor_stream,handler1,-1,def(msg){result_table.append!(msg)},true)//参数说明//-第一个参数:数据库路径流表为空//-sensor_stream:流表名//-handler1:订阅名称//--1:offset-1表示从最新开始//-handler:处理函数//-true:batchSizetrue表示批量处理3.2 订阅参数详解//完整订阅参数 subscribeTable(,//database:数据库路径sensor_stream,//tableName:流表名handler1,//actionName:订阅名称-1,//offset:起始位置 handler,//handler:处理函数 true,//batchSize:批量处理1000,//throttle:节流时间(ms)true,//hash:是否哈希分组10,//filter:过滤条件 true//reconnect:是否自动重连)//offset参数说明//--1:从最新数据开始//-0:从头开始//-N:从第N条开始3.3 多订阅者//创建多个订阅者//订阅者1实时数据 subscribeTable(,sensor_stream,realtime_handler,-1,def(msg){realtime_table.append!(msg)},true)//订阅者2告警检测 subscribeTable(,sensor_stream,alert_handler,-1,def(msg){alertsselect*frommsg where temperature30alert_table.append!(alerts)},true)//订阅者3数据持久化 subscribeTable(,sensor_stream,persist_handler,-1,def(msg){loadTable(dfs://db,sensor_data).append!(msg)},true,10000,true)3.4 取消订阅//查看订阅状态 getSubscriptionStat()//取消指定订阅 unsubscribeTable(,sensor_stream,handler1)//取消流表的所有订阅 unsubscribeTable(,sensor_stream)//取消所有订阅 unsubscribeAll()四、处理函数4.1 简单处理//简单处理直接写入defsimpleHandler(msg){result_table.append!(msg)}subscribeTable(,sensor_stream,simple,-1,simpleHandler,true)4.2 过滤处理//过滤处理deffilterHandler(msg){filteredselect*frommsg where temperature between20and30result_table.append!(filtered)}subscribeTable(,sensor_stream,filter,-1,filterHandler,true)4.3 转换处理//转换处理deftransformHandler(msg){transformedselect device_id,timestamp,temperature,temperature*1.832asfahrenheit,humidityfrommsg result_table.append!(transformed)}subscribeTable(,sensor_stream,transform,-1,transformHandler,true)4.4 聚合处理//聚合处理defaggHandler(msg){aggselect device_id,avg(temperature)asavg_temp,max(temperature)asmax_temp,min(temperature)asmin_temp,count(*)ascntfrommsg group by device_id agg_result.append!(agg)}subscribeTable(,sensor_stream,agg,-1,aggHandler,true)4.5 复杂处理//复杂处理多步骤defcomplexHandler(msg){//步骤1数据清洗 cleanedselect*frommsg where temperatureisnotnullandtemperature between-40and100//步骤2数据转换 transformedselect device_id,timestamp,temperature,casewhen temperature30then高温when temperature10then低温else正常endasstatusfromcleaned//步骤3写入结果 result_table.append!(transformed)//步骤4触发告警 alertsselect*fromtransformed where status!正常alert_table.append!(alerts)}subscribeTable(,sensor_stream,complex,-1,complexHandler,true)五、批量处理5.1 批量处理配置//批量处理累积一定数量后处理 subscribeTable(,sensor_stream,batch_handler,-1,def(msg){result_table.append!(msg)},1000)//batchSize1000累积1000条后处理//批量处理节流 subscribeTable(,sensor_stream,batch_throttle_handler,-1,def(msg){result_table.append!(msg)},1000,//batchSize10005000)//throttle5000ms最多等待5秒5.2 批量处理优势优势说明减少IO批量写入减少IO次数提高吞吐批量处理提高吞吐量降低延迟合理配置降低延迟5.3 批量处理最佳实践//最佳实践根据场景选择批量大小//高吞吐场景batchSize10000subscribeTable(,sensor_stream,high_throughput,-1,def(msg){result_table.append!(msg)},10000)//低延迟场景batchSize100subscribeTable(,sensor_stream,low_latency,-1,def(msg){result_table.append!(msg)},100)//平衡场景batchSize1000,throttle1000ms subscribeTable(,sensor_stream,balanced,-1,def(msg){result_table.append!(msg)},1000,1000)六、流表持久化6.1 启用持久化//启用持久化 enableTablePersistence(sensor_stream,true,true,1000000)//参数说明//-asynctrue:异步持久化//-synctrue:同步持久化高可靠//-capacity1000000:内存中保留的最大行数6.2 持久化模式模式说明适用场景asynctrue异步持久化高吞吐synctrue同步持久化高可靠asyncsync双重持久化最高可靠6.3 持久化配置//高吞吐配置 enableTablePersistence(sensor_stream,true,false,1000000)//高可靠配置 enableTablePersistence(sensor_stream,false,true,1000000)//平衡配置 enableTablePersistence(sensor_stream,true,true,1000000)6.4 持久化监控//查看持久化状态 getPersistenceStat()//查看持久化文件 persistenceDir()七、流表高可用7.1 高可用配置//高可用流表配置//1.启用持久化 enableTablePersistence(sensor_stream,true,true,1000000)//2.订阅配置自动重连 subscribeTable(,sensor_stream,ha_handler,-1,def(msg){result_table.append!(msg)},true,1000,false,,true)//reconnecttrue//3.监控订阅状态defmonitorSubscription(){statgetSubscriptionStat()for(rowinstat){if(row.status!OK){print(订阅异常: row.actionName)}}}7.2 故障恢复//故障恢复流程defrecoverStream(){//1.检查流表状态 statgetStreamStat()//2.重新创建流表如果需要if(stat.rows()0){share streamTable(1:0,device_idtimestamptemperature,[INT,TIMESTAMP,DOUBLE])assensor_stream enableTablePersistence(sensor_stream,true,true,1000000)}//3.重新订阅 subscribeTable(,sensor_stream,handler,-1,def(msg){result_table.append!(msg)},true)print(流表恢复完成)}八、实战案例8.1 实时数据采集系统//1.创建流表share streamTable(100000:0,device_idtimestamptemperaturehumiditypressure,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE])assensor_stream//2.启用持久化enableTablePersistence(sensor_stream,true,true,1000000)//3.创建分布式表dbdatabase(dfs://realtime_db,VALUE,1..100)schematable(1:0,device_idtimestamptemperaturehumiditypressure,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE])db.createPartitionedTable(schema,sensor_data,device_id)//4.创建订阅//实时数据订阅 share table(1:0,device_idtimestamptemperaturehumiditypressure,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE])asrealtime_data subscribeTable(,sensor_stream,realtime,-1,def(msg){realtime_data.append!(msg)},true)//持久化订阅 subscribeTable(,sensor_stream,persist,-1,def(msg){loadTable(dfs://realtime_db,sensor_data).append!(msg)},10000,5000)//告警订阅 share table(1:0,device_idtimestampalert_typevalue,[INT,TIMESTAMP,SYMBOL,DOUBLE])asalert_table subscribeTable(,sensor_stream,alert,-1,def(msg){alertsselect device_id,timestamp,temperature_highasalert_type,temperatureasvaluefrommsg where temperature30alert_table.append!(alerts)},true)//5.监控函数defmonitorStream(){print( 流表监控 )print(流表行数: string(execcount(*)fromsensor_stream))print(实时数据行数: string(execcount(*)fromrealtime_data))print(告警数量: string(execcount(*)fromalert_table))statgetSubscriptionStat()print(订阅状态: string(stat.rows()) 个订阅)}monitorStream()print(实时数据采集系统创建完成)九、总结本文详细介绍了DolphinDB流数据表的创建与订阅流表创建基本创建、指定容量、查看结构流表订阅基本订阅、参数配置、多订阅者处理函数简单、过滤、转换、聚合处理批量处理批量配置、优势分析、最佳实践流表持久化启用持久化、模式选择、监控高可用配置、故障恢复思考题如何选择合适的批量大小流表持久化有什么作用如何设计高可用的流处理系统参考资料DolphinDB流数据表DolphinDB订阅配置