Introduction to Spring Cloud Function and Spring Cloud Stream

Ishan Soni
5 min readFeb 8, 2024

--

What is Streaming and Stream processing?

Event Stream Processing (ESP) is the practice of taking action on a series of data points/events (stream) that originate from a system that continuously creates data.

Stream processing can typically be represented using a DAG:

Stream Processing DAG

This is a very high level abstraction. Can we model it using something we already have in Java? Yes — Java8 Functions!

Stream Processing DAG — Java Functional Toolkit

Spring Cloud Function

Spring Cloud Function is a framework that promotes the implementation of business logic as Java Functions further instrumenting them to execute in context of different runtimes (HTTP, Messaging, etc.)!

Example — Let’s create a function that turns a string into uppercase:

public Function<String, String> toUpper() {
return (str) -> {
System.out.println("Original: " + str);
return str.toUpperCase();
};
}

If you are using spring initializer, and simply add the Spring Cloud Function dependency, it will add the spring-cloud-function-context dependency. If you have the Spring Web dependency in your project and add the Spring Cloud Function dependency, it will add the spring-cloud-function-web dependency i.e. we will be able to expose the above function as a rest endpoint (i.e this function is automatically bound to an HTTP context). Let’s see it in action!

//Returns a Function (A Processor node -> has both Input and Output!)
@Bean
public Function<String, String> toUpper() {
return (str) -> {
System.out.println("Original: " + str);
return str.toUpperCase();
};
}

Start your application and invoke this function at:

localhost:<port>/toUpper/ishan

Output:

With Spring Cloud Function, we were successfully able to bind this function to an HTTP context!

Spring Cloud Stream

A framework that instruments your Spring Cloud Functions as event handlers bound to destinations using binders specific to a particular messaging system. Also see — why do you need a messaging system?

Eg — You want your Spring Cloud Function to become a message handler for a RabbitMQ destination(queue)!

Setup

Add the spring-cloud-stream-binder-rabbit dependency.

Bring up RabbitMQ using docker-compose

---
version: “3.8”
services:
rabbitmq:
image: rabbitmq:3.11-management-alpine
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
RABBITMQ_DEFAULT_VHOST: /
Default RabbitMQ setup. Notice no custom exchanges!

Connect RabbitMQ to your spring application (application.properties/yaml)

rabbitmq.host=localhost
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest

When you bring the application up, you’ll see two new exchanges!

If you drop a message in toUpper-in-0 exchange, your toUpper function will get invoked and the output will be pushed to toUpper-out-0 exchange! Note — The REST access to this function will still work if you also have the spring-cloud-function-web dependency!

Note — A Function is a processor. That is why it created 2 exchanges! It listens to toUpper-in-0 and sends the output to toUpper-out-0

Let’s create a consumer for toUpper-out-0 exchange:

@Bean
public Consumer<String> consumeUpper() {
return (upper) -> {
System.out.println("Consumed: " + upper);
};
}
# Spring Cloud Functions defined in your application
# Define multiple functions definitions by using ;
spring.cloud.function.definition=toUpper;consumeUpper

# Based on the Function definitions, Spring Cloud Stream will create bindings
# which connects the Function's input and/or output to destinations!
# The toUpper-in-0 (the toUpper function listens to this)
# and toUpper-out-0 (the toUpper function produces results to this) were
# automatically created

# If you do not do any configuration for the consumeUpper function,
# it will automatically create a consumeUpper-in-0 exchange and listen
# to that. But we want this function to listen to toUpper-out-0
# What we are saying below is ->
# Bind consumeUpper-in-0 binding to listen to the toUpper-out-0 destination
# Since this is a Consumer function, the binding that is created is consumeUpper-in-0
spring.cloud.stream.bindings.consumeUpper-in-0.destination=toUpper-out-0
spring.cloud.stream.bindings.consumeUpper-in-0.group=school-service

The concept of groups is similar to consumer groups in Kafka. But more on that in another article.

Let’s drop the message “Ishan” into the exchange “toUpper-in-0” using the RabbitMQ admin console. You’ll see the following in your application logs:

From the toUpper function
From the consumeUpper function

Producer

You can directly produce data to a destination using the StreamBridge class:

@Autowired
private StreamBridge streamBridge;

...

streamBridge.send(Destination, data);

or you can use a Supplier

Supplier is a little different that a Function or a Consumer. Functions/Consumers are triggered whenever there is an input. But a Supplier doesn’t have a trigger. In Spring Cloud Function, the Supplier has an automated polling mechanism which polls the supplier every second (which you can override)

@Bean
public Supplier<String> createData() {
return () -> {
System.out.println("Creating some data");
return "Ishan-" +
ThreadLocalRandom.current().nextInt(0, 10000) +
"-" +
LocalDateTime.now();
};
}

spring.cloud.function.definition=toUpper;consumeUpper;createData

...

# Modify the poll interval of the supplier
spring.cloud.stream.poller.fixed-delay=10000

# The binding destination of the supplier function.
# If we had provided no configuration, spring cloud stream would have
# automatically created a createData-out-0 exchange and the data this
# supplier creates will be pushed to that exchange. But we want this
# supplier to push data to the toUpper-in-0 exchange
spring.cloud.stream.bindings.createData-out-0.destination=toUpper-in-0

Restart your application again:

Sending and Consuming data based on specific binding and routing keys

If you are unfamiliar with RabbitMQ routing and binding keys, see this article first.

Example — You are using StreamBridge to send messages to a students exchange. How do you set the routing key of the message?

spring.cloud.stream.rabbit.bindings.students.producer.routingKeyExpression=headers['type']

What you are saying is, treat the type header as a routing key

import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;

...

Message<StudentCreated> message = MessageBuilder.withPayload(studentCreated)
.setHeader("type", "student.created").build();
streamBridge.send("students", message);

Now, how do you bind the below consumer and the queue it listens to a specific binding key on an exchange? (i.e this consumer only wants to listen to StudentCreated events from the students exchange)

@Bean
public Consumer<StudentCreated> studentCreated() {
...
}
spring.cloud.function.definition=studentCreated
spring.cloud.stream.bindings.studentCreated-in-0.destination=students
spring.cloud.stream.bindings.studentCreated-in-0.group=school-service
#IMP
spring.cloud.stream.rabbit.bindings.studentCreated-in-0.consumer.bindingRoutingKey=student.created
spring.cloud.stream.rabbit.bindings.studentCreated-in-0.consumer.bindingRoutingKeyDelimiter=,

--

--