Flink

    xiaoxiao2021-04-15  421

    flink HA部署

    flink搭建,采用分布式部署方式,分别为A,B,C三个节点。其中A为master;A,B,C为worker。 本文使用的用户是hadoop用户(自己新建)

    先决条件 Java 1.8.x or higherscala 自己使用的是2.11.4节点之间做ssh免密搭建hadoop集群及启动hdfs及yarn(2.8.4)zk集群是在搭建hadoop集群时已经搭建好。 下载flink,本文采用的是flink1.6.4-hadoop2.8版本,下载地址 解压并软链flink-1.6.4-bin-hadoop28-scala_2.11.tgz tar -zxvf flink-1.6.4-bin-hadoop28-scala_2.11.tgz ln -s flink-1.6.4/ flink 修改flink-conf.yaml文件。具体如下所示: //此项修改为每台节点各自的hostname jobmanager.rpc.address: bigdata-01 jobmanager.rpc.port: 6123 # The heap size for the JobManager JVM jobmanager.heap.size: 2048m # The heap size for the TaskManager JVM taskmanager.heap.size: 2048m # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. taskmanager.numberOfTaskSlots: 1 # The parallelism used for programs that did not specify and other parallelism. parallelism.default: 1 #============================================================================== # High Availability 高可用必须按如下所示配置 #high-availability: 高可用模式,必须为zookeeper #high-availability.storageDir: JobManager的元数据持久化保存的位置 #high-availability.zookeeper.quorum: zk集群地址 #============================================================================== # The high-availability mode. Possible options are 'NONE' or 'zookeeper'. # high-availability: zookeeper # The path where metadata for master recovery is persisted. While ZooKeeper stores # the small ground truth for checkpoint and leader election, this location stores # the larger objects, like persisted dataflow graphs. # # Must be a durable file system that is accessible from all nodes # (like HDFS, S3, Ceph, nfs, ...) # high-availability.storageDir: hdfs://A:9000/user/flink/ha/ # # The list of ZooKeeper quorum peers that coordinate the high-availability # setup. This must be a list of the form: # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) # high-availability.zookeeper.quorum: A:2181,B:2181,C:2181 # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) # The default value is "open" and it can be changed to "creator" if ZK security is enabled # # high-availability.zookeeper.client.acl: open #============================================================================== # Fault tolerance and checkpointing #============================================================================== # The backend that will be used to store operator state checkpoints if # checkpointing is enabled. # # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the # <class-name-of-factory>. # state.backend: filesystem # Directory for checkpoints filesystem, when using any of the default bundled # state backends. # state.checkpoints.dir: hdfs://A:9000/user/flink/flink-checkpoints # Default target directory for savepoints, optional. # state.savepoints.dir: hdfs://A:9000/user/flink/flink-savepoints # Flag to enable/disable incremental checkpoints for backends that # support incremental checkpoints (like the RocksDB state backend). # # state.backend.incremental: false 然后修改后masters文件将A写进去。同时修改slaves文件,将ABC三个都写进去。将修改后的flink-1.6.4 scp到B和C机器上。在master节点的bin目录下执行./start-cluster.sh即可,然后就可以在master上看到两个jvm 进程:StandaloneSessionClusterEntrypoint和TaskManagerRunner在浏览其上输入http://A:8081即可出现fink管理界面

    ⚠️注意:

    在flink-conf.yaml文件中obmanager.rpc.address这个值按照所在节点的hostname改。

    flink-conf.yaml文件中没有写出来的我选择的默认,其中涉及hdfs路径的需要提前建好。

    flink踩坑

    Container is running beyond virtual memory limits. Current usage: 119.5 MB of 1 GB physical memory used; 2.2 GB of 2.1 GB virtual memory used. Killing container

    解决办法: 主要参考博文,这篇文章解释的十分清楚。

    flink练习demo

    《Flink官方文档》Batch Examples


    最新回复(0)