Pushing an offline logs from the past

Hello Grafana/Loki community,

I trying to push an offline log streams to loki, could you please help me with analysing why it doesn’t work? My precognitions are:

  • I’m trying to use grafana/loki to fetch offline logs that were collected in the past (possibly long time ago).
  • Each log line has a timestamp which is the log line date which I need to use (push to loki)
  • I can’t install any new software on the monitored host so I have written a simple python client that fetches the logs with scp and pushes them to loki via rest
  • I musing a monoltic grafana/loki setup from GitHub - grafana/loki: Like Prometheus, but for logs. as described in Getting started with Grafana and Loki in under 4 minutes | Grafana Labs.
  • promtail is not running as I have no use for it.
  • current date is 14.11.2022

The container setup (docker-compose.yaml) is:

version: "3"

networks:
  loki:

services:
  loki:
    image: grafana/loki:2.6.1
    ports:
      - "3100:3100"
    command: -config.file=/etc/loki/local-config.yaml -log.level=debug
    networks:
      - loki

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    networks:
      - loki

In the loki.yaml the relevant (to my knowledge) sections are:

limits_config:
  max_cache_freshness_per_query: '10m'
  enforce_metric_name: false
  reject_old_samples: false
  reject_old_samples_max_age: 8064h #336d
  ingestion_rate_mb: 10
  ingestion_burst_size_mb: 20
  # parallelize queries in 15min intervals
  split_queries_by_interval: 15m
  unordered_writes: true
  # Default: 721h
  max_query_length: 5000h

chunk_store_config:
  max_look_back_period: 8064h #336d

Simplified, relevant parts of client code are:

    ssh = createSSHClient('10.10.0.10', 614, 'root')
    scp = SCPClient(ssh.get_transport())
    sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
    file_list = sftp.listdir("/var/opt/some-log-dir/")
    for item in file_list:
       scp.get("/var/opt/some-log-dir/{}".format(item), local_path=host_path)

    for filename in os.listdir(host_path):
        entries, payload = init_trace_log_stream(filename)
        prev_timestamp = None
        with open(os.path.join(host_path, filename)) as f:
            prev_line = None
            print("File: '{}'".format(f))
            for line in f:
                time_stamp, component, level, thread, reminder = parse_trace_log_line(line)
                if not time_stamp and len(entries):
                    entries[-1][-1] += "\n" + line.strip()
                    continue                
                unix_ts = time.mktime(time_stamp.timetuple())
                # TIME NEEDS TO BE IN NANOSECONDS!!
                entries.append([str(int(unix_ts * 1000000000)), line])
                    
                # Send logs in a per-day chunks
                if prev_timestamp is None:
                    prev_timestamp = unix_ts
                elif unix_ts - prev_timestamp > 10:
                    answer = push_to_loki(payload)
                    print("Anwser: {}/{}".format(answer, answer.text))
                    entries, payload = init_trace_log_stream(filename)
                    prev_timestamp = unix_ts
        
        answer = push_to_loki(payload)
        print("Anwser: {}/{}".format(answer, answer.text))

Depending on the log line date, some log stręams are rejected:

POSTING: {"streams": [{"stream": {"filename": "aosDomLag.log"}, "values": [["1665503572000000000", "2022-10-11T17:52:52.739|eth.com.proc   |NOTICE|THD:7f0dc4ff9640|\u001b[32mAppMgrBase.cpp\u001b[0m:\u001b[31m263 \u001b[32mcheckHealth()\u001b[0m|delay release model service:1\n"], ["1665503577000000000", "2022-10-11T17:52:57.739|eth.com.proc   |NOTICE|THD:7f0dc4ff9640|\u001b[32mAppMgrBase.cpp\u001b[0m:\u001b[31m263 \u001b[32mcheckHealth()\u001b[0m|delay release model service:0\n"], ["1665503577000000000", "2022-10-11T17:52:57.739|eth.com.proc   |NOTICE|THD:7f0dc4ff9640|\u001b[32mModelHelper.cpp\u001b[0m:\u001b[31m192 \u001b[32mreleaseModel()\u001b[0m|Destroy CapMgr\n"], ["1665503577000000000", "2022-10-11T17:52:57.739|eth.com.proc   |NOTICE|THD:7f0dc4ff9640|\u001b[32mModelHelper.cpp\u001b[0m:\u001b[31m196 \u001b[32mreleaseModel()\u001b[0m|Destroy Model API\n"]]}]}

Anwser: <Response [400]>/entry for stream '{filename="aosDomLag.log"}' has timestamp too old: 2022-10-11T15:52:57Z, oldest acceptable timestamp is: 2022-11-07T19:06:56Z

and some are accepted:

POSTING: {"streams": [{"stream": {"filename": "snmpd.log"}, "values": [["1668446652000000000", "2022-11-14T18:24:12.584|fw.snmp.app    |NOTICE|THD:7f30756f5640|libAosSnmpFwCache.so|CachePlugin.cpp:312 syncExecute()| MIT sync start\n"], ["1668446652000000000", "2022-11-14T18:24:12.940|core.sirm.adapt|NOTICE|THD:7f305effd640|Starting phase: START\n"], ["1668446652000000000", "2022-11-14T18:24:12.940|core.sirm.adapt|NOTICE|THD:7f305effd640|Finished phase: START\n"], ["1668446654000000000", "2022-11-14T18:24:14.332|fw.snmp.app    |NOTICE|THD:7f30756f5640|libAosSnmpFwCache.so|CachePlugin.cpp:357 syncExecute()| MIT sync complete\n"], ["1668446654000000000", "2022-11-14T18:24:14.332|fw.snmp.app    |NOTICE|THD:7f30756f5640|libAosSnmpFwCache.so|CachePlugin.cpp:361 syncExecute()| snmp sync took: 1748ms\n"], ["1668446664000000000", "2022-11-14T18:24:24.260|core.sirm.adapt|NOTICE|THD:7f305effd640|Starting phase: ENABLE\n"]]}]}

Anwser: <Response [204]>/

Do you know why the log lines are rejected and how to fix it?

Best Regards,
Marcin

When you are replaying old logs you need to make sure they are relatively in order. Basically you can’t send old log to a stream if there are newer entries already, and I think it has something to do with max_chunk_age.

There is a post on this forum that explains it pretty well, but I can’t find it at the moment.

1 Like

Wow it worked. Thank you!
I thought that setting

limits_config:
  unordered_writes: true

would allow me to send the logs out of order. I’ll look into the docs. more.

This topic was automatically closed 365 days after the last reply. New replies are no longer allowed.