Implementing operators
Event processor
The library defines an Operator
by providing an EventProcessor
:
trait EventProcessor[-R, +E, T] {
def apply(context: OperatorContext, event: TypedWatchEvent[T]): ZIO[R, OperatorFailure[E], Unit]
}
To start an operator, use either the Operator.cluster
or Operator.namespaced
functions:
import com.coralogix.zio.k8s.client.NamespacedResource
import com.coralogix.zio.k8s.client.model._
import com.coralogix.zio.k8s.model.core.v1.Pod
import com.coralogix.zio.k8s.operator._
import com.coralogix.zio.k8s.operator.Operator._
import zio._
import zio.Clock
sealed trait CustomOperatorFailures
val eventProcessor: EventProcessor[Any, CustomOperatorFailures, Pod] =
(ctx, event) =>
event match {
case Reseted() =>
ZIO.unit
case Added(item) =>
ZIO.unit
case Modified(item) =>
ZIO.unit
case Deleted(item) =>
ZIO.unit
}
val operator =
Operator.namespaced(
eventProcessor
)(namespace = None, buffer = 1024)
Passing None
as namespace to Operator.namespaced
means watching all namespaces.
Aspects
The event processor logic can be modified by various aspects such as logging and monitoring. The library currently only provides a single logging aspect that can be applied with the @@
operator:
import com.coralogix.zio.k8s.operator.aspects._
val operator2 =
Operator.namespaced(
eventProcessor @@ logEvents
)(namespace = None, buffer = 1024)
// operator2: ZIO[NamespacedResource[Pod], Nothing, Operator[Any, CustomOperatorFailures, Pod]] = FlatMap(
// "com.coralogix.zio.k8s.operator.Operator.namespaced(Operator.scala:198)",
// Stateful(
// "com.coralogix.zio.k8s.operator.Operator.namespaced(Operator.scala:198)",
// zio.FiberRef$unsafe$$anon$2$$Lambda$20322/0x0000000804825040@492dc5c2
// ),
// zio.ZIO$$Lambda$20328/0x0000000804822040@2c8f106
// )
Defining an aspect
In this example we define another aspect for monitoring the event processing time and the number of different events processed with Prometheus metrics using the zio-metrics library.
Assuming we have a type OperatorMetrics
with the following fields:
import zio.metrics.prometheus.helpers._
import zio.metrics.prometheus.{ Counter, Histogram, Registry }
case class OperatorMetrics(
eventCounter: Counter,
eventProcessingTime: Histogram
)
object OperatorMetrics {
def labels(
event: TypedWatchEvent[_],
resourceName: String,
namespace: Option[K8sNamespace]
): Array[String] = {
val eventType = event match {
case Reseted() => "reseted"
case Added(_) => "added"
case Modified(_) => "modified"
case Deleted(_) => "deleted"
}
Array(eventType, resourceName, namespace.map(_.value).getOrElse(""))
}
}
we can define a metered
aspect:
import zio.Clock
def metered[T, E](operatorMetrics: OperatorMetrics): Aspect[Any, E, T] =
new Aspect[Any, E, T] {
override def apply[R1, E1 >: E](
f: EventProcessor[R1, E1, T]
): EventProcessor[R1, E1, T] =
(ctx, event) => {
// Using the operator context and the event to produce some Prometheus labels
val labels = OperatorMetrics.labels(
event,
ctx.resourceType.resourceType,
ctx.namespace.orElse(event.namespace)
)
// Increasing the event counter
operatorMetrics.eventCounter.inc(labels).ignore *>
// Running the event processor
f(ctx, event).timed.flatMap {
case (duration, result) =>
// Recording the event processing time
operatorMetrics.eventProcessingTime
.observe(
duration.toMillis.toDouble / 1000.0,
labels
)
.ignore
.as(result)
}
}
}
def operator3(metrics: OperatorMetrics) =
Operator.namespaced(
eventProcessor @@ logEvents @@ metered(metrics)
)(namespace = None, buffer = 1024)
To learn more about the general idea of aspects, check the presentation of Adam Fraser from ZIO Meetup London on September 24, 2020.