We use analytics and cookies to understand site traffic. Information about your use of our site is shared with Google for that purpose. Learn more.
Apache Kafka Sink
This page shows how to install and configure Apache Kafka Sink.
Prerequisites
Knative Eventing installation.
Installation
-
Install the Kafka controller:
kubectl apply --filename https://storage.googleapis.com/knative-sandbox-nightly/eventing-kafka-broker/latest/eventing-kafka-controller.yaml
-
Install the Kafka Sink data plane:
kubectl apply --filename https://storage.googleapis.com/knative-sandbox-nightly/eventing-kafka-broker/latest/eventing-kafka-sink.yaml
-
Verify that
kafka-controller
andkafka-sink-receiver
are running:kubectl get deployments.apps -n knative-eventing
Example output:
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-sink-receiver 1/1 1 1 5s
Kafka Sink
A KafkaSink
object looks like this:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
Security
Apache Kafka supports different security features, Knative supports the followings:
- Authentication using
SASL
without encryption - Authentication using
SASL
and encryption usingSSL
- Authentication and encryption using
SSL
- Encryption using
SSL
without client authentication
To enable security features, in the KafkaSink
spec, we can reference a Secret
:
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
auth.secret.ref.name: my_secret
The Secret
my_secret
must exist in the same namespace of the KafkaSink
, in this case: default
.
Note: Certificates and keys must be in PEM
format.
Authentication using SASL
Knative supports the following SASL mechanisms:
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
To use a specific SASL mechanism replace <sasl_mechanism>
with the mechanism of your choice.
Authentication using SASL without encryption
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
Authentication using SASL and encryption using SSL
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
Encryption using SSL without client authentication
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
Authentication and encryption using SSL
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
NOTE: ca.crt
can be omitted to fallback to use system’s root CA set.
Kafka Producer configurations
A Kafka Producer is the component responsible for sending events to the Apache Kafka cluster. Knative exposes all available Kafka Producer configurations that can be modified to suit your workloads.
You can change these configurations by modifying the config-kafka-sink-data-plane
config map in
the knative-eventing
namespace.
Documentation for the settings available in this config map is available on the Apache Kafka website, in particular, Producer configurations.
Enable debug logging for data plane components
To enable debug logging for data plane components change the logging level to DEBUG
in the kafka-config-logging
config map.
-
Apply the following
kafka-config-logging
config map:apiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration>
-
Restart the
kafka-sink-receiver
:kubectl rollout restart deployment -n knative-eventing kafka-sink-receiver
Additional information
- To report bugs or add feature requests, open an issue in the eventing-kafka-broker repository.
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.