We use analytics and cookies to understand site traffic. Information about your use of our site is shared with Google for that purpose. Learn more.
Reconciler Implementation and Design
Reconciler Functionality
General steps the reconciliation process needs to cover:
- Update the
ObservedGenerationand initialize theStatusconditions (as defined insamplesource_lifecycle.goandsamplesource_types.go)
src.Status.InitializeConditions()
src.Status.ObservedGeneration = src.Generation
- Create/reconcile the Receive Adapter (detailed below)
- If successful, update the
StatusandMarkDeployed
src.Status.PropagateDeploymentAvailability(ra)
- Create/reconcile the
SinkBindingfor the Receive Adapter targeting theSink(detailed below) - MarkSink with the result
src.Status.MarkSink(sb.Status.SinkURI)
- Return a new reconciler event stating that the process is done
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SampleSourceReconciled", "SampleSource reconciled: \"%s/%s\"", namespace, name)
Reconcile/Create The Receive Adapter
As part of the source reconciliation, we have to create and deploy (and update if necessary) the underlying receive adapter.
Verify the specified kubernetes resources are valid, and update the Status accordingly
Assemble the ReceiveAdapterArgs
raArgs := resources.ReceiveAdapterArgs{
EventSource: src.Namespace + "/" + src.Name,
Image: r.ReceiveAdapterImage,
Source: src,
Labels: resources.Labels(src.Name),
AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
}
NB The exact arguments may change based on functional requirements Create the underlying deployment from the arguments provided, matching pod templates, labels, owner references, etc as needed to fill out the deployment Example: pkg/reconciler/sample/resources/receive_adapter.go
- Fetch the existing receive adapter deployment
namespace := owner.GetObjectMeta().GetNamespace()
ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(expected.Name, metav1.GetOptions{})
- Otherwise, create the deployment
ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Create(expected)
- Check if the expected vs existing spec is different, and update the deployment if required
} else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) {
ra.Spec.Template.Spec = expected.Spec.Template.Spec
if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil {
return ra, err
}
- If updated, record the event
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "DeploymentUpdated", "updated deployment: \"%s/%s\"", namespace, name)
Reconcile/Create The SinkBinding
Instead of directly giving the details of the sink to the receive adapter, use a SinkBinding to bind the receive adapter with the sink.
Steps here are almost the same with the Deployment reconciliation above, but it is for another resource, SinkBinding.
- Create a
Referencefor the receive adapter deployment. This deployment will beSinkBinding's source:
tracker.Reference{
APIVersion: appsv1.SchemeGroupVersion.String(),
Kind: "Deployment",
Namespace: ra.Namespace,
Name: ra.Name,
}
- Fetch the existing
SinkBinding
namespace := owner.GetObjectMeta().GetNamespace()
sb, err := r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Get(expected.Name, metav1.GetOptions{})
- If it doesn’t exist, create it
sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Create(expected)
- Check if the expected vs existing spec is different, and update the
SinkBindingif required
else if r.specChanged(sb.Spec, expected.Spec) {
sb.Spec = expected.Spec
if sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Update(sb); err != nil {
return sb, err
}
- If updated, record the event
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SinkBindingUpdated", "updated SinkBinding: \"%s/%s\"", namespace, name)
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.