Flink如何实现支付数据流的实时宽表处理?

2026-05-19 12:461阅读0评论SEO问题
  • 内容介绍
  • 相关推荐

本文共计1577个文字,预计阅读时间需要7分钟。

Flink如何实现支付数据流的实时宽表处理?

支付宽表的目的是,最主要是支付表没有订单明细,支付金额没有细分到商品上,没有制度系统能计商品类的支付状态。因此本次宽表的核心就是要把支付表的信息与订单明细关联起来。

支付宽表的目的,最主要的原因是支付表没有到订单明细,支付金额没有细分到商品上, 没有办法统计商品级的支付状况。 所以本次宽表的核心就是要把支付表的信息与订单明细关联上。 支付宽表

支付宽表的目的,最主要的原因是支付表没有到订单明细,支付金额没有细分到商品上, 没有办法统计商品级的支付状况。 所以本次宽表的核心就是要把支付表的信息与订单明细关联上。

解决方案有两个

一个是把订单明细表(或者宽表)输出到 Hbase 上,在支付宽表计算时查询 hbase, 这相当于把订单明细作为一种维度进行管理。

一个是用流的方式接收订单明细,然后用双流 join 方式进行合并。因为订单与支付产 生有一定的时差。所以必须用 intervalJoin 来管理流的状态时间,保证当支付到达时订 单明细还保存在状态中。

支付相关实体类

PaymentInfo.java:支付实体类

import lombok.Data; import java.math.BigDecimal; /** * @author zhangbaohpu * @date 2021/12/25 10:08 * @desc 支付实体类 */ @Data public class PaymentInfo { Long id; Long order_id; Long user_id; BigDecimal total_amount; String subject; String payment_type; String create_time; String callback_time; }

PaymentWide.java:支付宽表实体类

import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.beanutils.BeanUtils; import java.lang.reflect.InvocationTargetException; import java.math.BigDecimal; /** * @author zhangbaohpu * @date 2021/12/25 10:10 * @desc 支付宽表实体类 */ @Data @AllArgsConstructor @NoArgsConstructor public class PaymentWide { Long payment_id; String subject; String payment_type; String payment_create_time; String callback_time; Long detail_id; Long order_id ; Long sku_id; BigDecimal order_price ; Long sku_num ; String sku_name; Long province_id; String order_status; Long user_id; BigDecimal total_amount; BigDecimal activity_reduce_amount; BigDecimal coupon_reduce_amount; BigDecimal original_total_amount; BigDecimal feight_fee; BigDecimal split_feight_fee; BigDecimal split_activity_amount; BigDecimal split_coupon_amount; BigDecimal split_total_amount; String order_create_time; String province_name;//查询维表得到 String province_area_code; String province_iso_code; String province_3166_2_code; Integer user_age ; String user_gender; Long spu_id; //作为维度数据 要关联进来 Long tm_id; Long category3_id; String spu_name; String tm_name; String category3_name; public PaymentWide(PaymentInfo paymentInfo, OrderWide orderWide){ mergeOrderWide(orderWide); mergePaymentInfo(paymentInfo); } public void mergePaymentInfo(PaymentInfo paymentInfo ) { if (paymentInfo != null) { try { BeanUtils.copyProperties(this,paymentInfo); payment_create_time=paymentInfo.create_time; payment_id = paymentInfo.id; } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } public void mergeOrderWide(OrderWide orderWide ) { if (orderWide != null) { try { BeanUtils.copyProperties(this,orderWide); order_create_time=orderWide.create_time; } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } } 支付宽表主程序

目前还没有任何计算,仍然放在dwm层

在dwm包下创建PaymentWideApp.java任务类

import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.zhangbao.gmall.realtime.bean.OrderWide; import com.zhangbao.gmall.realtime.bean.PaymentInfo; import com.zhangbao.gmall.realtime.bean.PaymentWide; import com.zhangbao.gmall.realtime.utils.MyKafkaUtil; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.util.Collector; ​ import java.time.Duration; ​ /** * @author zhangbaohpu * @date 2021/12/25 10:16 * @desc 支付宽表 */ public class PaymentWideApp { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //添加并行度 env.setParallelism(4); ​ //设置检查点 // env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setCheckpointTimeout(60000); // env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/paymentWide")); // //指定哪个用户读取hdfs文件 // System.setProperty("HADOOP_USER_NAME","zhangbao"); ​ //设置kafka主题及消费者组 String paymentInfoTopic = "dwd_payment_info"; String orderWideTopic = "dwm_order_wide"; String paymentWideTopic = "dwm_payment_wide"; String paymentWideGroup = "paymentWideGroup"; ​ //获取支付信息 FlinkKafkaConsumer<String> paymentInfo = MyKafkaUtil.getKafkaSource(paymentInfoTopic, paymentWideGroup); DataStreamSource<String> paymentInfoJsonStrDs = env.addSource(paymentInfo); //获取订单宽表信息 FlinkKafkaConsumer<String> orderWide = MyKafkaUtil.getKafkaSource(orderWideTopic, paymentWideGroup); DataStreamSource<String> orderWideJsonStrDs = env.addSource(orderWide); ​ //转换格式 SingleOutputStreamOperator<PaymentInfo> paymentJsonDs = paymentInfoJsonStrDs.map(paymentInfoStr -> JSON.parseObject(paymentInfoStr, PaymentInfo.class)); SingleOutputStreamOperator<OrderWide> orderWideJsonDs = orderWideJsonStrDs.map(orderWideStr -> JSON.parseObject(orderWideStr, OrderWide.class)); ​ paymentJsonDs.print("payment info >>>"); orderWideJsonDs.print("order wide >>>"); ​ //指定事件时间字段 SingleOutputStreamOperator<PaymentInfo> paymentInfoWithWaterMarkDs = paymentJsonDs.assignTimestampsAndWatermarks( WatermarkStrategy.<PaymentInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<PaymentInfo>() { @Override public long extractTimestamp(PaymentInfo paymentInfo, long l) { return DateUtil.parse(paymentInfo.getCallback_time(), DatePattern.NORM_DATETIME_PATTERN).getTime(); } }) ); SingleOutputStreamOperator<OrderWide> orderWideWithWaterMarkDs = orderWideJsonDs.assignTimestampsAndWatermarks( WatermarkStrategy.<OrderWide>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() { @Override public long extractTimestamp(OrderWide orderWide, long l) { return DateUtil.parse(orderWide.getCreate_time(), DatePattern.NORM_DATETIME_PATTERN).getTime(); } }) ); ​ //分组 KeyedStream<PaymentInfo, Long> paymentInfoKeyedDs = paymentInfoWithWaterMarkDs.keyBy(payInfoObj -> payInfoObj.getOrder_id()); KeyedStream<OrderWide, Long> orderWideKeyedDs = orderWideWithWaterMarkDs.keyBy(orderWideObj -> orderWideObj.getOrder_id()); ​ paymentInfoKeyedDs.print("paymentInfoKeyedDs >>>"); orderWideKeyedDs.print("orderWideKeyedDs >>>"); ​ //双流join,用支付数据关联订单数据 SingleOutputStreamOperator<PaymentWide> paymentWideObjDs = paymentInfoKeyedDs.intervalJoin(orderWideKeyedDs) .between(Time.seconds(-1800), Time.seconds(1800)) .process(new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() { @Override public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>.Context context, Collector<PaymentWide> collector) throws Exception { System.out.println(paymentInfo); System.out.println(orderWide); collector.collect(new PaymentWide(paymentInfo, orderWide)); } }); //将数据流转换为json SingleOutputStreamOperator<String> paymentWideDs = paymentWideObjDs.map(paymentWide -> JSON.toJSONString(paymentWide)); paymentWideDs.print("payment wide json >>> "); //发送到kafka FlinkKafkaProducer<String> kafkaSink = MyKafkaUtil.getKafkaSink(paymentWideTopic); paymentWideDs.addSink(kafkaSink); ​ try { env.execute("payment wide task"); } catch (Exception e) { e.printStackTrace(); } } }

到这里,支付宽表的操作就完成了。

项目地址:github.com/zhangbaohpu/gmall-flink-parent/tree/master/gmall-realtime

总结

DWM 层部分的代码主要的责任,是通过计算把一种明细转变为另一种明细以应对后续的统计。学完本阶段内容要求掌握

  • 学会利用状态(state)进行去重操作。(需求:UV 计算)

  • 学会利用 CEP 可以针对一组数据进行筛选判断。需求:跳出行为计算

  • 学会使用 intervalJoin 处理流 join

  • 学会处理维度关联,并通过缓存和异步查询对其进行性能优化。

    Flink如何实现支付数据流的实时宽表处理?

更多请在某公号平台搜索:选手一号位,本文编号:1011,回复即可获取。

本文共计1577个文字,预计阅读时间需要7分钟。

Flink如何实现支付数据流的实时宽表处理?

支付宽表的目的是,最主要是支付表没有订单明细,支付金额没有细分到商品上,没有制度系统能计商品类的支付状态。因此本次宽表的核心就是要把支付表的信息与订单明细关联起来。

支付宽表的目的,最主要的原因是支付表没有到订单明细,支付金额没有细分到商品上, 没有办法统计商品级的支付状况。 所以本次宽表的核心就是要把支付表的信息与订单明细关联上。 支付宽表

支付宽表的目的,最主要的原因是支付表没有到订单明细,支付金额没有细分到商品上, 没有办法统计商品级的支付状况。 所以本次宽表的核心就是要把支付表的信息与订单明细关联上。

解决方案有两个

一个是把订单明细表(或者宽表)输出到 Hbase 上,在支付宽表计算时查询 hbase, 这相当于把订单明细作为一种维度进行管理。

一个是用流的方式接收订单明细,然后用双流 join 方式进行合并。因为订单与支付产 生有一定的时差。所以必须用 intervalJoin 来管理流的状态时间,保证当支付到达时订 单明细还保存在状态中。

支付相关实体类

PaymentInfo.java:支付实体类

import lombok.Data; import java.math.BigDecimal; /** * @author zhangbaohpu * @date 2021/12/25 10:08 * @desc 支付实体类 */ @Data public class PaymentInfo { Long id; Long order_id; Long user_id; BigDecimal total_amount; String subject; String payment_type; String create_time; String callback_time; }

PaymentWide.java:支付宽表实体类

import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import org.apache.commons.beanutils.BeanUtils; import java.lang.reflect.InvocationTargetException; import java.math.BigDecimal; /** * @author zhangbaohpu * @date 2021/12/25 10:10 * @desc 支付宽表实体类 */ @Data @AllArgsConstructor @NoArgsConstructor public class PaymentWide { Long payment_id; String subject; String payment_type; String payment_create_time; String callback_time; Long detail_id; Long order_id ; Long sku_id; BigDecimal order_price ; Long sku_num ; String sku_name; Long province_id; String order_status; Long user_id; BigDecimal total_amount; BigDecimal activity_reduce_amount; BigDecimal coupon_reduce_amount; BigDecimal original_total_amount; BigDecimal feight_fee; BigDecimal split_feight_fee; BigDecimal split_activity_amount; BigDecimal split_coupon_amount; BigDecimal split_total_amount; String order_create_time; String province_name;//查询维表得到 String province_area_code; String province_iso_code; String province_3166_2_code; Integer user_age ; String user_gender; Long spu_id; //作为维度数据 要关联进来 Long tm_id; Long category3_id; String spu_name; String tm_name; String category3_name; public PaymentWide(PaymentInfo paymentInfo, OrderWide orderWide){ mergeOrderWide(orderWide); mergePaymentInfo(paymentInfo); } public void mergePaymentInfo(PaymentInfo paymentInfo ) { if (paymentInfo != null) { try { BeanUtils.copyProperties(this,paymentInfo); payment_create_time=paymentInfo.create_time; payment_id = paymentInfo.id; } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } public void mergeOrderWide(OrderWide orderWide ) { if (orderWide != null) { try { BeanUtils.copyProperties(this,orderWide); order_create_time=orderWide.create_time; } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } } 支付宽表主程序

目前还没有任何计算,仍然放在dwm层

在dwm包下创建PaymentWideApp.java任务类

import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DateUnit; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSON; import com.zhangbao.gmall.realtime.bean.OrderWide; import com.zhangbao.gmall.realtime.bean.PaymentInfo; import com.zhangbao.gmall.realtime.bean.PaymentWide; import com.zhangbao.gmall.realtime.utils.MyKafkaUtil; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.util.Collector; ​ import java.time.Duration; ​ /** * @author zhangbaohpu * @date 2021/12/25 10:16 * @desc 支付宽表 */ public class PaymentWideApp { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //添加并行度 env.setParallelism(4); ​ //设置检查点 // env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); // env.getCheckpointConfig().setCheckpointTimeout(60000); // env.setStateBackend(new FsStateBackend("hdfs://hadoop101:9000/gmall/flink/checkpoint/paymentWide")); // //指定哪个用户读取hdfs文件 // System.setProperty("HADOOP_USER_NAME","zhangbao"); ​ //设置kafka主题及消费者组 String paymentInfoTopic = "dwd_payment_info"; String orderWideTopic = "dwm_order_wide"; String paymentWideTopic = "dwm_payment_wide"; String paymentWideGroup = "paymentWideGroup"; ​ //获取支付信息 FlinkKafkaConsumer<String> paymentInfo = MyKafkaUtil.getKafkaSource(paymentInfoTopic, paymentWideGroup); DataStreamSource<String> paymentInfoJsonStrDs = env.addSource(paymentInfo); //获取订单宽表信息 FlinkKafkaConsumer<String> orderWide = MyKafkaUtil.getKafkaSource(orderWideTopic, paymentWideGroup); DataStreamSource<String> orderWideJsonStrDs = env.addSource(orderWide); ​ //转换格式 SingleOutputStreamOperator<PaymentInfo> paymentJsonDs = paymentInfoJsonStrDs.map(paymentInfoStr -> JSON.parseObject(paymentInfoStr, PaymentInfo.class)); SingleOutputStreamOperator<OrderWide> orderWideJsonDs = orderWideJsonStrDs.map(orderWideStr -> JSON.parseObject(orderWideStr, OrderWide.class)); ​ paymentJsonDs.print("payment info >>>"); orderWideJsonDs.print("order wide >>>"); ​ //指定事件时间字段 SingleOutputStreamOperator<PaymentInfo> paymentInfoWithWaterMarkDs = paymentJsonDs.assignTimestampsAndWatermarks( WatermarkStrategy.<PaymentInfo>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<PaymentInfo>() { @Override public long extractTimestamp(PaymentInfo paymentInfo, long l) { return DateUtil.parse(paymentInfo.getCallback_time(), DatePattern.NORM_DATETIME_PATTERN).getTime(); } }) ); SingleOutputStreamOperator<OrderWide> orderWideWithWaterMarkDs = orderWideJsonDs.assignTimestampsAndWatermarks( WatermarkStrategy.<OrderWide>forBoundedOutOfOrderness(Duration.ofSeconds(3)) .withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() { @Override public long extractTimestamp(OrderWide orderWide, long l) { return DateUtil.parse(orderWide.getCreate_time(), DatePattern.NORM_DATETIME_PATTERN).getTime(); } }) ); ​ //分组 KeyedStream<PaymentInfo, Long> paymentInfoKeyedDs = paymentInfoWithWaterMarkDs.keyBy(payInfoObj -> payInfoObj.getOrder_id()); KeyedStream<OrderWide, Long> orderWideKeyedDs = orderWideWithWaterMarkDs.keyBy(orderWideObj -> orderWideObj.getOrder_id()); ​ paymentInfoKeyedDs.print("paymentInfoKeyedDs >>>"); orderWideKeyedDs.print("orderWideKeyedDs >>>"); ​ //双流join,用支付数据关联订单数据 SingleOutputStreamOperator<PaymentWide> paymentWideObjDs = paymentInfoKeyedDs.intervalJoin(orderWideKeyedDs) .between(Time.seconds(-1800), Time.seconds(1800)) .process(new ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>() { @Override public void processElement(PaymentInfo paymentInfo, OrderWide orderWide, ProcessJoinFunction<PaymentInfo, OrderWide, PaymentWide>.Context context, Collector<PaymentWide> collector) throws Exception { System.out.println(paymentInfo); System.out.println(orderWide); collector.collect(new PaymentWide(paymentInfo, orderWide)); } }); //将数据流转换为json SingleOutputStreamOperator<String> paymentWideDs = paymentWideObjDs.map(paymentWide -> JSON.toJSONString(paymentWide)); paymentWideDs.print("payment wide json >>> "); //发送到kafka FlinkKafkaProducer<String> kafkaSink = MyKafkaUtil.getKafkaSink(paymentWideTopic); paymentWideDs.addSink(kafkaSink); ​ try { env.execute("payment wide task"); } catch (Exception e) { e.printStackTrace(); } } }

到这里,支付宽表的操作就完成了。

项目地址:github.com/zhangbaohpu/gmall-flink-parent/tree/master/gmall-realtime

总结

DWM 层部分的代码主要的责任,是通过计算把一种明细转变为另一种明细以应对后续的统计。学完本阶段内容要求掌握

  • 学会利用状态(state)进行去重操作。(需求:UV 计算)

  • 学会利用 CEP 可以针对一组数据进行筛选判断。需求:跳出行为计算

  • 学会使用 intervalJoin 处理流 join

  • 学会处理维度关联,并通过缓存和异步查询对其进行性能优化。

    Flink如何实现支付数据流的实时宽表处理?

更多请在某公号平台搜索:选手一号位,本文编号:1011,回复即可获取。