Deploy Apache Flink on Kubernetes

Posted on June 22, 2021
Tags: flink, kubernetes

In this post we are going to take a look at deploying Apache Flink on Kubernetes, specifically on GKE.

Once we are done we are looking at a folders with the following content:

> tree .
.
├── prod-flink
   ├── flink-configuration-configmap.yaml
   ├── flink-storage.yaml
   ├── jobmanager-service.yaml
   ├── jobmanager-session-deployment.yaml
   ├── namespace.yaml
   └── taskmanager-session-deployment.yaml

└──prod-nfs
   ├── namespace.yaml
   ├── nfs-server-service.yaml
   └── nfs-server.yaml

Flink supports different kind of deployment models, we are going to focus on Session Mode.

In the end we will have 3 components running:

  1. A jobmanager
  2. A taskmangager
  3. A Service exposing the UI

Jobmanager & taskmanager need to access the same volume so they can write check & savepoints and clean them up. Since GCEPersistentDisk does not support ReadWriteMany natively, we will add that via NFS.

Let’s create a new directory prod-nfs:

mkdir prod-nfs

Then create the 3 files with the following content:

> cat namespace.yaml

---
apiVersion: v1
kind: Namespace
metadata:
  name: prod-nfs

> cat nfs-server.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nfs-server
  namespace: prod-nfs
spec:
  replicas: 1
  selector:
    matchLabels:
      role: nfs-server
  template:
    metadata:
      labels:
        role: nfs-server
    spec:
      containers:
      - name: nfs-server
        image: gcr.io/google_containers/volume-nfs:0.8
        ports:
          - name: nfs
            containerPort: 2049
          - name: mountd
            containerPort: 20048
          - name: rpcbind
            containerPort: 111
        securityContext:
          privileged: true
        volumeMounts:
          - mountPath: /exports
            name: mypvc
      volumes:
        - name: mypvc
          gcePersistentDisk:
            pdName: disk-1
            fsType: ext4

> cat nfs-server-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: nfs-server
  namespace: prod-nfs
spec:
  ports:
    - name: nfs
      port: 2049
    - name: mountd
      port: 20048
    - name: rpcbind
      port: 111
  selector:
    role: nfs-server

Then deploy these files via kubectl apply -f or if via fluxctl if you use that.

Now, lets start deploying flink:

mkdir prod-flink

And then create the following files:

> cat namespace.yaml

---
apiVersion: v1
kind: Namespace
metadata:
  name: prod-flink

> cat flink-storage.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: flink-storage
  namespace: prod-flink
spec:
  capacity:
    storage: 100Gi
  accessModes:
    - ReadWriteMany
  nfs:
    server: nfs-server.prod-nfs.svc.cluster.local
    path: "/"

---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: flink-storage
  namespace: prod-flink
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: ""
  resources:
    requests:
      storage: 100Gi

> cat flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  namespace: prod-flink
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 15
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 4g
    parallelism.default: 2    
    state.backend: rocksdb
    state.savepoints.dir: file:///cache/savepoints
    state.checkpoints.dir: file:///cache/checkpoints
    state.backend.incremental: true   
    metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF

> cat jobmanager-session-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
  namespace: prod-flink
  labels:
    app: flink
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      securityContext:
        fsGroup: 2000
      # Weird permission requriements due to NFS being mounted as root
      # only, so change the permission to the flink user group.
      initContainers:
      - name: nfs-fixer
        image: alpine
        securityContext:
          runAsUser: 0
        volumeMounts:
        - name: cache
          mountPath: /cache/
        command:
        - sh
        - -c
        - (chmod 0775 /cache/; chgrp 2000 /cache/)
      containers:
      - name: jobmanager
        image: apache/flink:1.13.0-scala_2.11
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        - name: cache
          mountPath: /cache/
        securityContext:
          runAsUser: 9999  
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: cache
        persistentVolumeClaim:
          claimName: flink-storage

> cat taskmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
  namespace: prod-flink
  labels:
    app: flink
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      securityContext:
        fsGroup: 2000
      # Weird permission requriements due to NFS being mounted as root
      # only, so change the permission to the flink user group.
      initContainers:
      - name: nfs-fixer
        image: alpine
        securityContext:
          runAsUser: 0
        volumeMounts:
        - name: cache
          mountPath: /cache/
        command:
        - sh
        - -c
        - (chmod 0775 /cache/; chgrp 2000 /cache/)
      containers:
      - name: taskmanager
        image: apache/flink:1.13.0-scala_2.11
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: cache
          mountPath: /cache/
        securityContext:
          runAsUser: 9999 
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: cache
        persistentVolumeClaim:
          claimName: flink-storage

cat jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
  namespace: prod-flink
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

Now if you have deployed all those files as well you should be able to run:

> k get po -n prod-flink

NAME                                 READY   STATUS    RESTARTS   AGE
flink-jobmanager-679fd56977-mftpj    1/1     Running   0          10m
flink-taskmanager-7dc46c8475-ff6gs   1/1     Running   0          10m
flink-taskmanager-7dc46c8475-q5kbh   1/1     Running   0          10m

Then simply select the jobmanager and port forward it:

k -n prod-flink port-forward flink-jobmanager-679fd56977-mftpj 8081:8081

Navigate to localhost:8081 and you should be able to deploy your jars.

There is also a k8s operator maintained by Lyft at https://github.com/lyft/flinkk8soperator. I have not tested that out yet.