You are browsing a read-only backup copy of Wikitech. The live site can be found at wikitech.wikimedia.org

Wikidata Query Service/Flink On Kubernetes

From Wikitech-static
Jump to navigation Jump to search

This page describes the options we have to deploy a flink job on top of the WMF kubernetes cluster. This page assumes that you have a basic understanding of how flink schedule jobs and what is a jobmanager and a taskmanager.

Flink job lifecycle

A flink streaming job is generally meant to run indefinitely but it sometimes have to be stopped for various maintenance operations:

  • job upgrades
  • flink version upgrades
  • maintenance on its state (think of alter table for sql db)

A stateful flink job has to remember the location of the state it has written to its object storage (swift in our case) so that when resuming operations after a failure/restart it can continue where it left off.

There are two kind of state backup formats in flink:

  • checkpoints: cheap, possibly incremental, generally made at regular intervals (every 30secs for the wdqs streaming updater)
  • savepoints: self-contained, triggered on-demand

In short, checkpoints are designed to allow the pipeline to recover from failures, savepoints are made to allow anything else (upgrades, fork the stream, state inspection/manipulation...).

Retries and flink H/A

When a flink job starts it can be started from an existing savepoint/checkpoint.

When operating the job flink can use its own retry mechanism, it is configurable and permits a flink job to die and restart a certain number of times. In these failure modes when the jobmanager remains alive it is its responsibility to track in memory the path of the job's latest successful checkpoint.

But when the jobmanager itself dies and restarts it needs a way to retrieve the pointer to the latest successful checkpoint. With the flink kubernetes integration this is done via configmaps. With one such configmap per running job it can maintain the path to the latest successful checkpoint that will survive the jobmanager death. This is what is called H/A in flink world.

So when starting in H/A flink has a choice to make:

  • should it start from the savepoint passed via an argument in the startup script?
  • should it start from the H/A state (k8s configmap)?

Flink will simply take the most recent of the two, it also implicitly means as well that if the H/A state is non-existent the savepoint passed as an argument is taken. The last point is really important to take into consideration because when stopping the flink job with a savepoint the H/A configmap is emptied. So if the job restarts with the same startup arguments it might restart from the old savepoint and not the one taken when the job was stopped.

Imagine the following scenario:

  1. the job startup script is --fromSavepoint savepoint_T1
  2. the job is started
  3. 30 seconds later checkpoint_T2 is made
  4. the H/A state is updated with checkpoint_T2
  5. the job dies
  6. the job restarts with the same script and reads the H/A: it chooses checkpoint_T2 as it's newer than savepoint_T1
  7. the job is stopped with a savepoint stored to savepoint_T3
  8. the H/A state is emptied

At this point if the job is started with the same startup script it will start from savepoint_T1 instead of savepoint_T3.

Kubernetes deployment strategies

Using flink H/A is mandatory as we must ensure that the job is restarted properly if the jobmanager dies unexpectedly. This means that maintenance operation performed on the k8s cluster don't require a manual intervention on the flink jobs themselves as long as the flink configmaps are kept during the operation.

A flink job will be able to restart itself from where it left off without manual intervention when:

  • the k8s cluster is restarted
  • the jobmanager pod is lost and automatically restarted by k8s
  • one or all taskmanager pods are lost and automatically restarted by k8s

Some intervention might be required to depool dependent services during the operation, for wdqs it means routing all the external traffic to one cluster (moving all traffic to eqiad if k8s@codfw is downtimed for a long period).

This implies that whatever solutions is chosen the flink cluster must have the rights to create/edit/watch/get/list a set of configmaps in its k8s namespace.

Note that another feature of flink H/A is to make sure that a single job manager is currently active. This done via configmaps annotated with control-plane.alpha.kubernetes.io/leader. In other words: multiple jobmanagers running is not an issue.

The following sections discusses the options we have to deploy a flink job over k8s and their impact on the following operations:

  • deploying a new version of the flink job
  • deploying a new version of flink

Flink session cluster

A session cluster is an empty box to which one can schedule jobs, the kubernetes responsibility here is only to make sure that the flink components are alive and to restart them when a problem occurs. Responsibility of the job lifecycle is left to job owners that will have to write custom deploy scripts to submit, stop, upgrade their jobs.

With the session cluster the set of H/A configmaps created is not known in advance and the corresponding service account must be granted full read/write access to all configmaps without resourceNames restrictions.

Because the same cluster is reused for multiple jobs its related resources (mainly H/A configmaps) will remain and will have to be cleaned up manually (needs kubectl delete configmap perms).

Pods will run code that is not part of the container image because deployed outside the classic helm apply lifecycle. The job jar is stored in the object storage (swift).

Job upgrades are done via the REST api using custom scripts.

Flink-cluster upgrades are strongly coupled with the jobs it's running, those jobs have to be stopped before upgrading the session-cluster.

Jobs are not isolated from each others and it might make sense to run multiple session-clusters.

Upgrading to a new version of the flink Job:

  1. the job owner uses the flink's REST api to stop and save the job
  2. the job owner uses the flink's REST api to submit the updated jar and start the job with the savepoint taken

Upgrading to a new version of Flink (requires coordination between job owner and k8s deployer):

  1. the job owner uses the flink's REST api to stop and save the job
  2. the helm chart of the of session-cluster is updated to use a new flink image and applied
  3. the job owner uses the flink's REST api to submit the updated jar and start the job with the savepoint taken

Flink application cluster

An application cluster embeds the job jar and its purpose is to run a single job, the lifecycle of the flink job is now tightly coupled with the lifecycle of the k8s resource.

Jobmanager as a k8s Deployment resource

K8s Deployment will always restart their failed pods to match the expected number of replicas. It becomes problematic when the job owner wants to stop the job. We want the jobmanager pod to end but k8s will want to restart it using the same the startup script (which we know is problematic as it will start from an old savepoint).

A hack is necessary to prevent the pod from restarting: a flag do-not-restart flag can be shared via a configmap that the startup script will inspect and die if set to true.

Upgrading to a new version of the flink Job:

  1. update the configmap to set the do-not-restart flag to true (via patch+helm apply or via kubectl edit configmap if allowed)
  2. stop the job with a savepoint using the flink REST API
  3. the jobmanager will want to restart but will enter the CrashLoopBackoff state because of the do-not-restart flag
    1. possibly scale down the jobmanager deployment to 0 so that it stops complaining (via patch+helm apply or via kubectl scale deploy if allowed)
  4. update the chart: container image, update the savepoint with the new one taken, set the do-not-restart flag to false (reset replicas to 1)
  5. apply the chart

(Upgrading to a new version of Flink is the same)

Jobmanager as a k8s Job resource

As suggested in the flink documentation the jobmanager can be a k8s Job. Since k8s Jobs are by design meant to end, k8s will not automatically restart the pod if it ends successfully (completions=1, parallelism=1). Using OnFailure restart strategy it should restart the jobmanager only when an error occurs this is exactly what is needed here: do not retry if I tell the job to stop cleanly. Unfortunately k8s Job resources have other drawbacks when it comes to deployments with helm : these resources are immutables, they must be deleted before applying the changes via helm.

Upgrading to a new version of the flink Job:

  1. stop the job with a savepoint
  2. the pods should end with a success and will mark the k8s Job as "completed"
  3. delete the k8s Job (deployer needs kubectl delete job perms)
  4. update the chart: container image, update the savepoint with the new one taken
  5. apply the chart

(Upgrading to a new version of Flink is the same)

It is not battle-tested and possibly dangerous if the job properly stops with a savepoint (emptying the H/A configmap) but still fails for some reasons afterwards returning non-zero status code causing the pod to restart. Introducing the do-not-restart flag hack might help to better guarantee that the Job's pod does not restart unexpectedly if the H/A has been emptied.

Recreate state of Flink (application) from swift

We discussed on 2021-05-21 if it would be possible to periodically create safepoints (via a k8s CronJob) and have them read by flink on startup. This would give us more automation in case of disaster and k8s cluster upgrades as k8s operators don't need to take special care of Flink.

Unfortunately this only works in application-mode as in session-mode the jar file to load ins not available, multiple jobs could be running etc. which would make this way more complicated.

  • Write a k8s CronJob stat triggers the savepoint every 5min
  • savepoint needs to be written to a static location in swift
    • if that's not possible, we would need to have some code run before flink (initContainer) that figures out the most recent savepoint and build startup parameters for flink.
  • Use that static savepoint path in flinks startup parameters to always resume from there

Using a flink-operator

It seems clear that no k8s default resources provide the necessary control to allow a flink job to be upgraded automatically while applying the chart. The actions involved (stopping with a savepoint, restarting from this new savepoint) require several steps that all may fail. It is the perfect use-case for dedicated k8s operator.

Flinkk8soperator (lyft)

https://github.com/lyft/flinkk8soperator

See https://www.youtube.com/watch?v=ydb3TbK2r84

flink-on-k8s-operator (GCP)

https://github.com/GoogleCloudPlatform/flink-on-k8s-operator

Pros and cons

Here is a short list of pros & cons of the various approaches that may help to decide which strategy is best in the short term. Available k8s operators have not been investigated thoroughly as it did not seem possible to have them available in the k8s cluster in the short term.

Session-cluster App Cluster (Deployment) App Cluster (Job)
service account read/write to all configmaps read/write to a set of configmap read/write to a set of configmap
k8s resource cleanup one new configmap created per deployment none required none required
artifacts immutability no yes part of the image yes part of the image
job isolation no but can be controlled with dedicated session clusters yes yes
job upgrade procedure outside k8s/not seen in helm chart painful (several chart patches) painful
k8s perms required by the deploy user none none (edit configmap, scale deployment might allow to save some patches) job deletions
flink version upgrade strong coupling with deployed jobs same as job upgrade same as job upgrade
long (>1h) k8s maintenance depool dependent services depool dependent services depool dependent services
unexpected pod restarts OK (no action required) OK (no action required) OK (no action required)