This article was published as a part of the Data Science Blogathon.
Introduction
Apache Flink is a big data framework that allows programmers to process huge amounts of data in a very efficient and scalable way. This article will introduce some basic API concepts and standard data transformations available in the Apache Flink Java API. The fluid style of this API makes it easy to work with Flink’s central construct – a distributed collection. First, we’ll look at Flink’s DataSet API transformations and use them to implement a word-counting program. Then we’ll take a brief look at Flink’s DataStream API, which allows you to process event streams in real time.
Dependency on Maven
org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test
Basic API Concepts
DataSet API Transformation
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
int threshold = 30; List collect = amounts .filter(a -> a > threshold) .reduce((integer, t1) -> integer + t1) .collect(); claimThis(collect.get(0)).isEqualTo(90);
private static class Person { private int age; private String name; // standard constructors/getters/setters }
DataSet personDataSource = env.from collection( Arrays.asList( new Person(23, "Tom"), new Person(75, "Michael")));
List ages = personDataSource .map(p -> p.age) .collect(); claim this(ages).size(2); assert(ages).contains(23, 75);
Connect
Tuple3 address = new Tuple3(1, "5th Avenue", "London"); DataSet<Tuple3> addresses = env.from elements(address); Tuple2 first transaction = new Tuple2(1, "Transaction_1"); Transaction DataSet<Tuple2> = env.from elements(first transaction, new Tuple2(12, "Transaction_2"));
The first field in both tuples is of type Integer and is the id field on which we want to join the two datasets.
private static class IdKeySelectorTransaction implements KeySelector<Tuple2, Integer> { @Overwrite public Integer getKey(Tuple2 value) { return value.f0; } } private static class IdKeySelectorAddress implements KeySelector<Tuple3, Integer> { @Overwrite public Integer getKey(Tuple3 value) { return value.f0; } }
List<Nice2<Nice2, Nice3>> connected = transaction.connect(addresses) .where(new IdKeySelectorTransaction()) .equalTo(new IdKeySelectorAddress()) .collect(); claim this(joined).hasSize(1); claim that(joined).contains(new Tuple2(first transaction, address));
Arrange
Tuple2 secondPerson = new Tuple2(4, "Tom"); Tuple2 third person = new Tuple2(5, "Scott"); Tuple2 fourth person = new Tuple2(200, "Michael"); Tuple2 firstPerson = new Tuple2(1, "Jack"); DataSet<Tuple2> transaction = env.from elements( fourth person, second person, third person, first-person);
List<Nice2> sorted = transactions .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING) .collect(); claim that (sorted) .containsExactly(first person, second-person, third-person, fourth person);
Word Count
public class LineSplitter implements FlatMapFunction<String, Tuple2> { @Overwrite public void flatMap(String value, Collector<Nice2> out) { Stream.of(value.toLowerCase().split("\W+")) .filter(t -> t.length() > 0) .forEach(token -> out.collect(new Tuple2(token, 1))); } }
public static DataSet<Tuple2> startWordCount( ExecutionEnvironment env, List lines) throws Exception { DataSet text = env.fromCollection(lines); return text.flatMap(new LineSplitter()) .groupBy(0) .aggregate(Aggregations.SUM, 1); }
List lines = Arrays.asList( "This is the first sentence", "This is the second one-word sentence"); DataSet<Tuple2> result = WordCount.startWordCount(env, lines); List<Tuple2> collect = result.collect(); assert that(collect).containsExactlyInAnyOrder( new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1), new Tuple2("is", 2), new Tuple2("that", 2), new Tuple2("other", 1), new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));
DataStream API
StreamExecutionEnvironment execution environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream dataStream = execution environment.form elements( "This is the first sentence", "This is the second one-word sentence");
SingleOutputStreamOperator uppercase = text.map(String::toUpperCase);
uppercase.print(); env.execute();
1> THIS IS THE FIRST SENTENCE 2> THIS IS THE SECOND SENTENCE WITH ONE WORD
Events window
SingleOutputStreamOperator<Tuple2> in window = env.from elements( new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()), new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond())) .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor <Tuple2>(Time.seconds(20)) { @Overwrite public long extract timestamp(Tuple2 element) { return element.f1 * 1000; } });
SingleOutputStreamOperator<Tuple2> reduced = in window .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) .maxBy(0, true); reduced.print();
1> (15.1491221519)
Conclusion
- We implemented a word count program using Flink’s smooth and functional DataSet API. We then looked at the DataStream API and implemented a simple real-time transformation to an event stream.
- The implementation of all these examples and code snippets can be found on GitHub – this is a Maven project, so it should be easy to import and run as is.
-
Flink transformations are lazy and not executed until the sink operation is invoked. The API has two modes of operations, i.e., batch and real-time. If you’re dealing with a limited data source that can be processed in batch mode, you’ll use the DataSet API.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.