Skip to main content
Version: Next

Pulsar Function CRD configurations

This document lists CRD configurations available for Pulsar Functions. The CRD configurations for Pulsar Functions consist of Function configurations and common CRD configurations.

Function configurations

This table lists Pulsar Function configurations.

FieldDescription
nameThe function name is a string of up to 43 characters.
classnameThe class name of a Pulsar Function.
tenantThe tenant of a Pulsar Function.
namespaceThe Pulsar namespace of a Pulsar Function.
clusterNameThe Pulsar cluster of a Pulsar Function.
replicasThe number of instances that you want to run for a function. If it is set to 0, it means to stop the function. When HPA is enabled, you cannot set the replicas parameter to 0 or a negative number.
ShowPreciseParallelismConfigure whether to show the precise parallelism. If it is set to true, the Parallelism is equal to value of the replicas parameter. In this situation, when you update the value of the replicas parameter, it will cause all Pods to be recreated. By default, it is set to false.
minReplicasThe minimum number of instances that you want to run for a function. If it is set to 0, it means to stop the function. By default, it is set to 1. When HPA auto-scaling is enabled, the HPA controller scales the Pods up / down based on the values of the minReplicas and maxReplicas options. The number of the Pods should be greater than the value of the minReplicas and be smaller than the value of the maxReplicas.
downloaderImageThe image of the init container that is used to download a package from Pulsar if the download path is specified. By default, the downloaderImage is an official pulsarctl image.
cleanupImageThe image that is used to remove the subscriptions created or used by a function when the function is deleted. If no clean-up image is set, the runner image will be used.
maxReplicasThe maximum number of instances that you want to run for this Pulsar function. When the value of the maxReplicas parameter is greater than the value of replicas, it indicates that the Functions controller automatically scales the Pulsar Functions based on the CPU usage. By default, maxReplicas is set to 0, which indicates that auto-scaling is disabled.
timeoutThe message timeout in milliseconds.
deadLetterTopicThe topic where all messages that were not processed successfully are sent. This parameter is not supported in Python Functions.
funcConfigPulsar Functions configurations in YAML format.
logTopicIf it is configured, Function Mesh will produce logs of the function to this log topic. Otherwise, you can only view the printed logs for the Pod.
logTopicAgentThe log agent that reads the logs and sends them to the Pulsar log topic (logTopic). Available options are runtime and sidecar.
- runtime: when you set the logTopic option, Function Mesh will send the predefined logs of the function to the Pulsar log topic (logTopic).
- runtime: when you set the logTopic option, Function Mesh will send all logs of the pod to the Pulsar log topic (logTopic) through the Pulsar Beat output plugin.
filebeatImageThe Docker image that is used to run Filebeat, which is used to send logs of the pod to the Pulsar log topic when you use a sidecar log agent.
autoAck (Deprecated)Whether or not the framework acknowledges messages automatically. This field is required. You can set it to true or false.
maxMessageRetryHow many times to process a message before giving up.
processingGuaranteeThe processing guarantees (delivery semantics) applied to the function. Available values: atleast_once, atmost_once, effectively_once, and manual. When you set ProcessingGuarantees to effectively_once, the runtime will set the subscription type to FAILOVER. By default, the subscription type is set to SHARED. The manual option is only available for the runner image v2.11.0 or above.
forwardSourceMessagePropertyConfigure whether to pass message properties to a target topic.
retainOrderingThe function consumes and processes messages in order. When you set retainOrdering, the runtime will set the subscription type to FAILOVER. By default, the subscription type is set to SHARED.
retainKeyOrderingConfigure whether to retain the key order of messages. When you set retainKeyOrdering, the runtime will set the subscription type to KEY_SHARED. By default, the subscription type is set to SHARED.
subscriptionNamePulsar Functions’ subscription name if you want a specific subscription-name for the input-topic consumer.
cleanupSubscriptionConfigure whether to clean up subscriptions that are created or used by a function when the function is deleted.
subscriptionPositionThe subscription position.
pulsarThe configurations about the Pulsar cluster. For details, see messaging.
VolumeClaimTemplatesA list of claims that a Pod is allowed to reference. It provides stable storage using PersistentVolumes provisioned by a PersistentVolume Provisioner. This property is specified at the first time when you create the function and it cannot be modified when you update the resource.
persistentVolumeClaimRetentionPolicyConfigure whether and how PVCs are deleted during the lifecycle of a StatefulSet. Available options are whenDeleted and whenScaled.
- whenDeleted: configure the volume retention behavior that applies when the StatefulSet is deleted.
- whenScaled: configure the volume retention behavior that applies when the replica count of the StatefulSet is deleted.

Annotations

In Kubernetes, an annotation defines an unstructured Key Value Map (KVM) that can be set by external tools to store and retrieve metadata. annotations must be a map of string keys and string values. Annotation values must pass Kubernetes annotations validation. For details, see Kubernetes documentation on Annotations.

This example shows how to use an annotation to make an object unmanaged. Therefore, the Controller will skip reconciling unmanaged objects in reconciliation loop.

apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
annotations:
compute.functionmesh.io/managed: "false"

Images

This section describes image options available for Pulsar Function, source, sink and Function Mesh CRDs.

Runner images

Function Mesh uses runner images as images of Pulsar functions and connectors. Each runner image only contains necessary tool-chains and libraries for specified runtime.

This table lists available Function runtime runner images.

TypeDescription
Java runnerThe Java runner is based on the base runner and contains the Java function instance to run Java functions or connectors. The streamnative/pulsar-functions-pulsarctl-java-runner(streamnative/pulsar-functions-java-runner will be deprecated) Java runner is stored at the Docker Hub and is automatically updated to align with Apache Pulsar release.
Python runnerThe Python runner is based on the base runner and contains the Python function instance to run Python functions. You can build your own Python runner to customize Python dependencies. The streamnative/pulsar-functions-pulsarctl-python-runner(streamnative/pulsar-functions-python-runner will be deprecated) Python runner is located at the Docker Hub and is automatically updated to align with Apache Pulsar release.
Golang runnerThe Golang runner provides all the tool-chains and dependencies required to run Golang functions. The streamnative/pulsar-functions-pulsarctl-go-runner(streamnative/pulsar-functions-go-runner will be deprecated) Golang runner is located at the Docker Hub and is automatically updated to align with Apache Pulsar release.

Image pull policies

When the Function Mesh Operator creates a container, it uses the imagePullPolicy option to determine whether the image should be pulled prior to starting the container. There are three possible values for the imagePullPolicy option:

FieldDescription
AlwaysAlways pull the image.
NeverNever pull the image.
IfNotPresentOnly pull the image if the image does not already exist locally.

Messaging

Function Mesh provides Pulsar cluster configurations in the Function, Source, and Sink CRDs. You can configure TLS encryption, TLS authentication, and OAuth2 authentication using the following configurations.

Note

The tlsConfig and tlsSecret are exclusive. If you configure TLS configurations, the TLS Secret will not take effect.

FieldDescription
authConfigThe authentication configurations of the Pulsar cluster. Currently, you can only configure generic authentication or OAuth2 authentication through this field. For other authentication methods, you can configure them using the authSecret field.

Generic authentication

  • clientAuthenticationParameters: specify the client authentication parameters.
  • clientAuthenticationPlugin: specify the client authentication plugin.

OAuth2 authentication

  • audience: specify the OAuth2 resource server identifier for the Pulsar cluster.
  • issuerUrl: specify the URL of the OAuth2 identity provider that allows a Pulsar client to obtain an access token.
  • scope: specify the scope of an access request. For more information, see access token scope.
  • keySecretName: specify the name of the Kubernetes Secret.
  • keySecretKey: specify the key of the Kubernetes Secret that contains the content of the OAuth2 private key.
authSecretThe name of the authentication ConfigMap that stores authentication configurations of the Pulsar cluster.
  • clientAuthenticationPlugin: specify the client authentication plugin.
  • clientAuthenticationParameters: specify the client authentication parameters.
cleanupAuthConfigThe authentication configurations for removing subscriptions and intermediate topics. You can configure generic authentication or OAuth2 authentication through this field. If not provided, the `authConfig` will be used.

Generic authentication

  • clientAuthenticationParameters: specify the client authentication parameters.
  • clientAuthenticationPlugin: specify the client authentication plugin.

OAuth2 authentication

  • audience: specify the OAuth2 resource server identifier.
  • issuerUrl: specify the URL of the OAuth2 identity provider that allows a Pulsar client to obtain an access token.
  • scope: specify the scope of an access request. For more information, see access token scope.
  • keySecretName: specify the name of the Kubernetes Secret.
  • keySecretKey: specify the key of the Kubernetes Secret that contains the content of the OAuth2 private key.
pulsarConfigThe name of the ConfigMap that stores Pulsar cluster configurations.
  • webServiceURL: specify the web service URL for managing the Pulsar cluster. This URL should be a standard DNS name.
  • brokerServiceURL: specify the Pulsar protocol URL for interaction with the brokers in the Pulsar cluster. This URL should not use the same DNS name as the web service URL but should use the pulsar scheme.
tlsConfigThe TLS configurations of the Pulsar cluster.
  • allowInsecure: allow insecure TLS connection.
  • certSecretKey: specify the TLS Secret key.
  • certSecretName: specify the TLS Secret name.
  • enabled: enable TLS configurations.
  • hostnameVerification: enable hostname verification.
tlsSecretThe name of the TLS ConfigMap that stores TLS configurations of the Pulsar cluster.
  • tlsAllowInsecureConnection: allow insecure TLS connection. By default, it is set to false.
  • tlsHostnameVerificationEnable: enable hostname verification. By default, it is set to true.
  • tlsTrustCertsFilePath: specify the path of the TLS trust certificate file.

State storage

Function Mesh provides the following fields for Stateful functions in the CRD definition.

FieldDescription
statefulConfigThe state storage configuration for the Stateful Functions.
statefulConfig.pulsar.serviceUrlThe service URL that points to the state storage service. By default, the state storage service is the BookKeeper table service.
statefulConfig.pulsar.javaProvider(Optional) If you want to overwrite the default configuration, you can use the state storage configuration for the Java runtime. For example, you can change it to other backend services other than the BookKeeper table service.
statefulConfig.pulsar.javaProvider.classNameThe Java class name of the state storage provider implementation. The class must implement the org.apache.pulsar.functions.instance.state.StateStoreProvider interface. If not, org.apache.pulsar.functions.instance.state.BKStateStoreProviderImpl will be used.
statefulConfig.pulsar.javaProvider.configThe configurations that are passed to the state storage provider.

Window function configurations

Function Mesh provides the following fields for window functions in the CRD definition.

FieldDescription
actualWindowFunctionClassNameOptional. The runner class name of the implemented window function. By default, the value is the same as the spec.className.
lateDataTopicOptional. The late data topic for the late tuple messages. The late data topic must be defined when specifying a timestamp extractor class (timestampExtractorClassName).
maxLagMsOptional. The maximum lag duration (in milliseconds) of the window function. By default, it is set to 0.
slidingIntervalCountOptional. The number of messages before the window slides.
slidingIntervalDurationMsOptional. The time duration (in milliseconds) after which the window slides.
timestampExtractorClassNameOptional. The timestamp extractor class name. It should be set to org.apache.pulsar.functions.windowing.TimestampExtractor.
watermarkEmitIntervalMsOptional. The watermark interval (in milliseconds) of the window function. By default, it is set to 1000 ms.
windowLengthCountOptional. The number of messages per window.
windowLengthDurationMsOptional. The time duration (in milliseconds) of the window.

Input

The input topics of a Pulsar Function. The following table lists options available for the Input.

FieldDescription
topicsThe configuration of the topic from which messages are fetched.
customSerdeSourcesThe map of input topics to SerDe class names (as a JSON string).
customSchemaSourcesThe map of input topics to Schema class names (as a JSON string).
sourceSpecsThe map of source specifications to consumer specifications. Consumer specifications include these options:
- SchemaType: the built-in schema type or custom schema class name to be used for messages fetched by the function.
- SerdeClassName: the SerDe class to be used for messages fetched by the function.
- IsRegexPattern: configure whether the input topic adopts a Regex pattern.
- SchemaProperties: the schema properties for messages fetched by the function.
- ConsumerProperties: the consumer properties for messages fetched by the function.
- ReceiverQueueSize: the size of the consumer receive queue. br /> - cryptoConfig: cryptography configurations of the consumer.

Output

The output topics of a Pulsar Function. This table lists options available for the Output.

NameDescription
topicsThe output topic of a Pulsar Function (If none is specified, no output is written).
sinkSerdeClassNameThe map of output topics to SerDe class names (as a JSON string).
sinkSchemaTypeThe built-in schema type or custom schema class name to be used for messages sent by the function.
producerConfThe producer specifications. Available options:
- batchBuilder: The type of batch construction method. Support the key-based batcher.
- compressionType: the message data compression type used by a producer. Available options are LZ4, NONE, ZLIB, ZSTD, and SNAPPY. By default, it is set to LZ4. This option is only available for the runner image v3.0.0 or above.
- cryptoConfig: the cryptography configurations of the producer.
- maxPendingMessages: the maximum number of pending messages.
- maxPendingMessagesAcrossPartitions: the maximum number of pending messages across all partitions.
- useThreadLocalProducers: configure whether the producer uses a thread.
customSchemaSinksThe map of output topics to Schema class names (as a JSON string).

Resources

When you specify a function or connector, you can optionally specify how much of each resource they need. The resources available to specify are CPU and memory (RAM).

If the node where a Pod is running has enough of a resource available, it's possible (and allowed) for a pod to use more resources than its request for that resource specifies. However, a pod is not allowed to use more than its resource limit.

Secrets

Function Mesh provides the secretsMap field for Function, Source, and Sink in the CRD definition. You can refer to the created secrets under the same namespace and the controller can include those referred secrets. The secrets are provide by EnvironmentBasedSecretsProvider, which can be used by context.getSecret() in Pulsar functions and connectors.

The secretsMap field is defined as a Map struct with String keys and SecretReference values. The key indicates the environment value in the container, and the SecretReference is defined as below.

FieldDescription
pathThe name of the secret in the Pod's namespace to select from.
keyThe key of the secret to select from. It must be a valid secret key.

Suppose that there is a Kubernetes Secret named credential-secret defined as below:

apiVersion: v1
data:
username: foo
password: bar
kind: Secret
metadata:
name: credential-secret
type: Opaque

To use it in Pulsar Functions in a secure way, you can define the secretsMap in the Custom Resource:

secretsMap:
username:
path: credential-secret
key: username
password:
path: credential-secret
key: password

Then, in the Pulsar Functions and Connectors, you can call context.getSecret("username") to get the secret value (foo).

Packages

Function Mesh supports running Pulsar Functions in Java, Python and Go. This table lists fields available for running Pulsar Functions in different languages.

FieldDescription
jarLocationThe path to the JAR file for the function. It is only available for Pulsar functions written in Java.
javaOptsIt specifies JVM options to better configure JVM behaviors, including exitOnOOMError, Garbage Collection logs, Garbage Collection tuning, and so on.
goLocationThe path to the JAR file for the function. It is only available for Pulsar functions written in Go.
pyLocationThe path to the JAR file for the function. It is only available for Pulsar functions written in Python.
extraDependenciesDirIt specifies the dependent directory for the JAR package.

Runtime logs

Runtime logs include all logs generated by your pods. These logs provide information about the output of your functions. Function Mesh provides the following fields for the runtime logs in the CRD definition.

FieldDescription
levelThe log level. Available options are off, trace, debug, warn, error, fatal, all, and panic. For details about each log level, see log levels.
rotatePolicyThe log rotation policy. Available options are TimedPolicyWithDaily, TimedPolicyWithWeekly, TimedPolicyWithMonthly, SizedPolicyWith10MB, and SizedPolicyWith100MB. For details, see log rotation policies.
formatThe log format that defines how the content of a log file should be interpreted. Available options are json and text. The log format configurations are only available for the Java and Python runtimes.
logConfigA key-name format option used to reference to a custom log configuration file.
javaLog4JConfigFileTypeThe Log4j configuration file type. Available options are yaml and xml. By default, it is set to xml. This option is only available for the Java runtime.

Log levels

By default, the log level for Pulsar functions is info. Function Mesh supports setting multiple log levels for Pulsar functions.

Notes

The log levels are only available for the Go runtime 2.11 or higher.

CriticalDescriptionJava runtimePython runtimeGo runtime
offNothing will be logged.
traceThe logs that contain the most detailed messages.
debugThe logs that are used for interactive investigation during development. These logs primarily contain information useful for debugging and have no long-term value.
warnThe logs that highlight an abnormal or unexpected event in the function, but do not cause the function to stop.
errorThe logs that highlight when the function is stopped due to a failure. These indicate a failure in the current activity, not an application-wide failure.
fatalThe logs that contain fatal errors. It indicates that the function is unusable.
allAll events are logged.
panicIt indicates the function is in panic.

For details about how to set log levels and produce logs for Pulsar functions, see produce function logs.

Log rotation policies

With more and more logs being written to the log file, the log file grows in size. Therefore, Function Mesh supports log rotation to avoid large files that could create issues when opening them. You can set the log rotation policies based on the time or the log file size.

FieldDescription
TimedPolicyWithDailyRotate the log file daily.
TimedPolicyWithWeeklyRotate the log file weekly.
TimedPolicyWithMonthlyRotate the log file monthly.
SizedPolicyWith10MBRotate the log file at every 10 MB.
SizedPolicyWith50MBRotate the log file at every 50 MB.
SizedPolicyWith100MBRotate the log file at every 100 MB.

For details about how to set a log rotation policy, see set log rotation policies.

Cluster location

In Function Mesh, the Pulsar cluster is defined through a ConfigMap. Pods can consume ConfigMaps as environment variables in a volume. The Pulsar cluster ConfigMap defines the Pulsar cluster URLs.

FieldDescription
webServiceURLThe Web service URL of the Pulsar cluster.
brokerServiceURLThe broker service URL of the Pulsar cluster.

Health checks

Note

To enable health checks, you need to create a PVC and a PV, and bind the PVC to the PV.

With the Kubernetes liveness probe, Function Mesh supports monitoring and acting on the state of Pods (Containers) to ensure that only healthy Pods serve traffic. Implementing health checks using probes provides Function Mesh a solid foundation, better reliability, and higher uptime.

apiVersion: compute.functionmesh.io/v1alpha1
kind: Function
metadata:
name: health-check-sample
namespace: default
spec:
image: streamnative/pulsar-functions-java-sample:2.9.2.23
className: org.apache.pulsar.functions.api.examples.ExclamationFunction
forwardSourceMessageProperty: true
maxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 5
logTopic: persistent://public/default/logging-function-logs
pod:
liveness:
failureThreshold: # --- [1]
initialDelaySeconds: 10 # --- [2]
periodSeconds: 10 # --- [3]
successThreshold: 1 # --- [4]

...
# Other configs
  • [1] failureThreshold: specify the times to restart a failed probe before giving up the probe. By default, it is set to 3.
  • [2] initialDelaySeconds: specify the time that should wait before performing the first liveness probe.
  • [3] periodSeconds: specify the frequency to perform a liveness probe.
  • [4] successThreshold: specify the minimum consecutive successes for the probe to be considered successful after having failed. By default, it is set to 1.

For more information about probe types, probe check mechanisms, and probe parameters, see Kubernetes documentation on Pod lifecycle and configure probes.

Security context

A security context defines privilege and access control settings for a Pod. By default, Function Mesh uses the following PodSecurityContext as the default value and applies to every function's Pod.

podSecurityContext:
fsGroup: 10001
runAsGroup: 10001
runAsNonRoot: false
runAsUser: 10000
seccompProfile:
type: "RuntimeDefault"

Apart from the PodSecurityContext, Function Mesh also applies the following SecurityContext to the Function's container to ensure the Pod Security Standard follows the restricted specification.

SecurityContext:
capabilities:
drop:
- ALL
allowPrivilegeEscalation: false
FieldDescription
fsGroupA special supplemental group that applies to all containers in a Pod.
fsGroupChangePolicyDefine the behavior of changing ownership and permission of the volume before being exposed inside a Pod. This field only applies to volume types that support fsGroup-based ownership and permissions.
runAsGroupThe Group ID (GID) that is used to run the entry point of the container process. If it is unset, the runtime is used.
runAsNonRootIndicate that the container must run as a non-root user. If it is set to true, the system will validate the image at runtime to ensure that it does not run as a root user (User ID 0) and fail to start the container if it does. If it is unset or is set to false, no such validation will be performed.
runAsUserThe User ID (UID) that is used to run the entry point of the container process.
seLinuxOptionsThe SELinux context that is applied to a container.
seccompProfileThe seccomp options that is used by a container.
supplementalGroupsA list of groups that is applied to the first process running in each container, in addition to the container's primary GID, the fsGroup (if specified), and group memberships defined in the container image for the UID of the container process.
sysctlsSysctls hold a list of namespaced sysctls used for the Pod.
windowsOptionsThe windows-specific settings that are applied to all containers.
allowPrivilegeEscalationControl whether a process can gain more privileges than its parent process.
capabilitiesThe capabilities to add/drop when running a container.
privilegedRun the container in privileged or unprivileged mode.
procMountThe type of proc mount that is used by a container.
readOnlyRootFilesystemMount the container's root filesystem as read-only.

Pod specifications

Function Mesh supports customizing the Pod running function instance. This table lists sub-fields available for the pod field.

FieldDescription
labelsSpecify labels attached to a Pod.
livenessSpecify the liveness probe properties for a Pod.
  • initialDelaySecond: specify the time that should wait before performing the first liveness probe.
  • periodSeconds: specify the frequency to perform a liveness probe.

For details, see health checks.

nodeSelectorSpecify a map of key-value pairs. For a Pod running on a node, the node must have each of the indicated key-value pairs as labels.
affinitySpecify the scheduling constraints of a Pod.
tolerationsSpecify the tolerations of a Pod.
annotationsSpecify the annotations attached to a Pod.
securityContextSpecify the security context for a Pod. For details, see [security context](#security-context).
terminationGracePeriodSecondsThe amount of time that Kubernetes gives for a Pod before terminating it.
volumesA list of volumes that can be mounted by containers belonging to a Pod.
imagePullSecretsAn optional list of references to secrets in the same namespace for pulling any of the images used by a Pod.
serviceAccountNameSpecify the name of the service account that is used to run Pulsar Functions or connectors.
initContainersThe initialization containers belonging to a Pod. A typical use case could be using an initialization container to download a remote JAR to a local path.
sidecarsSidecar containers run together with the main function container in a Pod.
builtinAutoscalerSpecify the built-in autoscaling rules.
  • CPU-based autoscaling: auto-scale the number of Pods based on the CPU usage (80%, 50%, or 20%).
  • Memory-based autoscaling: auto-scale the number of Pods based on the memory usage (80%, 50%, or 20%).

If you configure the builtinAutoscaler field, you do not need to configure the autoScalingMetrics and autoScalingBehavior options and vice versa.

autoScalingMetricsSpecify how to scale based on customized metrics defined in connectors. For details, see MetricSpec v2 autoscaling.
autoScalingBehaviorConfigure the scaling behavior of the target in both up and down directions (scaleUp and scaleDown fields respectively). If not specified, the default Kubernetes scaling behaviors are adopted. For details, see HorizontalPodAutoscalerBehavior v2 autoscaling.
vpaConfigure the behavior of the Vertical Pod Autoscaling (VPA). It contains two fields:
envSpecify the environment variables to expose on the containers. It is a key/value map. You can either use the value option to specify a particular value for the environment variable or use the valueFrom option to specify the source for the environment variable's value, as shown below.
```yaml
env:
- name: example1
value: simpleValue
- name: example2
valueFrom:
secretKeyRef:
name: secret-name
key: akey
```