How could I pull Loki records from a Python script?

Hello team:
I would like to dig into the Loki records in order to find patterns. For instance, the following sequence of records coming from a LAN switch tells me one switch port is flapping once each 10 seconds:

In cases like this, if the sequence repeats beyond some upper bound of time, I would like to notify the network guys in order to fix the potential issue in the network device.

But I do not want to waste my time looking to the Grafana dashboards. I want this task to be delegated to a program.

I wonder how to address the requirement from a programming point of view. I would have liked to automate this from a Python script but AFAIK there is no Python library from which to call Loki for its records. At least the ones I found (loki-client, python-loki-client) do not work.

How are these tasks being carried out currently?

Any hints will be greatly appreciated
Best regards

Rogelio

you could use the api

Thank you Raymond!

I forgot to say that I am already working with the HTTP API. It seems to be that for the time being this is the way to go.

Best regards, Rogelio

1 Like

I am not so proud of my piece of code, but the following gave me access to the Loki records :slightly_smiling_face:

The following is the payload that will be used as argument of the POST message to Grafana to ask it for the Loki records:

def payload(switch):
argument = {
“queries”: [
{
“refId”: “A”,
“key”: “Q-f03bc30a-9f33-45a4-8fd0-9d9c3662b890-0”,
“datasource”: {
“type”: “loki”,
“uid”: “JKn0-PDSk”
},
“editorMode”: “builder”,
“expr”: “{hostname="”+switch+“", loglevel="5"} |= changed state to”,
“queryType”: “range”,
“maxLines”: Loki_lines_to_retrieve,
“legendFormat”: “”,
“datasourceId”: 22,
“intervalMs”: 15000,
“maxDataPoints”: 1485
}
],
“from”: “2024-01-08T00:00:00.000Z”,
“to”: “2024-01-08T23:59:59.000Z”,
}
return(argument)

The following are the other arguments of the POST:

apikey=‘<JKKJHJKKLLLJKJLLHUJNK;BJ >’

headers = {‘Authorization’: apikey,‘Accept’: ‘aplication/json’,‘Content-Type’: ‘application/json’}

url=‘http://admin:admin1@10.235.2.19:3000/api/ds/query

And then the following is the POST executed to gather the actual records:

PLD=payload(str(LANswitch[0]))
response= requests.post(url, data=json.dumps(PLD),headers=headers)

Once I have the answer from Grafana, I iterate on the results (provided that the POST returned a 200 OK). the following lines are executed in a loop:
while index >= 0:
LOG=response.json()[‘results’][‘A’][‘frames’][0][‘data’][‘values’][2][index]
TIMESTAMP=int(response.json()[‘results’][‘A’][‘frames’][0][‘data’][‘values’][1][index])
Filtered=LOG.split(“Interface “,1)[1]
INT=Filtered.split(”,”,1)[0]
STATE=Filtered.split(“state to “,1)[1].replace(”\n”,‘’)
(… more lines follow but not shown here…)

It worked quite well, although I would have liked a library like the one I use to ask InfluxDB for its records…

Thanks!!!

1 Like

Thanks for your response as well. It helped me to understand, that Loki does not have a Python client to work with. Even though I persists all logs to S3 but since there’s no client to parse or read those logs from S3, I too tried the POST method. Here’s the code that I wrote. It might help others.

Do note, you need to run this code from within the server (EKS, EC2, etc) if you’re using an internal endpoint like mine.

import requests
from datetime import datetime, timedelta
import os 

def query_loki(base_url, query, start_time, end_time):
    endpoint = f"{base_url}/loki/api/v1/query_range"
    
    payload = {
        "query": query,
        "start": start_time,
        "end": end_time
    }
    
    headers = {
        "Accept": "application/json",
        "X-Scope-OrgID": "1"  # Adjust as per your Loki setup
    }
    
    print(f"Sending request to: {endpoint}")
    print(f"Payload: {payload}")
    print(f"Headers: {headers}")
    
    try:
        response = requests.get(endpoint, params=payload, headers=headers)
        print(f"Status code: {response.status_code}")
        print(f"Content-Type: {response.headers.get('Content-Type', 'Not specified')}")
        print(f"Response headers: {dict(response.headers)}")
        
        if response.status_code == 200:
            try:
                json_response = response.json()
                print("Successfully parsed JSON response")
                return json_response
            except json.JSONDecodeError:
                print("Received a 200 status, but response is not valid JSON")
                print(f"First 500 characters of response: {response.text[:500]}")
        else:
            print(f"Received non-200 status code. Response text: {response.text[:500]}")
        
        return None
    except requests.exceptions.RequestException as e:
        print(f"Request Exception: {e}")
        return None

# Example usage
base_url = "http://loki-gateway.monitoring.svc.cluster.local:80"
query = '{app="backend"}'
current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
start_time = (datetime.utcnow() - timedelta(minutes=5)).strftime('%Y-%m-%dT%H:%M:%SZ')
end_time = current_time

result = query_loki(base_url, query, start_time, end_time)

if result:
    print("Response received and parsed as JSON")
    print(json.dumps(result, indent=2))
else:
    print("No valid result returned")
1 Like