sha1sum <name_of_parcel_file> | awk {'print $1'} > <name_of_parcel_file>.sha
Kudu has tight integration with Impala, allowing you to use Impala to insert, query, update, and delete data from Kudu tablets using Impala’s SQL syntax, as an alternative to using the Kudu APIs to build a custom Kudu application. In addition, you can use JDBC or ODBC to connect existing or new applications written in any language, framework, or business intelligence tool to your Kudu data, using Impala as the broker.
The following instructions assume a Cloudera Manager deployment. However, you can use Kudu with Impala without Cloudera Manager. |
This integration relies on features that released versions of Impala do not have yet. In the interim, you need to install a fork of Impala, which this document will refer to as Impala_Kudu.
You can install Impala_Kudu using parcels or packages.
Kudu itself requires CDH 5.4.3 or later. To use Cloudera Manager with Impala_Kudu, you need Cloudera Manager 5.4.3 or later. Cloudera Manager 5.4.7 is recommended, as it adds support for collecting metrics from Kudu.
If you have an existing Impala instance on your cluster, you can install Impala_Kudu alongside the existing Impala instance if you use parcels. The new instance does not share configurations with the existing instance and is completely independent. A script is provided to automate this type of installation. See Manual Installation.
It is especially important that the cluster has adequate unreserved RAM for the Impala_Kudu instance.
Consider shutting down the original Impala service when testing Impala_Kudu if you want to be sure it is not impacted.
Before installing Impala_Kudu, you must have already installed and configured services for HDFS (though it is not used by Kudu), the Hive Metastore (where Impala stores its metadata), and Kudu. You may need HBase, YARN, Sentry, and ZooKeeper services as well. Meeting the Impala installation requirements is out of the scope of this document. See Impala Prequisites in the official Impala documentation for more information.
If you use Cloudera Manager, you can install Impala_Kudu using parcels or packages. However, if you have an existing Impala instance, you must use parcels and you should use the instructions provided in procedure, rather than these instructions.
Manual installation of Impala_Kudu is only supported where there is no other Impala service already running in the cluster, and when you use parcels. |
Obtain the Impala_Kudu parcel either by using the parcel repository or downloading it manually.
To use the parcel repository:
Go to Hosts / Parcels.
Click Edit Settings. Add http://archive.cloudera.com/beta/impala-kudu/parcels/latest/ as a Remote Parcel Repository URL. Click Save Changes.
To download the parcel manually:
Download the parcel for your operating system from
http://archive.cloudera.com/beta/impala-kudu/parcels/latest/ and upload
it to /opt/cloudera/parcel-repo/
on the Cloudera Manager server.
Create a SHA1 file for the parcel. Cloudera Manager expects the SHA1 to be named
with the exact same name as the parcel, with a .sha
ending added, and to only
contain the SHA1 itself, not the name of the parcel.
sha1sum <name_of_parcel_file> | awk {'print $1'} > <name_of_parcel_file>.sha
Go to Hosts / Parcels. Click Check for New Parcels. Verify that Impala_Kudu is in the list.
Download (if necessary), distribute, and activate the Impala_Kudu parcel.
Add a new Impala service. This service will use the Impala_Kudu parcel.
Go to the cluster and click Actions / Add a Service.
Choose one host to run the Catalog Server, one to run the StateServer, and one or more to run Impala Daemon instances. Click Continue.
Choose one or more Impala scratch directories. Click Continue. The Impala service starts. However, the features that Impala needs in order to work with Kudu are not enabled yet.
Enable the features that allow Impala to work with Kudu.
Go to the new Impala service. Click Configuration.
Search for the Impala Service Environment Advanced Configuration Snippet (Safety
Valve) configuration item. Add the following to the text field and save your changes:
IMPALA_KUDU=1
Restart the Impala service.
You can verify that the Kudu features are available to Impala by running the following query in Impala Shell:
select if(version() like '%KUDU%', "all set to go!", "check your configs") as s;
Query: select if(version() like '%KUDU%', "all set to go!", "check your configs") as s
+----------------+
| s |
+----------------+
| all set to go! |
+----------------+
Fetched 1 row(s) in 0.02s
If you do not 'all set to go!', carefully review the previous instructions to be sure that you have not missed a step.
deploy.py
ScriptIf you use parcels, Cloudera recommends using the included deploy.py
script to
install and deploy the Impala_Kudu service into your cluster. If your cluster does
not have an existing Impala instance, the script is optional. However, if you do
have an existing Impala instance and want to install Impala_Kudu side-by-side,
you must use the script.
The script depends upon the Cloudera Manager API Python bindings. Install the bindings
using sudo pip install cm-api
(or as an unprivileged user, with the --user
option to pip
), or see http://cloudera.github.io/cm_api/docs/python-client/
for more details.
You need the following information to run the script:
The IP address or fully-qualified domain name of the Cloudera Manager server.
The IP address or fully-qualified domain name of the host that should run the Kudu master process, if different from the Cloudera Manager server.
The cluster name, if Cloudera Manager manages multiple clusters.
If you have an existing Impala service and want to clone its configuration, you need to know the name of the existing service.
If your cluster has more than one instance of a HDFS, Hive, HBase, or other CDH service that this Impala_Kudu service depends upon, the name of the service this new Impala_Kudu service should use.
A name for the new Impala service.
A user name and password with Full Administrator privileges in Cloudera Manager.
The IP address or host name of the host where the new Impala_Kudu service’s master role should be deployed, if not the Cloudera Manager server.
A comma-separated list of local (not HDFS) scratch directories which the new Impala_Kudu service should use, if you are not cloning an existing Impala service.
Your Cloudera Manager server needs network access to reach the parcel repository
hosted on cloudera.com
.
Download the deploy.py
from https://github.com/apache/incubator-impala/blob/master/infra/deploy/deploy.py
using curl
or another utility of your choice.
$ curl -O https://raw.githubusercontent.com/apache/incubator-impala/master/infra/deploy/deploy.py
Run the deploy.py
script. The syntax below creates a standalone IMPALA_KUDU
service called IMPALA_KUDU-1
on a cluster called Cluster 1
. Exactly one HDFS, Hive,
and HBase service exist in Cluster 1, so service dependencies are not required.
The cluster should not already have an Impala instance.
$ python deploy.py create IMPALA_KUDU-1 --cluster 'Cluster 1' \
--master_host <FQDN_of_Kudu_master_server> \
--host <FQDN_of_cloudera_manager_server>
If you do not specify --master_host , the Kudu master is configured to run
on the Cloudera Manager server (the value specified by the --host parameter).
|
If two HDFS services are available, called HDFS-1
and HDFS-2
, use the following
syntax to create the same IMPALA_KUDU-1
service using HDFS-2
. You can specify
multiple types of dependencies; use the deploy.py create -h
command for details.
$ python deploy.py create IMPALA_KUDU-1 --cluster 'Cluster 1' --hdfs_dependency HDFS-2 \
--host <FQDN_of_cloudera_manager_server>
Run the deploy.py
script with the following syntax to clone an existing IMPALA
service called IMPALA-1
to a new IMPALA_KUDU service called IMPALA_KUDU-1
, where
Cloudera Manager only manages a single cluster. This new IMPALA_KUDU-1
service
can run side by side with the IMPALA-1
service if there is sufficient RAM for both.
IMPALA_KUDU-1
should be given at least 16 GB of RAM and possibly more depending
on the complexity of the workload and the query concurrency level.
$ python deploy.py clone IMPALA_KUDU-1 IMPALA-1 --host <FQDN_of_cloudera_manager_server>
Additional parameters are available for deploy.py
. To view them, use the -h
argument. You can also use commands such as deploy.py create -h
or
deploy.py clone -h
to get information about additional arguments for individual operations.
The service is created but not started. Review the configuration in Cloudera Manager and start the service.
Before installing Impala_Kudu packages, you need to uninstall any existing Impala packages, using operating system utilities. For this reason, you cannot use Impala_Kudu alongside another Impala instance if you use packages.
OS | Repository | Individual Packages |
---|---|---|
RHEL or CentOS |
||
Ubuntu |
Download and configure the Impala_Kudu repositories for your operating system, or manually download individual RPMs, the appropriate link from Impala_Kudu Package Locations.
An Impala cluster has at least one impala-kudu-server
and at most one impala-kudu-catalog
and impala-kudu-state-store
. To connect to Impala from the command line, install
the impala-kudu-shell
package.
Add a new Impala service in Cloudera Manager.
Go to the cluster and click Actions / Add a Service.
Choose one host to run the Catalog Server, one to run the Statestore, and at least three to run Impala Daemon instances. Click Continue.
Choose one or more Impala scratch directories. Click Continue.
The Impala service starts.
Before installing Impala_Kudu packages, you need to uninstall any existing Impala packages, using operating system utilities. For this reason, you cannot use Impala_Kudu alongside another Impala instance if you use packages.
Do not use these command-line instructions if you use Cloudera Manager. Instead, follow Installing Impala_Kudu Using Packages. |
OS | Repository | Individual Packages |
---|---|---|
RHEL or CentOS |
||
Ubuntu |
Download and configure the Impala_Kudu repositories for your operating system, or manually download individual RPMs, the appropriate link from Impala_Kudu Package Locations.
An Impala cluster has at least one impala-kudu-server
and at most one impala-kudu-catalog
and impala-kudu-state-store
. To connect to Impala from the command line, install
the impala-kudu-shell
package.
Use the Impala start-up scripts to start each service on the relevant hosts:
$ sudo service impala-state-store start $ sudo service impala-catalog start $ sudo service impala-server start
This is only a small sub-set of Impala Shell functionality. For more details, see the Impala Shell documentation. |
Neither Kudu nor Impala need special configuration in order for you to use the Impala Shell or the Impala API to insert, update, delete, or query Kudu data using Impala. However, you do need to create a mapping between the Impala and Kudu tables. Kudu provides the Impala query to map to an existing Kudu table in the web UI.
Be sure you are using the impala-shell
binary provided by the Impala_Kudu package,
rather than the default CDH Impala binary. The following shows how to verify this
using the alternatives
command on a RHEL or CentOS host.
$ sudo alternatives --display impala-shell
impala-shell - status is auto.
link currently points to /opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.1007/bin/impala-shell
/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.1007/bin/impala-shell - priority 10
/opt/cloudera/parcels/IMPALA_KUDU-2.3.0-1.cdh5.5.0.p0.119/bin/impala-shell - priority 5
Current `best' version is /opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.1007/bin/impala-shell.
$ sudo alternatives --set impala-shell /opt/cloudera/parcels/IMPALA_KUDU-2.3.0-1.cdh5.5.0.p0.119/bin/impala-shell
Start Impala Shell using the impala-shell
command. By default, impala-shell
attempts to connect to the Impala daemon on localhost
on port 21000. To connect
to a different host,, use the -i <host:port>
option. To automatically connect to
a specific Impala database, use the -d <database>
option. For instance, if all your
Kudu tables are in Impala in the database impala_kudu
, use -d impala_kudu
to use
this database.
To quit the Impala Shell, use the following command: quit;
When creating a new Kudu table using Impala, you can create the table as an internal table or an external table.
An internal table is managed by Impala, and when you drop it from Impala, the data and the table truly are dropped. When you create a new table using Impala, it is generally a internal table.
An external table (created by CREATE EXTERNAL TABLE
) is not managed by
Impala, and dropping such a table does not drop the table from its source location
(here, Kudu). Instead, it only removes the mapping between Impala and Kudu. This is
the mode used in the syntax provided by Kudu for mapping an existing table to Impala.
See the Impala documentation for more information about internal and external tables.
Go to http://kudu-master.example.com:8051/tables/, where kudu-master.example.com is the address of your Kudu master.
Click the table ID for the relevant table.
Scroll to the bottom of the page, or search for Impala CREATE TABLE statement
.
Copy the entire statement.
Paste the statement into Impala. Impala now has a mapping to your Kudu table.
Creating a new table in Kudu from Impala is similar to mapping an existing Kudu table
to an Impala table, except that you need to write the CREATE
statement yourself.
Use the following example as a guideline. Impala first creates the table, then creates
the mapping.
CREATE TABLE my_first_table
(
id BIGINT,
name STRING
)
DISTRIBUTE BY HASH INTO 16 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'my_first_table',
'kudu.master_addresses' = 'kudu-master.example.com:7051',
'kudu.key_columns' = 'id'
);
In the CREATE TABLE
statement, the columns that comprise the primary key must
be listed first. Additionally, primary key columns are implicitly marked NOT NULL
.
The following table properties are required, and the kudu.key_columns
property must
contain at least one column.
storage_handler
the mechanism used by Impala to determine the type of data source.
For Kudu tables, this must be com.cloudera.kudu.hive.KuduStorageHandler
.
kudu.table_name
the name of the table that Impala will create (or map to) in Kudu.
kudu.master_addresses
the list of Kudu masters Impala should communicate with.
kudu.key_columns
the comma-separated list of primary key columns, whose contents should not be nullable.
When creating a new Kudu table, you are required to specify a distribution scheme.
See Partitioning Tables. The table creation example above is distributed into
16 buckets by hashing the id
column, for simplicity. See
Partitioning Rules of Thumb for guidelines on partitioning.
CREATE TABLE AS SELECT
You can create a table by querying any other table or tables in Impala, using a CREATE
TABLE … AS SELECT
statement. The following example imports all rows from an existing table
old_table
into a Kudu table new_table
. The columns in new_table
will have the
same names and types as the columns in old_table
, but you need to populate the kudu.key_columns
property. In this example, the primary key columns are ts
and name
.
CREATE TABLE new_table
DISTRIBUTE BY HASH INTO 16 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'new_table',
'kudu.master_addresses' = 'kudu-master.example.com:7051',
'kudu.key_columns' = 'ts, name'
)
AS SELECT * FROM old_table;
For |
You can refine the SELECT
statement to only match the rows and columns you want
to be inserted into the new table. You can also rename the columns by using syntax
like SELECT name as new_name
.
Tables are divided into tablets which are each served by one or more tablet
servers. Ideally, tablets should split a table’s data relatively equally. Kudu currently
has no mechanism for automatically (or manually) splitting a pre-existing tablet.
Until this feature has been implemented, you must pre-split your table when you create
it. When designing your table schema, consider primary keys that will allow you to
pre-split your table into tablets which grow at similar rates. You can provide split
points using a DISTRIBUTE BY
clause when creating a table using Impala:
Impala keywords, such as group , are enclosed by back-tick characters when
they are not used in their keyword sense.
|
CREATE TABLE cust_behavior (
_id BIGINT,
salary STRING,
edu_level INT,
usergender STRING,
`group` STRING,
city STRING,
postcode STRING,
last_purchase_price FLOAT,
last_purchase_date BIGINT,
category STRING,
sku STRING,
rating INT,
fulfilled_date BIGINT
)
DISTRIBUTE BY RANGE (_id)
SPLIT ROWS((1439560049342),
(1439566253755),
(1439572458168),
(1439578662581),
(1439584866994),
(1439591071407))
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'cust_behavior',
'kudu.master_addresses' = 'a1216.halxg.cloudera.com:7051',
'kudu.key_columns' = '_id',
'kudu.num_tablet_replicas' = '3'
);
If you have multiple primary key columns, you can specify split points by separating
them with commas within the inner brackets: (('va',1), ('ab',2))
. The expression
must be valid JSON.
Impala uses a database containment model. In Impala, you can create a table within a specific
scope, referred to as a database. To create the database, use a CREATE DATABASE
statement. To use the database for further Impala operations such as CREATE TABLE
,
use the USE
statement. For example, to create a table in a database called impala_kudu
,
use the following statements:
Impala uses a namespace mechanism to allow for tables to be created within different
scopes, called databases . To create a database, use a CREATE DATABASE
statement. To use the database for further Impala operations such as CREATE TABLE ,
use the USE statement. For example, to create a table in a database called impala_kudu ,
use the following SQL:
|
CREATE DATABASE impala_kudu;
USE impala_kudu;
CREATE TABLE my_first_table (
id BIGINT,
name STRING
)
DISTRIBUTE BY HASH INTO 4 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'my_first_table',
'kudu.master_addresses' = 'kudu-master.example.com:7051',
'kudu.key_columns' = 'id'
);
The my_first_table
table is created within the impala_kudu
database. To refer
to this database in the future, without using a specific USE
statement, you can
refer to the table using <database>.<table>
syntax. For example, to specify the
my_first_table
table in database impala_kudu
, as opposed to any other table with
the same name in another database, use impala_kudu.my_first_table
. This also applies
to INSERT
, UPDATE
, DELETE
, and DROP
statements.
Currently, Kudu does not encode the Impala database into the table name
in any way. This means that even though you can create Kudu tables within Impala databases,
the actual Kudu tables need to be unique within Kudu. For example, if you create database_1.my_kudu_table
and database_2.my_kudu_table , you will have a naming collision within Kudu, even
though this would not cause a problem in Impala. This can be resolved by specifying
a unique Kudu table name in the kudu.table_name property.
|
The following Impala keywords are not supported when creating Kudu tables:
- PARTITIONED
- STORED AS
- LOCATION
- ROWFORMAT
If the WHERE
clause of your query includes comparisons with the operators
=
, <=
, or >=
, Kudu evaluates the condition directly and only returns the
relevant results. This provides optimum performance, because Kudu only returns the
relevant results to Impala. For predicates <
, >
, !=
, or any other predicate
type supported by Impala, Kudu does not evaluate the predicates directly, but returns
all results to Impala and relies on Impala to evaluate the remaining predicates and
filter the results accordingly. This may cause differences in performance, depending
on the delta of the result set before and after evaluating the WHERE
clause.
In the CREATE TABLE
statement, the first column must be the primary key. Additionally,
the primary key can never be NULL when inserting or updating a row.
All properties in the TBLPROPERTIES
statement are required, and the kudu.key_columns
must contain at least one column.
Tables are partitioned into tablets according to a partition schema on the primary key columns. Each tablet is served by at least one tablet server. Ideally, a table should be split into tablets that are distributed across a number of tablet servers to maximize parallel operations. The details of the partitioning schema you use will depend entirely on the type of data you store and how you access it. For a full discussion of schema design in Kudu, see Schema Design.
Kudu currently has no mechanism for splitting or merging tablets after the table has been created. Until this feature has been implemented, you must provide a partition schema for your table when you create it. When designing your tables, consider using primary keys that will allow you to partition your table into tablets which grow at similar rates.
You can partition your table using Impala’s DISTRIBUTE BY
keyword, which
supports distribution by RANGE
or HASH
. The partition scheme can contain zero
or more HASH
definitions, followed by an optional RANGE
definition. The RANGE
definition can refer to one or more primary key columns.
Examples of basic and advanced
partitioning are shown below.
Impala keywords, such as group , are enclosed by back-tick characters when
they are used as identifiers, rather than as keywords.
|
DISTRIBUTE BY RANGE
You can specify split rows for one or more primary key columns that contain integer or string values. Range partitioning in Kudu allows splitting a table based based on the lexicographic order of its primary keys. This allows you to balance parallelism in writes with scan efficiency.
The split row does not need to exist. It defines an exclusive bound in the form of:
(START_KEY, SplitRow), [SplitRow, STOP_KEY)
In other words, the split row, if
it exists, is included in the tablet after the split point. For instance, if you
specify a split row abc
, a row abca
would be in the second tablet, while a row
abb
would be in the first.
Suppose you have a table that has columns state
, name
, and purchase_count
. The
following example creates 50 tablets, one per US state.
Monotonically Increasing Values
If you partition by range on a column whose values are monotonically increasing,
the last tablet will grow much larger than the others. Additionally, all data
being inserted will be written to a single tablet at a time, limiting the scalability
of data ingest. In that case, consider distributing by |
CREATE TABLE customers (
state STRING,
name STRING,
purchase_count int32,
)
DISTRIBUTE BY RANGE (state)
SPLIT ROWS (('al'),
('ak'),
('ar'),
...
('wv'),
('wy'))
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'customers',
'kudu.master_addresses' = 'kudu-master.example.com:7051',
'kudu.key_columns' = 'state, name'
);
DISTRIBUTE BY HASH
Instead of distributing by an explicit range, or in combination with range distribution, you can distribute into a specific number of 'buckets' by hash. You specify the primary key columns you want to partition by, and the number of buckets you want to use. Rows are distributed by hashing the specified key columns. Assuming that the values being hashed do not themselves exhibit significant skew, this will serve to distribute the data evenly across buckets.
You can specify multiple definitions, and you can specify definitions which
use compound primary keys. However, one column cannot be mentioned in multiple hash
definitions. Consider two columns, a
and b
:
* HASH(a)
, HASH(b)
* HASH(a,b)
* HASH(a), HASH(a,b)
DISTRIBUTE BY HASH with no column specified is a shortcut to create the desired
number of buckets by hashing all primary key columns.
|
Hash partitioning is a reasonable approach if primary key values are evenly distributed in their domain and no data skew is apparent, such as timestamps or serial IDs.
The following example creates 16 tablets by hashing the id
column. This spreads
writes across all 16 tablets. In this example, a query for a range of sku
values
is likely to need to read all 16 tablets, so this may not be the optimum schema for
this table. See Advanced Partitioning for an extended example.
CREATE TABLE cust_behavior (
id BIGINT,
sku STRING,
salary STRING,
edu_level INT,
usergender STRING,
`group` STRING,
city STRING,
postcode STRING,
last_purchase_price FLOAT,
last_purchase_date BIGINT,
category STRING,
rating INT,
fulfilled_date BIGINT
)
DISTRIBUTE BY HASH INTO 16 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'cust_behavior',
'kudu.master_addresses' = 'kudu-master.example.com:7051',
'kudu.key_columns' = 'id, sku'
);
You can combine HASH
and RANGE
partitioning to create more complex partition schemas.
You can specify zero or more HASH
definitions, followed by zero or one RANGE
definitions.
Each definition can encompass one or more columns. While enumerating every possible distribution
schema is out of the scope of this document, a few examples illustrate some of the
possibilities.
DISTRIBUTE BY RANGE
Using Compound Split RowsThis example creates 100 tablets, two for each US state. Per state, the first tablet holds names starting with characters before 'm', and the second tablet holds names starting with 'm'-'z'. Writes are spread across at least 50 tablets, and possibly up to 100. A query for a range of names in a given state is likely to only need to read from one tablet, while a query for a range of names across every state will likely read from at most 50 tablets.
CREATE TABLE customers (
state STRING,
name STRING,
purchase_count int32,
)
DISTRIBUTE BY RANGE (state, name)
SPLIT ROWS (('al', ''),
('al', 'm'),
('ak', ''),
('ak', 'm'),
...
('wy', ''),
('wy', 'm'))
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'customers',
'kudu.master_addresses' = 'kudu-master.example.com:7051',
'kudu.key_columns' = 'state, name'
);
DISTRIBUTE BY HASH
and RANGE
Consider the simple hashing example above, If you often query for a range of sku
values, you can optimize the example by combining hash partitioning with range partitioning.
The following example still creates 16 tablets, by first hashing the id
column into 4
buckets, and then applying range partitioning to split each bucket into four tablets,
based upon the value of the sku
string. Writes are spread across at least four tablets
(and possibly up to 16). When you query for a contiguous range of sku
values, you have a
good chance of only needing to read from a quarter of the tablets to fulfill the query.
By default, the entire primary key is hashed when you use DISTRIBUTE BY HASH .
To hash on only part of the primary key, specify it by using syntax like DISTRIBUTE
BY HASH (id, sku) .
|
CREATE TABLE cust_behavior (
id BIGINT,
sku STRING,
salary STRING,
edu_level INT,
usergender STRING,
`group` STRING,
city STRING,
postcode STRING,
last_purchase_price FLOAT,
last_purchase_date BIGINT,
category STRING,
rating INT,
fulfilled_date BIGINT
)
DISTRIBUTE BY HASH (id) INTO 4 BUCKETS,
RANGE (sku)
SPLIT ROWS (('g'),
('o'),
('u'))
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'cust_behavior',
'kudu.master_addresses' = 'kudu-master.example.com:7051',
'kudu.key_columns' = 'id, sku'
);
DISTRIBUTE BY HASH
DefinitionsAgain expanding the example above, suppose that the query pattern will be unpredictable, but you want to ensure that writes are spread across a large number of tablets You can achieve maximum distribution across the entire primary key by hashing on both primary key columns.
CREATE TABLE cust_behavior (
id BIGINT,
sku STRING,
salary STRING,
edu_level INT,
usergender STRING,
`group` STRING,
city STRING,
postcode STRING,
last_purchase_price FLOAT,
last_purchase_date BIGINT,
category STRING,
rating INT,
fulfilled_date BIGINT
)
DISTRIBUTE BY HASH (id) INTO 4 BUCKETS,
HASH (sku) INTO 4 BUCKETS
TBLPROPERTIES(
'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
'kudu.table_name' = 'cust_behavior',
'kudu.master_addresses' = 'kudu-master.example.com:7051',
'kudu.key_columns' = 'id, sku'
);
The example creates 16 buckets. You could also use HASH (id, sku) INTO 16 BUCKETS
.
However, a scan for sku
values would almost always impact all 16 buckets, rather
than possibly being limited to 4.
For large tables, such as fact tables, aim for as many tablets as you have cores in the cluster.
For small tables, such as dimension tables, aim for a large enough number of tablets that each tablet is at least 1 GB in size.
In general, be mindful the number of tablets limits the parallelism of reads, in the current implementation. Increasing the number of tablets significantly beyond the number of cores is likely to have diminishing returns.
Impala allows you to use standard SQL syntax to insert data into Kudu.
This example inserts a single row.
INSERT INTO my_first_table VALUES (99, "sarah");
This example inserts three rows using a single statement.
INSERT INTO my_first_table VALUES (1, "john"), (2, "jane"), (3, "jim");
When inserting in bulk, there are at least three common choices. Each may have advantages and disadvantages, depending on your data and circumstances.
INSERT
statementsThis approach has the advantage of being easy to understand and implement. This approach is likely to be inefficient because Impala has a high query start-up cost compared to Kudu’s insertion performance. This will lead to relatively high latency and poor throughput.
INSERT
statement with multiple VALUES
If you include more
than 1024 VALUES
statements, Impala batches them into groups of 1024 (or the value
of batch_size
) before sending the requests to Kudu. This approach may perform
slightly better than multiple sequential INSERT
statements by amortizing the query start-up
penalties on the Impala side. To set the batch size for the current Impala
Shell session, use the following syntax: set batch_size=10000;
Increasing the Impala batch size causes Impala to use more memory. You should verify the impact on your cluster and tune accordingly. |
The approach that usually performs best, from the standpoint of
both Impala and Kudu, is usually to import the data using a SELECT FROM
statement
in Impala.
If your data is not already in Impala, one strategy is to import it from a text file, such as a TSV or CSV file.
Create the Kudu table, being mindful that the columns designated as primary keys cannot have null values.
Insert values into the Kudu table by querying the table containing the original data, as in the following example:
INSERT INTO my_kudu_table
SELECT * FROM legacy_data_import_table;
In many cases, the appropriate ingest path is to
use the C++ or Java API to insert directly into Kudu tables. Unlike other Impala tables,
data inserted into Kudu tables via the API becomes available for query in Impala without
the need for any INVALIDATE METADATA
statements or other statements needed for other
Impala storage types.
INSERT
and the IGNORE
KeywordNormally, if you try to insert a row that has already been inserted, the insertion
will fail because the primary key would be duplicated. See Failures During INSERT
, UPDATE
, and DELETE
Operations.
If an insert fails part of the way through, you can re-run the insert, using the
IGNORE
keyword, which will ignore only those errors returned from Kudu indicating
a duplicate key..
The first example will cause an error if a row with the primary key 99
already exists.
The second example will still not insert the row, but will ignore any error and continue
on to the next SQL statement.
INSERT INTO my_first_table VALUES (99, "sarah");
INSERT IGNORE INTO my_first_table VALUES (99, "sarah");
UPDATE my_first_table SET name="bob" where id = 3;
The UPDATE statement only works in Impala when the target table is in
Kudu.
|
You can update in bulk using the same approaches outlined in Inserting In Bulk.
UPDATE
and the IGNORE
KeywordSimilarly to INSERT
and the IGNORE
Keyword, you can use the IGNORE
operation to ignore an UPDATE
which would otherwise fail. For instance, a row may be deleted while you are
attempting to update it. In Impala, this would cause an error. The IGNORE
keyword causes the error to be ignored.
UPDATE IGNORE my_first_table SET name="bob" where id = 3;
DELETE FROM my_first_table WHERE id < 3;
You can also delete using more complex syntax. A comma in the FROM
sub-clause is
one way that Impala specifies a join query. For more information about Impala joins,
see http://www.cloudera.com/content/cloudera/en/documentation/core/latest/topics/impala_joins.html.
DELETE c FROM my_second_table c, stock_symbols s WHERE c.name = s.symbol;
The DELETE statement only works in Impala when the target table is in
Kudu.
|
You can delete in bulk using the same approaches outlined in Inserting In Bulk.
DELETE
and the IGNORE
KeywordSimilarly to INSERT
and the IGNORE
Keyword, you can use the IGNORE
operation to ignore an DELETE
which would otherwise fail. For instance, a row may be deleted by another process
while you are attempting to delete it. In Impala, this would cause an error. The
IGNORE
keyword causes the error to be ignored.
DELETE IGNORE FROM my_first_table WHERE id < 3;
INSERT
, UPDATE
, and DELETE
OperationsINSERT
, UPDATE
, and DELETE
statements cannot be considered transactional as
a whole. If one of these operations fails part of the way through, the keys may
have already been created (in the case of INSERT
) or the records may have already
been modified or removed by another process (in the case of UPDATE
or DELETE
).
You should design your application with this in mind. See INSERT
and the IGNORE
Keyword.
You can change Impala’s metadata relating to a given Kudu table by altering the table’s properties. These properties include the table name, the list of Kudu master addresses, and whether the table is managed by Impala (internal) or externally. You cannot modify a table’s split rows after table creation.
Altering table properties only changes Impala’s metadata about the table, not the underlying table itself. These statements do not modify any table metadata in Kudu. |
ALTER TABLE my_table RENAME TO my_new_table;
ALTER TABLE my_table
SET TBLPROPERTIES('kudu.master_addresses' = 'kudu-new-master.example.com:7051');
ALTER TABLE my_table SET TBLPROPERTIES('EXTERNAL' = 'TRUE');
If the table was created as an internal table in Impala, using CREATE TABLE
, the
standard DROP TABLE
syntax drops the underlying Kudu table and all its data. If
the table was created as an external table, using CREATE EXTERNAL TABLE
, the mapping
between Impala and Kudu is dropped, but the Kudu table is left intact, with all its
data.
DROP TABLE my_first_table;
The examples above have only explored a fraction of what you can do with Impala Shell.
Learn about the Impala project.
Read the Impala documentation.
View the Impala SQL reference.
Read about Impala internals or learn how to contribute to Impala on the Impala Wiki.
Read about the native Kudu APIs.
Kudu tables with a name containing upper case or non-ascii characters must be assigned an alternate name when used as an external table in Impala.
Kudu tables with a column name containing upper case or non-ascii characters may not be used as an external table in Impala. Non-primary key columns may be renamed in Kudu to work around this issue.
When creating a Kudu table, the CREATE TABLE
statement must include the
primary key columns before other columns, in primary key order.
Kudu tables containing UNIXTIME_MICROS
-typed columns may not be used as an
external table in Impala.
Impala can not create Kudu tables with TIMESTAMP
or nested-typed columns.
Impala can not update values in primary key columns.
NULL
, NOT NULL
, !=
, and IN
predicates are not pushed to Kudu, and
instead will be evaluated by the Impala scan node.
Impala can not specify column encoding or compression during Kudu table creation, or alter a columns encoding or compression.
Impala can not create Kudu tables with bounded range partitions, and can not alter a table to add or remove range partitions.
When bulk writing to a Kudu table, performance may be improved by setting the
batch_size
option (see Inserting In Bulk).
INSERT
, UPDATE
, and DELETE
Operations