目录1. 环境初始化2. 数据源3. Map 转换4. Filter 转换5. FlatMap 转换6. KeyBy 转换7. 简单聚合8. Reduce 转换9. 富函数类测试10. 执行任务代码拓展package transform import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, ReduceFunction, RichMapFunction} import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector case class Event(user:String,url:String,timestamp:Long) /** * * PROJECT_NAME: flink1.13 * PACKAGE_NAME: transform * author: 赵嘉盟-HONOR * data: 2025-05-19 2:45 * DESCRIPTION * */ object 单流转换算子 { def main(args: Array[String]): Unit { val envStreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val data env.fromElements( Event(Mary, ./home, 100L), Event(Sum, ./cart, 500L), Event(King, ./prod, 1000L), Event(King, ./root, 200L) ) //TODO Map val mapdata.map(_.user).print(map) val mapFunctiondata.map(new MapFunction[Event,String] { override def map(t: Event): String t.user }).print(mapFunction) //TODO Filter val filterdata.filter(_.userSum).print(filter) val filterFunctiondata.filter(new FilterFunction[Event] { override def filter(t: Event): Boolean t.user.contains(m) //包含 }).print(filterFunction) //TODO FlatMap val flatMapdata.flatMap(new FlatMapFunction[Event,String] { override def flatMap(t: Event, collector: Collector[String]): Unit if(t.userSum) collector.collect(t.url) }).print(flatMapFunction) //TODO KeyBy val keyBydata.keyBy(_.user).print(keyBy) val keyByFunctiondata.keyBy(new KeySelector[Event,String] { override def getKey(in: Event): String in.user }) //TODO 简单聚合Sun,Min,Max(抽取聚合前第一条数据),MinBy,MaxBy(抽取当前数据) //统计当前最大时间戳 keyByFunction.max(timestamp).print(max) keyByFunction.maxBy(2).print(maxBy) //keyByFunction.maxBy(_2).print(maxByT) //元组根据位置取元素 //TODO 规约聚合 Reduce //提取当前最活跃用户 data.map(data(data.user,1)).keyBy(_._1) .reduce(new ReduceFunction[(String, Int)] { override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) (t._1,t._2t1._2) }) //提取所有元素 .keyBy(datatrue) .reduce((state,data)if(state._2data._2) state else data) .print(reduceFunction) //TODO 富函数类测试 data.map(new RichMapFunction[Event,Long] { override def open(parameters: Configuration): Unit println(索引号为getRuntimeContext.getIndexOfThisSubtask的任务开始) override def close(): Unit println(索引号为getRuntimeContext.getIndexOfThisSubtask的任务结束) override def map(in: Event): Long in.timestamp }) env.execute(单流转换算子) } }这段代码展示了 Apache Flink 中常用的单流转换算子Single Stream Transformations的使用。以下是代码的详细解释1. 环境初始化val env StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1)创建 Flink 流处理执行环境。设置并行度为 1表示所有操作在单线程中执行。2. 数据源val data env.fromElements( Event(Mary, ./home, 100L), Event(Sum, ./cart, 500L), Event(King, ./prod, 1000L), Event(King, ./root, 200L) )使用fromElements方法创建包含 4 个Event对象的流数据。Event是一个样例类包含user用户、urlURL和timestamp时间戳三个字段。3. Map 转换val map data.map(_.user).print(map) val mapFunction data.map(new MapFunction[Event, String] { override def map(t: Event): String t.user }).print(mapFunction)map使用 Lambda 表达式提取Event中的user字段。mapFunction使用MapFunction实现类提取Event中的user字段。结果将Event转换为String用户名并打印。4. Filter 转换val filter data.filter(_.user Sum).print(filter) val filterFunction data.filter(new FilterFunction[Event] { override def filter(t: Event): Boolean t.user.contains(m) }).print(filterFunction)filter过滤出user为Sum的事件。filterFunction过滤出user包含字母m的事件。结果打印满足条件的事件。5. FlatMap 转换val flatMap data.flatMap(new FlatMapFunction[Event, String] { override def flatMap(t: Event, collector: Collector[String]): Unit if (t.user Sum) collector.collect(t.url) }).print(flatMapFunction)flatMap如果user为Sum则提取url并发送到下游。结果打印满足条件的url。6. KeyBy 转换val keyBy data.keyBy(_.user).print(keyBy) val keyByFunction data.keyBy(new KeySelector[Event, String] { override def getKey(in: Event): String in.user })keyBy根据user字段对流数据进行分组。keyByFunction使用KeySelector实现类根据user字段分组。结果将流数据按user分组。7. 简单聚合keyByFunction.max(timestamp).print(max) keyByFunction.maxBy(2).print(maxBy)max(timestamp)按user分组后提取每组中timestamp最大的记录。maxBy(2)按user分组后提取每组中第 2 个字段url最大的记录。结果打印每组中满足条件的记录。8. Reduce 转换data.map(data (data.user, 1)).keyBy(_._1) .reduce(new ReduceFunction[(String, Int)] { override def reduce(t: (String, Int), t1: (String, Int)): (String, Int) (t._1, t._2 t1._2) }) .keyBy(data true) .reduce((state, data) if (state._2 data._2) state else data) .print(reduceFunction)第一步将Event转换为(user, 1)按user分组后累加每个用户的点击次数。第二步将所有用户数据分组到一个组中找到点击次数最多的用户。结果打印最活跃的用户。9. 富函数类测试data.map(new RichMapFunction[Event, Long] { override def open(parameters: Configuration): Unit println(索引号为 getRuntimeContext.getIndexOfThisSubtask 的任务开始) override def close(): Unit println(索引号为 getRuntimeContext.getIndexOfThisSubtask 的任务结束) override def map(in: Event): Long in.timestamp })RichMapFunction在map转换中使用富函数类。open在任务开始时执行打印任务索引号。close在任务结束时执行打印任务索引号。map提取Event中的timestamp字段。10. 执行任务env.execute(单流转换算子)启动 Flink 任务执行上述所有转换操作。代码拓展增加更多事件类型扩展Event类增加更多字段如eventType、ip等以支持更复杂的业务逻辑。case class Event(user: String, url: String, eventType: String, ip: String, timestamp: Long)动态数据源使用 Kafka、Socket 或文件作为数据源而不是硬编码的fromElements。val data env.addSource(new FlinkKafkaConsumer[Event](topic, new EventSchema, properties))复杂聚合使用Window进行时间窗口聚合例如统计每 5 秒内每个用户的点击次数。data.keyBy(_.user) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .aggregate(new CountAggregator)自定义聚合函数实现自定义的聚合函数例如计算每个用户的平均点击时间间隔。class AvgIntervalAggregator extends AggregateFunction[Event, (Long, Int), Double] { override def createAccumulator(): (Long, Int) (0L, 0) override def add(in: Event, acc: (Long, Int)): (Long, Int) (acc._1 in.timestamp, acc._2 1) override def getResult(acc: (Long, Int)): Double acc._1.toDouble / acc._2 override def merge(acc1: (Long, Int), acc2: (Long, Int)): (Long, Int) (acc1._1 acc2._1, acc1._2 acc2._2) }多流合并使用union或connect将多个流合并进行联合分析。val stream1 env.fromElements(Event(Mary, ./home, 100L)) val stream2 env.fromElements(Event(Sum, ./cart, 500L)) val mergedStream stream1.union(stream2)异常处理在转换过程中增加异常处理逻辑确保任务不会因为数据异常而中断。data.map(event { try { // 业务逻辑 } catch { case e: Exception // 异常处理 } })性能优化使用BroadcastState或Async I/O优化性能例如从外部数据库查询数据。val broadcastStream data.broadcast val resultStream data.connect(broadcastStream).process(new BroadcastProcessFunction[Event, Config, Result] { override def processElement(value: Event, ctx: BroadcastProcessFunction[Event, Config, Result]#ReadOnlyContext, out: Collector[Result]): Unit { val config ctx.getBroadcastState(configDescriptor) // 业务逻辑 } override def processBroadcastElement(value: Config, ctx: BroadcastProcessFunction[Event, Config, Result]#Context, out: Collector[Result]): Unit { ctx.getBroadcastState(configDescriptor).put(key, value) } })通过这些扩展可以使代码更加灵活、健壮并适应更复杂的业务场景。