78

A Fare Cache in a Sharded Data Cluster

 6 years ago
source link: https://www.tuicool.com/articles/hit/eQFbyyE
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Hipmunk is all about discovering a better travel experience, and that requires knowing the alternatives. So each night, Hipmunk conducts proactive searches of a little over 100 million possible trips, which are then placed in a database called the fare cache.

The fare cache allows a travel search to easily show alternatives. What if, instead of Monday, you left on Tuesday? What if you could fly from another nearby airport? What if you just want to spend the weekend on an island? With the help of the fare cache, any of the Hipmunk front ends can answer these questions.

So the problem becomes, how to put all these fares into a database that easily scales? Ideally, that scaling should take place with no application changes and no downtime. What follows is a description of a solution using a distributed, or sharded, database architecture.

Now for any modern database, 100 million data points is pretty simple. What makes the problem interesting is the frequency of data refresh, and the complexity of the searches. As an example, most queries to the fare cache involve all fares between multiple cities with durations for departure dates of a week to a month. These queries are being made into a database table that is likely being updated at the time.

One solution is to break the data up into sub-tables. An algorithm could select which sub-table to query, or query many of them if required. The sub-tables could be placed on smaller instances. Both reads and writes could be performed in parallel. Database architects call this a sharded system, a shard is just a term for a part of a relation.

However, contemplating this requires a way to break the query, route it to one or more database servers, then collect and organize the results. PostgreSQL has an extension that does this automatically, it is called Citus . Citus interacts with the PostgreSQL query planner and executor to permit distributed tables over a cluster of servers. It also provides a set of SQL functions to manage the cluster.

Within a Citus cluster, servers have two roles. One, the coordinator, is the engine that breaks and combines query results. The application interacts with the coordinator exactly as any other PostgreSQL server. The other is the worker, which holds the data and executes the queries. A worker is fundamentally a PostgreSQL server, thus all server tuning and management techniques apply. Of course, workers must have trusted access to the coordinator.

Citus changes the DDL for CREATE TABLE to allow for any number of shards to be placed on workers. Since the shards all have a unique name within the cluster, there is no requirement as to the number of shards per worker. To design for later scale out, it is good practice to begin with a larger number of shards than workers. A scale issue can then be solved by simply increasing the workers and redistributing the shards.

There are several excellent documents as to how to create a Citus cluster. It is actively developed and supported. Its deployment options are as flexible as PostgreSQL, from a docker container all the way up to a hosted environment. Here is how to defining a data model and produce a method that can ingest and search at scale.

Defining the Data Model

The elements of an airline fare are as follows:

  • Origin
  • Destination
  • Departure Date
  • Return Date
  • Cabin Class
  • Point of Sale
  • Price

Note that the model omits several details, such as the exact airline and times. That is because this data is used to discover what kinds of fares are possible, a more detailed search is required for additional information. The point of sale column tells which market the sale will take place, it is used for international flights that are marketed in more than one location.

Citus permits distributing a table either by time of data ingestion or by a per column hash. Queries generated by the application made the per column hash the better option. However, only a single column can be a shard key, and the best choice is departure date. About the same number of flights depart each date, and most queries into the cache involve fare differences over days or weeks. Thus, with 32 shards, an entire month of fares is available with one query per shard.

However, there is one more complication. Each night, the fares may be changed as the result of a proactive search. The search will really only change one thing -- the price, but will not change other aspects of the fare. The way to incorporate this constraint is to make the departure date shard key part of a compound primary key. Citus will then distribute the data by departure date, and as long as that field is part of a SQL where clause, can efficiently process it. With these changes, the data model looks like this:

CREATE TABLE fares (
    origin character varying(3),
    destination character varying(3),
    departure_date date,
    return_date date,
    cabin varchar(25),
    point_of_sale varchar(2),
    price double precision,
    PRIMARY KEY(origin, destination, departure_date, return, cabin, point_of_sale)
    );

SELECT create_distributed_table('fares', 'departure_date');

The second SELECT tells Citus to divide the table by departure date and distribute it to the workers. Citus will take care of the DDL on the workers. The number of shards and their distribution is controlled by configuration variables. No data will be stored on the coordinator.

The use of the compound primary key takes the place of an index, and semantically binds the relation together. Instead of seven fields, there are two -- the primary key, and the fare amount. Fare data is always requested with the parameters in the primary key.

Ingesting Data

With the data model defined, the next task is data ingestion. Proactive search happens over a period of hours, and the searcher makes the results available in batches of records in a csv file. To import the csv file requires a destination, and Citus supports copy to a sharded table. At time of writing, Citus only supports copy. To use a more complex data manipulation, first copy the data to an ingest table, then write a query that moves data from the ingest table into the main table.

The ingest table has the same column names and types as the fare cache, except it has no primary key. If required, the primary key must be the shard key. Here is the create:

CREATE TABLE fares_ingest (
    origin character varying(3),
    destination character varying(3),
    departure_date date,
    return_date date,
    cabin varchar(25),
    point_of_sale varchar(2),
    price double precision,
    );

SELECT create_distributed_table('fares_ingest', 'departure_date');

Then the copy looks like this. It is part of the libpq client api.

COPY fares_ingest(origin, destination, departure_date,
    return_date, cabin, price, point_of_sale)
FROM
  'proactive_search_results.csv' DELIMITER ',';

After the copy, the shard for the ingest table and the shard for the main table are guaranteed to be on the same Citus worker. This means that the algorithm to move the data into place can run in parallel on every shard. Here is an example of using a PostgreSQL upsert:

INSERT INTO fares (origin, destination, departure_date,
    return_date, cabin, price, point_of_sale)
SELECT
    origin, destination, departure_date, return_date, cabin, price, point_of_sale
FROM
    fares_ingest ON CONFLICT (origin, destination, departure_date,
        return_date, cabin, point_of_sale)
    DO
    UPDATE
    SET
        price = EXCLUDED.price;

The Citus coordinator runs this query on each shard, allowing for maximum concurrency. Tuning the batch size and server parameters can further optimize the process.

Conclusion

A sharded architecture works well for this problem. The Citus extension provides a simple method to integrate a sharded architecture with a minimum of application changes. As long as all data relationships can be expressed on a per-shard basis, queries will run in parallel, allowing read and write scale.

References

For more information on Citus, see https://docs.citusdata.com .


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK