2020-06-04

Java 8 - stream

stream

Stream is an API introduced by java8 that uses lambda expressions heavily. Stream uses an intuitive way similar to querying data from a database with SQL statements to provide a high-level abstraction of Java set operations and expressions. Intuitive means that when writing code, developers only need to focus on what they want instead of focusing on the specific way to achieve the result. In this chapter, we will introduce why we need a new data processing API, the difference between Collection and Stream, and how to apply StreamAPI to our coding.

Why do we need a new abstract concept of data processing?

In my opinion, there are two main points:

The Collection API cannot provide a higher-level structure to query data, so developers have to write a lot of boilerplate code for most trivial tasks.
2. There are certain restrictions on the parallel processing of collection data. How to use the Java language concurrency structure, how to efficiently process data, and how to achieve efficient concurrency all need to be thought and implemented by programmers themselves.


Data processing before Java 8

Read the code below and guess what it is used for.

public class Example1_Java7 {

    public static void main(String[] args) {
        List<Task> tasks = getTasks();

        List<Task> readingTasks = new ArrayList<>();
        for (Task task : tasks) {
            if (task.getType() == TaskType.READING) {
                readingTasks.add(task);
            }
        }
        Collections.sort(readingTasks, new Comparator<Task>() {
            @Override
            public int compare(Task t1, Task t2) {
                return t1.getTitle().length() - t2.getTitle().length();
            }
        });
        for (Task readingTask : readingTasks) {
            System.out.println(readingTask.getTitle());
        }
    }
}
The above code is used to print the titles of all READING tasks in order of string length. All Java developers write such code every day. In order to write such a simple program, we have to write 15 lines of Java code. However, the biggest problem with the above code is not its code length, but its inability to clearly convey the developer’s intention: to filter out all READING tasks, sort by the length of the string, and then generate a String type List.

Data processing in Java8
You can use the Stream API in java8 to achieve the same effect as the above code like the following code.

public class Example1_Stream {

    public static void main(String[] args) {
        List<Task> tasks = getTasks();

        List<String> readingTasks = tasks.stream()
                .filter(task -> task.getType() == TaskType.READING)
                .sorted((t1, t2) -> t1.getTitle().length() - t2.getTitle().length())
                .map(Task::getTitle)
                .collect(Collectors.toList());

        readingTasks.forEach(System.out::println);
    }
}
In the above code, a pipeline composed of multiple stream operations is formed.

stream () - similar to the above by tasks List<Task>calling on the set of source stream()to create a stream of pipeline method.

filter(Predicate<T>) -This operation is used to extract elements in the stream that match the predicate definition rules. If you have a stream, you can call zero or more intermittent operations on it. The lambda expression task -> task.getType() == TaskType.READINGdefines a rule for filtering out all READING tasks.

sorted(Comparator<T>) : This operation returns a stream consisting of all the stream elements sorted by the Comparator defined by lambda expression ie in the example shown above. This operation returns a stream that is defined by all lambda expressions Comparator is composed of sorted stream elements. The expression sorted in the above code is (t1, t2) -> t1.getTitle().length()-t2.getTitle().length().

map(Function<T,R>) : This operation returns a stream, and each element of the stream comes from the result of processing each element of the original stream through Function<T,R>.

collect(toList()) -This operation loads the results of various operations on the stream into a list.

Why Java 8 is better

In my opinion Java 8 code is better because of the following reasons: In my opinion, Java 8 code is better for the following reasons:

The Java8 code can clearly express the developer's intention of data filtering, sorting and other operations.

By using the higher abstraction in the Stream API format, developers express what they want rather than how to get these results.

Stream API provides a unified language for data processing, so that developers have a common vocabulary when talking about data processing. When two developers discuss filterfunctions, you will understand that they are both performing a data filtering operation.

Developers no longer need all kinds of boilerplate code written for data processing, and no longer need redundant code for loop code or temporary collection to store data. The Stream API will handle all this.

Stream does not modify the potential collection, it is non-commutative.

What is Stream

Stream is an abstract view on some data. For example, Stream can be a view of a list or a few lines in a file or any other sequence of elements. The Stream API provides the sum of operations that can be expressed sequentially or in parallel. Developers need to understand that Stream is a higher-order abstraction, not a data structure. Stream does not store data. Stream is inherently lazy , and only performs calculations when it is used. It allows us to generate an unlimited stream of data. In Java 8, you can easily write a code that generates unlimited identifiers like the following:

public static void main(String[] args) {
    Stream<String> uuidStream = Stream.generate(() -> UUID.randomUUID().toString());
}
In the Stream interface, there are various static factory methods such as,, ofand so on generatethat iteratecan be used to create stream instances. The generatemethod mentioned above has one Supplier, which Supplieris a functional interface that can be used to describe a function that does not require any input and will produce a value. We generatepass a supplier to the method, and when it is called, it generates a specific identifier symbol.

Supplier<String> uuids = () -> UUID.randomUUID().toString()
Running the above code, nothing will happen, because Stream is lazy loaded and will not be executed until it is used. If we change to the following code, we will see the printed UUID in the console. This program will continue to execute.

public static void main(String[] args) {
    Stream<String> uuidStream = Stream.generate(() -> UUID.randomUUID().toString());
    uuidStream.forEach(System.out::println);
}
Java8 running developers create streams by calling streammethods on a Collection . Stream supports data processing operations, so developers can use higher-level data processing structures to express operations.


Collection vs Stream

The following table illustrates the differences between Collection and Stream



Let's discuss the difference between internal iteration and external iteration, and the concept of lazy assignment.

External iteration vs. internal iteration vs
The difference between the Java8 Stream API code and the Collection API code mentioned above is who controls the iteration, the iterator itself or the developer. The Stream API only provides the operations they want to implement, and then the iterator applies these operations to each element of the underlying Collection. It is called Inner iteration when the iterative operation on the potential Collection is controlled by the iterator itself ; conversely, it is called when the iterative operation is controlled by the developer 外迭代. for-eachThe use of structures in the Collection API is an Inner iteration example.

Some people will say that in the Collection API we also don't need to operate on potential iterators, because the for-eachstructure has been handled very well for us, but the for-eachstructure is actually just a kind of syntactic sugar for the iterator API. for-eachAlthough it is simple, it has some disadvantages-1) only inherent order 2) easy to write imperative code (imperative code) 3) difficult to parallel.

Lazy evaluation

The stream expression will not be evaluated until it is called by the ultimate operation method. Most operations in the Stream API will return a Stream. These operations will not do any execution operations, they will only build this pipeline. Looking at the code below, predict what its output will be.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> stream = numbers.stream().map(n -> n / 0).filter(n -> n % 2 == 0);
In the above code, we divide the number in the stream element by 0. We may think that this code will throw ArithmeticExceptinan exception at runtime , but it will not. Because stream expressions are only evaluated when an ultimate operation is called. If we add the final operation to the above stream, the stream will be executed and an exception will be thrown.

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> stream = numbers.stream().map(n -> n / 0).filter(n -> n % 2 == 0);
stream.collect(toList());
We will get the following stack trace:

Exception in thread "main" java.lang.ArithmeticException: / by zero
    at org._7dayswithx.java8.day2.EagerEvaluationExample.lambda$main$0(EagerEvaluationExample.java:13)
    at org._7dayswithx.java8.day2.EagerEvaluationExample$$Lambda$1/1915318863.apply(Unknown Source)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)

Use Stream API

The Stream API provides a lot of operations that developers can use to query data from the collection. These operations are divided into two types-transition operations and ultimate operations.

Transition operation to produce another new stream function from existing stream, for example filter, map, sorted, and the like.

The ultimate operation of generating a non-stream from the stream function of the results, such as collect(toList()), forEach, countand the like.

Transitional operations allow developers to build pipelines that are executed only when the ultimate operation is called. The following is a list of some functions of the Stream API:

<a href="https://whyjava.files.wordpress.com/2015/07/stream-api.png">
stream-api
</a>

Example class

In this tutorial, we will use the Task management class to explain these concepts. In the example, there is a class called Task, which is a class represented by the user, and its definition is as follows:

import java.time.LocalDate;
import java.util.*;

public class Task {
    private final String id;
    private final String title;
    private final TaskType type;
    private final LocalDate createdOn;
    private boolean done = false;
    private Set<String> tags = new HashSet<>();
    private LocalDate dueOn;

    // removed constructor, getter, and setter for brevity
}
The data set in the example is as follows, we will use it throughout the Stream API example.

Task task1 = new Task("Read Version Control with Git book", TaskType.READING, LocalDate.of(2015, Month.JULY, 1)).addTag("git").addTag("reading").addTag("books");

Task task2 = new Task("Read Java 8 Lambdas book", TaskType.READING, LocalDate.of(2015, Month.JULY, 2)).addTag("java8").addTag("reading").addTag("books");

Task task3 = new Task("Write a mobile application to store my tasks", TaskType.CODING, LocalDate.of(2015, Month.JULY, 3)).addTag("coding").addTag("mobile");

Task task4 = new Task("Write a blog on Java 8 Streams", TaskType.WRITING, LocalDate.of(2015, Month.JULY, 4)).addTag("blogging").addTag("writing").addTag("streams");

Task task5 = new Task("Read Domain Driven Design book", TaskType.READING, LocalDate.of(2015, Month.JULY, 5)).addTag("ddd").addTag("books").addTag("reading");

List<Task> tasks = Arrays.asList(task1, task2, task3, task4, task5);
This chapter does not discuss the Data Time API of Java 8 for the time being. Here we will treat it as an ordinary date API.

Example 1: Find the titles of all READING Tasks and sort them according to their creation time.
The first example we will implement is to find out the titles of all the tasks being read from the Task list and sort them according to their creation time. The operations we have to do are as follows:

Filter out all tasks whose TaskType is READING.
Sort tasks according to creation time.
Get the title of each task.
Pack these titles into a List.
The above four operation steps can be very simply translated into the following code:

private static List<String> allReadingTasks(List<Task> tasks) {
        List<String> readingTaskTitles = tasks.stream().
                filter(task -> task.getType() == TaskType.READING).
                sorted((t1, t2) -> t1.getCreatedOn().compareTo(t2.getCreatedOn())).
                map(task -> task.getTitle()).
                collect(Collectors.toList());
        return readingTaskTitles;
}
In the above code, we used the following methods in the Stream API:

filter : Allows developers to define a judgment rule to extract some elements that meet this rule from potential streams. The rule task -> task.getType() == TaskType.READING means to select all elements whose TaskType is READING from the stream.

sorted : allows developers to define a comparator to sort streams. In the above example, we sorted according to the creation time. The lambda expression (t1, t2) -> t1.getCreatedOn().compareTo(t2.getCreatedOn())compare implements the functions in the functional interface Comparator .

map : Need a Function<? super T, ? extends R>lambda expression that can convert one stream to another stream as a parameter, and the Function<? super T,? extends R> interface can convert one stream to another stream. The lambda expression task -> task.getTitle() turns a task into a title.

collect(toList()) This is the ultimate operation. It packs the titles of all READING Tasks into a list.

We can simplify the above code to the following code by using Comparatorinterface comparingmethods and method references:

public List<String> allReadingTasks(List<Task> tasks) {
    return tasks.stream().
            filter(task -> task.getType() == TaskType.READING).
            sorted(Comparator.comparing(Task::getCreatedOn)).
            map(Task::getTitle).
            collect(Collectors.toList());

}
Starting from Java 8, the interface can contain methods implemented by static and default methods, which have been introduced in ch01 .
Method references Task::getCreatedOnare derived Function<Task,LocalDate>.

In the above code, we use the Comparatorstatic method to help interface comparing, this method requires a receiver for extracting Comparablethe Functionquery and returns a comparison by Key Comparator. Method Reference Task::getCreatedOnis made Function<Task, LocalDate>come.

We can use function composition like the following code, and call the reversed()method on the Comparator to reverse the order very easily.

public List<String> allReadingTasksSortedByCreatedOnDesc(List<Task> tasks) {
    return tasks.stream().
            filter(task -> task.getType() == TaskType.READING).
            sorted(Comparator.comparing(Task::getCreatedOn).reversed()).
            map(Task::getTitle).
            collect(Collectors.toList());
}
Example 2: Remove duplicate tasks
Suppose we have a data set with many repeated tasks, we can distincteasily remove the duplicate elements in the stream by calling methods like the following code :

public List<Task> allDistinctTasks(List<Task> tasks) {
    return tasks.stream().distinct().collect(Collectors.toList());
}
distinct()The method converts a stream into a stream without repeating elements, and it equalsjudges whether the objects are equal by the method of the object. According to the judgment of the object equality method, if two objects are equal, it means that there is duplication, it will be removed from the result stream.

Example 3: Sort by creation time, find the top 5 tasks in reading state
limitThe method can be used to limit the result set to a given number. limitIt is a short circuit operation, which means that it will not operate on all elements in order to get the result.

public List<String> topN(List<Task> tasks, int n){
    return tasks.stream().
            filter(task -> task.getType() == TaskType.READING).
            sorted(comparing(Task::getCreatedOn)).
            map(Task::getTitle).
            limit(n).
            collect(toList());
}
You can use the skipmethod and limitmethod to create a page at the same time as the following code .

// page starts from 0. So to view a second page `page` will be 1 and n will be 5.
//page从0开始,所以要查看第二页的话,`page`应该为1,n应该为5
List<String> readingTaskTitles = tasks.stream().
                filter(task -> task.getType() == TaskType.READING).
                sorted(comparing(Task::getCreatedOn).reversed()).
                map(Task::getTitle).
                skip(page * n).
                limit(n).
                collect(toList());
Example 4: Count the number of tasks whose status is reading
To get the number of all tasks that are currently reading, we can use the countmethod in the stream to obtain, this method is an ultimate method.

public long countAllReadingTasks(List<Task> tasks) {
    return tasks.stream().
            filter(task -> task.getType() == TaskType.READING).
            count();
}
Example 5: List all tags in all tasks non-repeatedly
To find unique tags, we need the following steps

Get the tags in each task.
Put all the tags in one stream.
Remove duplicate tags.
Put the final result into a list.
Through the first and second steps can be streamon the call flatMapto get. flatMapThe operation task.getTags().streamsynthesizes each stream obtained by calling into one stream. Once we put all the tags in a stream, we can distinctget the unique tags by calling the method.

private static List<String> allDistinctTags(List<Task> tasks) {
        return tasks.stream().flatMap(task -> task.getTags().stream()).distinct().collect(toList());
}
Example 6: Check if all reading tasks have booktags
Stream API can be used to detect some of the data set whether the method comprising a given property, allMatch, anyMatch, noneMatch, findFirst, findAny. To determine whether the title of all tasks whose status is reading contains bookstags, you can use the following code to achieve:

public boolean isAllReadingTasksWithTagBooks(List<Task> tasks) {
    return tasks.stream().
            filter(task -> task.getType() == TaskType.READING).
            allMatch(task -> task.getTags().contains("books"));
}
To determine whether there is a task containing java8tag among all reading tasks, it can anyMatchbe achieved by the code as follows:

public boolean isAnyReadingTasksWithTagJava8(List<Task> tasks) {
    return tasks.stream().
            filter(task -> task.getType() == TaskType.READING).
            anyMatch(task -> task.getTags().contains("java8"));
}
Example 7: Create an overview of all titles
When you want to create an overview of all titles, you can use the reduceoperation reduceto turn the stream into a value. reduceThe function accepts a lambda expression that can be used to connect all elements in the stream.

public String joinAllTaskTitles(List<Task> tasks) {
    return tasks.stream().
            map(Task::getTitle).
            reduce((first, second) -> first + " *** " + second).
            get();
}
Example 8: Operation of basic type stream
In addition to the common object-based streams, Java 8 also provides specific streams for basic types such as int, long, and double. Let's look at some examples of basic types of streams.

To create a value interval, you can call the rangemethod. rangeThe method creates a stream with a value from 0 to 9, excluding 10.

IntStream.range(0, 10).forEach(System.out::println);
rangeClosedThe method allows us to create a stream containing the upper limit. Therefore, the following code will generate a stream from 1 to 10.

IntStream.rangeClosed(1, 10).forEach(System.out::println);
You can also iteratecreate infinite streams by using methods on basic types of streams as follows:

LongStream infiniteStream = LongStream.iterate(1, el -> el + 1);
To filter out all even numbers from an infinite stream, you can use the following code to achieve:

infiniteStream.filter(el -> el % 2 == 0).forEach(System.out::println);
limitThe number of result streams can be found by using operations, the code is as follows:
We can limit the resulting stream by using the limitoperation as shown below.

infiniteStream.filter(el -> el % 2 == 0).limit(100).forEach(System.out::println);
Example 9: Creating a stream for an array
You can create a stream for the array by calling Arraysthe static method streamof the class like the following code :

String[] tags = {"java", "git", "lambdas", "machine-learning"};
Arrays.stream(tags).map(String::toUpperCase).forEach(System.out::println);
You can also create a stream based on the specific start index and end index in the array as follows. The start index is included here, and the end index is not included.

Arrays.stream(tags, 1, 3).map(String::toUpperCase).forEach(System.out::println);

Parallel Streams

One advantage of using Stream is that, since Stream uses internal iteration, the java library can effectively manage concurrency. You can call parallelmethods on a stream to make a stream in parallel. parallelThe underlying implementation of the method is based on the fork-joinAPI introduced in JDK7 . By default, it spawns threads equal to the number of machine CPUs. In the following code, we group the numbers according to the thread that processes them. In section 4, we will learn the collectsum groupingByfunction, and now temporarily understand that it can group elements according to a key.

public class ParallelStreamExample {

    public static void main(String[] args) {
        Map<String, List<Integer>> numbersPerThread = IntStream.rangeClosed(1, 160)
                .parallel()
                .boxed()
                .collect(groupingBy(i -> Thread.currentThread().getName()));

        numbersPerThread.forEach((k, v) -> System.out.println(String.format("%s >> %s", k, v)));
    }
}
On my machine, the printed results are as follows:

ForkJoinPool.commonPool-worker-7 >> [46, 47, 48, 49, 50]
ForkJoinPool.commonPool-worker-1 >> [41, 42, 43, 44, 45, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130]
ForkJoinPool.commonPool-worker-2 >> [146, 147, 148, 149, 150]
main >> [106, 107, 108, 109, 110]
ForkJoinPool.commonPool-worker-5 >> [71, 72, 73, 74, 75]
ForkJoinPool.commonPool-worker-6 >> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160]
ForkJoinPool.commonPool-worker-3 >> [21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 76, 77, 78, 79, 80]
ForkJoinPool.commonPool-worker-4 >> [91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145]
Not every working thread processes an equal number of numbers, and the number of fork-join thread pools can be controlled by changing system properties System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "2").

Another parallelexample where operations will be used is when you want to process a list of URLs like this:

String[] urls = {"https://www.google.co.in/", "https://twitter.com/", "http://www.facebook.com/"};
Arrays.stream(urls).parallel().map(url -> getUrlContent(url)).forEach(System.out::println);

No comments:

Post a Comment