Skip to main content
Version: 0.8.0

Sink CRD configurations

This document lists CRD configurations available for Pulsar sink connectors. The sink CRD configurations consist of sink connector configurations and the common CRD configurations.

Sink configurations

This table lists sink configurations.

FieldDescription
nameThe connector name is a string of up to 43 characters.
classnameThe class name of a sink connector.
tenantThe tenant of a sink connector.
namespaceThe Pulsar namespace of a sink connector.
clusterNameThe Pulsar cluster of a sink connector.
replicasThe number of instances that you want to run for this sink connector. If no value is set, the system will set it to 1.
minReplicasThe minimum number of instances that you want to run for this sink connector. If no value is set, the system will set it to 1. When HPA auto-scaling is enabled, the HPA controller scales the Pods up / down based on the values of the minReplicas and maxReplicas options. The number of the Pods should be greater than the value of the minReplicas and be smaller than the value of the maxReplicas.
downloaderImageThe image for installing the init container that is used to download packages or functions from Pulsar if the download path is specified.
maxReplicasThe maximum number of instances that you want to run for this sink connector. When the value of the maxReplicas parameter is greater than the value of replicas, it indicates that the sink controller automatically scales the sink connector based on the CPU usage. By default, maxReplicas is set to 0, which indicates that auto-scaling is disabled.
sinkConfigThe sink connector configurations in YAML format.
timeoutThe message timeout in milliseconds.
negativeAckRedeliveryDelayMsThe number of redelivered messages due to negative acknowledgement.
autoAckWhether or not the framework acknowledges messages automatically. This field is required. You can set it to true or false.
maxMessageRetryHow many times to process a message before giving up.
processingGuaranteeThe processing guarantees (delivery semantics) applied to the sink connector. Available values: atleast_once, atmost_once, effectively_once.
retainOrderingThe sink connector consumes and processes messages in order.
deadLetterTopicThe topic where all messages that were not processed successfully are sent.
subscriptionNameThe subscription name of the sink connector if you want a specific subscription name for the input-topic consumer.
cleanupSubscriptionConfigure whether to clean up subscriptions.
subscriptionPositionThe subscription position.

Annotations

In Kubernetes, an annotation defines an unstructured Key Value Map (KVM) that can be set by external tools to store and retrieve metadata. annotations must be a map of string keys and string values. Annotation values must pass Kubernetes annotations validation. For details, see Kubernetes documentation on Annotations.

This example shows how to use an annotation to make an object unmanaged. Therefore, the Controller will skip reconciling unmanaged objects in reconciliation loop.

apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
annotations:
compute.functionmesh.io/managed: "false"

Images

This section describes image options available for Pulsar sink CRDs.

Base runner

The base runner is an image base for other runners. The base runner is located at ./pulsar-functions-base-runner. The base runner image contains basic tool-chains like /pulsar/bin, /pulsar/conf and /pulsar/lib to ensure that the pulsar-admin CLI tool works properly to support Apache Pulsar Packages.

Runner images

Function Mesh uses runner images as images of Pulsar connectors. Each runner image only contains necessary tool-chains and libraries for specified runtime.

Pulsar connectors support using the Java runner images as their images. The Java runner is based on the base runner and contains the Java function instance to run Java functions or connectors. The streamnative/pulsar-functions-java-runner Java runner is stored at the Docker Hub and is automatically updated to align with Apache Pulsar release.

Image pull policies

When the Function Mesh Operator creates a container, it uses the imagePullPolicy option to determine whether the image should be pulled prior to starting the container. There are three possible values for the imagePullPolicy option:

FieldDescription
AlwaysAlways pull the image.
NeverNever pull the image.
IfNotPresentOnly pull the image if the image does not already exist locally.

State storage

Function Mesh provides the following fields for stateful configurations in the CRD definition.

FieldDescription
statefulConfigThe state storage configuration for the sink connector.
statefulConfig.pulsar.serviceUrlThe service URL that points to the state storage service. By default, the state storage service is the BookKeeper table service.
statefulConfig.pulsar.javaProvider(Optional) If you want to overwrite the default configuration, you can use the state storage configuration for the Java runtime. For example, you can change it to other backend services other than the BookKeeper table service.
statefulConfig.pulsar.javaProvider.classNameThe Java class name of the state storage provider implementation. The class must implement the org.apache.pulsar.functions.instance.state.StateStoreProvider interface. If not, org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl will be used.
statefulConfig.pulsar.javaProvider.configThe configurations that are passed to the state storage provider.

Input

The input topics of a Pulsar Function. The following table lists options available for the Input.

FieldDescription
topicsThe configuration of the topic from which messages are fetched.
customSerdeSourcesThe map of input topics to SerDe class names (as a JSON string).
customSchemaSourcesThe map of input topics to Schema class names (as a JSON string).
sourceSpecsThe map of source specifications to consumer specifications. Consumer specifications include these options:
- SchemaType: the built-in schema type or custom schema class name to be used for messages fetched by the connector.
- SerdeClassName: the SerDe class to be used for messages fetched by the connector.
- IsRegexPattern: configure whether the input topic adopts a Regex pattern.
- SchemaProperties: the schema properties for messages fetched by the connector.
- ConsumerProperties: the consumer properties for messages fetched by the connector.
- ReceiverQueueSize: the size of the consumer receive queue.
- cryptoConfig: cryptography configurations of the consumer.

Resources

When you specify a function or connector, you can optionally specify how much of each resource they need. The resources available to specify are CPU and memory (RAM).

If the node where a Pod is running has enough of a resource available, it is possible (and allowed) for a Pod to use more resources than its request for that resource. However, a Pod is not allowed to use more than its resource limit.

Secrets

Function Mesh provides the secretsMap field for Function, Source, and Sink in the CRD definition. You can refer to the created secrets under the same namespace and the controller can include those referred secrets. The secrets are provide by EnvironmentBasedSecretsProvider, which can be used by context.getSecret() in Pulsar functions and connectors.

The secretsMap field is defined as a Map struct with String keys and SecretReference values. The key indicates the environment value in the container, and the SecretReference is defined as below.

FieldDescription
pathThe name of the secret in the Pod's namespace to select from.
keyThe key of the secret to select from. It must be a valid secret key.

Suppose that there is a Kubernetes Secret named credential-secret defined as below:

apiVersion: v1
data:
username: foo
password: bar
kind: Secret
metadata:
name: credential-secret
type: Opaque

To use it in Pulsar Functions in a secure way, you can define the secretsMap in the Custom Resource:

secretsMap:
username:
path: credential-secret
key: username
password:
path: credential-secret
key: password

Then, in the Pulsar Functions and Connectors, you can call context.getSecret("username") to get the secret value (foo).

Authentication

Function Mesh provides the tlsConfig, tlsSecret, authSecret, authConfig fields for Function, Source, and Sink in the CRD definition. You can configure TLS encryption, TLS authentication, and OAuth2 authentication using the following configurations.

Note

The TLS configurations and TLS Secret configurations are exclusive. If you configure TLS configurations, TLS Secret configurations will not take effect.

  • TLS configurations

    FieldDescription
    allowInsecureAllow insecure TLS connection.
    certSecretKeyThe Secret key.
    certSecretNameThe Secret name.
    enabledEnable TLS configurations.
    hostnameVerificationEnable hostname verification.
  • TLS Secret

    FieldDescription
    tlsAllowInsecureConnectionAllow insecure TLS connection. By default, it is set to false.
    tlsHostnameVerificationEnableEnable hostname verification. By default, it is set to true.
    tlsTrustCertsFilePathThe path of the TLS trust certificate file.
  • Authentication Secret

    FieldDescription
    clientAuthenticationPluginThe client authentication plugin.
    clientAuthenticationParametersThe client authentication parameters.
  • Authentication configurations

    Note

    Currently, only OAuth2 authentication configurations are supported. For other authentication methods, you can configure them using the Authentication Secret.

    FieldDescription
    OAuth2 authentication
    audienceThe OAuth2 "resource server" identifier for the Pulsar cluster.
    issuerUrlThe URL of the authentication provider that allows a Pulsar client to obtain an access token.
    scopeThe scope of an access request. For more information, see access token scope.
    keySecretNameThe name of the Kubernetes secret.
    keySecretKeyThe key of the Kubernetes secret, which contains the content of the OAuth2 private key.

Packages

Function Mesh supports running Pulsar connectors in Java.

FieldDescription
jarLocationThe path to the JAR file for the connector.
javaOptsIt specifies JVM options to better configure JVM behaviors, including exitOnOOMError, Garbage Collection logs, Garbage Collection tuning, and so on.
extraDependenciesDirIt specifies the dependent directory for the JAR package.

Cluster location

In Function Mesh, the Pulsar cluster is defined through a ConfigMap. Pods can consume ConfigMaps as environment variables in a volume. The Pulsar cluster ConfigMap defines the Pulsar cluster URLs.

FieldDescription
webServiceURLThe Web service URL of the Pulsar cluster.
brokerServiceURLThe broker service URL of the Pulsar cluster.

Pod specifications

Function Mesh supports customizing the Pod running Pulsar connectors. This table lists sub-fields available for the pod field.

FieldDescription
labelsSpecify labels attached to a Pod.
nodeSelectorSpecify a map of key-value pairs. For a Pod running on a node, the node must have each of the indicated key-value pairs as labels.
affinitySpecify the scheduling constraints of a Pod.
tolerationsSpecify the tolerations of a Pod.
annotationsSpecify the annotations attached to a Pod.
securityContextSpecify the security context for a Pod.
terminationGracePeriodSecondsThe amount of time that Kubernetes gives for a Pod before terminating it.
volumesA list of volumes that can be mounted by containers belonging to a Pod.
imagePullSecretsAn optional list of references to secrets in the same namespace for pulling any of the images used by a Pod.
serviceAccountNameSpecify the name of the service account that is used to run Pulsar Functions or connectors.
initContainersThe initialization containers belonging to a Pod. A typical use case could be using an initialization container to download a remote JAR to a local path.
sidecarsSidecar containers run together with the main function container in a Pod.
builtinAutoscalerSpecify the built-in autoscaling rules.
  • CPU-based autoscaling: auto-scale the number of Pods based on the CPU usage (80%, 50%, or 20%).
  • Memory-based autoscaling: auto-scale the number of Pods based on the memory usage (80%, 50%, or 20%).

If you configure the builtinAutoscaler field, you do not need to configure the autoScalingMetrics and autoScalingBehavior options and vice versa.

autoScalingMetricsSpecify how to scale based on customized metrics defined in connectors. For details, see MetricSpec v2 autoscaling.
autoScalingBehaviorConfigure the scaling behavior of the target in both up and down directions (scaleUp and scaleDown fields respectively). If not specified, the default Kubernetes scaling behaviors are adopted. For details, see HorizontalPodAutoscalerBehavior v2 autoscaling.
envSpecify the environment variables to expose on the containers. It is a key/value map. You can either use the value option to specify a particular value for the environment variable or use the valueFrom option to specify the source for the environment variable's value, as shown below.
```yaml
env:
- name: example1
value: simpleValue
- name: example2
valueFrom:
secretKeyRef:
name: secret-name
key: akey
```