Working with Streams and Lambda expressions
- Implement functional interfaces using lambda expressions, including interfaces from the
java.util.function
package- Use Java Streams to filter, transform and process data
- Perform decomposition and reduction, including grouping and partitioning on sequential and parallel streams
- Functional interfaces and lambda expressions
- Basic principle of streams
- Streams and lambda expressions
- Streams for filtering
- Data processing: map, reduce…
- Technical issues: non-reusability…
- Streams and parallelism
- Streams enhancements from Java 9
Functional interfaces Rule(s)
- Functional interfaces have only one (abstract) method. In the Java library, functional interfaces are annotated with
@java.lang.FunctionalInterface
and are intimately associated with lambda expressions.java.lang.Runnable
is a functional interface. It has a singlerun
method that requires a (deferred) body through an inheriting class. Threads require runnable objects as arguments for later executing (based onstart
)run
in split threads of control. To avoid the creation of such a class (that involves the creation of an associated.java
file) for only providing the body ofrun
, a lambda expression is useful.Example Lambda_expression.Java.zip
public class Functional_interface_test { public static void main(String[] args) { // No reference to the 'run' method is required since 'java.lang.Runnable' only owns this single method: Runnable to_do = () -> { try { Thread.sleep(1000L); // Nothing to do for 1 sec. <=> a kind of delay! } catch (InterruptedException ie) { java.util.logging.Logger.getLogger("Why it doesn't work?").log(java.util.logging.Level.SEVERE, null, ie); } }; new Thread(to_do).start(); …
Expressiveness
Rule(s)
- While syntax of a lambda expressions is compact, it may decomposed starting from the
@FunctionalInterface
annotation.Example Lambda_expression.Java.zip
@FunctionalInterface public interface My_functional_interface { void my_print(Object o); } … My_functional_interface mfi = (o) -> System.out.println(o.toString()); // Java 7, which is deeply inspired by JavaScript! mfi.my_print("This is Java 7 style..."); … My_functional_interface mfi2 = System.out::println; // This is called "method reference" mfi2.my_print("But Java 8 goes beyond...");
Example Heap.Java.zip
java.util.Comparator<Short> comparator = (Short i, Short j) -> { return i - j; };
Example (instead of…)
java.util.Comparator<Short> comparator = new java.util.Comparator<>() { @Override public int compare(Short i, Short j) { if (i < j) { return -1; } if (i == j) { return 0; } // if (i > j) { // return 1; // } return 1; // Mandatory without 'if' } };
Limitation
Example (more than one method prevents the utilization of a lambda expression)
// @FunctionalInterface -> compilation error because of *TWO* methods! public interface A_given_interface { void a_given_method(); void another_given_method(); } … /* A_given_interface get_one() { return new A_given_interface() { @Override public void a_given_method() { throw new UnsupportedOperationException("Don't be lazy! Please define body..."); } @Override public void another_given_method() { throw new UnsupportedOperationException("Don't be lazy! Please define body..."); } }; } */
In conjunction with streams…
Rule(s)
- Lambda expressions are natural counterparts of stream programming.
Example
java.util.List<Integer> positives = java.util.Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); java.util.stream.Stream<Integer> stream = positives.stream().filter(p -> p > 1 && java.util.stream.IntStream.rangeClosed(2, (int) Math.sqrt(p)).noneMatch(i -> (p % i == 0))); java.util.List<Integer> primes = stream.collect(java.util.stream.Collectors.toList()); primes.forEach(p -> System.out.print(" " + p)); // '2 3 5 7 11' is displayed...
Rule(s)
- Java streams have been introduced in Java 8 with amendments later on.
- Streams support transparent data processing, especially “big” data sets. Streams are logically related data that a priori cannot, compared to collections, stay in the computer memory as a whole (i.e., “streams are lazily constructed”). Differences between collections and streams are discussed ☛.
- Streams entail the idea of parallel processing without the explicit (complicated) use of threads. Transparency then comes from facilities within the Java stream API that hides multithreading.
- Typically, primitive array concatenation ☛ may be advantageously replaced by stream usage.
Example (use of method reference:
::
mechanism) Primitive_array_concatenation.Java.zippublic class Primitive_array_concatenation { private final static String[] _Author = {"Franck", "Barbier"}; // Delimited word: [^\w] <=> [^a-zA-Z_0-9] private final static java.util.regex.Pattern _Word = java.util.regex.Pattern.compile("[^\\w]"); public static void main(String[] args) { String string = "Primitive array concatenation from"; String[] words = _Word.split(string); String[] concatenation = java.util.stream.Stream.of(words, _Author).flatMap(java.util.stream.Stream::of).toArray(String[]::new); // 'of': stream of two arrays // 'flatMap': stream of the two arrays' elements } }
Rule(s)
- The move from collections to streams is straightforward.
java.util.stream.Stream<String> s = java.util.Arrays.asList("Franck", "Sophie", "Oscar", "Léna", "Joseph").stream();
Streams work together with lambda expressions for code leanness Rule(s)
java.util.function.Function<T, R>
(a Java functional interface) is the type of the unique parameter of themap
method for streams. Themap
method returns a stream ofR
elements fromT
elements. Two examples below without, and later with, lambda expressions show two equivalent forms of instantiatingjava.util.function.Function<T, R>
.Example (without lambda expression)
final javax.json.stream.JsonParser.Event event = parser.next(); assert (event == javax.json.stream.JsonParser.Event.START_ARRAY); // The JSON parser points to a JSON array // For test only: // parser.getArrayStream().forEach(System.out::println); // Stream is no longer usable afterwards! java.util.stream.Stream<javax.json.JsonObject> stream = parser.getArrayStream() .map(new java.util.function.Function<javax.json.JsonValue, javax.json.JsonObject>() { @Override public javax.json.JsonObject apply(javax.json.JsonValue json_value) { // '(javax.json.JsonObject) json_value' <=> 'json_value.asJsonObject()' return (javax.json.JsonObject) json_value; // Be sure that array elements are actually JSON objects! } });
Example (with lambda expression) Carrefour_Items.Java.zip
java.util.stream.Stream<javax.json.JsonObject> stream = parser.getArrayStream() // '(javax.json.JsonObject) json_value' <=> 'json_value.asJsonObject()' .map(json_value -> (javax.json.JsonObject) json_value); // Be sure that array elements are actually JSON objects!
Rule(s)
- Streams offer a high-end interoperability with collections (and vice-versa). Specifically,
java.util.Collections
(on the “collections” side) andjava.util.stream.Collectors
(on the “streams” side) are utility classes that foster this interoperability.Example Horner.Java.zip
public class Polynomial { private java.util.Map<Integer, Double> _polynomial = new java.util.HashMap<>(); Polynomial() { _polynomial.put(1, -12.D); _polynomial.put(39, 8.D); } public double horner_method(final double x) { java.util.List<Integer> x_powers = _polynomial.keySet().stream().collect(java.util.stream.Collectors.toList()); java.util.Collections.sort(x_powers, java.util.Collections.reverseOrder()); … } }
Rule(s)
- Web programming (HTTP, HTTPS, WebSockets…) in essence is the processing of incoming data, for instance, incoming data filtering. The following example shows how a message receiver identifies the message sender's session within multiple WebSockets sessions, i.e., multiple connected participants.
Example (
filter
,findAny
and,findFirst
)@javax.websocket.OnMessage public void onMessage(javax.websocket.Session sender, String message) throws java.io.IOException { // Get the sender session among the opened sessions: java.util.Optional<javax.websocket.Session> sender_session = sender.getOpenSessions().stream() .filter(session -> sender.getId() == session.getId() && !session.getPathParameters().values().isEmpty()) .findAny(); // A little bit stupid, but for illustration only... /* Test */ assert (sender_session.isPresent()); /* End of test */ // Not empty by construction, see filter above: String sender_name = sender_session.get().getPathParameters().values().iterator().next(); // Get peers and select the (only) one (Java <-> JavaScript) that has to receive this in-progress message: for (java.util.Iterator<javax.websocket.Session> i = sender.getOpenSessions().iterator(); i.hasNext();) { javax.websocket.Session session = i.next(); if (sender.getId() != session.getId()) { // This prevents some echo... /* Test */ assert (!session.getPathParameters().values().isEmpty()); /* End of test */ java.util.Optional<String> receiver_name = session.getPathParameters().values().stream().findFirst(); /* Test */ assert (receiver_name.isPresent()); /* End of test */ if (sender_name.equals(receiver_name.get())) { session.getBasicRemote().sendText(message); break; } } } }
Scenario(s)
Carrefour.io (unfortunately, site is no longer operational) was the digital vision of the Carrefour French retailer. Carrefour.io offered the Items the Stores APIs. Queries about items (i.e., common consumer products) return JSON raw data that may require post-processing as in the following example:
private final static String _POULAIN = "{\"size\": 200,\"queries\": [{\"query\": \"poulain\",\"field\": \"barcodeDescription\"}]}";
as JSON query expects at most 200 items whose ‘brand’ or ‘description’ (according to the API doc., ‘barcodeDescription’ implies search in both ‘brand’ and ‘description’) value field contains the"poulain"
literal (search is not case-sensitive) which, for most people in France, matches to a chocolate brand with derivatives such as cocoa powder, etc. Around 100 items are returned.
private final static String _POULAIN_NOISETTE = "{\"queries\": [{\"query\": \"poula\",\"field\": \"barcodeDescription\"},{\"query\": \"noiset\",\"field\": \"barcodeDescription\"}]}";
as JSON query returns less than 10 items due to the"noiset"
literal (i.e., looking for chocolate with nuts) must also be present in the ‘brand’ or ‘description’ value field ("oiset"
instead of"noiset"
retrieves no item). More generally, the Carrefour.io Items API allows (cursory) filters that imply further filtering at local place (i.e., client program).Rule(s)
- Queries' post-processing is the possibility of dealing with returned data for further filtering. For queries retrieving big data sets, the use of Java streams aims at accelerating filtering, through parallel processing in particular.
- As an illustration, the following example simply gets a stream from raw JSON data (returned items from Carrefour.io Items API).
Example Carrefour_Items.Java.zip
try (javax.json.stream.JsonParser parser = javax.json.Json.createParser(new java.io.StringReader(_json_result))) { while (parser.hasNext()) { if (parser.next() == javax.json.stream.JsonParser.Event.START_ARRAY) { // The JSON parser points to a JSON array java.util.stream.Stream<javax.json.JsonValue> stream = parser.getArrayStream(); …
Rule(s)
- A stream cannot be reused. If so, an
IllegalStateException
object is raised. Following example fails (lines 14-16) because, line 12,allMatch
(as terminal operation) makes thestream
object non-reusable.Example Carrefour_Items.Java.zip
public void check_format() { try (javax.json.stream.JsonParser parser = javax.json.Json.createParser(new java.io.StringReader(_json_result))) { while (parser.hasNext()) { // '_LIST' key is associated with a value being an array whose elements are the returned items (by default, only 20 are returned): if (parser.next() == javax.json.stream.JsonParser.Event.KEY_NAME && parser.getString().equals(_LIST)) { final javax.json.stream.JsonParser.Event event = parser.next(); assert (event == javax.json.stream.JsonParser.Event.START_ARRAY); java.util.stream.Stream<javax.json.JsonValue> stream = parser.getArrayStream(); // Check that all members are 'javax.json.JsonValue.ValueType.OBJECT': assert (stream .allMatch(json_value -> json_value.getValueType() == javax.json.JsonValue.ValueType.OBJECT)); // Check (based on map-reduce principle) that all members are 'javax.json.JsonValue.ValueType.OBJECT': java.util.Optional<Boolean> optional = stream .map(json_value -> json_value.getValueType() == javax.json.JsonValue.ValueType.OBJECT) .reduce((Boolean a, Boolean b) -> a & b); assert (optional.isPresent() && optional.get()); } } } }
Rule(s)
- Streams lazy evaluation (principle ☛) entails the fact that intermediate operations (e.g.,
map
) on streams are effectively executed at the time terminal operations (e.g.,reduce
) occur. The distinction between intermediate and terminal operations is described ☛. Lazy evaluation benefit only results from big, possibly infinite, data streams. Issues about the way lazy evaluation may improve performance are addressed ☛.- Following example applies
filter
intermediate operation at the time theforEach
terminal operation is called.peek
intermediate operation is only devoted to debugging (explanation ☛). So, givenjson_value1
,json_value2
…json_valueN
, data is processed as follows:forEach(peek(filter(json_value1)))
,forEach(peek(filter(json_value2)))
…forEach(peek(filter(json_valueN)))
. Parallel streams is then the ability of possibly assigning theseN
(or less) pieces of work to threads.- Note that filtering is a key facility of streams. Following example extracts members that obey a predicate before searching “words” from each member's content. While
filter
andpeek
are based on a lambda expression,forEach
is not (i.e., thejava.util.function.Consumer<T>
functional interface is instantiated).Example Carrefour_Items.Java.zip
stream .filter(json_value -> json_value.getValueType() == javax.json.JsonValue.ValueType.OBJECT) .peek(json_value -> System.out.println("Filtered value: " + json_value.asJsonObject().toString())) .forEach(new java.util.function.Consumer<javax.json.JsonValue>() { @Override public void accept(javax.json.JsonValue json_value) { _search_items_(json_value.asJsonObject(), words); } });
Parallel computing and streams Rule(s)
- A stream may be made explicitly parallel, but the use of the
java.util.Spliterator
interface is another way of dealing with parallelism in a more implicit way.Example (
stream
object is made parallel from theparallel
Boolean variable at line 1)java.util.stream.Stream<javax.json.JsonValue> stream = parallel ? parser.getArrayStream().parallel() : parser.getArrayStream(); assert (stream.isParallel() || !parallel); stream // '(javax.json.JsonObject) json_value' <=> 'json_value.asJsonObject()': .map(json_value -> (javax.json.JsonObject) json_value) // Be sure that array elements are actually JSON objects! .forEach(new java.util.function.Consumer<javax.json.JsonObject>() { @Override public void accept(javax.json.JsonObject json_object) { _search_items_(json_object, words); } });
Rule(s)
- The
java.util.Spliterator
interface offers thetrySplit
method in order to divide data processing into pieces of work that are (virtually) assignable to threads ⇒ parallelism. Following example recursively divides ajava.util.Spliterator
object as much as possible.Example (
trySplit
)private void _max_splitting(java.util.Set<java.util.Spliterator<javax.json.JsonObject>> split_iterators, java.util.Spliterator<javax.json.JsonObject> split_iterator) { // Line for test only: long split_iterator_estimateSize = split_iterator.estimateSize(); java.util.Spliterator<javax.json.JsonObject> splitting = split_iterator.trySplit(); if (splitting == null) { split_iterators.add(split_iterator); } else { // Line for test only: assert (split_iterator_estimateSize >= split_iterator.estimateSize() + splitting.estimateSize()); _max_splitting(split_iterators, split_iterator); _max_splitting(split_iterators, splitting); } }
Rule(s)
- Once split, parallel data processing occurs through the
tryAdvance
orforEachRemaining
methods.Example
java.util.List<javax.json.JsonObject> list = parser.getArrayStream() .map(json_value -> json_value.asJsonObject()) .collect(java.util.stream.Collectors.toList()); // Reduction principle java.util.Spliterator<javax.json.JsonObject> root_split_iterator = list.spliterator(); assert (root_split_iterator.characteristics() == (java.util.Spliterator.ORDERED | java.util.Spliterator.SIZED | java.util.Spliterator.SUBSIZED)); // Alternative: // java.util.Spliterator<javax.json.JsonObject> root_split_iterator = java.util.Spliterators.spliterator(list, (java.util.Spliterator.IMMUTABLE | java.util.Spliterator.NONNULL | java.util.Spliterator.ORDERED | java.util.Spliterator.SIZED | java.util.Spliterator.SUBSIZED)); java.util.Set<java.util.Spliterator<javax.json.JsonObject>> split_iterators = new java.util.HashSet<>(); _max_splitting(split_iterators, root_split_iterator); // Versus '_min_splitting(split_iterators, root_split_iterator);' split_iterators.forEach((split_iterator) -> { split_iterator.forEachRemaining(json_object -> _search_items_(json_object, words)); // Alternative: // boolean result; // do { // result = split_iterator.tryAdvance(json_object -> _search_items_(json_object, words)); // } while (result); });
Scenario(s)
The retrieval of 9.500 (requested size) items among (around) 20.000 items matching the query is followed by the application of the same (local) filter. This filtering is called 500 times with the same method in order to determine an average elapsed time. The No stream method filtering is realized with the help of
filter_items
whilefilter_items_
supports both the Sequential stream and Parallel stream methods ('parallel' argument equals tofalse
ortrue
). Thejava.util.Spliterator
method is supported byfilter_items__
.
Measures Method ⤳ No stream Sequential stream Parallel stream java.util.Spliterator
Elapsed time ≈ 250 ms ≈ 250 ms ≈ 250 ms ≈ 275 ms Comments Common (end-to-end) JSON parsing Stream of JSON values (i.e., javax.json.JsonValue
) from JSON parser:parser.getArrayStream()
Stream of JSON values (i.e., javax.json.JsonValue
) is switched to “parallel” as follows:parser.getArrayStream().parallel()
Variations on trySplit
) for minimum versus maximum splitting andcharacteristics
provoke no effect on performance!Surprisingly, using
java.util.Spliterator
leads to a slight overhead while other methods lead to the same performance! However, Java performance benchmarking has to preferably be carried out with the tool described ☛
Rule(s)
- Streams benefit from slight API enhancements in Java 9 ☛.
Example From_Java_9.Java.zip
java.util.List<Integer> positives = java.util.Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12); java.util.stream.Stream<Integer> stream = positives.stream().filter(p -> p > 1 && java.util.stream.IntStream.rangeClosed(2, (int) Math.sqrt(p)).noneMatch(i -> (p % i == 0))); System.out.println(stream.takeWhile(p -> !p.equals(7)).collect(java.util.stream.Collectors.toList())); // '[2, 3, 5]' is displayed... java.util.stream.Stream<Integer> stream_ = positives.stream().filter(p -> p > 1 && java.util.stream.IntStream.rangeClosed(2, (int) Math.sqrt(p)).noneMatch(i -> (p % i == 0))); System.out.println(stream_.dropWhile(p -> !p.equals(7)).collect(java.util.stream.Collectors.toList())); // '[7, 11]' is displayed...
Rule(s)
- Streams in Java 8 cannot have
null
values. TheofNullable
static method in Java 9 improves the way of dealing withnull
.Example From_Java_9.Java.zip
java.util.List<Object> objects = java.util.Arrays.asList(new Object(), null, new Object()); java.util.stream.Stream<Object> stream__ = objects.stream().flatMap(o -> java.util.stream.Stream.ofNullable(o)); System.out.println(stream__.count()); // '2' is displayed...
See also
Eugen Baeldung Web site has a rich section on Java streams ☛.