On this page:
1 What is Nexmark
2 The Queries
3 Benchmark Guideline
3.1 Build Nexmark
3.2 Setup Flink Cluster
3.3 Run Nexmark Benchmark
References
7.9

Nexmark Benchmark for Apache Flink

1 What is Nexmark

Nexmark is a benchmark suite for queries over continuous data streams. This benchmark is inspired by the NEXMark research paper and Apache Beam.

These are multiple queries over a three entities model representing on online auction system:

2 The Queries

Nexmark Benchmark Suite

Query

  

Name

  

Summary

q0

  

Pass Through

  

Measures the monitoring overhead including the source generator.

q1

  

Currency Conversion

  

Convert each bid value from dollars to euros.

q2

  

Selection

  

Find bids with specific auction ids and show their bid price.

q3

  

Local Item Suggestion

  

Who is selling in OR, ID or CA in category 10, and for what auction ids?

q4

  

Average Price for a Category

  

Select the average of the wining bid prices for all auctions in each category.

q5

  

Hot Items

  

Which auctions have seen the most bids in the last period?

q6

  

Average Selling Price by Seller

  

What is the average selling price per seller for their last 10 closed auctions.

q7

  

Highest Bid

  

Select the bids with the highest bid price in the last period.

q8

  

Monitor New Users

  

Select people who have entered the system and created auctions in the last period.

q9

  

Winning Bids

  

Find the winning bid for each auction.

q10

  

Log to File System

  

Log all events to file system. Illustrates windows streaming data into partitioned file system.

q11

  

User Sessions

  

How many bids did a user make in each session they were active? Illustrates session windows.

q12

  

Processing Time Windows

  

How many bids does a user make within a fixed processing time limit? Illustrates working in processing time window.

q13

  

Bounded Side Input Join

  

Joins a stream to a bounded side input, modeling basic stream enrichment.

q1 ~ q8 are from original NEXMark queries, q9 ~ q13 are from Apache Beam.

3 Benchmark Guideline

The Nexmark benchmark framework runs Flink queries on standalone cluster (Session Mode). The cluster should consist of one master node and one or more worker nodes. All of them should be Linux environment (the CPU monitor script requries to run on Linux). Please make sure you have the following software installed on each node:

Having (passwordless SSH) and the same directory structure on all your cluster nodes will allow you to use our scripts to control everything.

3.1 Build Nexmark

Before start to run the benchmark, you should build the Nexmark benchmark first to have a benchmark package. Please make sure you have installed maven in your build machine. And run the emph{./build.sh} command under nexmark-flink directoy. Then you will get the nexmark-flink.tgz archive under the directory.

shell

$ git clone https://github.com/nexmark/nexmark
$ cd nexmark-flink
[nexmark-flink ~] mvn package -DskipTests
[nexmark-flink ~] ls target/
nexmark-flink-0.2-SNAPSHOT.jar  original-nexmark-flink-0.2-SNAPSHOT.jar
[nexmark-flink ~] ls
build.sh*  nexmark-flink.tgz  pom.xml  src/  target/

3.2 Setup Flink Cluster
  1. Download the latest Flink package from the download page.

    shell

    $ wget https://dlcdn.apache.org/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.12.tgz --no-check-certificate
    $ tar xzf flink-1.13.3-bin-scala_2.12.tgz; tar xzf nexmark-flink.tgz
    $ mv flink-1.13.3 flink; mv nexmark-flink nexmark

  2. Copy the jars under nexmark/lib to flink/lib which contains the Nexmark source generator.

  3. Configure Flink
    • Edit flink/conf/workers and enters the IP address of each worker node. Recommand to set 8 entries.

    • Replace flink/conf/sql-client-defaults.yaml by nexmark/conf/sql-client-defaults.yaml

    • Replace flink/conf/flink-conf.yaml by nexmark/conf/flink-conf.yaml. Remember to update the following configurations:
      • Set jobmanager.rpc.address to you master IP address

      • Set state.checkpoints.dir to your local file path (recommend to use SSD), e.g. file:///home/username/checkpoint.

      • Set state.backend.rocksdb.localdir to your local file path (recommend to use SSD), e.g. /home/username/rocksdb.

  4. Configure Nexmark benchmark. Set nexmark.metric.reporter.host in nexmark/conf/nexmark.yaml to your master IP address.

  5. Copy flink and nexmark folders to your worker nodes using scp.

  6. Start Flink Cluster by running flink/bin/start-cluster.sh on the master node.

  7. Start Nexmark benchmark by running nexmark/bin/setup_cluster.sh on the master node.

3.3 Run Nexmark Benchmark

You can run the Nexmark benchmark by running nexmark/bin/run_query.sh all on the master node. It will run all the queries one by one, and collect benchmark metrics automatically. It will take 50 minutes to finish the benchmark by default. At last, it will print the benchmark summary result (Cores * Time(s) for each query) on the console. You can also run specific queries by running the following command:

nexmark/bin/run_query.sh q1,q2

You can also tune the workload of the queries by editing nexmark/conf/nexmark.yaml with the nexmark.workload.* prefix options.

References
  1. Pete Tucker, Kristin Tufte, Vassilis Papadimos, David Maier. NEXMark – A Benchmark for Queries over Data Streams. June 2010.

  2. Apache Beam, https://beam.apache.org/

  3. Nexmark, https://web.archive.org/web/20100620010601/http://datalab.cs.pdx.edu/niagaraST/NEXMark/

  4. Nexmark Benchmark Suite, https://github.com/nexmark/nexmark