Work with us

Writing your own flux functions

Writing your own flux functions - Part 1

To anyone that regularly uses some form of “monitoring tool”, you will know that feeling you get when the day comes that you need to answer a question that requires you to join bits of information from seperate sets of time series data.

Now, if we are working on a traditional relational database (think: SQL), this is relatively easy to answer – but, nowadays it is equally likely that we’re working on any number of special TSDBs…and answering this kind of question can be very difficult.

Yes, we could resort to using the “read the whole DB, and filter by a regex”, but we then have to put up with running the query for a few hours, and apologising to the storage guys because their hardware is on fire.

Well, now, there’s a better way – Flux1 - It was designed for playing with data, and makes this task easy when you know how.

I will present this by first looking at a specific scenario that you can try for yourself, and the logic that could be used to solve it, and then the code.

In the next installment (Part 2), we will go through exactly what all of these steps do, and why we need them. But enough for now, let's see some code!

So, let’s suppose that we need to find “the 3 hosts with the busiest cpus, as measured by user time” – seems easy right? Well, the niave way would be to simply use the inbuilt “top("user_time", 3)” function – except that this will actually go through all of the data, and return the highest 3 values – which may well come from the same host. When what we actually need is the following:

  1. Out of all the data covering our time range, find the 3 hosts that have the highest CPU values.
  2. Return all of the data points, over our time range, for the 3 hosts identified in step 1. 

Ok, so here are the two functions that give us this, first, we get the names of the hosts with the highest cpu:

hosts_ranked_by_cpu = from(bucket: "telegraf/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => 
    r._measurement == "cpu" and 
    r._field == "usage_user"
  )
  |> group(columns:["host","_measurement"], mode:"by")
  |>sort(columns:["_value"],desc:true)
  |>limit(n:1)
  |>group()
  |>sort(columns:["_value"],desc:true)
  |>limit(n:3)
  |>rename(columns: {_time: "ignore_time"})

 

 

Then, we need to grab the cpu series for all hosts:

all_hosts_cpu_series = from(bucket: "telegraf/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => 
    r._measurement == "cpu" and 
    r._field == "usage_user"
  )
  |>rename(columns: {_start: "start", _stop: "stop"})
  |>group()


Once we have defined these two data-streams, then we need to join them (calling this causes the two streams to execute) and produce our final result:

join(
  tables: {cpu:hosts_ranked_by_cpu, all:all_hosts_cpu_series},
  on: ["host"]
)
|>group(columns:["host"], mode: "by")
|>map(fn:(r) => ({
  _time: r._time,
  _value: r._value_all,
}))
Not bad at all, but we can do better. Let's break some of the boiler plate into its own function, which would allow us to reuse some of this logic more easily:
top_x_by = (tables=<-, by_measurement, x) =>
  tables
  |> group(columns:[by_measurement,"_measurement"], mode:"by")
  |>sort(columns:["_value"],desc:true)
  |>limit(n:1)
  |>group()
  |>sort(columns:["_value"],desc:true)
  |>limit(n:x)
  |>rename(columns: {_time: "ignore_time"})

This allows us to adjust the parameters to handle other things as required - and we can group by other tags (not only by host, as we did before), as well as the number of series we want to return (can be top 1, 30, 21 - you get the picture).

So, for completeness, the full code now looks like this:

top_x_by = (tables=<-, by_measurement, x) =>
  tables
  |> group(columns:[by_measurement,"_measurement"], mode:"by")
  |>sort(columns:["_value"],desc:true)
  |>limit(n:1)
  |>group()
  |>sort(columns:["_value"],desc:true)
  |>limit(n:x)
  |>rename(columns: {_time: "ignore_time"})
 
hosts_ranked_by_cpu = from(bucket: "telegraf/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => 
    r._measurement == "cpu" and 
    r._field == "usage_user"
  )
  |>top_x_by(by_measurement:"host", x:3)
 
all_hosts_cpu_series = from(bucket: "telegraf/autogen")
  |> range(start: dashboardTime)
  |> filter(fn: (r) => 
      r._measurement == "cpu" and 
      r._field == "usage_user"
  )
  |>rename(columns: {_start: "start", _stop: "stop"})
  |>group()
 
join(
  tables: {cpu:hosts_ranked_by_cpu, all:all_hosts_cpu_series},
  on: ["host"]
)
|>group(columns:["host"], mode: "by")
|>map(fn:(r) => ({
  _time: r._time,
  _value: r._value_all,
 }))

We can now generate the result: a graphic showing the full timeseries data for the "top 3":

Code is available here, enjoy ;-)

1. Flux is in active development at Influxdata, and as such, is still considered as experimental, and not yet stable.

Back to Blog

We make data work for you

Our Expertise

Learn More