Sonata: Query-Driven Streaming Network Telemetry

Arpit Gupta, Rob Harrison, Marco Canini, Nick Feamster, Jennifer Rexford, and Walter Willinger. 2018. Sonata: query-driven streaming network telemetry. In Proceedings of the 2018 Conference of the ACM Special Interest Group on Data Communication (SIGCOMM ’18). Association for Computing Machinery, New York, NY, USA, 357–371. DOI:https://doi.org/10.1145/3230543.3230555

Sonata is a query-driven network telemetry system that is both flexible and scalable. The onus of making the net work is on network operators. However, in recent years with the proliferation of internet connected devices and applications, the complexity of network management has increased significantly. To keep the networks running, network operators have to handle various events such as outages, congestion and cyberattacks. To handle all these events, the first thing is to detect these network events in real time. There is a need to perform network monitoring.

To understand the requirements of a network monitoring system, lets’ consider a simple example. We have an attacker which sends spoof DNS request messages to open resolvers and the response to these requests get reflected to the victims. As the number of responses increases, the victims get overwhelmed. If I’m a network operator and if I want to figure out if a host in my network is a victim of this attack or not, I need to find out if there are hosts that receive DNS responses from many distinct sources. In this task, the network operator is trying to extract an aggregate metric from a subset of the traffic. There are many ways in which we can define what subset of traffic we are interested in & what kind of metrics we want to extract.

Traffic: address, protocol, payload, device, location, … Metrics: jitter, distinct hosts, volume, delay, loss, …

While designing network monitoring systems, it is important that we make sure that they are as flexible as possible: performance diagnosis, malware detection, fault localization, DDoS detection.

The amount of storage and compute resources available for these network monitoring systems is limited. This creates a gap between desired flexibility and scalability. Sonata tries to bridge this gap. Sonata gives the abstractions that make it possible for network operators to express wide range of network monitoring tasks and the algorithms that make the best use of limited storage and compute resources and the system that glues the high-level abstraction with the low-level algorithms.

Building Sonata is challenging:

Programming Abstractions

A packet in a network carries a lot of information in header & payload: metadata (traversed path, queue size, number of bytes) + header (source/dest address, protocol, parts) + payload. With recent developments in programmable switches, a packet also carries lot of meta information which captures network state information. Unlike many existing network monitoring systems that operate at much coarser level of granularity, Sonata operates at packet-level of granularity. It lets network operators treat packets as tuples: Packet = (path,qsize,nbytes,sIP,dIP,proto,sPort,dPort,...,payload). This lets them express their monitoring tasks as Dataflow Queries.

For example: Detecting DNS Reflection Attack is about identifying if DNS response messages from unique DNS servers to a single host exceeds a threshold (Th). We want to find victimIPs, and we will operate over a stream of packet tuples pktStream.

victimIPs = pktStream
    .filter(p=>p.udp.sport==53)      // DNS responses
    .map(p=>(p.dstIP,p.srcIP))
    .distinct()                      // from unique DNS servers
    .map((dstIP,srcIP)=>(dstIP,1))
    .reduce(keys=(dstIP,),sum)       // to a single host
    .filter((dstIP,count)=>count>Th) // exceeds a threshold

This is helpful because we can express wide range of network monitoring tasks in fewer than 20 lines of code.

Scalability

What is the target we are going to use for compiling/executing these queries? In the past, all the network monitoring systems that offers this level of flexibility used CPU for query execution (SIGMOD’03 Gigascope, SIGCOMM’17 NetQRE). We can extract and parse header and payload fields, apply any action, and memory available for stateful operators is O(Gb). The problem is that they process packets at O(us) which is very slow.

One can argue that you can use state of art scalable stream processor (like Apache Spark) to horizontally scale the computation but the problem is that they cannot process packets at line rate still holds true, and you still need to capture packets from the data plane & send to CPU for processing.

An alternate solution is we can programmable switches, though these programmable switches can process packets at line rate, O(ns), but they offer limited flexibility (only match headers, and can only apply add, subtract, bit ops, with O(Mb) memory for stateful ops). Example: Univmon (SIGCOMM’16), Marple (SIGCOMM’17).

We can use both switches and CPUs.

PISA (RMT SIGCOMM’13) Processing Model: Devices based on PISA, a protocol-independent switch architecture, have a programmable parser. This can extract user-defined packet formats and store the parsed value in a fixed-length packet header vector. A switch will have a pipeline with multiple stages - each stage will have multiple reconfigurable match-action tables composed of TCAMs, ALUs & SRAM. At the end of the pipeline, there is a programmable deparser that will serialize the packet header vector back into a packet. This is not very different from Dataflow processing model which is inherent to stream processors.

Packets can be represented as tuples. But how can we translate operators to match-action tables?

Which dataflow operators can be compiled to match-action tables?

Case for stateless operators: filter(p) is a stateless operator with predicate p. It receives a stream of elements as input and output a stream of elements satisfying predicate p. This operator can be simply translated by supplying this predicate p to the match field of the match-action table.

Case for stateful operators: reduce(f) which applies a function f. It receives a stream of elements as input and returns a single output value which is the result of applying function f over all elements. Because it is stateful, it will require some memory. This memory can be implemented in the hardware switch as hash-indexed array. To compile this operator, we will require two match action tables: .reduce(keys=(dstIP,),sum) will compile to:

Match |  Action
-------------------------
 *    |  idx=hash(m.dstIP)
 *    |  stateful[idx]+=1

For compiling a query, apply all these individual match-action tables in sequence.

Query Partitioning

The amount of resources that are available over a switch are limited. Thus the system actually requires a “Query Planner” which is going to make decisions about which portions of this query should be executed on the switch and which portions should be executed on the CPU. After applying the processing on the switch, the remainder tuples will be sent to the CPU for further processing.

A Query Planner needs to answer two basic questions for each decision:

We have to make partitioning decisions for all queries which will be sharing the dataplane resources.

The query partitioning problem is modeled as an Integer Linear Program (ILP). The goal is to minimize the tuples sent to the stream processor. This is subject to dataplane constraints:

We use representative packet traces to estimate what are the resource requirements for individual match-action tables and also the estimate of the number of tuples that will be sent to the stream processor.

How effective is Query Partitioning?

With 8 monitoring tasks and 100 Gbps workload, and we will compare the number of tuples sent to the stream processor. If we send everything to the CPU, the number of tuples is O(10^9). If we can execute stateful operators in the switch itself, then we should expect multiple orders of magnitude reduction in the tuples sent to the stream processors. But we observe only 1 order of magnitude reduction: O(10^8). This is because the amount of stateful memory required for executing stateful operators such as distinct and reduce was more than what was available in the switch, so the Query Planner was forced to execute all the stateful operators in the CPU rather than the switch.

How can we reduce the memory footprint of stateful operators?

Observations:

They combined all these observations together to devise the scheme called Iterative Query Refinement. We first execute query at coarser level, and at the end of the first window interval, we will get the output which will have the query because of the choice of hierarchical packet fields. This will have have the needles we’re interested in but also have some false positives. For new fresh set of packets that are coming in a new window interval, we’ll apply the same query but at finer level. But this time we’ll apply a filter operator which will make sure that it will only operate over subset of traffic that satisfied the query in the previous interval. We’ll apply all the similar set of operators at finer refinement level. The effective memory footprint over these two intervals has reduced but the cost is that it is taking additional time for finding the needles that we were interested in. This approach is able to reduce the memory footprint for executing these queries at the cost of additional detection delay.

How Query Planner makes this refinement decisions?

The goal is still to minimize the tuples sent to the stream processor. It is given the set of queries and representative packet traces. In addition to standard decision making it was making for query partitioning, it needs to answer:

They augmented partitioning ILP to compute both refinement & partitioning plans.

Sonata’s Performance: Compared to the approach where we were sending everything to the CPU, when we were doing partitioning, we were just getting 1 order of magnitude improvement, but with partitioning as well as refinement, we are able to get 4 orders of magnitude reduction.

Strengths

Weaknesses

Future Work