Run Pulsar connectors
Pulsar IO connectors consist of source and sink connectors. Source connectors pass through data from external systems into Pulsar while sink connectors output data from Pulsar into external systems. Function Mesh supports defining sources and sink connectors through source or sink CRDs.
This document describes how to run a Pulsar connector. To run a Pulsar connector in Function Mesh, you need to package the connector and then submit it to a Pulsar cluster.
Pulsar built-in connectors and StreamNative-managed connectors
StreamNative provides ready-to-use Docker images for Pulsar built-in connectors and StreamNative-managed connectors. These images are public at the Docker Hub, with the image name in a format of streamnative/pulsar-io-CONNECTOR-NAME:TAG, such as streamnative/pulsar-io-hbase:2.7.1. You can check all supported connectors on the StreamNative Hub.
For Pulsar built-in connectors and StreamNative-managed connectors, you can create them by specifying the Docker image in the source or sink CRDs.
- Define a sink connector named - sink-sampleby using a YAML file and save the YAML file- sink-sample.yaml.- This example shows how to publish a - elastic-searchsink to Function Mesh by using a docker image.- apiVersion: compute.functionmesh.io/v1alpha1
 kind: Sink
 metadata:
 name: sink-sample
 spec:
 image: streamnative/pulsar-io-elastic-search:2.7.1 # using connector image here
 className: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
 replicas: 1
 maxReplicas: 1
 input:
 topics:
 - persistent://public/default/input
 typeClassName: "[B"
 sinkConfig:
 elasticSearchUrl: "http://quickstart-es-http.default.svc.cluster.local:9200"
 indexName: "my_index"
 typeName: "doc"
 username: "elastic"
 password: "wJ757TmoXEd941kXm07Z2GW3"
 pulsar:
 pulsarConfig: "test-sink"
 resources:
 limits:
 cpu: "0.2"
 memory: 1.1G
 requests:
 cpu: "0.1"
 memory: 1G
 java:
 extraDependenciesDir: random-dir/
 jar: connectors/pulsar-io-elastic-search-2.7.1.nar # the NAR location in image
 jarLocation: "" # leave empty since we will not download package from Pulsar Packages
 clusterName: test-pulsar
 autoAck: true
- Apply the YAML file to create the sink. - kubectl apply -f /path/to/sink-sample.yaml
- Check whether the sink is created successfully. - kubectl get all
Self-built connectors
To run a self-built connector, you need to package it to an external package (NAR package or uber JAR package) or a Docker image and then submit the connector to a Pulsar cluster through source or sink CRDs.
Package self-built connectors
After developing and testing your connector, you need to package it so that it can be submitted to a Pulsar cluster. You can package a Pulsar connector to a NAR/JAR package or a Docker image.
NAR or uber JAR packages
This section describes how to package a Pulsar connector to a NAR or JAR package and upload it to the Pulsar package management service.
Prerequisites
- Apache Pulsar 2.8.0 or higher
- Function Mesh v0.1.3 or higher
Build NAR and uber JAR packages
- NAR packages - This section describes how to package a Pulsar connector to a NAR package. NAR stands for NiFi Archive, which is a custom packaging mechanism used by Apache NiFi, to provide a bit of Java - ClassLoaderisolation.- Build a NAR package for a connector by using the nifi-nar-maven-plugin. 
- Include this nifi-nar-maven-plugin in the Maven project for your connector. - <plugins>
 <plugin>
 <groupId>org.apache.nifi</groupId>
 <artifactId>nifi-nar-maven-plugin</artifactId>
 <version>1.2.0</version>
 </plugin>
 </plugins>
- Create a - resources/META-INF/services/pulsar-io.yamlfile with the following contents.- name: connector name
 description: connector description
 sourceClass: fully qualified class name (only if source connector)
 sinkClass: fully qualified class name (only if sink connector)
- (Optional) if you use the Gradle NiFi plugin, you need to create a directive to ensure that your - pulsar-io.yamlis copied to the NAR file correctly.
 
- uber JAR packages - You can create an uber JAR that contains all the connector's JAR files and other resource files. - You can use the maven-shade-plugin to create an uber JAR as below. - <plugin>
 <groupId>org.apache.maven.plugins</groupId>
 <artifactId>maven-shade-plugin</artifactId>
 <version>3.1.1</version>
 <executions>
 <execution>
 <phase>package</phase>
 <goals>
 <goal>shade</goal>
 </goals>
 <configuration>
 <filters>
 <filter>
 <artifact>*:*</artifact>
 </filter>
 </filters>
 </configuration>
 </execution>
 </executions>
 </plugin>
Upload NAR or JAR packages
Use the pulsar-admin CLI tool to upload the NAR or uber JAR package to the Pulsar package management service.
This example shows how to upload the NAR package of the my-sink connector to the Pulsar package management service.
bin/pulsar-admin packages upload sink://public/default/my-sink@1.0 --path "/path/to/package-file" --description PACKAGE_DESCRIPTION
Then, you can define source or sink CRDs by specifying the uploaded connector package.
Docker images
StreamNative provides ready-to-use Docker images for Pulsar built-in connectors and StreamNative-managed connectors. For non built-in connectors, you can build Docker images for them.
Prerequisites
- Apache Pulsar 2.7.0 or higher
- Function Mesh v0.1.3 or higher
Build Docker images
To build a Docker image, follow these steps.
- Package your connector to a NAR package or JAR package. 
- Define a - Dockerfile.- This example shows how to define a - Dockerfilewith a NAR package called- pulsar-io-example.nar.- # Use pulsar-functions-java-runner since we pack Pulsar Connector written in Java
 FROM streamnative/pulsar-functions-java-runner:2.7.1
 # Copy NAR file into /pulsar/connectors/ directory
 COPY pulsar-io-example.nar /pulsar/connectors/
- Build your connector Docker image packaged with your connector NAR or JAR package. 
Then, you can push the connector Docker image into an image registry (such as the Docker Hub, or any private registry) and use the connector Docker image to configure and submit the connector to Function Mesh.
Submit self-built connectors
After packaging your Pulsar connectors, you can submit your Pulsar connectors to a Pulsar cluster. This section describes how to submit a Pulsar connector through a source or sink CRD. Function Mesh supports using the source or sink CRD to define Pulsar connectors.
Prerequisites
- Create and connect to a Kubernetes cluster.
- Create a Pulsar cluster in the Kubernetes cluster.
- Install the Function Mesh Operator and CRD into Kubernetes cluster.
- Set up the external source or sink system to communicate with the Pulsar connector.
For self-built connectors, you can create them based on how you package them.
- Define a sink connector by using a YAML file and save the YAML file. - This example shows how to publish a sink connector named - my-sink-package-sampleconnector to Function Mesh by using a package.- apiVersion: compute.functionmesh.io/v1alpha1
 kind: Sink
 metadata:
 name: my-sink-package-sample
 namespace: default
 spec:
 image: streamnative/pulsar-functions-java-runner:2.7.1 # using java function runner
 className: org.example.MySink
 forwardSourceMessageProperty: true
 maxPendingAsyncRequests: 1000
 replicas: 1
 maxReplicas: 1
 input:
 topics:
 - persistent://public/default/input
 typeClassName: java.lang.String
 sinkConfig:
 myconfig: "test-config"
 pulsar:
 pulsarConfig: "test-pulsar"
 java:
 extraDependenciesDir: random-dir/
 jar: /pulsar/connectors/pulsar-io-my-sink.nar # the package will download as this filename.
 jarLocation: sink://public/default/my-sink@1.0 # connector package URL
- This example shows how to publish a sink connector named - my-sink-image-sampleconnector to Function Mesh by using a Docker image.- apiVersion: compute.functionmesh.io/v1alpha1
 kind: Sink
 metadata:
 name: my-sink-image-sample
 namespace: default
 spec:
 image: myorg/pulsar-io-my-sink:2.7.1 # using self built image
 forwardSourceMessageProperty: true
 maxPendingAsyncRequests: 1000
 replicas: 1
 maxReplicas: 1
 input:
 topics:
 - persistent://public/default/input
 typeClassName: java.lang.String
 sinkConfig:
 myconfig: "test-config"
 pulsar:
 pulsarConfig: "test-pulsar"
 java:
 extraDependenciesDir: random-dir/
 jar: /pulsar/connectors/pulsar-io-my-sink.nar # the NAR location in image.
 jarLocation: "" # leave empty since we will not download package from Pulsar Packages
 
- Apply the YAML file to create the sink connector. - kubectl apply -f /path/to/sink-sample.yaml
- Check whether the sink connector is created successfully. - kubectl get all