如何使用Flink高效处理Wikipedia实时消息?

2026-06-11 13:219阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1100个文字,预计阅读时间需要5分钟。

如何使用Flink高效处理Wikipedia实时消息?

欢迎访问我的GitHub,这里分类汇总了全部原创(含配套源码):https://github.com/zq2599/blog_demos关于Wikipedia Edit Stream,Wikipedia Edit Stream官方网站提供的经典demo,该应用应使用免费的消息。

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):github.com/zq2599/blog_demos

关于Wikipedia Edit Stream

  • Wikipedia Edit Stream是Flink官网提供的一个经典demo,该应用消费的消息来自维基百科,消息中包含了用户名对wiki的编辑情况,demo的官方资料地址:ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html

消息来源

  • 消息的DataSource是个名为WikipediaEditsSource的类,这里面建立了到irc.wikimedia.org的Socker连接,再通过Internet Relay Chat (IRC) 协议接收对方的数据,收到数据后保存在阻塞队列中,通过一个while循环不停的从队列取出数据,再调用SourceContext的collect方法,就在Flink中将这条数据生产出来了;

  • IRC是应用层协议,更多细节请看:en.wikipedia.org/wiki/Internet_Relay_Chat

  • 关于WikipediaEditsSource类的深入分析,请参考《Flink数据源拆解分析(WikipediaEditsSource)》

实战简介

  • 本次实战就是消费上述消息,然后统计每个用户十五秒内所有的消息,将每次操作的字节数累加起来,就得到用户十五秒内操作的字节数总和,并且每次累加了多少都会记录下来并最终和聚合结果一起展示;

和官网demo的不同之处

  • 和官网的demo略有不同,官网用的是Tuple2来处理数据,但我这里用了Tuple3,多保存了一个StringBuilder对象,用来记录每次聚合时加了哪些值,这样在结果中通过这个字段就能看出来这个时间窗口内每个用户做了多少次聚合,每次是什么值:

环境信息

  • Flink:1.7;
  • 运行模式:单机(官网称之为Local Flink Cluster);
  • Flink所在机器的操作系统:CentOS Linux release 7.5.1804;
  • 开发环境JDK:1.8.0_181;
  • 开发环境Maven:3.5.0;

操作步骤简介

  • 今天的实战分为以下步骤:
  • 创建应用;
  • 编码;
  • 构建;
  • 部署运行;

创建应用

  • 应用基本代码是通过mvn命令创建的,在命令行输入以下命令:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
  • 按控制台的提示输入groupId、artifactId、version、package等信息,一路回车确认后,会生成一个和你输入的artifactId同名的文件夹(我这里是wikipediaeditstreamdemo),里面是个maven工程:
Define value for property 'groupId': com.bolingcavalry Define value for property 'artifactId': wikipediaeditstreamdemo Define value for property 'version' 1.0-SNAPSHOT: : Define value for property 'package' com.bolingcavalry: : Confirm properties configuration: groupId: com.bolingcavalry artifactId: wikipediaeditstreamdemo version: 1.0-SNAPSHOT package: com.bolingcavalry Y: :
  • 用IEDA导入这个maven工程,如下图,已经有了两个类:BatchJob和StreamingJob,BatchJob是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的StreamingJob:

  • 应用创建成功,接下来可以开始编码了;

编码

  • 您可以选择直接从GitHub下载这个工程的源码,地址和链接信息如下表所示:
名称 链接 备注 项目主页 github.com/zq2599/blog_demos 该项目在GitHub上的主页 git仓库地址(github.com/zq2599/blog_demos.git 该项目源码的仓库地址,192.168.1.103:8081 ;
  • 选择刚刚生成的jar文件作为一个新的任务,如下图:
    • 点击下图红框中的"upload",将文件提交:

    • 目前还只是将jar文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框2中填写的前面编写的StreamingJob类的完整名称:

    如何使用Flink高效处理Wikipedia实时消息?

    • 提交后的页面效果如下图所示,可见一个job已经在运行中了:

    • 接下来看看我们的job的执行效果,如下图,以用户名聚合后的字数统计已经被打印出来了,并且Details后面的内容还展示了具体的聚合情况:

    • 至此,一个实施处理的Flink应用就开发完成了,希望能给您的开发过程提供一些参考,后面的实战中咱们一起继续深入学习和探讨Flink;

    欢迎关注51CTO博客:程序员欣宸

    学习路上,你不孤单,欣宸原创一路相伴...

    本文共计1100个文字,预计阅读时间需要5分钟。

    如何使用Flink高效处理Wikipedia实时消息?

    欢迎访问我的GitHub,这里分类汇总了全部原创(含配套源码):https://github.com/zq2599/blog_demos关于Wikipedia Edit Stream,Wikipedia Edit Stream官方网站提供的经典demo,该应用应使用免费的消息。

    欢迎访问我的GitHub

    这里分类和汇总了欣宸的全部原创(含配套源码):github.com/zq2599/blog_demos

    关于Wikipedia Edit Stream

    • Wikipedia Edit Stream是Flink官网提供的一个经典demo,该应用消费的消息来自维基百科,消息中包含了用户名对wiki的编辑情况,demo的官方资料地址:ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html

    消息来源

    • 消息的DataSource是个名为WikipediaEditsSource的类,这里面建立了到irc.wikimedia.org的Socker连接,再通过Internet Relay Chat (IRC) 协议接收对方的数据,收到数据后保存在阻塞队列中,通过一个while循环不停的从队列取出数据,再调用SourceContext的collect方法,就在Flink中将这条数据生产出来了;

    • IRC是应用层协议,更多细节请看:en.wikipedia.org/wiki/Internet_Relay_Chat

    • 关于WikipediaEditsSource类的深入分析,请参考《Flink数据源拆解分析(WikipediaEditsSource)》

    实战简介

    • 本次实战就是消费上述消息,然后统计每个用户十五秒内所有的消息,将每次操作的字节数累加起来,就得到用户十五秒内操作的字节数总和,并且每次累加了多少都会记录下来并最终和聚合结果一起展示;

    和官网demo的不同之处

    • 和官网的demo略有不同,官网用的是Tuple2来处理数据,但我这里用了Tuple3,多保存了一个StringBuilder对象,用来记录每次聚合时加了哪些值,这样在结果中通过这个字段就能看出来这个时间窗口内每个用户做了多少次聚合,每次是什么值:

    环境信息

    • Flink:1.7;
    • 运行模式:单机(官网称之为Local Flink Cluster);
    • Flink所在机器的操作系统:CentOS Linux release 7.5.1804;
    • 开发环境JDK:1.8.0_181;
    • 开发环境Maven:3.5.0;

    操作步骤简介

    • 今天的实战分为以下步骤:
    • 创建应用;
    • 编码;
    • 构建;
    • 部署运行;

    创建应用

    • 应用基本代码是通过mvn命令创建的,在命令行输入以下命令:
    mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
    • 按控制台的提示输入groupId、artifactId、version、package等信息,一路回车确认后,会生成一个和你输入的artifactId同名的文件夹(我这里是wikipediaeditstreamdemo),里面是个maven工程:
    Define value for property 'groupId': com.bolingcavalry Define value for property 'artifactId': wikipediaeditstreamdemo Define value for property 'version' 1.0-SNAPSHOT: : Define value for property 'package' com.bolingcavalry: : Confirm properties configuration: groupId: com.bolingcavalry artifactId: wikipediaeditstreamdemo version: 1.0-SNAPSHOT package: com.bolingcavalry Y: :
    • 用IEDA导入这个maven工程,如下图,已经有了两个类:BatchJob和StreamingJob,BatchJob是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的StreamingJob:

    • 应用创建成功,接下来可以开始编码了;

    编码

    • 您可以选择直接从GitHub下载这个工程的源码,地址和链接信息如下表所示:
    名称 链接 备注 项目主页 github.com/zq2599/blog_demos 该项目在GitHub上的主页 git仓库地址(github.com/zq2599/blog_demos.git 该项目源码的仓库地址,192.168.1.103:8081 ;
  • 选择刚刚生成的jar文件作为一个新的任务,如下图:
    • 点击下图红框中的"upload",将文件提交:

    • 目前还只是将jar文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框2中填写的前面编写的StreamingJob类的完整名称:

    如何使用Flink高效处理Wikipedia实时消息?

    • 提交后的页面效果如下图所示,可见一个job已经在运行中了:

    • 接下来看看我们的job的执行效果,如下图,以用户名聚合后的字数统计已经被打印出来了,并且Details后面的内容还展示了具体的聚合情况:

    • 至此,一个实施处理的Flink应用就开发完成了,希望能给您的开发过程提供一些参考,后面的实战中咱们一起继续深入学习和探讨Flink;

    欢迎关注51CTO博客:程序员欣宸

    学习路上,你不孤单,欣宸原创一路相伴...