How to pair specific logs in Flux (InfluxDB) to calculate average duration?

Hello Community,

I’m trying to calculate the average duration between POST and PUT events in InfluxDB using Flux in Grafana, but I’m encountering limitations.

Objective:

I have several POST events (POST /cases) and PUT events (PUT cases / {id}).

I want to pair each POST with the next PUT from the same user, calculate the time difference, and then calculate the total average. The user is filterable, or if I select all users, I’ll get the overall average.

This should work even if there are multiple POSTs in a row before the next PUT. In other words, my goal is to always collect the POST before the PUT and ignore all others behind it, if applicable.

Example logs:|action_type | _time|
|— | —|
|POST /cases | 2025-10-09 08:07:49|
|PUT /cases/123 | 2025-10-09 08:13:33|
|POST /cases | 2025-10-09 08:18:45|
|PUT /cases/456 | 2025-10-09 08:20:08|
|POST /cases | 2025-10-09 09:26:07|
|PUT /cases/789 | 2025-10-09 09:27:40|

Based on this example, the idea is to calculate the durations:

08:07:49 → 08:13:33 = 5 min 44 s
08:18:45 → 08:20:08 = 1 min 23 s
09:26:07 → 09:27:40 = 1 min 33 s

And then calculate the total average ≈ 2.89 minutes.

Current issue:

I’m trying to use join + group + sort + limit(1) in Flux, but it only captures the last POST→PUT and ignores the others.

I tried experimental.stateTracking, but I couldn’t create the pairs correctly without errors.

Question:

Is there a way in Flux/Grafana to:
Pair each POST with the next PUT sequentially for each user?
Calculate the duration between them and generate the average?

Thanks for any help or suggestions! :folded_hands:

This is my current code, but it’s only collecting the last difference → 1 min 33 s

import "strings"

starts = from(bucket: "example1")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
      (r["_measurement"] == "success" or r["_measurement"] == "error") and
      r["environment"] =~ /^$environment$/ and
      r["user_id"] =~ /^$userId$/ and
      r["action_type"] == "POST /cases"
  )
  |> keep(columns: ["_time", "user_id"])
  |> rename(columns: {_time: "start_time"})
  |> sort(columns: ["start_time"])

ends = from(bucket: "example1")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) =>
      (r["_measurement"] == "success" or r["_measurement"] == "error") and
      r["environment"] =~ /^$environment$/ and
      r["user_id"] =~ /^$userId$/ and
      (r["action_type"] =~ /^PUT \/cases\/[0-9]+$/")
  )
  |> keep(columns: ["_time", "user_id"])
  |> rename(columns: {_time: "end_time"})
  |> sort(columns: ["end_time"])

sessions = join(
    tables: {start: starts, end: ends},
    on: ["user_id"],
    method: "inner"
)
  |> filter(fn: (r) => r.end_time > r.start_time)
  |> group(columns: ["user_id", "start_time"])
  |> sort(columns: ["end_time"]) 
  |> limit(n: 1)
  |> map(fn: (r) => ({
      user_id: r.user_id,
      start_time: r.start_time,
      end_time: r.end_time,
      duration_sec: float(v: int(v: r.end_time) - int(v: r.start_time)) / 1000000000.0
  }))

sessions
  |> mean(column: "duration_sec")
  |> map(fn: (r) => ({ AVG_case_plan_general: r.duration_sec/60.0 }))
  |> yield(name: "average_general")