如何将Flink Kubernetes Application部署转化为一个长尾词?

2026-04-18 01:511阅读0评论SEO问题
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计817个文字,预计阅读时间需要4分钟。

如何将Flink Kubernetes Application部署转化为一个长尾词?

Flink on Kubernetes 应用部署环境配置:- 使用 Flink 1.12.2- Docker 版本 20.10.7- Kubernetes 版本 1.20.2- Java 版本 1.8- Kubernetes 集成配置

Flink Kubernetes Application部署 环境

Flink 1.12.2
Docker 20.10.7
Kubernetes 1.20.2
JDK 1.8

K8s 配置

#创建flink的使用账户(账户名可以自定义,-n flink可以省略使用k8s默认的命名空间) kubectl create serviceaccount flinkaccount -n flink #对创建的用户赋予编辑权限(flink:flinkaccount是指想flink命名空间下的flinkaccount账户,可以使用default默认的命名空间) kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flinkaccount Docker 镜像配置

FROM flink:1.12.2 RUN mkdir -p $FLINK_HOME/usrlib #默认存放flink任务的jar在$FLINK_HOME/usrlib,k8s启动后会在这个文件夹寻找启动的jar COPY ./application.jar $FLINK_HOME/usrlib/startup.jar

#打包Docker镜像,并推送镜像到远端仓库 docker build -t build:v1.0 . docker tag build:v1.0 192.168.1.11/xxx/build:v1.0 docker push 192.168.1.11/xxx/build:v1.0 Java 代码

import io.fabric8.kubernetes.client.osgi.ManagedKubernetesClient; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.configuration.*; import org.apache.flink.kubernetes.KubernetesClusterDescriptor; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient; import java.util.*; import java.util.concurrent.Executors; @Slf4j @Data public final class StartupFlinkKubernetes { /** * K8s配置信息 */ private Configuration configuration; /** * K8s的客户端 */ private Fabric8FlinkKubeClient client; /** * K8s客户端 */ private ManagedKubernetesClient managedKubernetesClient; /** * 加载K8s的配置信息 */ private void loadConfig(){ //加载一个默认K8s的配置 configuration = GlobalConfiguration.loadConfiguration(); //设置k8s的部署模式 configuration.set(DeploymentOptions.ATTACHED,false); configuration.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName()); //设置应用的名称 configuration.set(KubernetesConfigOptions.CLUSTER_ID, "applicationName"); //设置Docker的远端镜像地址 configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE, "192.168.1.11/xxx/build:v1.0"); //设置K8s的命名空间(部署应用的命名空间)(可选) configuration.set(KubernetesConfigOptions.NAMESPACE, "flink"); //设置k8s使用的K8s账号(账号名称可自定义) configuration.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, "flinkaccount"); //设置K8s对外暴露的端口方式,这里直接使用宿主机端口,可以在外面访问Flink的后台管理界面 configuration.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,KubernetesConfigOptions.ServiceExposedType.NodePort); //设置K8s拉取Docker镜像的方式:Always 每次都拉取,Never 不拉取,IfNotPresent 本地没有再拉取 configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, KubernetesConfigOptions.ImagePullPolicy.Always); //设置镜像内启动Flink的jar包文件路径 //仅支持file://开头的协议,且此jar包已经再Docker镜像中 //Docker镜像的基础是Flink:1.12.0 configuration.set(PipelineOptions.JARS, Collections.singletonList("file://")); //设置Flink的占用内存大小 configuration.set(JobManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1024 mb")); //如果启动不起来,报错说是缺少配置可以在这里添加配置 //具体的配置根据:flink/conf/flink-conf.yaml中的配置决定 configuration.setString("taskmanager.memory.flink.size","1 gb"); //千万别自己设置FLINK_CONF_DIR选项 //这个选项会在启动K8s应用的时候读取宿主机上这个路径/conf的文件作为Docker镜像里面的Flink的conf //而且启动后执行Flink的环境是/opt/flink的不会使用这里配置的FLINK_CONF_DIR,从而导致启动应用失败 //所以建议不要更改这个默认的选项,但可以在宿主机的这个路径上配置Flink的相关配置,到时候会被读取到部署的应用中 //configuration.set(KubernetesConfigOptions.FLINK_CONF_DIR, flinkHome); } /** * 加载K8s客户端 */ private void loadClient(){ //K8s客户端 managedKubernetesClient = new ManagedKubernetesClient(); //将之前的配置 Map<String,String> toMap = configuration.toMap(); //将之前设置的配置转换 Map<String,Object> config = new HashMap<>(toMap.size()); for (String key : toMap.keySet()){ config.put(key,toMap.get(key)); } //覆盖一下客户端的配置 managedKubernetesClient.activate(config); //创建客户端 client = new Fabric8FlinkKubeClient(configuration,managedKubernetesClient,() -> Executors.newFixedThreadPool(16)); } /** * 启动任务 * @param arg 启动参数 * @throws Exception 启动异常 */ private void startApplication(String[] arg) throws Exception { //设置要运行的jar的class以及启动jar的参数 ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(arg, "com.xxx.xxx"); //创建k8s的描述器 KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration,client); //创建k8s信息 ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); //部署应用启动到k8s ClusterClientProvider<String> provider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification,applicationConfiguration); //应用id String applicationId = provider.getClusterClient().getClusterId(); //访问Flink Web的地址 String webAddress = provider.getClusterClient().getWebInterfaceURL(); } }

如何将Flink Kubernetes Application部署转化为一个长尾词?

本文共计817个文字,预计阅读时间需要4分钟。

如何将Flink Kubernetes Application部署转化为一个长尾词?

Flink on Kubernetes 应用部署环境配置:- 使用 Flink 1.12.2- Docker 版本 20.10.7- Kubernetes 版本 1.20.2- Java 版本 1.8- Kubernetes 集成配置

Flink Kubernetes Application部署 环境

Flink 1.12.2
Docker 20.10.7
Kubernetes 1.20.2
JDK 1.8

K8s 配置

#创建flink的使用账户(账户名可以自定义,-n flink可以省略使用k8s默认的命名空间) kubectl create serviceaccount flinkaccount -n flink #对创建的用户赋予编辑权限(flink:flinkaccount是指想flink命名空间下的flinkaccount账户,可以使用default默认的命名空间) kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flinkaccount Docker 镜像配置

FROM flink:1.12.2 RUN mkdir -p $FLINK_HOME/usrlib #默认存放flink任务的jar在$FLINK_HOME/usrlib,k8s启动后会在这个文件夹寻找启动的jar COPY ./application.jar $FLINK_HOME/usrlib/startup.jar

#打包Docker镜像,并推送镜像到远端仓库 docker build -t build:v1.0 . docker tag build:v1.0 192.168.1.11/xxx/build:v1.0 docker push 192.168.1.11/xxx/build:v1.0 Java 代码

import io.fabric8.kubernetes.client.osgi.ManagedKubernetesClient; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.flink.client.deployment.ClusterSpecification; import org.apache.flink.client.deployment.application.ApplicationConfiguration; import org.apache.flink.client.program.ClusterClientProvider; import org.apache.flink.configuration.*; import org.apache.flink.kubernetes.KubernetesClusterDescriptor; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; import org.apache.flink.kubernetes.kubeclient.Fabric8FlinkKubeClient; import java.util.*; import java.util.concurrent.Executors; @Slf4j @Data public final class StartupFlinkKubernetes { /** * K8s配置信息 */ private Configuration configuration; /** * K8s的客户端 */ private Fabric8FlinkKubeClient client; /** * K8s客户端 */ private ManagedKubernetesClient managedKubernetesClient; /** * 加载K8s的配置信息 */ private void loadConfig(){ //加载一个默认K8s的配置 configuration = GlobalConfiguration.loadConfiguration(); //设置k8s的部署模式 configuration.set(DeploymentOptions.ATTACHED,false); configuration.set(DeploymentOptions.TARGET, KubernetesDeploymentTarget.APPLICATION.getName()); //设置应用的名称 configuration.set(KubernetesConfigOptions.CLUSTER_ID, "applicationName"); //设置Docker的远端镜像地址 configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE, "192.168.1.11/xxx/build:v1.0"); //设置K8s的命名空间(部署应用的命名空间)(可选) configuration.set(KubernetesConfigOptions.NAMESPACE, "flink"); //设置k8s使用的K8s账号(账号名称可自定义) configuration.set(KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT, "flinkaccount"); //设置K8s对外暴露的端口方式,这里直接使用宿主机端口,可以在外面访问Flink的后台管理界面 configuration.set(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,KubernetesConfigOptions.ServiceExposedType.NodePort); //设置K8s拉取Docker镜像的方式:Always 每次都拉取,Never 不拉取,IfNotPresent 本地没有再拉取 configuration.set(KubernetesConfigOptions.CONTAINER_IMAGE_PULL_POLICY, KubernetesConfigOptions.ImagePullPolicy.Always); //设置镜像内启动Flink的jar包文件路径 //仅支持file://开头的协议,且此jar包已经再Docker镜像中 //Docker镜像的基础是Flink:1.12.0 configuration.set(PipelineOptions.JARS, Collections.singletonList("file://")); //设置Flink的占用内存大小 configuration.set(JobManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1024 mb")); //如果启动不起来,报错说是缺少配置可以在这里添加配置 //具体的配置根据:flink/conf/flink-conf.yaml中的配置决定 configuration.setString("taskmanager.memory.flink.size","1 gb"); //千万别自己设置FLINK_CONF_DIR选项 //这个选项会在启动K8s应用的时候读取宿主机上这个路径/conf的文件作为Docker镜像里面的Flink的conf //而且启动后执行Flink的环境是/opt/flink的不会使用这里配置的FLINK_CONF_DIR,从而导致启动应用失败 //所以建议不要更改这个默认的选项,但可以在宿主机的这个路径上配置Flink的相关配置,到时候会被读取到部署的应用中 //configuration.set(KubernetesConfigOptions.FLINK_CONF_DIR, flinkHome); } /** * 加载K8s客户端 */ private void loadClient(){ //K8s客户端 managedKubernetesClient = new ManagedKubernetesClient(); //将之前的配置 Map<String,String> toMap = configuration.toMap(); //将之前设置的配置转换 Map<String,Object> config = new HashMap<>(toMap.size()); for (String key : toMap.keySet()){ config.put(key,toMap.get(key)); } //覆盖一下客户端的配置 managedKubernetesClient.activate(config); //创建客户端 client = new Fabric8FlinkKubeClient(configuration,managedKubernetesClient,() -> Executors.newFixedThreadPool(16)); } /** * 启动任务 * @param arg 启动参数 * @throws Exception 启动异常 */ private void startApplication(String[] arg) throws Exception { //设置要运行的jar的class以及启动jar的参数 ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration(arg, "com.xxx.xxx"); //创建k8s的描述器 KubernetesClusterDescriptor kubernetesClusterDescriptor = new KubernetesClusterDescriptor(configuration,client); //创建k8s信息 ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder().createClusterSpecification(); //部署应用启动到k8s ClusterClientProvider<String> provider = kubernetesClusterDescriptor.deployApplicationCluster(clusterSpecification,applicationConfiguration); //应用id String applicationId = provider.getClusterClient().getClusterId(); //访问Flink Web的地址 String webAddress = provider.getClusterClient().getWebInterfaceURL(); } }

如何将Flink Kubernetes Application部署转化为一个长尾词?