Working with resources
Per-resource layers
The getting started page demonstrated how to get access per-resource client layers, for example pods and configmaps:
import com.coralogix.zio.k8s.client.v1.configmaps.ConfigMaps
import com.coralogix.zio.k8s.client.v1.pods.Pods
import com.coralogix.zio.k8s.client.K8sFailure
val k8s: ZLayer[Any, Throwable, Pods with ConfigMaps] =
k8sDefault >>> (Pods.live ++ ConfigMaps.live)
With this approach we gain a clear understanding of exactly what parts our application uses of the Kubernetes API, and for each function the type signature documents which resources the given function works with.
As an example, the above created Pods with ConfigMaps
could be provided to some
functions working with a subset of these resource types:
def launchNewPods(count: Int): ZIO[Pods, K8sFailure, Unit] = ZIO.unit // ...
def getFromConfigMap(key: String): ZIO[ConfigMaps, K8sFailure, String] = ZIO.succeed("TODO") // ...
launchNewPods(5).provide(k8s)
getFromConfigMap("something").provide(k8s)
when using this style, use the accessor functions that are available for every resource in its
package. This is explained in details below, but as an example, see the following function from
the com.coralogix.zio.k8s.client.v1.pods
package:
def create(newResource: Pod, namespace: K8sNamespace, dryRun: Boolean = false): ZIO[Pods, K8sFailure, Pod]
Unified layer
An alternative style is supported by the library, recommended in cases when there are so many different resource types used by the application logic that creating and specifying the per-resource layers creates too much boilerplate.
In this case it is possible to create a single, unfied Kubernetes API layer:
import com.coralogix.zio.k8s.client.kubernetes.Kubernetes
val api = k8sDefault >>> Kubernetes.live
This is a huge interface providing all operations for all Kubernetes resources. The initialization and the type signatures becomes much simpler, but on the other hand we lose the ability to see what parts of the API our functions use:
def launchNewPods2(count: Int): ZIO[Kubernetes, K8sFailure, Unit] =
ZIO.service[Kubernetes.Service].flatMap { k8s =>
// ...
ZIO.unit
}
def getFromConfigMap2(key: String): ZIO[Kubernetes, K8sFailure, String] =
ZIO.service[Kubernetes.Service].flatMap { k8s =>
// ...
ZIO.succeed("TODO")
}
launchNewPods2(5).provide(api)
getFromConfigMap2("something").provide(api)
Also, instead of the accessor functions like pods.create
shown in the previous section,
in this case the pod creation is accessed through the Kubernetes.Service
interface:
ZIO.service[Kubernetes.Service].flatMap { k8s =>
k8s.v1.pods.create(...)
}
Narrowing the unified layer
The two styles - per-resource layers and the unified API layer - can be mixed together.
If we have initialized the single unified API layer like above, called api
, we can still
provide it for functions that have per-resource layer requirements:
launchNewPods(5).provide(api.project(_.v1.pods))
getFromConfigMap("something").provide(api.project(_.v1.configmaps))
Operations
Each resource client provides a set of operations and depending on the resource, some additional capabilities related to subresources.
Let's see what the supported operations by looking at an example, the StatefulSet
resource!
The following functions are the basic operations the resource supports:
Get
def getAll(
namespace: Option[K8sNamespace],
chunkSize: Int = 10,
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None,
resourceVersion: ListResourceVersion = ListResourceVersion.MostRecent
): ZStream[StatefulSets, K8sFailure, StatefulSet]
def get(name: String, namespace: K8sNamespace): ZIO[StatefulSets, K8sFailure, StatefulSet]
getAll
returns a stream of all resources in the cluster, optionally filtered to a single namespace.get
returns a singleStatefulSet
from a given namespace
Field and label selector DSL
There is a small DSL for assembling field and label selector expressions.
The following examples demonstrate how to create label selectors:
import com.coralogix.zio.k8s.client.model._
label("release") === "my-release"
// res6: LabelSelector.LabelEquals = LabelEquals("release", "my-release")
label("version") in ("v1", "v2")
// res7: LabelSelector.LabelIn = LabelIn("version", Set("v1", "v2"))
(label("release") === "my-release") && (label("version") in ("v1", "v2"))
// res8: LabelSelector.And = And(
// Chunk.AnyRefArray(
// LabelEquals("release", "my-release"),
// LabelIn("version", Set("v1", "v2"))
// )
// )
For building field selectors, the resource data model's companion objects have field selectors that can be used to recursively point to the field to be used in the filter expression:
import com.coralogix.zio.k8s.model.core.v1.Pod
Pod.metadata.name === "something"
// res9: FieldSelector.FieldEquals = FieldEquals(
// Chunk.AppendN("metadata", "name"),
// "something"
// )
Pod.spec.securityContext.fsGroup !== "admin"
// res10: FieldSelector.FieldNotEquals = FieldNotEquals(
// Chunk.AppendN("spec", "securityContext", "fsGroup"),
// "admin"
// )
(Pod.metadata.name === "something") && (Pod.spec.securityContext.fsGroup !== "admin")
// res11: FieldSelector.And = And(
// Chunk.AnyRefArray(
// FieldEquals(Chunk.AppendN("metadata", "name"), "something"),
// FieldNotEquals(Chunk.AppendN("spec", "securityContext", "fsGroup"), "admin")
// )
// )
NOTE that Kubernetes does not support field selectors on all fields and zio-k8s
currently does not have
any information about which one it does. So the library provides a Field
value for all fields and it is the library
user's responsibility to know what fields are selectable for a given resource.
Watch
def watch(
namespace: Option[K8sNamespace],
resourceVersion: Option[String],
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
): ZStream[StatefulSets, K8sFailure, TypedWatchEvent[StatefulSet]]
def watchForever(
namespace: Option[K8sNamespace],
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
): ZStream[StatefulSets, K8sFailure, TypedWatchEvent[StatefulSet]]
watch
starts a stream of watch events starting from a given resource version. The lifecycle of this stream corresponds with the underlying Kubernetes API request.watchForever
is built on top ofwatch
and handles reconnection when the underlying connection closes.
A watch event is one of the following:
Reseted
when the underlying watch stream got restarted. If the watch stream is processed in a stateful way, the state must be rebuilt from scratch when this event arrives.Added
Modified
Deleted
Create
def create(
newResource: StatefulSet,
namespace: K8sNamespace,
dryRun: Boolean = false
): ZIO[StatefulSets, K8sFailure, StatefulSet]
create
creates a new Kubernetes resource in the given namespace. The model section below describes how to assemble the resource data.
Replace
def replace(
name: String,
updatedResource: StatefulSet,
namespace: K8sNamespace,
dryRun: Boolean = false
): ZIO[StatefulSets, K8sFailure, StatefulSet]
replace
updates an existing Kubernetes resource identified by its name in the given namespace with the new value.
Delete
def delete(
name: String,
deleteOptions: DeleteOptions,
namespace: K8sNamespace,
dryRun: Boolean = false
): ZIO[StatefulSets, K8sFailure, Status]
def deleteAll(
deleteOptions: DeleteOptions,
namespace: K8sNamespace,
dryRun: Boolean = false,
gracePeriod: Option[Duration] = None,
propagationPolicy: Option[PropagationPolicy] = None,
fieldSelector: Option[FieldSelector] = None,
labelSelector: Option[LabelSelector] = None
): ZIO[StatefulSets, K8sFailure, Status]
delete
deletes an existing Kubernetes resource identified by its name in the given namespace.deleteAll
deletes multiple existing Kubernetes resources. Field and label selectors can be used to select the subset to be deleted. The default behavior is to delete all existing items.
Namespaced vs cluster resources
Some Kubernetes resources are cluster level while others like the example StatefulSet
above are split in namespaces. The zio-k8s
library encodes this property in the resource interfaces, and for cluster resources the operations does not
have a namespace
parameter at all.
Status subresource
Most of the resources have a status subresource. This capability is provided by the following extra functions:
def getStatus(name: String, namespace: K8sNamespace): ZIO[StatefulSets, K8sFailure, StatefulSet]
def replaceStatus(
of: StatefulSet,
updatedStatus: StatefulSetStatus,
namespace: K8sNamespace,
dryRun: Boolean = false
): ZIO[StatefulSets, K8sFailure, StatefulSet]
Note that currently the Scala interface closely reflects the underlying HTTP API's behavior and for this reason the type of these functions can be a bit surprising:
getStatus
returns the whole resource not just the statusreplaceStatus
requires the whole resource to be updated but Kubernetes will only update the status part while using the metadata part for collision detection.
Other subresources
Some resources have additional subresources. Our example, the StatefulSet
has one, the Scale
subresource.
For each subresource a set of additional operations are provided, in the example case a get/replace pair:
def getScale(
name: String,
namespace: K8sNamespace
): ZIO[StatefulSets, K8sFailure, Scale]
def replaceScale(
name: String,
updatedValue: Scale,
namespace: K8sNamespace,
dryRun: Boolean = false
): ZIO[StatefulSets, K8sFailure, autoscaling.v1.Scale]
Some important things to note:
- The subresource API does not share the weird properties of the status subresource API. The main resource is identified by its name (and namespace), and only the subresource data type get sent in the request.
- Not all subresources have a
get
and areplace
operation. Some only have acreate
(for exampleEviction
), and some only have aget
(for exampleLog
).
Model
For all Kubernetes data types - the resources, subresources and inner data structures - there are corresponding Scala case classes defined in the zio-k8s-client
library. Because a huge part of the model consists of optional fields and very deep structures, a couple of features were added to reduce boilerplate caused by this.
Let's take a look at the example StatefulSet
resource's data model:
case class StatefulSet(
metadata: Optional[ObjectMeta] = Optional.Absent,
spec: Optional[StatefulSetSpec] = Optional.Absent,
status: Optional[StatefulSetStatus] = Optional.Absent
) {
def getMetadata: IO[K8sFailure, ObjectMeta]
def getSpec: IO[K8sFailure, StatefulSetSpec]
def getStatus: IO[K8sFailure, StatefulSetStatus]
}
- Instead of the standard
Option
type,zio-k8s
uses a customOptional
type - In addition to the case class fields, it has ZIO getter functions that fail in case of absence of value
Creating
The custom Optional[T]
type used in the model classes provides implicit conversion from both T
and Option[T]
. This provides a boilerplate-free way to specify large Kubernetes resources, with a syntax that is not far from to the usual YAML representation of Kubernetes resources.
The following example demonstrates this:
import com.coralogix.zio.k8s.client.model.K8sNamespace
import com.coralogix.zio.k8s.model.core.v1._
import com.coralogix.zio.k8s.model.pkg.apis.meta.v1._
import com.coralogix.zio.k8s.model.rbac.v1._
def clusterRoleBinding(name: String, namespace: K8sNamespace): ClusterRoleBinding =
ClusterRoleBinding(
metadata = ObjectMeta(
name = "fluentd-coralogix-role-binding",
namespace = namespace.value,
labels = Map(
"k8s-app" -> s"fluentd-coralogix-$name"
)
),
roleRef = RoleRef(
apiGroup = "rbac.authorization.k8s.io",
kind = "ClusterRole",
name = "fluentd-coralogix-role"
),
subjects = Vector(
Subject(
kind = "ServiceAccount",
name = "fluentd-coralogix-service-account",
namespace = namespace.value
)
))
Accessing
The Optional
fields support all the usual combinators Option
has and when needed they can be converted back with .toOption
.
In many cases we expect that the optional fields are specified in the application logic that works with the Kubernetes clients. To support these there are getter effects on each case class failing the ZIO effect in case the field is not present.