A sample app[1] that leverages the built-in Pulsar Functions support in Spring for Apache Pulsar to create a streaming pipeline to handle user signups.
The app consists of a Rabbit source, a SCSt function, and a HTTP sink with the following details:
-
Messages sent to RabbitMQ
user_signup_queue
queue are sourced into Pulsaruser-signup-topic
topic -
Messages from
user-signup-topic
topic are fed into SCSt signup function which:-
ENTERPRISE
tier signups result in a customer message incustomer-signup-topic
Pulsar topic
-
-
Messages from
customer-signup-topic
topic are sinked into Slack webhook via the Pulsar HTTP sink connector.
-
Ability to run Docker containers locally and Docker Compose installed - see here for more details
Note
|
All commands are expected to be executed from the directory this document lives in /guides/pulsar-functions |
Build the application and function libs with the following command:
../../gradlew clean build
The connector libs are not included in the Docker image, download them one time by executing the following script:
./download-connectors.sh
Start the Pulsar, RabbitMQ, and Cassandra services using Docker Compose with the following command:
docker-compose up -d
rabbitmq
is ready by executing the following commanddocker logs rabbitmq | grep "Server startup complete"
2023-01-04 21:06:39.692935+00:00 [info] <0.721.0> Server startup complete; 4 plugins started.
pulsar
is ready by executing the following commanddocker logs pulsar 2>&1 | grep "messaging service is ready"
2023-01-04T21:39:57 [main] INFO org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap_seconds=4 2023-01-04T21:39:57 [main] INFO org.apache.pulsar.broker.PulsarService - messaging service is ready, bootstrap service port = 8080, broker url= pulsar://localhost:6650, cluster=standalone, configs=org.apache.pulsar.broker....
pulsar
has the connectors installed by executing the following commandcurl -s http://localhost:8080/admin/v2/functions/connectors
[
{
"name": "http",
"description": "Writes data to an HTTP server (Webhook)",
"sinkClass": "org.apache.pulsar.io.http.HttpSink",
"sinkConfigClass": "org.apache.pulsar.io.http.HttpSinkConfig"
},
{
"name": "rabbitmq",
"description": "RabbitMQ source and sink connector",
"sourceClass": "org.apache.pulsar.io.rabbitmq.RabbitMQSource",
"sinkClass": "org.apache.pulsar.io.rabbitmq.RabbitMQSink",
"sourceConfigClass": "org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig",
"sinkConfigClass": "org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig"
}
]
At this point the following services are up and running:
-
rabbitmq
-
management UI
-
-
pulsar
standalone-
function support enabled
-
http
andrabbit
connectors installed
-
The sample app registers the Pulsar Functions which effectively create the streaming pipeline.
cd sample-signup-app && ../../../gradlew bootRun
[main] PulsarFunctionAdministration : Creating 'UserSignupFunction' function (using local archive: /Users/cbono/repos/spring-pulsar/spring-pulsar-sample-apps/sample-pulsar-functions/signup-function/target/signup-function-0.0.1-SNAPSHOT.jar) [main] PulsarFunctionAdministration : Creating 'CustomerSignupCassandraSink' sink (using local archive: builtin://cassandra) [main] PulsarFunctionAdministration : Creating 'UserSignupRabbitSource' source (using local archive: builtin://rabbitmq) [main] SignupApplication : Started SignupApplication in 6.485 seconds (process running for 6.839)
docker exec -it pulsar bin/pulsar-admin sources list docker exec -it pulsar bin/pulsar-admin functions list docker exec -it pulsar bin/pulsar-admin sinks list
[ "UserSignupRabbitSource" ] UserSignupFunction [ "CustomerSignupHttpSink" ]
The app produces a random user signup record to the RabbitMQ user_signup_queue
queue every 5 seconds.
It also logs all messages on the user-signup-topic
and customer-signup-topic
Pulsar topics as well as the last 5 emails sent to the customer_signup
Cassandra table.
To verify the pipeline is working simply watch the console log as the app runs. The output should look like similar to the following:
TO RABBIT user_signup_queue => Signup[signupTier=ENTERPRISE, firstName=Samuel, lastName=Weiss, [email protected], signupTimestamp=1673236049021] FROM PULSAR user-signup => Signup[signupTier=ENTERPRISE, firstName=Samuel, lastName=Weiss, [email protected], signupTimestamp=1673236049021] FROM PULSAR customer-signup => Customer[firstName=Samuel, lastName=Weiss, [email protected], signupTimestamp=1673236049021] FROM CASSANDRA => latest (5/18) emails: [email protected], [email protected], [email protected], [email protected], [email protected]... TO RABBIT user_signup_queue => Signup[signupTier=BASIC, firstName=Arianna, lastName=Edwards, [email protected], signupTimestamp=1673236054031] FROM PULSAR user-signup => Signup[signupTier=BASIC, firstName=Arianna, lastName=Edwards, [email protected], signupTimestamp=1673236054031] TO RABBIT user_signup_queue => Signup[signupTier=STANDARD, firstName=Kylie, lastName=Raymond, [email protected], signupTimestamp=1673236059038] FROM PULSAR user-signup => Signup[signupTier=STANDARD, firstName=Kylie, lastName=Raymond, [email protected], signupTimestamp=1673236059038] TO RABBIT user_signup_queue => Signup[signupTier=ENTERPRISE, firstName=Nolan, lastName=Floyd, [email protected], signupTimestamp=1673236064045] FROM PULSAR user-signup => Signup[signupTier=ENTERPRISE, firstName=Nolan, lastName=Floyd, [email protected], signupTimestamp=1673236064045] FROM PULSAR customer-signup => Customer[firstName=Nolan, lastName=Floyd, [email protected], signupTimestamp=1673236064045] FROM CASSANDRA => latest (5/19) emails: [email protected], [email protected], [email protected], [email protected], [email protected]... 202
docker logs pulsar
Processing Signup(signupTier=ENTERPRISE, firstName=Gavin, lastName=Wilson, [email protected], signupTimestamp=1673196872351) ENTERPRISE signup count: 1 Converting to Signup(signupTier=ENTERPRISE, firstName=Gavin, lastName=Wilson, [email protected], signupTimestamp=1673196872351) Processing Signup(signupTier=FREE, firstName=Nevaeh, lastName=Sexton, [email protected], signupTimestamp=1673196877357) FREE signup count: 1 Processing Signup(signupTier=ENTERPRISE, firstName=Charlotte, lastName=Beach, [email protected], signupTimestamp=1673196882364) ENTERPRISE signup count: 2 Converting to Signup(signupTier=ENTERPRISE, firstName=Charlotte, lastName=Beach, [email protected], signupTimestamp=1673196882364)
Each ENTERPRISE
signup should result in a record in the Cassandra table.
To inspect all customer signup records you can query the Cassandra table.
CQLSH
utility on the cassandra container w/ the following commanddocker exec -it cassandra cqlsh cassandra
cqlsh>
prompt execute the followinguse sample_pulsar_functions_keyspace; select * from customer_signup; exit;
customer_email | customer_details --------------------------------------+----------------------------------------------------------------------------------------------------------------------------- [email protected] | {"firstName":"Molly","lastName":"Mckay","email":"[email protected]","signupTimestamp":1673196862339} [email protected] | {"firstName":"Gavin","lastName":"Wilson","email":"[email protected]","signupTimestamp":1673196872351} [email protected] | {"firstName":"Ryan","lastName":"Ramsey","email":"[email protected]","signupTimestamp":1673196892373}
docker exec -ti pulsar bin/pulsar-admin sources get --name UserSignupRabbitSource
docker exec -ti pulsar bin/pulsar-admin sinks get --name CustomerSignupCassandraSink
docker exec -ti pulsar bin/pulsar-admin functions get --name UserSignupFunction
docker exec -ti pulsar bin/pulsar-client consume customer_signup -s "co-sub1" -p "Earliest" -n 100