Package org.apache.drill.exec.physical.impl.scan.project
Overview
The projection framework look at schema as a set of transforms:
- Scan level: physical plan projection list and optional provided schema information.
- File level: materializes implicit file and parition columns.
- Reader level: integrates the actual schema discovered by the reader with the scan-level projection list.
Projection turns out to be a very complex operation in a schema-on-read system such as Drill. Provided schema helps resolve ambiguities inherent in schema-on-read, but at the cost of some additional complexity.
Background
The Scan-level projection holds the list of columns (or the wildcard) as requested by the user in the query. The planner determines which columns to project. In Drill, projection is speculative: it is a list of names which the planner hopes will appear in the data files. The reader must make up columns (the infamous nullable INT) when it turns out that no such column exists. Else, the reader must figure out the data type for any columns that does exist.With the advent of provided schema in Drill 1.16, the scan level projection integrates that schema information with the projection list provided in the physical operator. If a schema is provided, then each scan-level column tracks the schema information for that column.
The scan-level projection also implements the special rules for a "strict" provided schema: if the operator projection list contains a wildcard, a schema is provided, and the schema is strict, then the scan level projection expands the wildcard into the set of columns in the provided schema. Doing so ensures that the scan output contains exactly those columns from the schema, even if the columns must be null or at a default value. (The result set loader does additional filtering as well.)
The scan project list defines the set of columns which the scan operator is obliged to send downstream. Ideally, the scan operator sends exactly the same schema (the project list with types filled in) for all batches. Since batches may come from different files, the scan operator is obligated to unify the schemas from those files (or blocks.)
Reader (file)-level projection occurs for each reader. A single scan may use multiple readers to read data. From the reader's perspective, it offers the schema it discovers in the file. The reader itself is rather inflexible: it must deal with the data it finds, of the type found in the data source.
The reader thus tells the result set loader that it has such-and-so schema. It does that either at open time (so-called "early" schema, such as for CSV, JDBC or Parquet) or as it discovers the columns (so-called "late" schema as in JSON.) Again, in each case, the data source schema is what it is; it can't be changed due to the wishes of the scan-level projection.
Readers obtain column schema from the file or data source. For example, a Parquet reader can obtain schema information from the Parquet headers. A JDBC reader obtains schema information from the returned schema. As noted above, we use the term "early schema" when type information is available at open time, before reading the first row of data.
By contrast eaders such as JSON and CSV are "late schema": they don't know the data schema until they read the file. This is true "schema on read." Further, for JSON, the data may change from one batch to the next as the reader "discovers" fields that did not appear in earlier batches. This requires some amount of "schema smoothing": the ability to preserve a consistent output schema even as the input schema jiggles around some.
Drill supports many kinds of data sources via plugins. The DFS plugin works with files in a distributed store such as HDFS. Such file-based readers add implicit file or partition columns. Since these columns are generic to all format plugins, they are factored out into a file scan framework which inserts the "implicit" columns separate from the reader-provided columns.
Design
This leads to a multi-stage merge operation. The result set loader is presented with each column one-by-one (either at open time or during read.) When a column is presented, the projection framework makes a number of decisions:
- Is the column projected? For example, if a query is SELECT a, b, c and the reader offers column d, then column d will not be projected. In the wildcard case, "special" columns will be omitted from the column expansion and will be unprojected.
- Is type conversion needed? If a schema is provided, and the type of the column requested in the provided schema differs from that offered by the reader, the framework can insert a type-conversion "shim", assuming that the framework knows how to do the conversion. Else, and error is raised.
- Is the column type and mode consistent with the projection list? Suppose the query is SELECT a, b[10], c.d. Column `a` matches any reader column. But, column `b` is valid only for an array (not a map and not a scalar.) Column `c` must be a map (or array of maps.) And so on.
The result is a refined schema: the scan level schema with more information filled in. For Parquet, all projection information can be filled in. For CSV or JSON, we can only add file metadata information, but not yet the actual data schema.
Batch-level schema: once a reader reads actual data, it now knows exactly what it read. This is the "schema on read model." Thus, after reading a batch, any remaining uncertainty about the projected schema is removed. The actual data defined data types and so on.
The goal of this mechanism is to handle the above use cases cleanly, in a common set of classes, and to avoid the need for each reader to figure out all these issues for themselves (as was the case with earlier versions of Drill.)
Because these issues are complex, the code itself is complex. To make the code easier to manage, each bit of functionality is encapsulated in a distinct class. Classes combine via composition to create a "framework" suitable for each kind of reader: whether it be early or late schema, file-based or something else, etc.
Nuances of Reader-Level Projection
We've said that the scan-level projection identifies what the query wants. We've said that the reader identifies what the external data actually is. We've mentioned how we bridge between the two. Here we explore this in more detail.Run-time schema resolution occurs at various stages:
- The per-column resolution identified earlier: matching types, type conversion, and so on.
- The reader provides some set of columns. We don't know which columns until the end of the first (or more generally, every) batch. Suppose the query wants SELECT a, b, c but the reader turns out to provide only `a` and `b`. On after the first batch do we realize that we need column `c` as a "null" column (of a type defined in the provided schema, specified by the plugin, or good-old nullable INT.)
- The result set loader will have created "dummy" columns for unprojected columns. The reader can still write to such columns (because they represent data in the file), but the associated column writer simply ignores the data. As a result, the result set loader should produce only a (possibly full) subset of projected columns.
- After each reader batch, the projection framework goes to work filling in implicit columns, and filling in missing columns. It is important to remember that this pass *must* be done *after* a batch is read since we don't now the columns that the reader can provided until after a batch is read.
- Some readers, such as JSON, can "change its mind" about the schema across batches. For example, the first batch may include only columns a and b. Later in the JSON file, the reader may discover column c. This means that the above post-batch analysis must be repeated each time the reader changes the schema. (The result set loader tracks schema changes for this purpose.)
- File schemas evolve. The same changes noted above can occur cross files. Maybe file 1 has column `x` as a BIGINT, while file 2 has column 'x' as INT. A "smoothing" step attempts to avoid hard schema changes if they can be avoided. While smoothing is a clever idea, it only handles some cases. Provided schema is a more reliable solution (but is not yet widely deployed.)
Reader-Level Projection Set
The Projection Set mechanism is designed to handle the increasing nuances of Drill run-time projection by providing a source of information about each column that the reader may discover:- Is the column projected?
- If the query is explicit (SELECT a, b, c), is the column in the projection list?
- If the query is a wildcard (SELECT *), is the column marked as special (not included in the wildcard)?
- If the query is wildcard, and a strict schema is provided, is the column part of the provided schema?
- Verify column is consistent with projection.
- Type conversion, if needed.
Projection Via Rewrites
The core concept is one of successive refinement of the project list through a set of rewrites:
- Scan-level rewrite: convert
SchemaPath
entries into internal column nodes, tagging the nodes with the column type: wildcard, unresolved table column, or special columns (such as file metadata.) The scan-level rewrite is done once per scan operator. - Reader-level rewrite: convert the internal column nodes into other internal nodes, leaving table column nodes unresolved. The typical use is to fill in metadata columns with information about a specific file.
- Schema-level rewrite: given the actual schema of a record batch, rewrite the reader-level projection to describe the final projection from incoming data to output container. This step fills in missing columns, expands wildcards, etc.
Scan Plan | v +--------------+ | Project List | | Parser | +--------------+ | v +------------+ | Scan Level | +----------------+ | Projection | --->| Projection Set | +------------+ +----------------+ | | v v +------+ +------------+ +------------+ +-----------+ | File | ---> | File Level | | Result Set | ---> | Data File | | Data | | Projection | | Loader | <--- | Reader | +------+ +------------+ +------------+ +-----------+ | | v | +--------------+ Reader | | Reader Level | Schema | | Projection | <---------+ +--------------+ | | | v | +--------+ Loaded | | Output | Vectors | | Mapper | <------------+ +--------+ | v Output Batch
The left side can be thought of as the "what we want" description of the schema, with the right side being "what the reader actually discovered."
The output mapper includes mechanisms to populate implicit columns, create null columns, and to merge implicit, null and data columns, omitting unprojected data columns.
In all cases, projection must handle maps, which are a recursive structure much like a row. That is, Drill consists of nested tuples (the row and maps), each of which contains columns which can be maps. Thus, there is a set of alternating layers of tuples, columns, tuples, and so on until we get to leaf (non-map) columns. As a result, most of the above structures are in the form of tuple trees, requiring recursive algorithms to apply rules down through the nested layers of tuples.
The above mechanism is done at runtime, in each scan fragment. Since Drill is schema-on-read, and has no plan-time schema concept, run-time projection is required. On the other hand, if Drill were ever to support the "classic" plan-time schema resolution, then much of this work could be done at plan time rather than (redundantly) at runtime. The main change would be to do the work abstractly, working with column and row descriptions, rather than concretely with vectors as is done here. Then, that abstract description would feed directly into these mechanisms with the "final answer" about projection, batch layout, and so on. The parts of this mechanism that create and populate vectors would remain.
-
ClassDescriptionRepresents a projected column that has not yet been bound to a table column, special column or a null column.Represents an unresolved table column to be provided by the reader (or filled in with nulls.) May be associated with a provided schema column.Core interface for a projected column.Populate metadata columns either file metadata (AKA "implicit columns") or directory metadata (AKA "partition columns.") In both cases the column type is nullable Varchar and the column value is predefined by the projection planner; this class just copies that value into each row.Perform a schema projection for the case of an explicit list of projected columns.Queries can contain a wildcard (*), table columns, or special system-defined columns (the file metadata columns AKA implicit columns, the `columns` column of CSV, etc.).Do-nothing implementation of the metadata manager.Manages null columns by creating a null column loader for each set of non-empty null columns.Create and populate null columns for the case in which a SELECT statement refers to columns that do not exist in the actual table.Computes the full output schema given a table (or batch) schema.Reader-level projection is customizable.Orchestrates projection tasks for a single reader within the set that the scan operator manages.A resolved column has a name, and a specification for how to project data from a source vector to a vector in the final output container.Represents a column which is implicitly a map (because it has children in the project list), but which does not match any column in the table.Projected column that serves as both a resolved column (provides projection mapping) and a null column spec (provides the information needed to create the required null vectors.)Column that matches one provided by the table.Drill rows are made up of a tree of tuples, with the row being the root tuple.Represents a map implied by the project list, whether or not the map actually appears in the table schema.Represents a map tuple (not the map column, rather the value of the map column.) When projecting, we create a new repeated map vector, but share the offsets vector from input to output.Represents the top-level tuple which is projected to a vector container.Parses and analyzes the projection list passed to the scanner.Interface for add-on parsers, avoids the need to create a single, tightly-coupled parser for all types of columns.Identifies the kind of projection done for this scan.Performs projection of a record reader, along with a set of static columns, to produce the final "public" result set (record batch) for the scan operator.Implements a "schema smoothing" algorithm.Exception thrown if the prior schema is not compatible with the new table schema.Resolve a table schema against the prior schema.Base class for columns that take values based on the reader, not individual rows.Generic mechanism for retrieving vectors from a source tuple when projecting columns to the output tuple.Perform a wildcard projection.Perform a wildcard projection with an associated output schema.