Run Pulsar connectors
Pulsar IO connectors consist of source and sink connectors. Source connectors pass through data from external systems into Pulsar while sink connectors output data from Pulsar into external systems. Function Mesh supports defining sources and sink connectors through source or sink CRDs.
This document describes how to run a Pulsar connector. To run a Pulsar connector in Function Mesh, you need to package the connector and then submit it to a Pulsar cluster.
Pulsar built-in connectors and StreamNative-managed connectors
StreamNative provides ready-to-use Docker images for Pulsar built-in connectors and StreamNative-managed connectors. These images are public at the Docker Hub, with the image name in a format of streamnative/pulsar-io-CONNECTOR-NAME:TAG
, such as streamnative/pulsar-io-hbase:2.7.1
. You can check all supported connectors on the StreamNative Hub.
For Pulsar built-in connectors and StreamNative-managed connectors, you can create them by specifying the Docker image in the source or sink CRDs.
Define a sink connector named
sink-sample
by using a YAML file and save the YAML filesink-sample.yaml
.This example shows how to publish a
elastic-search
sink to Function Mesh by using a docker image.apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
name: sink-sample
spec:
image: streamnative/pulsar-io-elastic-search:2.7.1 # using connector image here
className: org.apache.pulsar.io.elasticsearch.ElasticSearchSink
replicas: 1
maxReplicas: 1
input:
topics:
- persistent://public/default/input
typeClassName: "[B"
sinkConfig:
elasticSearchUrl: "http://quickstart-es-http.default.svc.cluster.local:9200"
indexName: "my_index"
typeName: "doc"
username: "elastic"
password: "wJ757TmoXEd941kXm07Z2GW3"
pulsar:
pulsarConfig: "test-sink"
resources:
limits:
cpu: "0.2"
memory: 1.1G
requests:
cpu: "0.1"
memory: 1G
java:
jar: connectors/pulsar-io-elastic-search-2.7.1.nar # the NAR location in image
jarLocation: "" # leave empty since we will not download package from Pulsar Packages
clusterName: test-pulsar
autoAck: trueApply the YAML file to create the sink.
kubectl apply -f /path/to/sink-sample.yaml
Check whether the sink is created successfully.
kubectl get all
Self-built connectors
To run a self-built connector, you need to package it to an external package (NAR package or uber JAR package) or a Docker image and then submit the connector to a Pulsar cluster through source or sink CRDs.
Package self-built connectors
After developing and testing your connector, you need to package it so that it can be submitted to a Pulsar cluster. You can package a Pulsar connector to a NAR/JAR package or a Docker image.
NAR or uber JAR packages
This section describes how to package a Pulsar connector to a NAR or JAR package and upload it to the Pulsar package management service.
Prerequisites
- Apache Pulsar 2.8.0 or higher
- Function Mesh v0.1.3 or higher
Build NAR and uber JAR packages
NAR packages
This section describes how to package a Pulsar connector to a NAR package. NAR stands for NiFi Archive, which is a custom packaging mechanism used by Apache NiFi, to provide a bit of Java
ClassLoader
isolation.Build a NAR package for a connector by using the nifi-nar-maven-plugin.
Include this nifi-nar-maven-plugin in the Maven project for your connector.
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.2.0</version>
</plugin>
</plugins>Create a
resources/META-INF/services/pulsar-io.yaml
file with the following contents.name: connector name
description: connector description
sourceClass: fully qualified class name (only if source connector)
sinkClass: fully qualified class name (only if sink connector)(Optional) if you use the Gradle NiFi plugin, you need to create a directive to ensure that your
pulsar-io.yaml
is copied to the NAR file correctly.
uber JAR packages
You can create an uber JAR that contains all the connector's JAR files and other resource files.
You can use the maven-shade-plugin to create an uber JAR as below.
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
Upload NAR or JAR packages
Use the pulsar-admin
CLI tool to upload the NAR or uber JAR package to the Pulsar package management service.
This example shows how to upload the NAR package of the my-sink
connector to the Pulsar package management service.
bin/pulsar-admin packages upload sink://public/default/my-sink@1.0 --path "/path/to/package-file" --description PACKAGE_DESCRIPTION
Then, you can define source or sink CRDs by specifying the uploaded connector package.
Docker images
StreamNative provides ready-to-use Docker images for Pulsar built-in connectors and StreamNative-managed connectors. For non built-in connectors, you can build Docker images for them.
Prerequisites
- Apache Pulsar 2.7.0 or higher
- Function Mesh v0.1.3 or higher
Build Docker images
To build a Docker image, follow these steps.
Package your connector to a NAR package or JAR package.
Define a
Dockerfile
.This example shows how to define a
Dockerfile
with a NAR package calledpulsar-io-example.nar
.# Use pulsar-functions-java-runner since we pack Pulsar Connector written in Java
FROM streamnative/pulsar-functions-java-runner:2.7.1
# Copy NAR file into /pulsar/connectors/ directory
COPY pulsar-io-example.nar /pulsar/connectors/Build your connector Docker image packaged with your connector NAR or JAR package.
Then, you can push the connector Docker image into an image registry (such as the Docker Hub, or any private registry) and use the connector Docker image to configure and submit the connector to Function Mesh.
Submit self-built connectors
After packaging your Pulsar connectors, you can submit your Pulsar connectors to a Pulsar cluster. This section describes how to submit a Pulsar connector through a source or sink CRD. Function Mesh supports using the source or sink CRD to define Pulsar connectors.
Prerequisites
- Create and connect to a Kubernetes cluster.
- Create a Pulsar cluster in the Kubernetes cluster.
- Install the Function Mesh Operator and CRD into Kubernetes cluster.
- Set up the external source or sink system to communicate with the Pulsar connector.
For self-built connectors, you can create them based on how you package them.
Define a sink connector by using a YAML file and save the YAML file.
This example shows how to publish a sink connector named
my-sink-package-sample
connector to Function Mesh by using a package.apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
name: my-sink-package-sample
namespace: default
spec:
image: streamnative/pulsar-functions-java-runner:2.7.1 # using java function runner
className: org.example.MySink
forwardSourceMessageProperty: true
maxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 1
input:
topics:
- persistent://public/default/input
typeClassName: java.lang.String
sinkConfig:
myconfig: "test-config"
pulsar:
pulsarConfig: "test-pulsar"
java:
jar: /pulsar/connectors/pulsar-io-my-sink.nar # the package will download as this filename.
jarLocation: sink://public/default/my-sink@1.0 # connector package URLThis example shows how to publish a sink connector named
my-sink-image-sample
connector to Function Mesh by using a Docker image.apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
name: my-sink-image-sample
namespace: default
spec:
image: myorg/pulsar-io-my-sink:2.7.1 # using self built image
forwardSourceMessageProperty: true
maxPendingAsyncRequests: 1000
replicas: 1
maxReplicas: 1
input:
topics:
- persistent://public/default/input
typeClassName: java.lang.String
sinkConfig:
myconfig: "test-config"
pulsar:
pulsarConfig: "test-pulsar"
java:
jar: /pulsar/connectors/pulsar-io-my-sink.nar # the NAR location in image.
jarLocation: "" # leave empty since we will not download package from Pulsar Packages
Apply the YAML file to create the sink connector.
kubectl apply -f /path/to/sink-sample.yaml
Check whether the sink connector is created successfully.
kubectl get all