Apache Kafka Sink

This page shows how to install and configure Apache Kafka Sink.

Prerequisites

Knative Eventing installation.

Installation

  1. Install the Kafka controller:

    kubectl apply --filename https://storage.googleapis.com/knative-sandbox-nightly/eventing-kafka-broker/latest/eventing-kafka-controller.yaml
    
  2. Install the Kafka Sink data plane:

    kubectl apply --filename https://storage.googleapis.com/knative-sandbox-nightly/eventing-kafka-broker/latest/eventing-kafka-sink.yaml
    
  3. Verify that kafka-controller and kafka-sink-receiver are running:

    kubectl get deployments.apps -n knative-eventing
    

    Example output:

    NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
    eventing-controller            1/1     1            1           10s
    eventing-webhook               1/1     1            1           9s
    kafka-controller               1/1     1            1           3s
    kafka-sink-receiver            1/1     1            1           5s
    

Kafka Sink

A KafkaSink object looks like this:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: mytopic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

Security

Apache Kafka supports different security features, Knative supports the followings:

To enable security features, in the KafkaSink spec, we can reference a Secret:

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
   name: my-kafka-sink
   namespace: default
spec:
   topic: mytopic
   bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
   auth.secret.ref.name: my_secret

The Secret my_secret must exist in the same namespace of the KafkaSink, in this case: default.

Note: Certificates and keys must be in PEM format.

Authentication using SASL

Knative supports the following SASL mechanisms:

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

To use a specific SASL mechanism replace <sasl_mechanism> with the mechanism of your choice.

Authentication using SASL without encryption

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_PLAINTEXT \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

Authentication using SASL and encryption using SSL

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-file=ca.crt=caroot.pem \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

Encryption using SSL without client authentication

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-literal=user.skip=true

Authentication and encryption using SSL

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-file=user.crt=<my_cert.pem_file_path> \
  --from-file=user.key=<my_key.pem_file_path>

NOTE: ca.crt can be omitted to fallback to use system’s root CA set.

Kafka Producer configurations

A Kafka Producer is the component responsible for sending events to the Apache Kafka cluster. Knative exposes all available Kafka Producer configurations that can be modified to suit your workloads.

You can change these configurations by modifying the config-kafka-sink-data-plane config map in the knative-eventing namespace.

Documentation for the settings available in this config map is available on the Apache Kafka website, in particular, Producer configurations.

Enable debug logging for data plane components

To enable debug logging for data plane components change the logging level to DEBUG in the kafka-config-logging config map.

  1. Apply the following kafka-config-logging config map:

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-config-logging
      namespace: knative-eventing
    data:
      config.xml: |
        <configuration>
          <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
          </appender>
          <root level="DEBUG">
            <appender-ref ref="jsonConsoleAppender"/>
          </root>
        </configuration>
    
  2. Restart the kafka-sink-receiver:

    kubectl rollout restart deployment -n knative-eventing kafka-sink-receiver
    

Additional information