Currently, Go is a de facto monopoly among programming languages that people choose to create Kubernetes operators. Their preferences stem from objective reasons such as:
- There is a powerful framework for developing operators with Go — Operator SDK.
- Many Go-based applications, such as Docker and Kubernetes, have become game changers. Writing an operator in Go allows you to speak with the ecosystem in the same language.
- High performance of Go-based applications as well as simple mechanisms to use the concurrency right out of the box.
But what if lack of time or simply motivation prevents you from studying Go? In this article, we’ll show you how to create a solid operator using one of the most popular programming languages that almost every DevOps engineer is familiar with — Python.
Please welcome Copyrator — the copy operator!
To make things easy and practical, let’s create a simple operator designed to copy ConfigMap when a new namespace shows up or when one of the following two objects — ConfigMap or Secret — changes its state. From the practical side, our new operator can be used for bulk updates of the application’s configuration (by updating ConfigMap) or for resetting secrets, e.g. keys used for Docker Registry (when a Secret is added to the namespace).
So what features should a good Kubernetes operator have? Let’s name them:
- The interaction with the operator is made via Custom Resource Definitions (hereinafter CRD).
- The operator is configurable. We can use command line flags and environment variables to set it up.
- Docker image and Helm chart are created with simplicity in mind so that users can install it effortlessly (basically with just one command) into their Kubernetes clusters.
CRD
In order for the operator to know which resources and where to look for, we need to set some rules. Each rule will be represented as a specific CRD object. What fields should this CRD object have?
- Type of the resource that we are interested in (ConfigMap or Secret).
- List of namespaces that store resources.
- Selector which helps us in searching for resources in the particular namespace.
Let’s define our CRD:
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: copyrator.test.com
spec:
  group: test.com
  versions:
  - name: v1
    served: true
    storage: true
  scope: Namespaced
  names:
    plural: copyrators
    singular: copyrator
    kind: CopyratorRule
    shortNames:
    - copyr
  validation:
    openAPIV3Schema:
      type: object
      properties:
        ruleType:
          type: string
        namespaces:
          type: array
          items:
            type: string
        selector:
          type: string… and immediately add a simple rule to select ConfigMaps with labels matching copyrator: 'true' in the default namespace:
apiVersion: test.com/v1
kind: CopyratorRule
metadata:
  name: main-rule
  labels:
    module: copyrator
ruleType: configmap
selector:
  copyrator: "true"
namespace: defaultWell done! Now we have to obtain information about our rule somehow. It’s time to say we are not going to make our cluster API’s requests manually. For this purpose we will use a Python library called kubernetes-client:
import kubernetes
from contextlib import suppress
CRD_GROUP = 'test.com'
CRD_VERSION = 'v1'
CRD_PLURAL = 'copyrators'
def load_crd(namespace, name):
    client = kubernetes.client.ApiClient()
    custom_api = kubernetes.client.CustomObjectsApi(client)
    with suppress(kubernetes.client.api_client.ApiException):
        crd = custom_api.get_namespaced_custom_object(
            CRD_GROUP,
            CRD_VERSION,
            namespace,
            CRD_PLURAL,
            name,
        )
    return {x: crd[x] for x in ('ruleType', 'selector', 'namespace')}By executing the above code, we will get the following result:
{'ruleType': 'configmap', 'selector': {'copyrator': 'true'}, 'namespace': ['default']}Great! Now we have a specific rule for the operator. What’s important, we’ve been able to do it via the so-called Kubernetes way.
Environment variables or flags? Both!
Now it is time to proceed to the basic operator setup. There are two main approaches to configuring applications:
- via command line parameters,
- via environment variables.
You can retrieve settings via command line parameters with more flexibility and support/validation of data types. We will use an argparser module from the standard Python library. Details and examples of its use are available in the Python documentation.
Here is an example of configuring the retrieval of command line flags, adapted to our case:
parser = ArgumentParser(
        description='Copyrator - copy operator.',
        prog='copyrator'
    )
    parser.add_argument(
        '--namespace',
        type=str,
        default=getenv('NAMESPACE', 'default'),
        help='Operator Namespace'
    )
    parser.add_argument(
        '--rule-name',
        type=str,
        default=getenv('RULE_NAME', 'main-rule'),
        help='CRD Name'
    )
    args = parser.parse_args()On the other hand, you can easily pass service information about the pod into the container via environment variables in Kubernetes. For example, you can get information about the namespace where the pod is running via the following structure:
env:
- name: NAMESPACE
  valueFrom:
     fieldRef:
         fieldPath: metadata.namespaceThe operating logic of the operator
Let’s use special maps to divide methods for working with ConfigMap and Secret. They will allow us to figure out what methods we need for tracking and creating an object:
LIST_TYPES_MAP = {
    'configmap': 'list_namespaced_config_map',
    'secret': 'list_namespaced_secret',
}
CREATE_TYPES_MAP = {
    'configmap': 'create_namespaced_config_map',
    'secret': 'create_namespaced_secret',
}Then you have to receive events from the API server. We will implement that functionality in the following manner:
def handle(specs):
    kubernetes.config.load_incluster_config()
    v1 = kubernetes.client.CoreV1Api()
# Get the method for tracking objects
    method = getattr(v1, LIST_TYPES_MAP[specs['ruleType']])
    func = partial(method, specs['namespace'])
    w = kubernetes.watch.Watch()
    for event in w.stream(func, _request_timeout=60):
        handle_event(v1, specs, event)After the event is received, we proceed to the underlying logic of handling it:
# Types of events to which we will respond
ALLOWED_EVENT_TYPES = {'ADDED', 'UPDATED'}
def handle_event(v1, specs, event):
    if event['type'] not in ALLOWED_EVENT_TYPES:
        return
    object_ = event['object']
    labels = object_['metadata'].get('labels', {})
    # Look for the matches using selector
    for key, value in specs['selector'].items():
        if labels.get(key) != value:
            return
    # Get active namespaces
    namespaces = map(
        lambda x: x.metadata.name,
        filter(
            lambda x: x.status.phase == 'Active',
            v1.list_namespace().items
        )
    )
    for namespace in namespaces:
        # Clear the metadata, set the namespace
        object_['metadata'] = {
            'labels': object_['metadata']['labels'],
            'namespace': namespace,
            'name': object_['metadata']['name'],
        }
        # Call the method for creating/updating an object
        methodcaller(
            CREATE_TYPES_MAP[specs['ruleType']],
            namespace,
            object_
        )(v1)The basic logic is complete! Now we need to pack it into the single Python package. Let’s create setup.py and add metadata about the project to it:
from sys import version_infofrom sys import version_info
from setuptools import find_packages, setup
if version_info[:2] < (3, 5):
    raise RuntimeError(
        'Unsupported python version %s.' % '.'.join(version_info)
    )
_NAME = 'copyrator'
setup(
    name=_NAME,
    version='0.0.1',
    packages=find_packages(),
    classifiers=[
        'Development Status :: 3 - Alpha',
        'Programming Language :: Python',
        'Programming Language :: Python :: 3',
        'Programming Language :: Python :: 3.5',
        'Programming Language :: Python :: 3.6',
        'Programming Language :: Python :: 3.7',
    ],
    author='Somebody',
    author_email='somebody@test.com',
    include_package_data=True,
    install_requires=[
        'kubernetes==9.0.0',
    ],
    entry_points={
        'console_scripts': [
            '{0} = {0}.cli:main'.format(_NAME),
        ]
    }
)NB: Python client library for Kubernetes has its own versioning system. The compatibility of the client’s and Kubernetes’ versions is outlined in this matrix.
Currently, our project has the following structure:
copyrator
├── copyrator
│ ├── cli.py # Command line operating logic
│ ├── constant.py # Constants that we described above
│ ├── load_crd.py # CRD loading logic
│ └── operator.pyк # Basic logic of the operator
└── setup.py # Package descriptionDocker and Helm
The resulting Dockerfile will be ridiculously simple: we will take the basic python-alpine image and install our package (let’s postpone its optimization until better times):
FROM python:3.7.3-alpine3.9
ADD . /app
RUN pip3 install /app
ENTRYPOINT ["copyrator"]The deployment for Copyrator is also very simple:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ .Chart.Name }}
spec:
  selector:
    matchLabels:
      name: {{ .Chart.Name }}
  template:
    metadata:
      labels:
        name: {{ .Chart.Name }}
    spec:
      containers:
      - name: {{ .Chart.Name }}
        image: privaterepo.yourcompany.com/copyrator:latest
        imagePullPolicy: Always
        args: ["--rule-type", "main-rule"]
        env:
        - name: NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
      serviceAccountName: {{ .Chart.Name }}-accFinally, we have to create a relevant role for the operator with the necessary permissions:
apiVersion: v1
kind: ServiceAccount
metadata:
  name: {{ .Chart.Name }}-acc
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
  name: {{ .Chart.Name }}
rules:
  - apiGroups: [""]
    resources: ["namespaces"]
    verbs: ["get", "watch", "list"]
  - apiGroups: [""]
    resources: ["secrets", "configmaps"]
    verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: {{ .Chart.Name }}
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: {{ .Chart.Name }}
subjects:
- kind: ServiceAccount
  name: {{ .Chart.Name }}-accConclusion
In this article, we showed you how to create your own Python-based operator for Kubernetes. Of course, it still has room to grow: you can enrich it with the abilities to process several rules, monitor changes in its CRDs on its own, benefit from concurrency capabilities…
If you’re interested in other examples of Python-based operators, we can recommend you to pay attention to two operators for deploying mongodb (here and here).
P.S. There is also an alternative way to use Python for writing K8s — via specific framework called kopf (Kubernetes Operator Pythonic Framework). It’s useful if you want to minimize your Python code for that. Check kopf docs here.
 
                    
Comments