How to connect streaming data source to websocket?

I would like to develop a streaming datasource fro grafana analogous to the telegraf “Stream using WebSocket endpoint” mentioned here: Stream metrics from Telegraf to Grafana.
If I understand well, this could be an independent websocket client (or websocket server?), for example written in python or node.js and running as an independent process (for data acquisition).
From the configuration information for telegraf I infer that the websocket client would connect to

url = “ws://localhost:3000/api/live/push/custom_stream_id”

where “custom_stream_id” could be any identifier.
Also it seems that an authorization in the websocket header is needed:

Authorization = "Bearer "

as well as the

data_format = “influx”

which is well documented at the telegraf site.

Nevertheless, I did not find de grafana documentation that explains in more detail, what such an independent websocket client (or websocket server) has to take into account to connect to grafana in an similar way the WebSocket output plugin of Telegraf does.
I would be very grateful if somebody could help me out with this.
Thank you very much, Olivier Rickmers

Hi Olivier,

I agree with you that the grafana documentation is far from adequate on this topic. I was also interested in using websocket for data streaming since it seems to work a bit better for “high” data rates (about 100 samples/s) compared to running it through a database like Redistimeseries first. I actually ended up having to sniff my own network traffic with Wireguard to figure out what the mysterious “influx data format” that the little piece of rushed documentation talks about looks like. Unfortunately I don’t have this capture at hand anymore, but I came up with this very quick and dirty piece op Python 3.7 script which can stream a sinewave to the Grafana websocket for test purposes:

import math
import time
import asyncio
import websockets

async def stream():
    async with websockets.connect("ws://localhost:3000/api/live/push/sinewave_test", extra_headers={'Authorization': 'Bearer THETOKENSHOULDHAVEADMINRIGHTSIFIREMEMBERCORRECTLY'}) as websocket:
        i = 0
        while True:
            await websocket.send("test Sinewave=" + str((math.sin(math.radians(i))+1)*20+80) +" " + str(time.time_ns()+1000000000))
            if (i == 361):
                i = 0
                i = i + 15
            await asyncio.sleep(0.03)

If I remember correctly, you can add more signals in the websocket.send() part in Name=Value format, maybe with a comma behind the previous one.

I hope this will get you started!

Hi grafana89e9
Thank you so much for your great reply. I will try it out as soon as I have some time to do it and keep you all posted.
By the way, the influx data format is documented here:
and here:
My idea is indeed to use grafana for real time data display and the websocket streaming interface would be ideal for it, as it could decouple, via websockets, the data presentation (grafana) from the multiple considerations related to the specific real time data acquisition. And yes, you are right, passing through some other package, like telegraf, might be useful in some cases, but in other cases not appropriate for the process from which the data is obtained.

Thanks again, have a great day,

I did what the previous person suggested with wireshark and saw that the way telegraf sends data is with http post method and has a format that can be written in python as follows:

import requests
import time

x1 = 0
x2 = 0
x3 = 0

for i in range(3000) :
  response ='http://localhost:3000/api/live/push/custom_stream_id', data='sma,sma=cpu5,host=smar usage_softirq='+str(x1)+',usage_guest='+str(x2)+',usage_guest_nice='+str(x3), headers={'Authorization':'Bearer '+ 'eyJrIjoiMm03c0lXRWY0VGxrM0Vmd2hBelN6NWdtQ3ZSMHZLMTAiLCJuIjoidGVsIiwiaWQiOjF9'})

it works perfectly and i can make dashboards with my data
change host name so that it works for you and also change the authentication
note: there has to be a space between host=smar and the data part and also a space between bearer and api token