37

Tracking progress in timely dataflow

 4 years ago
source link: https://www.tuicool.com/articles/jaAJBzY
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Tracking progress in Timely Dataflow

Timely dataflow is a neat way of describing dataflow computations, one which gives you some pretty sweet latency and throughput properties. Although there are many interesting parts to timely dataflow, one of the relatively new parts (shared with the Naiad project ) is its approach to tracking the progress of a dataflow computation.

Dataflow computations are a bit unlike the traditional Von Neumann architecture in which some central processor announces what the computer should do next. In a dataflow setting you have multiple independent places computation happens, and it happens because data arrive, get processed, and turn in to output data. It doesn't happen because someone says "now it is your turn to do the work".

This flexibility in execution can lead to significant performance improvements, but it does introduce the complexity that each part of the computation can't rely on detailed knowledge about what is going on in the rest of the computation. Some times you would like to know if the dataflow operators upstream of you have finished and it is now "your turn" to execute.

We are going to talk through progress tracking, but using differential dataflow for the algorithms rather than the hand-rolled code in the timely repository (differential dataflow is built on timely dataflow, and relies on progress tracking, so self-hosting is not as easy as you might think). All of the code we will spin up now exists as examples/progress.rs in the differential dataflow repository.

This post is a bit long, so I've tried to break it up into independent components that you can read through and mentally checkpoint your understanding after each.

  1. Background on tracking progress in a timely dataflow.
  2. Types and traits to model a timely dataflow.
  3. Detecting cycles in a timely dataflow graph.
  4. Tracking progress in a running timely dataflow.

These topics get at the core of what goes on in timely dataflow, but there are some advanced topics too! In particular, advanced topics related to how timely dataflows can present themselves as operators contained in a larger timely dataflow.

  1. Summarizing timely dataflow structure.
  2. Extended progress tracking for nested dataflows.

These advanced topics are important for timely dataflow's modular implementation, which avoids anyone needing to have a full view of the whole dataflow, but you can understand most of what goes on in timely dataflow progress tracking without needing these additional details.

Tracking progress in timely dataflow

In timely dataflow each datum that flows through the system is stamped with a logical time. We say "logical time" in contrast with "physical time" to distinguish time recorded somewhere else ("logical") to time as observed locally ("physical"); there will be no assumed relationship between the logical timestamps and the time of the clock on the wall near our computer. The logical timestamps are simply presented to us with the data, and we are obliged to respect them.

From the point of view of the data moving through the system, a logical timestamp is attached to each record that enters the system from outside, and is maintained (or advanced) as the data flow through operators in the system. Throughout the life of some data, and other data that may result from it, the associated timestamp should never decrease, though it need not increase either.

From the point of view of the locations in the timely dataflow graph, data stream by with their timestamps. The timestamps are probably mostly in order, but there are no guarantees here: the data may have taken multiple paths to get to the operator and so had their times shuffled, and there were no guarantees in the first place that data needed to enter the system in order of their timestamps. Data arrive with timestamps, and the general trend is that the times generally increase and eventually we stop seeing some times, but we would like to know for sure.

Timely dataflow's position is that the information an operator needs most is some description of the timestamps the operator may yet see on any of its inputs. From this information, an operator can draw conclusions about times that it may never see again, and perform any work it may have for those times.

What an operator really needs (and timely dataflow provides) is a continually evolving view of the timestamps that may yet arrive at the operator's inputs. The view evolves because as data are produced, transformed, and eventually retired, the set of possible future timestamps changes. Ideally the set decreases , in that times that were not possible should not again become possible, as otherwise we would have a bit of a bug.

An example: PageRank

Let's look at introductory example. The following is the timely dataflow graph for a PageRank computation, taken from the timely dataflow diagnostics repository .

Mf26Z3y.png!web

In this dataflow there are four operators, with integer identifiers you can read out from their names (they have a leading zero to indicate that they are in the zero-th dataflow). These operators also have edges between them, and in particular the PageRank operator stands out as having two inputs, one for graph edges and one for circulating ranks. As edges flow in from the Input operator, they prompt the PageRank operator to start the circulation of ranks, on past a Probe operator, along a Feedback operator, and back in to the PageRank operator from whence they came.

As it turns out, the PageRank operator is written to await all of its input edges before it starts circulating ranks, and it awaits the contribution of all ranks from one iteration before producing new ranks for the next iteration. These seem like smart things to do, but how does the operator know when this happens if we are not directly coordinating the execution of operators?

Timely dataflow's progress tracking machinery is the answer!

Tracking progress

In the timely dataflow repository there is a module of code that deals with tracking progress in a timely dataflow graph: reachability.rs . This file does most of the thinking, and it is only about 350 lines of code, plus a fair bit for comments.

We are going to take a bit of a tour of the functionality of this module, through the lense of differential dataflow .

It turns out that it is easier to describe what is going on in the module using high-level programming constructs, like differential dataflow, which can get across the intent of the computation without the concurrent stress about the specific implementation details. We will need to use careful and clever algorithms when doing progress tracking, because we haven't yet built up differential dataflow (which itself relies on progress tracking; it's dataflows all the way down).

Types and traits to model timely dataflows.

Let's start with some warm up where we name the moving parts in a timely dataflow, from our progress-tracking point of view.

A timely dataflow graph is defined with respect to a timestamp type T , which needs time implement the Timestamp trait . This trait isn't wildly complicated, and we will call out the uses of its non-standard requirements at the few moments we see them. You are welcome to think of it as an integer for now.

A timely dataflow graph is described by a collection of operators, numbered using integers, that each have a number of input and output ports. The ports are connected to each other by edges, which specifically connect output ports of some operator with input ports of what may be some other operator (or the same operator, even).

There are some helpful types in timely dataflow's progress module :

/// A timely dataflow location.
pub struct Location {
    /// A scope-local operator identifier.
    pub node: usize,
    /// An operator port identifier.`
    pub port: Port,
}

/// An operator port.
pub enum Port {
    /// An operator input.
    Target(usize),
    /// An operator output.
    Source(usize),
}

In this world, a Location is either an input or output port at a specific operator, where ports are further classified as either Target or Source ports, respectively, because they are either the target or source of timestamped messages.

When we want to be more specific about a Location , we also have types that specifically name source and target locations:

/// Names a source of a data stream.
pub struct Source {
    /// Index of the source operator.
    pub node: usize,
    /// Number of the output port from the operator.
    pub port: usize,
}

/// Names a target of a data stream.
pub struct Target {
    /// Index of the target operator.
    pub node: usize,
    /// Number of the input port to the operator.
    pub port: usize,
}

I've elided some of the documentation comments, which get ahead of our current exposition.

We are going to model the state of a timely dataflow, from a progress tracking point of view, with three typed differential dataflow collections. Two of these might not be very surprisingly, and then the third one may require a bit of motivation.

  1. We will model the edges in a timely dataflow as a collection edges of pairs (Source, Target) describing the source and target of each edge. Each edge describes the source node and port, and the target node and port.

  2. We will model the times in a timely dataflow as a collection times of pairs (Location, T) where T is the timestamp type we are tracking. Each element of the collection describes a location where a timestamped datum may yet appear.

Each of these collections are probably unsurprising, as we probably couldn't say any less about the edges and times than we have. We will make up for this by describing the dataflow nodes with much more detail.

  1. We will model the nodes in a timely dataflow by a collection nodes of triples (Target, Source, T::Summary) describing an internal connection from an operator input to one of its outputs, where the T::Summary term describes a minimal increment that must happen to timestamps that traverse the internal connection.

The nodes are for sure less intuitive, especially without an understanding of what this T::Summary type is. Informally, T::Summary is a type that describe increments to the timestamp T , and it is used to guarantee a minimal increment along the connection: what the operator promises it will do to each timestamp. For example, the Feedback operator has one input port and one output port, and guarantees that it will increment any timestamp that flows along it by one. Most operators have internal connections with zero guarantee about incrementing, which just indicates that there is a connection from the input port to the output port.

This summary type might seem a bit odd, and it will be crucial only for iterative dataflows: those with cycles in them. For now you can think of it as an integer also, or perhaps not think of it much at all, and save understanding it for your next pass through the post.

Detecting cycles in timely dataflows

For timely dataflow's progress tracking protocol to be correct, the timely dataflow graph needs to have an important structural property: there should be no cycles that do not strictly advance the timestamp. If this property holds then we can be certain of conclusions of irreversible progress made by the system. If this property does not hold then there isn't very much we can say: the cycle is effectively capable of manufacturing data out of thin air, at timestamps that appear to have passed.

With this in mind, the progress::reachability module will check any timely dataflow graph for non-incrementing cycles, explicitly. It uses a fairly simple algorithm that restricts itself to edges and non-incrementing internal connections, and then repeatedly discards locations with no incoming transitions.

/// Identifies cycles along paths that do not increment timestamps.
///
/// The algorithm retains edges and internal connections within nodes
/// that have default timestamp summaries, and attempts to determine
/// that this graph is acyclic by repeatedly removing nodes with no
/// incoming edges. If the fixed point of this process has a non-empty
/// set of edges, then each edge participates in some cycle.
fn find_cycles<G: Scope, T: Timestamp>(
    nodes: Collection<G, (Target, Source, T::Summary)>,
    edges: Collection<G, (Source, Target)>,
) -> Collection<G, (Location, Location)>
where
    G::Timestamp: Lattice+Ord,
    T::Summary: differential_dataflow::ExchangeData,
{
    // Retain node connections along "default" timestamp summaries.
    let nodes = nodes.flat_map(|(target, source, summary)| {
        if summary != Default::default() {
            Some((Location::from(target), Location::from(source)))
        }
        else {
            None
        }
    });
    let edges = edges.map(|(source, target)| (Location::from(source), Location::from(target)));
    let transitions: Collection<G, (Location, Location)> = nodes.concat(&edges);

    // Repeatedly restrict to locations with an incoming transition from an active location.
    transitions
        .iterate(|locations| {
            let active =
            locations
                .map(|(_source, target)| target)
                .distinct();
            transitions
                .enter(&locations.scope())
                .semijoin(&active)
        })
        .consolidate()
}

This computation will return any transitions that participate in a non-incrementing cycle. Ideally it will be empty! But if it is not empty, we get to see which edges are the trouble-makers and report these back to the user. This is a great first step, before we start up with with the progress tracking.

The actual implementation uses a more direct algorithm, again because we don't yet have access to differential dataflow. The computation above has the cool property that it will interactively track the presence of cycles, and could warn you as you were building up a dataflow graph, but before you submit it.

Tracking progress in a running timely dataflow

With a suitable pair of nodes and edges , meaning one that has no non-incrementing cycles, we are ready to think through progress tracking as that timely dataflow runs.

Progress in a timely dataflow is relative to our collection times which contains pairs of location and timestamp for which data are believed to exist. Anything downstream from them should be warned that they might receive data bearing that timestamp, or whatever the timestamp would need to be having traversed a possibly incrementing path from the element of times to the location in question.

We can describe this computation by iteratively developing the times that can reach each location in the timely dataflow. For any timestamp that could reach a location, the timestamp can also traverse any outbound edges from the location, and the timestamp may cross any operator-internal connection although it may need to be transformed as it does. The times can then be collected by location (along with the initial times) and we can retain one copy of each timestamp that is not strictly greater than some other timestamp at the location.

The resulting differential dataflow code looks like so:

/// Propagates times along a timely dataflow graph.
///
/// Timely dataflow graphs are described by nodes with interconnected input and output ports,
/// and edges which connect output ports to input ports of what may be other nodes.
///
/// A set of times at various locations (input or output ports) could traverse nodes and
/// edges to arrive at various other locations. Each location can then track minimal times
/// that can reach them: those times not greater than some other time that can reach it.
///
/// The computation to determine this, and to maintain it as times change, is an iterative
/// computation that propagates times and maintains the minimal elements at each location.
fn frontier<G: Scope, T: Timestamp>(
    nodes: Collection<G, (Target, Source, T::Summary)>,
    edges: Collection<G, (Source, Target)>,
    times: Collection<G, (Location, T)>,
) -> Collection<G, (Location, T)>
where
    G::Timestamp: Lattice+Ord,
    T::Summary: differential_dataflow::ExchangeData,
{
    // Translate node and edge transitions into `(Location, Location, Summary)`.
    let nodes = nodes.map(|(target, source, summary)| (Location::from(target), (Location::from(source), summary)));
    let edges = edges.map(|(source, target)| (Location::from(source), (Location::from(target), Default::default())));
    let transitions: Collection<G, (Location, (Location, T::Summary))> = nodes.concat(&edges);

    times
        .iterate(|reach| {
            transitions
                .enter(&reach.scope())
                .join_map(&reach, |_from, (dest, summ), time| (dest.clone(), summ.results_in(time)))
                .flat_map(|(dest, time)| time.map(move |time| (dest, time)))
                .concat(×.enter(&reach.scope()))
                .reduce(|_location, input, output: &mut Vec<(T, isize)>| {
                    // retain the lower envelope of times.
                    for (t1, _count1) in input.iter() {
                        if !input.iter().any(|(t2, _count2)| t2.less_than(t1)) {
                            output.push(((*t1).clone(), 1));
                        }
                    }
                })
        })
        .consolidate()
}

The output of this computation will update as its inputs change. The change we expect in a traditional timely dataflow computation is that the collection times would evolve as various operators consume and produce things that hold on to timestamps (messages, and internal capabilities). These changes happen across workers, but the computation above is performed privately by each worker in response to the changes it hears from its peer workers. The entire progress tracking protocol, including what each worker chooses to announce to each of its peers, is beyond this incarnation of this blog post.

As with cycle detection, the above is not the actual code , which needs to be hand written in the absense of differential dataflow (and also, it is much faster than differential dataflow would be, using a less-synchronous algorithm).

Although not the same as the actual code, the above gives us the same interface: changes to the collection times stream in, and we need to produce any changes (Location, T) to the frontiers of locations in the timely dataflow. These changes are then surfaced to operators, who are given the chance to react to them.

The description of the computation above shows that we can actually respond to arbitrary changes in the inputs, including changes to the timely dataflow graph topology, changes to the internal structure of operators, and changes to the timestamps that do not follow the standard rules of timely dataflow's progress tracking protocol (which requires that messages and capabilities must be consumed to produce new messages or capabilities). This is all well and good, and possibly interesting, but without the additional protocol requirements we are not able to guarantee that frontiers will never move backwards.

Said differently: the computation above will compute and maintain the correct frontier, but only under some assumptions will the output of the computation be a guarantee about the future of the system.

Bonus: Summarizing timely dataflow structure

Timely dataflows may be nested within other timely dataflows, in which they present upwards as timely dataflow operators themselves.

In timely dataflow we model the external surface of a timely dataflow by its zero-th operator, who represents represents the outside world, and whose inputs and outputs correspond to the dataflow's outputs to and inputs from the outside world, respectively.

To present upwards as a timely dataflow operator, with its own input and output ports, a timely dataflow needs to summarize itself as connections between its inputs and outputs along with summaries for each. We will want to determine summaries for all possible internal connections between the output ports of operator zero and its input ports. Much like the progress tracking maintenance, we will start to propagate information through the dataflow graph, collecting summaries as we go.

While we could start at the outputs of operator zero and flow summary information forward, for reasons of the next section we prefer to flow backwards from the inputs to operator zero. Doing this will provide summary information from each location in the dataflow graph to each of the inputs to operator zero.

/// Summary paths from locations to operator zero inputs.
fn summarize<G: Scope, T: Timestamp>(
    nodes: Collection<G, (Target, Source, T::Summary)>,
    edges: Collection<G, (Source, Target)>,
) -> Collection<G, (Location, (Location, T::Summary))>
where
    G::Timestamp: Lattice+Ord,
    T::Summary: differential_dataflow::ExchangeData+std::hash::Hash,
{
    // Start from trivial reachability from each input to itself.
    let zero_inputs =
    edges
        .map(|(_source, target)| Location::from(target))
        .filter(|location| location.node == 0)
        .map(|location| (location, (location, Default::default())));

    // Retain node connections along "default" timestamp summaries.
    let nodes = nodes.map(|(target, source, summary)| (Location::from(source), (Location::from(target), summary)));
    let edges = edges.map(|(source, target)| (Location::from(target), (Location::from(source), Default::default())));
    let transitions: Collection<G, (Location, (Location, T::Summary))> = nodes.concat(&edges);

    zero_inputs
        .iterate(|summaries| {
            transitions
                .enter(&summaries.scope())
                .join_map(summaries, |_middle, (from, summ1), (to, summ2)| (from.clone(), to.clone(), summ1.followed_by(summ2)))
                .flat_map(|(from, to, summ)| summ.map(move |summ| (from, (to, summ))))
                .concat(&zero_inputs.enter(&summaries.scope()))
                .map(|(from, (to, summary))| ((from, to), summary))
                .reduce(|_from_to, input, output| {
                    for (summary, _count) in input.iter() {
                        if !input.iter().any(|(sum2, _count2)| sum2.less_than(*summary)) {
                            output.push(((*summary).clone(), 1));
                        }
                    }
                })
                .map(|((from, to), summary)| (from, (to, summary)))

        })
        .consolidate()
}

This method produces all summaries from all locations in the timely dataflow, to all inputs of operator zero. If we select out the summaries from the outputs of operator zero, we will have the summary of the dataflow upwards as an operator.

Savvy readers may also notice that in addition to presenting upwards as an operator, the timely dataflow hosting that operator must present a summary back downwards to the nested dataflow: we'll need the operator-internal connections for operator zero.

Bonus: Extended progress tracking for nested dataflows

When we nest timely dataflows as operators in other timely dataflows, there is a bit of a negotiation that needs to happen between the levels. When the inner dataflow circulates messages, it needs to warn the outer dataflow that it might still produce timestamped messages at its outputs, even though it does not yet have the message to produce.

In timely dataflow these are expressed through "timestamp capabilities", which can be retained by operators and which provide the ability to produce timestamped messages. They are a bit like pre-paid envelopes, which may eventually be used to send timestamped data, but at the moment are just a warning to others that messages may arrive.

As timestamped data circulate in a nested timely dataflow, we need to track the potential for timestamped data to emerge from the graph, by way of operator zero's inputs (which lead to the wider world). This is almost what we are doing with our vanilla progress tracking, with one exception: we are only supposed to report the potential for timestamped data to emerge from messages in the nested dataflow ; we don't want to accidentally mirror operator zero's capability to send timestamped data back at it. That is, if the wider world might send some data into the nested dataflow, it is not appropriate to report that back out as something the dataflow might produce (technically speaking, we are permitted to do this, but then the progress tracking algorithm will stall on loops the same way traditional stream processors do).

The remedy is to use the summaries of the previous section, from each location to each input of operator zero, to surface information about the activity in the nested subgraph, messages and capability contained within, to the wider world. By applying these to all operators other than operator zero, we produce the transitive effects of timestamps owned by the nested dataflow.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK