CREATE TABLE metrics ( host STRING NOT NULL, metric STRING NOT NULL, time INT64 NOT NULL, value DOUBLE NOT NULL, PRIMARY KEY (host, metric, time) );
Kudu tables have a structured data model similar to tables in a traditional RDBMS. Schema design is critical for achieving the best performance and operational stability from Kudu. Every workload is unique, and there is no single schema design that is best for every table. This document outlines effective schema design philosophies for Kudu, paying particular attention to where they differ from approaches used for traditional RDBMS schemas.
At a high level, there are three concerns when creating Kudu tables: column design, primary key design, and partitioning design. Of these, only partitioning will be a new concept for those familiar with traditional non-distributed relational databases. The final sections discuss altering the schema of an existing table, and known limitations with regard to schema design.
The perfect schema would accomplish the following:
Data would be distributed in such a way that reads and writes are spread evenly across tablet servers. This is impacted by partitioning.
Tablets would grow at an even, predictable rate and load across tablets would remain steady over time. This is most impacted by partitioning.
Scans would read the minimum amount of data necessary to fulfill a query. This is impacted mostly by primary key design, but partitioning also plays a role via partition pruning.
The perfect schema depends on the characteristics of your data, what you need to do with it, and the topology of your cluster. Schema design is the single most important thing within your control to maximize the performance of your Kudu cluster.
A Kudu Table consists of one or more columns, each with a defined type. Columns that are not part of the primary key may be nullable. Supported column types include:
8-bit signed integer
16-bit signed integer
32-bit signed integer
64-bit signed integer
date (32-bit days since the Unix epoch)
unixtime_micros (64-bit microseconds since the Unix epoch)
single-precision (32-bit) IEEE-754 floating-point number
double-precision (64-bit) IEEE-754 floating-point number
decimal (see Decimal Type for details)
varchar (see Varchar Type for details)
UTF-8 encoded string (up to 64KB uncompressed)
binary (up to 64KB uncompressed)
Kudu takes advantage of strongly-typed columns and a columnar on-disk storage format to provide efficient encoding and serialization. To make the most of these features, columns should be specified as the appropriate type, rather than simulating a 'schemaless' table using string or binary columns for data which may otherwise be structured. In addition to encoding, Kudu allows compression to be specified on a per-column basis.
No Version or Timestamp ColumnKudu does not provide a version or timestamp column to track changes to a row. If version or timestamp information is needed, the schema should include an explicit version or timestamp column.
decimal type is a numeric data type with fixed scale and precision suitable for
financial and other arithmetic calculations where the imprecise representation and
rounding behavior of
double make those types impractical. The
type is also useful for integers larger than int64 and cases with fractional values
in a primary key.
decimal type is a parameterized type that takes precision and scale type
Precision represents the total number of digits that can be represented by the column, regardless of the location of the decimal point. This value must be between 1 and 38 and has no default. For example, a precision of 4 is required to represent integer values up to 9999, or to represent values up to 99.99 with two fractional digits. You can also represent corresponding negative values, without any change in the precision. For example, the range -9999 to 9999 still only requires a precision of 4.
Scale represents the number of fractional digits. This value must be between 0 and the precision. A scale of 0 produces integral values, with no fractional part. If precision and scale are equal, all of the digits come after the decimal point. For example, a decimal with precision and scale equal to 3 can represent values between -0.999 and 0.999.
Kudu stores each value in as few bytes as possible depending on the precision specified for the decimal column. For that reason it is not advised to just use the highest precision possible for convenience. Doing so could negatively impact performance, memory and storage.
Before encoding and compression:
Decimal values with precision of 9 or less are stored in 4 bytes.
Decimal values with precision of 10 through 18 are stored in 8 bytes.
Decimal values with precision greater than 18 are stored in 16 bytes.
The precision and scale of
varchar type is a UTF-8 encoded string (up to 64KB uncompressed) with a
fixed maximum character length. This type is especially useful when migrating
from or integrating with legacy systems that support the
If a maximum character length is not required the
string type should be
varchar type is a parameterized type that takes a length attribute.
Length represents the maximum number of UTF-8 characters allowed. Values with characters greater than the limit will be truncated. This value must be between 1 and 65535 and has no default. Note that some other systems may represent the length limit in bytes instead of characters. That means that Kudu may be able to represent longer values in the case of multi-byte UTF-8 characters.
Each column in a Kudu table can be created with an encoding, based on the type of the column.
int8, int16, int32, int64
plain, bitshuffle, run length
plain, bitshuffle, run length
float, double, decimal
plain, run length
string, varchar, binary
plain, prefix, dictionary
Data is stored in its natural format. For example,
values are stored as fixed-size 32-bit little-endian integers.
A block of values is rearranged to store the most significant bit of every value, followed by the second most significant bit of every value, and so on. Finally, the result is LZ4 compressed. Bitshuffle encoding is a good choice for columns that have many repeated values, or values that change by small amounts when sorted by primary key. The bitshuffle project has a good overview of performance and use cases.
Runs (consecutive repeated values) are compressed in a column by storing only the value and the count. Run length encoding is effective for columns with many consecutive repeated values when sorted by primary key.
A dictionary of unique values is built, and each column value is encoded as its corresponding index in the dictionary. Dictionary encoding is effective for columns with low cardinality. If the column values of a given row set are unable to be compressed because the number of unique values is too high, Kudu will transparently fall back to plain encoding for that row set. This is evaluated during flush.
Common prefixes are compressed in consecutive column values. Prefix encoding can be effective for values that share common prefixes, or the first column of the primary key, since rows are sorted by primary key within tablets.
Kudu allows per-column compression using the
compression codecs. By default, columns that are Bitshuffle-encoded are
inherently compressed with LZ4 compression. Otherwise, columns are stored
uncompressed. Consider using compression if reducing storage space is more
important than raw scan performance.
Every data set will compress differently, but in general LZ4 is the most
performant codec, while
zlib will compress to the smallest data sizes.
Bitshuffle-encoded columns are automatically compressed using LZ4, so it is not
recommended to apply additional compression on top of this encoding.
Every Kudu table must declare a primary key comprised of one or more columns. Like an RDBMS primary key, the Kudu primary key enforces a uniqueness constraint. Attempting to insert a row with the same primary key values as an existing row will result in a duplicate key error.
Primary key columns must be non-nullable, and may not be a boolean, float or double type.
Once set during table creation, the set of columns in the primary key may not be altered.
Unlike an RDBMS, Kudu does not provide an auto-incrementing column feature, so the application must always provide the full primary key during insert.
Row delete and update operations must also specify the full primary key of the row to be changed. Kudu does not natively support range deletes or updates.
The primary key values of a column may not be updated after the row is inserted. However, the row may be deleted and re-inserted with the updated value.
As with many traditional relational databases, Kudu’s primary key is in a clustered index. All rows within a tablet are sorted by its primary key.
When scanning Kudu rows, use equality or range predicates on primary key columns to efficiently find the rows.
|Primary key indexing optimizations apply to scans on individual tablets. See the Partition Pruning section for details on how scans can use predicates to skip entire tablets.|
This section discuss a primary key design consideration for timeseries use cases where the primary key is a timestamp, or the first column of the primary key is a timestamp.
Each time a row is inserted into a Kudu table, Kudu looks up the primary key in the primary key index storage to check whether that primary key is already present in the table. If the primary key exists in the table, a "duplicate key" error is returned. In the typical case where data is being inserted at the current time as it arrives from the data source, only a small range of primary keys are "hot". So, each of these "check for presence" operations is very fast. It hits the cached primary key storage in memory and doesn’t require going to disk.
In the case when you load historical data, which is called "backfilling", from an offline data source, each row that is inserted is likely to hit a cold area of the primary key index which is not resident in memory and will cause one or more HDD disk seeks. For example, in a normal ingestion case where Kudu sustains a few million inserts per second, the "backfill" use case might sustain only a few thousand inserts per second.
To alleviate the performance issue during backfilling, consider the following options:
Make the primary keys more compressible.
For example, with the first column of a primary key being a random ID of 32-bytes, caching one billion primary keys would require at least 32 GB of RAM to stay in cache. If caching backfill primary keys from several days ago, you need to have several times 32 GB of memory. By changing the primary key to be more compressible, you increase the likelihood that the primary keys can fit in cache and thus reducing the amount of random disk I/Os.
Use SSDs for storage as random seeks are orders of magnitude faster than spinning disks.
Change the primary key structure such that the backfill writes hit a continuous range of primary keys.
In order to provide scalability, Kudu tables are partitioned into units called tablets, and distributed across many tablet servers. A row always belongs to a single tablet. The method of assigning rows to tablets is determined by the partitioning of the table, which is set during table creation.
Choosing a partitioning strategy requires understanding the data model and the expected workload of a table. For write-heavy workloads, it is important to design the partitioning such that writes are spread across tablets in order to avoid overloading a single tablet. For workloads involving many short scans, where the overhead of contacting remote servers dominates, performance can be improved if all of the data for the scan is located in the same tablet. Understanding these fundamental trade-offs is central to designing an effective partition schema.
No Default PartitioningKudu does not provide a default partitioning strategy when creating tables. It is recommended that new tables which are expected to have heavy read and write workloads have at least as many tablets as tablet servers.
Kudu provides two types of partitioning: range partitioning and hash partitioning. Tables may also have multilevel partitioning, which combines range and hash partitioning, or multiple instances of hash partitioning.
Range partitioning distributes rows using a totally-ordered range partition key. Each partition is assigned a contiguous segment of the range partition keyspace. The key must be comprised of a subset of the primary key columns. If the range partition columns match the primary key columns, then the range partition key of a row will equal its primary key. In range partitioned tables without hash partitioning, each range partition will correspond to exactly one tablet.
The initial set of range partitions is specified during table creation as a set of partition bounds and split rows. For each bound, a range partition will be created in the table. Each split will divide a range partition in two. If no partition bounds are specified, then the table will default to a single partition covering the entire key space (unbounded below and above). Range partitions must always be non-overlapping, and split rows must fall within a range partition.
|see the Range Partitioning Example for further discussion of range partitioning.|
Kudu allows range partitions to be dynamically added and removed from a table at runtime, without affecting the availability of other partitions. Removing a partition will delete the tablets belonging to the partition, as well as the data contained in them. Subsequent inserts into the dropped partition will fail. New partitions can be added, but they must not overlap with any existing range partitions. Kudu allows dropping and adding any number of range partitions in a single transactional alter table operation.
Dynamically adding and dropping range partitions is particularly useful for time series use cases. As time goes on, range partitions can be added to cover upcoming time ranges. For example, a table storing an event log could add a month-wide partition just before the start of each month in order to hold the upcoming events. Old range partitions can be dropped in order to efficiently remove historical data, as necessary.
Hash partitioning distributes rows by hash value into one of many buckets. In single-level hash partitioned tables, each bucket will correspond to exactly one tablet. The number of buckets is set during table creation. Typically the primary key columns are used as the columns to hash, but as with range partitioning, any subset of the primary key columns can be used.
Hash partitioning is an effective strategy when ordered access to the table is not needed. Hash partitioning is effective for spreading writes randomly among tablets, which helps mitigate hot-spotting and uneven tablet sizes.
|see the Hash Partitioning Example for further discussion of hash partitioning.|
Kudu allows a table to combine multiple levels of partitioning on a single table. Zero or more hash partition levels can be combined with an optional range partition level. The only additional constraint on multilevel partitioning beyond the constraints of the individual partition types, is that multiple levels of hash partitions must not hash the same columns.
When used correctly, multilevel partitioning can retain the benefits of the individual partitioning types, while reducing the downsides of each. The total number of tablets in a multilevel partitioned table is the product of the number of partitions in each level.
Kudu scans will automatically skip scanning entire partitions when it can be determined that the partition can be entirely filtered by the scan predicates. To prune hash partitions, the scan must include equality predicates on every hashed column. To prune range partitions, the scan must include equality or range predicates on the range partitioned columns. Scans on multilevel partitioned tables can take advantage of partition pruning on any of the levels independently.
To illustrate the factors and trade-offs associated with designing a partitioning strategy for a table, we will walk through some different partitioning scenarios. Consider the following table schema for storing machine metrics data (using SQL syntax and date-formatted timestamps for clarity):
CREATE TABLE metrics ( host STRING NOT NULL, metric STRING NOT NULL, time INT64 NOT NULL, value DOUBLE NOT NULL, PRIMARY KEY (host, metric, time) );
A natural way to partition the
metrics table is to range partition on the
time column. Let’s assume that we want to have a partition per year, and the
table will hold data for 2014, 2015, and 2016. There are at least two ways that
the table could be partitioned: with unbounded range partitions, or with bounded
The image above shows the two ways the
metrics table can be range partitioned
time column. In the first example (in blue), the default range
partition bounds are used, with splits at
results in three tablets: the first containing values before 2015, the second
containing values in the year 2015, and the third containing values after 2016.
The second example (in green) uses a range partition bound of
(2017-01-01)], and splits at
2016-01-01. The second example
could have equivalently been expressed through range partition bounds of
[(2015-01-01), (2016-01-01)], and
[(2016-01-01), (2017-01-01)], with no splits. The first example has unbounded
lower and upper range partitions, while the second example includes bounds.
Each of the range partition examples above allows time-bounded scans to prune partitions falling outside of the scan’s time bound. This can greatly improve performance when there are many partitions. When writing, both examples suffer from potential hot-spotting issues. Because metrics tend to always be written at the current time, most writes will go into a single range partition.
The second example is more flexible than the first, because it allows range
partitions for future years to be added to the table. In the first example, all
writes for times after
2016-01-01 will fall into the last partition, so the
partition may eventually become too large for a single tablet server to handle.
Another way of partitioning the
metrics table is to hash partition on the
In the example above, the
metrics table is hash partitioned on the
metric columns into four buckets. Unlike the range partitioning example
earlier, this partitioning strategy will spread writes over all tablets in the
table evenly, which helps overall write throughput. Scans over a specific host
and metric can take advantage of partition pruning by specifying equality
predicates, reducing the number of scanned tablets to one. One issue to be
careful of with a pure hash partitioning strategy, is that tablets could grow
indefinitely as more and more data is inserted into the table. Eventually
tablets will become too big for an individual tablet server to hold.
|Although these examples number the tablets, in reality tablets are only given UUID identifiers. There is no natural ordering among the tablets in a hash partitioned table.|
The previous examples showed how the
metrics table could be range partitioned
time column, or hash partitioned on the
These strategies have associated strength and weaknesses:
✗ - all writes go to latest partition
✓ - time-bounded scans can be pruned
✓ - new tablets can be added for future time periods
✓ - writes are spread evenly among tablets
✓ - scans on specific hosts and metrics can be pruned
✗ - tablets could grow too large
Hash partitioning is good at maximizing write throughput, while range partitioning avoids issues of unbounded tablet growth. Both strategies can take advantage of partition pruning to optimize scans in different scenarios. Using multilevel partitioning, it is possible to combine the two strategies in order to gain the benefits of both, while minimizing the drawbacks of each.
In the example above, range partitioning on the
time column is combined with
hash partitioning on the
metric columns. This strategy can be
thought of as having two dimensions of partitioning: one for the hash level and
one for the range level. Writes into this table at the current time will be
parallelized up to the number of hash buckets, in this case 4. Reads can take
advantage of time bound and specific host and metric predicates to prune
partitions. New range partitions can be added, which results in creating 4
additional tablets (as if a new column were added to the diagram).
Kudu can support any number of hash partitioning levels in the same table, as long as the levels have no hashed columns in common.
In the example above, the table is hash partitioned on
host into 4 buckets,
and hash partitioned on
metric into 3 buckets, resulting in 12 tablets.
Although writes will tend to be spread among all tablets when using this
strategy, it is slightly more prone to hot-spotting than when hash partitioning
over multiple independent columns, since all values for an individual host or
metric will always belong to a single tablet. Scans can take advantage of
equality predicates on the
metric columns separately to prune
Multiple levels of hash partitioning can also be combined with range partitioning, which logically adds another dimension of partitioning.
You can alter a table’s schema in the following ways:
Rename the table
Rename primary key columns
Rename, add, or drop non-primary key columns
Add and drop range partitions
Multiple alteration steps can be combined in a single transactional operation.
Kudu currently has some known limitations that may factor into schema design.
By default, Kudu will not permit the creation of tables with more than 300 columns. We recommend schema designs that use fewer columns for best performance.
No individual cell may be larger than 64KB before encoding or compression. The cells making up a composite key are limited to a total of 16KB after the internal composite-key encoding done by Kudu. Inserting rows not conforming to these limitations will result in errors being returned to the client.
Although individual cells may be up to 64KB, and Kudu supports up to 300 columns, it is recommended that no single row be larger than a few hundred KB.
Identifiers such as table and column names must be valid UTF-8 sequences and no longer than 256 bytes.
Kudu does not allow you to update the primary key columns of a row.
Kudu does not allow you to alter the primary key columns after table creation.
Kudu does not allow you to change how a table is partitioned after creation, with the exception of adding or dropping range partitions.
Kudu does not allow the type of a column to be altered.
Partitions cannot be split or merged after table creation.
The disk space occupied by a deleted
row is only reclaimable via compaction, and only when the deletion’s age
exceeds the "tablet history maximum age" (controlled by the
--tablet_history_max_age_sec flag). Furthermore, Kudu currently only schedules
compactions in order to improve read/write performance; a tablet will never be
compacted purely to reclaim disk space. As such, range partitioning should be
used when it is expected that large swaths of rows will be discarded. With range
partitioning, individual partitions may be dropped to discard data and reclaim
disk space. See KUDU-1625