Nexmark Benchmark for Apache Flink
1 What is Nexmark
Nexmark is a benchmark suite for queries over continuous data streams. This benchmark is inspired by the NEXMark research paper and Apache Beam.
These are multiple queries over a three entities model representing on online auction system:
Person represents a person submitting an item for auction and/or making a bid on an auction.
Auction represents an item under auction.
Bid represents a bid for an item under auction.
2 The Queries
Nexmark Benchmark Suite | ||||
Query |
| Name |
| Summary |
q0 |
| Pass Through |
| Measures the monitoring overhead including the source generator. |
q1 |
| Currency Conversion |
| Convert each bid value from dollars to euros. |
q2 |
| Selection |
| Find bids with specific auction ids and show their bid price. |
q3 |
| Local Item Suggestion |
| Who is selling in OR, ID or CA in category 10, and for what auction ids? |
q4 |
| Average Price for a Category |
| Select the average of the wining bid prices for all auctions in each category. |
q5 |
| Hot Items |
| Which auctions have seen the most bids in the last period? |
q6 |
| Average Selling Price by Seller |
| What is the average selling price per seller for their last 10 closed auctions. |
q7 |
| Highest Bid |
| Select the bids with the highest bid price in the last period. |
q8 |
| Monitor New Users |
| Select people who have entered the system and created auctions in the last period. |
q9 |
| Winning Bids |
| Find the winning bid for each auction. |
q10 |
| Log to File System |
| Log all events to file system. Illustrates windows streaming data into partitioned file system. |
q11 |
| User Sessions |
| How many bids did a user make in each session they were active? Illustrates session windows. |
q12 |
| Processing Time Windows |
| How many bids does a user make within a fixed processing time limit? Illustrates working in processing time window. |
q13 |
| Bounded Side Input Join |
| Joins a stream to a bounded side input, modeling basic stream enrichment. |
q1 ~ q8 are from original NEXMark queries, q9 ~ q13 are from Apache Beam.
3 Benchmark Guideline
The Nexmark benchmark framework runs Flink queries on standalone cluster (Session Mode). The cluster should consist of one master node and one or more worker nodes. All of them should be Linux environment (the CPU monitor script requries to run on Linux). Please make sure you have the following software installed on each node:
JDK 1.8.x or higher (Nexmark scripts uses some tools of JDK)
ssh (sshd must be running to use the Flink and Nexmark scripts that manage remote components)
Having (passwordless SSH) and the same directory structure on all your cluster nodes will allow you to use our scripts to control everything.
3.1 Build Nexmark
Before start to run the benchmark, you should build the Nexmark benchmark first to have a benchmark package. Please make sure you have installed maven in your build machine. And run the emph{./build.sh} command under nexmark-flink directoy. Then you will get the nexmark-flink.tgz archive under the directory.
shell
git clone https://github.com/nexmark/nexmark $ cd nexmark-flink $ nexmark-flink ~] mvn package -DskipTests [nexmark-flink ~] ls target/ [nexmark-flink-0.2-SNAPSHOT.jar original-nexmark-flink-0.2-SNAPSHOT.jar nexmark-flink ~] ls [build.sh* nexmark-flink.tgz pom.xml src/ target/
3.2 Setup Flink Cluster
- Download the latest Flink package from the download page.
shell
wget https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz --no-check-certificate $ tar xzf flink-1.13.3-bin-scala_2.12.tgz; tar xzf nexmark-flink.tgz $ mv flink-1.13.3 flink; mv nexmark-flink nexmark $
Copy the jars under nexmark/lib to flink/lib which contains the Nexmark source generator.
- Configure Flink
Edit flink/conf/workers and enters the IP address of each worker node. Recommand to set 8 entries.
Replace flink/conf/sql-client-defaults.yaml by nexmark/conf/sql-client-defaults.yaml
- Replace flink/conf/flink-conf.yaml by nexmark/conf/flink-conf.yaml. Remember to update the following configurations:
Set jobmanager.rpc.address to you master IP address
Set state.checkpoints.dir to your local file path (recommend to use SSD), e.g. file:///home/username/checkpoint.
Set state.backend.rocksdb.localdir to your local file path (recommend to use SSD), e.g. /home/username/rocksdb.
Configure Nexmark benchmark. Set nexmark.metric.reporter.host in nexmark/conf/nexmark.yaml to your master IP address.
Copy flink and nexmark folders to your worker nodes using scp.
Start Flink Cluster by running flink/bin/start-cluster.sh on the master node.
Start Nexmark benchmark by running nexmark/bin/setup_cluster.sh on the master node.
3.3 Run Nexmark Benchmark
You can run the Nexmark benchmark by running nexmark/bin/run_query.sh all on the master node. It will run all the queries one by one, and collect benchmark metrics automatically. It will take 50 minutes to finish the benchmark by default. At last, it will print the benchmark summary result (Cores * Time(s) for each query) on the console. You can also run specific queries by running the following command:
nexmark/bin/run_query.sh q1,q2
You can also tune the workload of the queries by editing nexmark/conf/nexmark.yaml with the nexmark.workload.* prefix options.
References
Pete Tucker, Kristin Tufte, Vassilis Papadimos, David Maier. NEXMark – A Benchmark for Queries over Data Streams. June 2010.
Apache Beam, https://beam.apache.org/
Nexmark, https://web.archive.org/web/20100620010601/http://datalab.cs.pdx.edu/niagaraST/NEXMark/
Nexmark Benchmark Suite, https://github.com/nexmark/nexmark