Spring supplier Example

Non Spring Developers


Let us assume for a second that you are not a Spring developer and not familiar with Spring Integration which already provides abstractions for ROME. In that case, we can certainly use ROME directly to produce feed records. For example, this is a valid Supplier for this scenario.

public Supplier<SyndEntry> feedSupplier()
{
return () -> {
//Use the ROME framework directly to produce syndicated entries.
}
}

The benefit here is that we can develop the supplier without any knowledge of Spring, and it can be deployed to a serverless environment directly, using the abstractions provided by that environment or by relying on a framework like Spring Cloud Function.

This essentially means that if you are a Java developer without much Spring Framework skills, you can still write the functions using just the interfaces defined in the java.util.function package such as Function, Supplier and Consumer, by providing the business logic. 

Spring Developers


Add the following Spring Integration Feed adapter dependency in the project. This brings the feed adapter from Spring Integration as well as any other transitive dependencies.

<dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-feed</artifactId>
</dependency>
Adding basic configuration properties
Now that we have our core dependency in, let’s start writing some code. Since the functions are expected to be used in a Spring Boot context, we need to create a ConfigurationProperties class to drive the configuration for the supplier function. Here is what it might look like.

package org.springframework.cloud.fn.supplier.feed;

@ConfigurationProperties("feed.supplier")
public class FeedSupplierProperties {

/**
* Key used in metadata store to avoid duplicate read from the feed
*/
private String metadataKey;

/**
* Feed url.
*/
private URL feedUrl;

// rest is omitted
}
As we can see, we use the prefix of feed.supplier on all the properties.

Adding the Configuration class
Next, let’s create a Spring based configuration class where we provide all the necessary components. We will build it incrementally. Below is the basic structure of the class.

package org.springframework.cloud.fn.supplier.feed;
...
@Configuration
@EnableConfigurationProperties(FeedSupplierProperties.class)
public class FeedSupplierConfiguration {

}
Add these fields to the class.

private final ConcurrentMetadataStore metadataStore;

private final Resource resource;

private final FeedSupplierProperties feedSuppplierProperties;
Quick note on these fields. Feed adapter in Spring Integration provides a capability for not reading the same entries that we read from a feed already. The metadataKey property we defined above is used for this purpose. The way it does is by using a metadata store. There are various metadata stores available for popular databases. Include the following dependency for an in-memory simple metadata store.

<dependency>
   <groupId>org.springframework.cloud.fn</groupId>
   <artifactId>metadata-store-common</artifactId>
   <version>${project.version}</version>
</dependency>
Note that this requirement is specific to this supplier and not all suppliers may need it.

Users can provide a Resource bean for reading the feed if there is no HTTP (or HTTPS) based url available (which we can set through the configuration property).

Let’s add a constructor to use these fields.

FeedSupplierConfiguration(FeedSupplierProperties feedSupplierProperties,
                   ConcurrentMetadataStore metadataStore,
                   @Nullable Resource resource) {
  this.feedSuppplierProperties = feedSupplierProperties;
  this.metadataStore = metadataStore;
  this.resource = resource;
}
Resource is nullable because most often we can simply pass the URL string as a configuration property and not provide a Resource bean.

The Spring Integration Feed adapter provides FeedEntryMessageSource which is a MessageSource implementation. We will use this message source in our supplier. Let’s set it up as a Spring Bean.The code below is pretty self explanatory.

@Bean
public FeedEntryMessageSource feedEntryMessageSource() {
  final FeedEntryMessageSource feedEntryMessageSource = this.resource == null ? new FeedEntryMessageSource(this.feedSuppplierProperties.getFeedUrl(),
        this.feedSuppplierProperties.getMetadataKey()) :
       ...
  return feedEntryMessageSource;
}

Non Reactive Supplier

Now that we have the MessageSource bean ready, it is relatively trivial to write a simple Supplier and invoke it programmatically by calling the get method of the supplier. Here it is.

@Bean
public Supplier<Message<SyndEntry>> feedSupplier() {
  return () -> feedEntryMessageSource().receive();
}
We can inject this Supplier bean into our application and call the get method programmatically. When this Supplier is used in a Spring Cloud Stream application (as we will see later), it will use a default poller provided by Spring Cloud Stream that will trigger the supplier every second by default. This schedule can be changed in the poller.


Reactive Supplier

The non reactive polling solution looks alright, but we might ask, how about if I don’t want to poll explicitly every so often, but I want the data as soon as it is available in the message source in a streaming manner? Well, we have a solution for that - develop a reactive supplier that delivers the data received as soon as it becomes available. Let’s see the details.

Here again, Spring Integration provides some abstractions we can use to convert our FeedEntryMessageSource into a reactive publisher as shown below.

@Bean
public Supplier<Flux<Message<SyndEntry>>> feedSupplier() {
  return () -> IntegrationReactiveUtils.messageSourceToFlux(feedEntryMessageSource());
}
You may notice that this supplier returns a Flux<Message<SyndEntry>> as opposed to Message<SyndEntry> as shown in the initial non-reactive supplier in which we were relying on programmatic invocation of the supplier or some other polling mechanism.

Other Reactive Solutions
Ok, it was nice that we had a MessageSource coming from Spring Integration and we could use that utility method for converting it to a Flux. What if there was no such MessageSource and we had to hand craft the basic retrieval of the data for the systems for which we want to write a reactive style supplier? For those cases, we can use the various facilities provided by Project Reactor and then programmatically feed the data to them. The bottom line is that, when we write a reactive streaming supplier, we have to return the data as a Flux.

Unit Testing the Reactive Supplier
Let’s add a unit test for this reactive supplier. We can use the atom feed example described in RFC 4287 - The Atom Syndication Format as our test data. Include it in src/test/resources.

Here is the test class.

@SpringBootTest(properties = {"feed.supplier.feedUrl=classpath:atom.xml",
     "feed.supplier.metadataKey=feedTest" })
@DirtiesContext
public class FeedSupplierTests {

  @Autowired
  Supplier<Flux<Message<SyndEntry>>> feedSupplier;

  @Test
  public void testFromSampleRssFile() {
     final Flux<Message<SyndEntry>> messageFlux = feedSupplier.get();

     StepVerifier.create(messageFlux)
           .assertNext((message) -> {
              assertThat(message.getPayload().getTitle().trim()).isEqualTo("Atom draft-07 snapshot");
              assertThat(message.getPayload().getContents().size()).isEqualTo(1);
              assertThat(message.getPayload().getContents().get(0).getValue().contains("The Atom draft is finished.")).isTrue();
           })
           .thenCancel()
           .verify();
  }

  @SpringBootApplication
  static class FeedSupplierTestApplication {

  }

}
Adding the Supplier function to the maven BOM for functions
The functions project aggregates all the available functions in a maven BOM. Add the feed-supplier to this BOM. This is primarily needed, if you are generating the Spring Cloud Stream application based on this function.

Generating Spring Cloud Stream Applications from the Supplier
At this point in the process, we can submit a pull request to the repository with our supplier, but if we want to generate Spring Cloud Stream binder based applications from the supplier, keep on reading. Once generated, these applications can be run standalone or as part of a data orchestration pipeline in Spring Cloud Data Flow.

Go ahead and create a new module called feed-source under applications/source. As we have mentioned in the previous blogs, java.util.function.Supplier is mapped as a Spring Cloud Stream Source.

We don’t need to add any custom code on top of our feed supplier as it can be used as it is. However, now that we are talking about Spring Cloud Stream application, we need to use the test binder with the supplier function to see how the supplier works with Spring Cloud Stream.

We can use one of the existing sources as a template to guide us through the process. We can even copy one of them and make changes incrementally.

All the apps use the parent pom stream-applications-core which brings all the necessary test dependencies, like the test binder mentioned above. It also provides the infrastructure for the application generator plugin that is responsible for generating the binder based applications.

One point that we would like to emphasize is that unless the application module contains custom code, this module simply becomes an application generator that generates the binder based applications. In other words, we won’t add a class with @SpringBootApplicaiton to it, rather it is generated for us.

Testing the supplier with the test binder
Add the following dependency for testing with test binder:

<dependencies>
   <dependency>
       <groupId>org.springframework.cloud.fn</groupId>
       <artifactId>feed-supplier</artifactId>
       <scope>test</scope>
   </dependency>
</dependencies>
Now we can add a test to verify that the feed-supplier works with the test binder in Spring Cloud Stream. Basically, we need to ensure that the supplier produces the data through the test binder and it is delivered to the destination on the test binder.

Here is the general idea behind the test:

public class FeedSourceTests {

  @Test
  public void testFileSource() throws Exception {
     try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
           TestChannelBinderConfiguration
                 .getCompleteConfiguration(FeedSourceTestApplication.class))
           .web(WebApplicationType.NONE)
           .run("--spring.cloud.function.definition=feedSupplier", "--feed.supplier.feedUrl=classpath:atom.xml", "--feed.supplier.metadataKey=feedTest")) {

        OutputDestination target = context.getBean(OutputDestination.class);
        Message<byte[]> sourceMessage = target.receive(10000);
        Object title = JsonPath.parse(new String(sourceMessage.getPayload())).read("$.title");
        assertThat(title).isEqualTo("Atom draft-07 snapshot");
     }
  }

  @SpringBootApplication
  @Import(FeedSupplierConfiguration.class)
  public static class FeedSourceTestApplication {

  }
}
The test is largely similar to the unit test we added for the supplier, but with a big difference. In the supplier, we were directly invoking it and verifying the data produced. Here, we are not invoking the supplier directly, but the binding mechanism in Spring Cloud Stream does that for us automatically. We are receiving the data from the outbound destination and then verify that.

Once the test passes, it is time for us to generate the applications.

Generating the binder based applications
By default, the plugin generates applications for both Kafka and Rabbit binders in Spring Cloud Stream. This is configured in the parent pom in stream-applications-core. If we have a need to customize the generation for different binders, we need to make those changes there. Below is the configuration for the application generator plugin.

<plugin>
   <groupId>org.springframework.cloud.stream.app.plugin</groupId>
   <artifactId>spring-cloud-stream-app-maven-plugin</artifactId>
   <configuration>
       <generatedApp>
           <name>feed</name>
           <type>source</type>
           <version>${project.version}</version>
           <configClass>org.springframework.cloud.fn.supplier.feed.FeedSupplierConfiguration.class</configClass>
       </generatedApp>
       <dependencies>
           <dependency>
               <groupId>org.springframework.cloud.fn</groupId>
               <artifactId>feed-supplier</artifactId>
           </dependency>
           <dependency>
               <groupId>org.springframework.cloud.stream.app</groupId>
               <artifactId>stream-applications-composite-function-support</artifactId>
               <version>${stream-apps-core.version}</version>
           </dependency>
       </dependencies>
   </configuration>
</plugin>
Let’s quickly go over some details here. We are requesting the plugin to create an application with the name feed-source and want it to use our Supplier developed above as the main configuration class. Within the dependencies section for the plugin, we also need to add any dependencies that the app needs, feed-supplier in this case. We need to add all our processor functions in all the generated source applications. This is because we can compose the source with other processors without requiring them to run as individual microservices as we have seen in the previous blog. More details on function composition with the processors can be found here as well. This is why we are adding the dependency, stream-applications-composite-function-support in the dependencies section in the plugin.

Build the application module and we will see the binder based apps in the apps folder. They will be named as feed-source-kafka and feed-source-rabbit. We can go to either of those applications and build it and then use it as a standalone application or as part of a pipeline on Spring Cloud Data Flow.

Comments

Popular posts from this blog

Today Walkin 14th-Sept

Spring Elasticsearch Operations

Hibernate Search - Elasticsearch with JSON manipulation