Multiline stage duplicating and wrongly sorting log lines

To clarify: I’m currently on Grafana Agent 0.39.2, which is obviously outdated. Due to various encountered issues with newer versions of Agent and Alloy in a Docker Swarm setup (see e.g. Debian 12: v1.1.0: panic loop · Issue #868 · grafana/alloy · GitHub) which were not fixed at the time and eroded our trust in the maturity of the solution, we have not upgraded (yet).

So now I’m seeing issues with Grafana Agent’s multiline stage in flow mode, i.e. logs are duplicated, triplicated, out-of-order etc. (no pattern recognisable), incl. both the first line and stack trace lines. The regex is simply checking line starts as per below:

stage.multiline {
   firstline = "^(x|y|z).*?$"
   max_wait_time = "3s" // Defaults to 3s
   max_lines = 128 // Defaults to 128
}

As I can’t find any issues/topics related to this, I’m wondering what might be causing it and if not a config issue, if this is fixed in Alloy/is a known issue. When I disable the multiline stage, everything looks fine, no duplicate/out-of-order entries.

Hello! Thank you for your message. I am personally not aware of issues which the multiline stage which would cause this.

Would it be possible to provide us with an example log file and config file so that we can reproduce the issue please?

Thanks for your reply! I attach a redacted sample config.river file that we currently use for the agent (note I commented out the 2 multiline parsing stages).

I also attach an example from our db logs - note as I said, I couldn’t see any pattern. It’s both application and db logs, sometimes it’s duplicate lines, sometimes it’s 3x/4x etc., sometimes it is the first line, sometimes not. Also out of order and then appearing in multiple log chunks. The max_lines config seems to work. Changing it (to 64) does not help.

I also attach our Dockerfile, docker-compose file (for Docker swarm), loki config, even if I don’t think it matters much for this topic.

Thanks in advance & let me know if you need further info.

example log:

2024-07-11 08:02:50.824 UTC [55186] LOG:  execute <unnamed>: /* delete from Object object
	where object.createDate < ?1 and not exists (select 1
	where object.createDate < ?1 and not exists (select 1
	from Object object
	where object.id = otherObject.id) */ delete from table1 bte1_0 where bte1_0.create_date<$1 and not exists(select 1 from table2 bpe1_0 where bte1_0.id=bpe1_0.trip_id)
	from Object object 

Dockerfile for custom agent image:

FROM grafana/agent:v0.39.2

COPY config.river /etc/agent/config.river

ENV AGENT_MODE=flow

CMD ["run", "--server.http.listen-addr=0.0.0.0:12345", "/etc/agent/config.river"]

docker-compose.yaml-excerpt:

  agent:
    image: internal-registry/grafana-agent:latest
    volumes:
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
      - /var/run/docker.sock:/var/run/docker.sock
    environment:
      - HOSTNAME=redacted
      - LOKI_URL=http://loki:3100/loki/api/v1/push
      - TENANT_ID=redacted
    ports:
      - "12345:12345"
    depends_on:
      - loki
    networks:
      - internal
      - prometheus
    labels:
      logging: "grafana-agent"
      prometheus.scrape: "true"
      prometheus.port: "12345"
    deploy:
      labels: (redacted)

config.river:

discovery.docker "docker_scrape" {
	host             = "unix:///var/run/docker.sock"
	refresh_interval = "5s"

	filter {
		name   = "label"
		values = ["logging=grafana-agent"]
	}
}

discovery.relabel "docker_scrape" {
	targets = []

	rule {
		source_labels = ["__meta_docker_container_id"]
		target_label  = "docker_container_id"
	}

	rule {
		source_labels = ["__meta_docker_container_name"]
		regex         = "/(.*)"
		target_label  = "container_name"
	}

	rule {
		source_labels = ["__meta_docker_container_log_stream"]
		target_label  = "logstream" // i.e. stdout/stderrr.
		// Containers will typically log to either one.
		// (This rule does not appear in the Grafana Agent documentation, but works as a Promtail configuration.)
	}

	rule {
		source_labels = ["__meta_docker_container_label_com_docker_swarm_service_name"]
		target_label = "docker_service_name"
	}

	rule {
		source_labels = ["__meta_docker_container_label_com_docker_stack_namespace"]
		target_label = "docker_stack_namespace"
	}

}

loki.source.docker "docker_scrape" {
	host             = "unix:///var/run/docker.sock"
	targets          = discovery.docker.docker_scrape.targets
	forward_to       = [loki.process.docker_scrape.receiver]
	relabel_rules    = discovery.relabel.docker_scrape.rules
	refresh_interval = "5s"
}

loki.write "default" {
	endpoint {
		url = env("LOKI_URL")
		tenant_id = env("TENANT_ID")
	}
	external_labels = {}
}

loki.process "docker_scrape" {
	forward_to = [loki.write.default.receiver]

	stage.static_labels {
		values = {
			host_name = env("HOSTNAME"),
		}
	}

	// ############################### APPLICATION ###############################

	stage.match {
		// This stage will parse non-JSON logs.
		selector = "{docker_service_name=~\".*application_name.*\", log_status!=\"processed\"} !~ \"^{.*.application.:.application_name.}$\""
		// The wildcards are '"',but single quotes are not allowed.
		// We use 'processed' as a custom label to only process logs in following stages not yet processed.

//		Multiline currently disabled, as leading to duplicate and out-of-order logs.
//		stage.multiline {
//			firstline = "^(x|y|z|v|other).*?$"
//			max_wait_time = "3s" // Defaults to 3s
//			max_lines = 128 // Defaults to 128
//		}

		stage.template {
			source = "docker_stack_namespace"
			template = "{{if .Value }} {{- .Value -}} {{else}} {{- \"(Not running on Docker Swarm)\" -}} {{end}}"
		}

		stage.regex {
			expression = "^.*?(?P<level>DEBUG|INFO|WARN|ERROR|FATAL).*?$"
		}

		stage.labels {
			values = {
				level = null,
			}
		}

		stage.static_labels {
			values = {
				pipeline 	= "application_name_plain",
				processing_status  = "processed",
			}
		}

	}

// ###################### APPLICATION JSON ######################

	stage.match {
		// This stage will parse application JSON logs.
		selector = "{docker_service_name=~\".*application_name.*\", processing_status!=\"processed\"} |~ \"^{.*.application.:.application_name.}$\""
		// The end is a custom JSON attribute added to the end of all application_name log4j logs via the JSON appender.
		// The wildcards are '"',but single quotes are not allowed.

		stage.json {
			// For examples of raw JSONs produced by application_name, see: REDACTED
			expressions = {
				time       = "timeMillis",
				class_name = "loggerName",
				level  = "level",
				message    = "message",
				thread     = "thread",
				thrown     = "thrown",
				marker     = "marker",
			}
		}

		stage.json {
			source = "marker"
			expressions = {
				marker_name  = "name",
			}
		}

		stage.timestamp {
			source = "time"
			format = "UnixMS"
			action_on_failure = "skip"
		}

		stage.labels {
			values = {
				level   = null,
				thread      = null,
				marker_name = null,
			}
		}

		stage.template {
			source = "docker_stack_namespace"
			template = "{{if .Value }} {{- .Value -}} {{else}} {{- \"(Not running on Docker Swarm)\" -}} {{end}}"
		}

		stage.template {
			// The thrown JSON is currently just passed without processing for potential subsequent evaluation.
			source   = "thrown"
			template = "{{if .Value }} {{- \"\\n\\nThrown:\\n\" -}}{{- .Value -}} {{else}} {{- \"\u200B\" -}} {{end}}" // \u200B = zero width space
		}

		stage.template {
			source   = "marker_name"
			template = "{{if .Value }} {{- .Value -}} {{else}} {{- \"\u200B\" -}} {{end}}" // \u200B = zero width space
		}

		stage.template {
			source   = "class_name"
			template = "{{if .Value }} {{- .Value -}} {{else}} {{- \"-\" -}} {{end}}"
		}

		stage.template {
			source   = "output"
			template = "{{ .docker_stack_namespace }}: {{ ToUpper .level }} [{{ .thread }}] {{ .class_name }}[{{ .marker_name }}] - {{ .message}}{{- .thrown -}}"
		}

		stage.output {
			source = "output"
		}

		stage.static_labels {
			values = {
				pipeline	 	= "log_application_name_json",
				processing_status  	= "processed",
			}
		}
	}

	// ############################### DB ###############################

	stage.match {
			selector = "{docker_service_name=~\".*db.*\", processing_status!=\"processed\"} !~ \"^{.*}$\"" // Assuming we do not have any other services to log with 'db' in the name.

		//		Multiline currently disabled, as leading to duplicate and out-of-order logs.
//		stage.multiline {
//			firstline = "^\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}.\\d{3}\\sUTC.*?$"
//			max_wait_time = "3s" // Defaults to 3s
//		}

		stage.regex {
			expression = "^.*?(?P<level>DEBUG|DETAIL|LOG|INFO|WARNING|ERROR|FATAL|PANIC)(.|\\n)*?$"
		}

		stage.regex {
			expression = "^(?P<time>\\d{4}-\\d{2}-\\d{2}\\s\\d{2}:\\d{2}:\\d{2}.\\d{3}\\sUTC)(.|\\n)*?$"
		}

		stage.timestamp {
			source = "time"
			format = "2006-01-02 15:04:05.999 MST" // See: https://grafana.com/docs/loki/latest/send-data/promtail/stages/timestamp/
            action_on_failure = "skip"
		}

		stage.labels {
			values = {
				level = null,
			}
		}

		stage.static_labels {
			values = {
				pipeline = "log_db_plain",
				processing_status   = "processed",
			}
		}
	}

	// ############################### AGENT ###############################

	stage.match {
		selector = "{docker_service_name=~\".*agent.*\", processing_status!=\"processed\"}"

		stage.static_labels {
			values = {
				pipeline = "log_agent",
				processing_status   = "processed",
			}
		}

		stage.regex {
			expression = "^ts=(?P<time>[T\\d-:.Z]*) level=(?P<level>[a-zA-Z]+).*?"
		}

		stage.template {
			source   = "level"
			template = "{{ ToUpper .level }}"
		}

		stage.labels {
			values = {
				level = null,
			}
		}

		stage.timestamp {
			source = "time"
			format = "2006-01-02T15:04:05.999999999Z"
            action_on_failure = "skip"
		}
	}

	// ############################### LOKI ###############################

	stage.match {
		selector = "{docker_service_name=~\".*loki.*\", processing_status!=\"processed\"}"

		stage.static_labels {
			values = {
				pipeline = "log_loki",
				processing_status   = "processed",
			}
		}

		stage.regex {
			expression = "^level=(?P<level>[a-zA-Z]+) ts=(?P<time>[T\\d-:.Z]*).*?"
		}

		stage.template {
			source   = "level"
			template = "{{ ToUpper .level }}"
		}

		stage.labels {
			values = {
				level = null,
			}
		}

		stage.timestamp {
			source = "time"
			format = "2006-01-02T15:04:05.999999999Z"
            action_on_failure = "skip"
        }

	}

	// ############################### GRAFANA ###############################

	stage.match {
		selector = "{docker_service_name=~\".*grafana.*\", processing_status!=\"processed\"}"

		stage.static_labels {
			values = {
				pipeline = "log_grafana",
				processing_status   = "processed",
			}
		}

		stage.regex {
			expression = ".*?(logger=(?P<logger>[a-zA-Z\\.]+))?.*?t=(?P<time>[T\\d-:.Z]*) level=(?P<level>[a-zA-Z]+)?.*?"
		}

		stage.template {
			source   = "level"
			template = "{{ ToUpper .level }}"
		}

		stage.labels {
			values = {
				level  = null,
				logger = null,
			}
		}

		stage.timestamp {
			source = "time"
			format = "2006-01-02T15:04:05.999999999Z"
            action_on_failure = "skip"
        }

	}

	// ############################### UNPROCESSED ###############################

	stage.match {
		selector = "{processing_status!=\"processed\"}"

		stage.regex {
			expression = "^.*?(?P<level>DEBUG|INFO|WARN|WARNING|ERROR|FATAL|debug|info|warn|warning|error|fatal).*?$"
		}

		stage.labels {
			values = {
				level  = null,
			}
		}

		stage.static_labels {
			values = {
				processing_status 	= "unprocessed",
			}
		}
	}
}

loki.yaml:

# Adapted from: https://github.com/grafana/loki/blob/main/cmd/loki/loki-local-config.yaml and https://github.com/grafana/loki/issues/4613#issuecomment-1018367471

# The module to run Loki with. Supported values
# all, distributor, ingester, querier, query-frontend, table-manager.
# [target: <string> | default = "all"]
target: all

auth_enabled: true # Header X-Scope-OrgID must be present if true (multi-tenant mode). This is not about an actual auth layer: https://github.com/grafana/loki/blob/main/docs/sources/operations/authentication.md

server:
  http_listen_port: ${HTTP_LISTEN_PORT:-3100}
  http_server_read_timeout: 300s
  http_server_write_timeout: 300s
  log_level: ${LOG_LEVEL:-info} #Supported values [debug, info, warn, error]

# Again, this is adapted from the link above, and partly differing from config found in the docs using 'storage_config'
common:
  instance_addr: ${INSTANCE_ADDR:-127.0.0.1}
  path_prefix: /loki
  storage:
    filesystem:
      chunks_directory: /loki/chunks # When run in single tenant mode, all chunks are put in a folder named 'fake' (auth_enabled: false)
      rules_directory: /loki/rules
  ring:
    kvstore:
      store: inmemory
    heartbeat_timeout: 10m # https://grafana.com/blog/2021/02/16/the-essential-config-settings-you-should-use-so-you-wont-drop-logs-in-loki/
  replication_factor: 1 # 3 would be recommended for availability. However you need to have rings (distributed hashtables)
    # set up explicitly and have additional ingester members join them.
  # https://grafana.com/blog/2021/02/16/the-essential-config-settings-you-should-use-so-you-wont-drop-logs-in-loki/

schema_config:
  configs:
    - from: 2024-01-01
      store: tsdb
      object_store: filesystem
      schema: v12# Adapted from: https://github.com/grafana/loki/blob/main/cmd/loki/loki-local-config.yaml and https://github.com/grafana/loki/issues/4613#issuecomment-1018367471

# The module to run Loki with. Supported values
# all, distributor, ingester, querier, query-frontend, table-manager.
# [target: <string> | default = "all"]
target: all

auth_enabled: true # Header X-Scope-OrgID must be present if true (multi-tenant mode). This is not about an actual auth layer: https://github.com/grafana/loki/blob/main/docs/sources/operations/authentication.md

server:
  http_listen_port: ${HTTP_LISTEN_PORT:-3100}
  http_server_read_timeout: 300s
  http_server_write_timeout: 300s
  log_level: ${LOG_LEVEL:-info} #Supported values [debug, info, warn, error]

# Again, this is adapted from the link above, and partly differing from config found in the docs using 'storage_config'
common:
  instance_addr: ${INSTANCE_ADDR:-127.0.0.1}
  path_prefix: /loki
  storage:
    filesystem:
      chunks_directory: /loki/chunks # When run in single tenant mode, all chunks are put in a folder named 'fake' (auth_enabled: false)
      rules_directory: /loki/rules
  ring:
    kvstore:
      store: inmemory
    heartbeat_timeout: 10m # https://grafana.com/blog/2021/02/16/the-essential-config-settings-you-should-use-so-you-wont-drop-logs-in-loki/
  replication_factor: 1 # 3 would be recommended for availability. However you need to have rings (distributed hashtables)
  # set up explicitly and have additional ingester members join them.
  # https://grafana.com/blog/2021/02/16/the-essential-config-settings-you-should-use-so-you-wont-drop-logs-in-loki/

schema_config:
  configs:
    - from: 2024-01-01
      store: tsdb
      object_store: filesystem
      schema: v12
      index:
        prefix: index_
        period: 24h # The index period must be 24h for the compactor.

ingester:
  chunk_encoding: snappy # https://grafana.com/blog/2021/02/16/the-essential-config-settings-you-should-use-so-you-wont-drop-logs-in-loki/
  #  Less compressibility but faster performance than gzip (default encoding).
  chunk_target_size: 1572864 # https://grafana.com/docs/loki/latest/configure/bp-configure/#use-chunk_target_size
  # We could additionally consider WAL enabling to prevent log loss in event of a crash. https://grafana.com/docs/loki/latest/operations/storage/wal/#write-ahead-log
  chunk_idle_period: 2h # To match 2h default of ingester.max_chunk_age (https://grafana.com/docs/loki/latest/operations/storage/filesystem/)

query_range:
  max_retries: 5 # | default = 5]
  cache_results: true # | default = false]

  results_cache:
    cache:
      embedded_cache:
        enabled: true # | default = false]
        max_size_mb: 100 # | default = 100]

chunk_store_config:
  chunk_cache_config:
    embedded_cache:
      enabled: true # | default = false]

compactor:
  shared_store: filesystem
  delete_request_store: filesystem # See https://grafana.com/docs/loki/latest/operations/storage/logs-deletion/#configuration
  working_directory: /loki/tsdb-shipper-compactor
  compaction_interval: 10m # dictates how often compaction and/or retention is applied. If the Compactor falls behind, compaction and/or retention occur as soon as possible.
  retention_enabled: true # Without this, the Compactor will only compact tables.
  retention_delete_delay: 2h # is the delay after which the Compactor will delete marked chunks.
  retention_delete_worker_count: 150 # specifies the maximum quantity of goroutine workers instantiated to delete chunks.

limits_config:
  # For more granular retention, see https://grafana.com/docs/loki/latest/operations/storage/retention/#configuring-the-retention-period (e.g. log retention per service)
  retention_period: ${RETENTION_PERIOD:-30d}

  # Limit how far back in time series data and metadata can be queried,
  # up until lookback duration ago.
  # This limit is enforced in the query frontend, the querier and the ruler.
  # If the requested time range is outside the allowed range, the request will not fail,
  # but will be modified to only query data within the allowed time range.
  # The default value of 0 does not set a limit.
  # CLI flag: -querier.max-query-lookback
  max_query_lookback: 90d

  # Examples for more advanced config such as query parallelization, see e.g. https://github.com/grafana/loki/issues/10257

frontend:
  # Maximum number of outstanding requests per tenant per frontend; requests
  # beyond this error with HTTP 429.
  # CLI flag: -querier.max-outstanding-requests-per-tenant
  max_outstanding_per_tenant: 2048 # default = 2048]

  # Compress HTTP responses.
  # CLI flag: -querier.compress-http-responses
  compress_responses: true # default = true]

  # Log queries that are slower than the specified duration. Set to 0 to disable.
  # Set to < 0 to enable on all queries.
  # CLI flag: -frontend.log-queries-longer-than
  log_queries_longer_than: 20s # | default = 0s ]

query_scheduler:
  # Maximum number of outstanding requests per tenant per query-scheduler.
  # In-flight requests above this limit will fail with HTTP response status code
  # 429.
  # CLI flag: -query-scheduler.max-outstanding-requests-per-tenant
  max_outstanding_requests_per_tenant: 32000 # default = 32000
    
    
    #ruler:
    #  storage:
    #    type: local
    #    local:
    #      directory: /data/loki/rules # volume, directory to scan for rules
    #  rule_path: /data/loki/rules-temp # volume, store temporary rule files
    #  alertmanager_url: "https://alertmanager.example.com"
    #  enable_alertmanager_v2: true
    #  alertmanager_client:
    #    basic_auth_username: "{{ loki_alertmanager_username }}"
    #    basic_auth_password: "{{ loki_alertmanager_password }}"
    
    # By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
    # analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
    #
    # Statistics help us better understand how Loki is used, and they show us performance
    # levels for most users. This helps us prioritize features and documentation.
    # For more information on what's sent, look at
    # https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
    # Refer to the buildReport method to see what goes into a report.
    #
    # If you would like to disable reporting, uncomment the following lines:
    #analytics:
    #  reporting_enabled: false
      index:
        prefix: index_
        period: 24h # The index period must be 24h for the compactor.

ingester:
  chunk_encoding: snappy # https://grafana.com/blog/2021/02/16/the-essential-config-settings-you-should-use-so-you-wont-drop-logs-in-loki/
  #  Less compressibility but faster performance than gzip (default encoding).
  chunk_target_size: 1572864 # https://grafana.com/docs/loki/latest/configure/bp-configure/#use-chunk_target_size
  # We could additionally consider WAL enabling to prevent log loss in event of a crash. https://grafana.com/docs/loki/latest/operations/storage/wal/#write-ahead-log
  chunk_idle_period: 2h # To match 2h default of ingester.max_chunk_age (https://grafana.com/docs/loki/latest/operations/storage/filesystem/)

query_range:
  max_retries: 5 # | default = 5]
  cache_results: true # | default = false]

  results_cache:
    cache:
      embedded_cache:
        enabled: true # | default = false]
        max_size_mb: 100 # | default = 100]

chunk_store_config:
  chunk_cache_config:
    embedded_cache:
      enabled: true # | default = false]

compactor:
  shared_store: filesystem
  delete_request_store: filesystem # See https://grafana.com/docs/loki/latest/operations/storage/logs-deletion/#configuration
  working_directory: /loki/tsdb-shipper-compactor
  compaction_interval: 10m # dictates how often compaction and/or retention is applied. If the Compactor falls behind, compaction and/or retention occur as soon as possible.
  retention_enabled: true # Without this, the Compactor will only compact tables.
  retention_delete_delay: 2h # is the delay after which the Compactor will delete marked chunks.
  retention_delete_worker_count: 150 # specifies the maximum quantity of goroutine workers instantiated to delete chunks.

limits_config:
  # For more granular retention, see https://grafana.com/docs/loki/latest/operations/storage/retention/#configuring-the-retention-period (e.g. log retention per service)
  retention_period: ${RETENTION_PERIOD:-30d}

  # Limit how far back in time series data and metadata can be queried,
  # up until lookback duration ago.
  # This limit is enforced in the query frontend, the querier and the ruler.
  # If the requested time range is outside the allowed range, the request will not fail,
  # but will be modified to only query data within the allowed time range.
  # The default value of 0 does not set a limit.
  # CLI flag: -querier.max-query-lookback
  max_query_lookback: 90d

  # Examples for more advanced config such as query parallelization, see e.g. https://github.com/grafana/loki/issues/10257

frontend:
  # Maximum number of outstanding requests per tenant per frontend; requests
  # beyond this error with HTTP 429.
  # CLI flag: -querier.max-outstanding-requests-per-tenant
  max_outstanding_per_tenant: 2048 # default = 2048]

  # Compress HTTP responses.
  # CLI flag: -querier.compress-http-responses
  compress_responses: true # default = true]

  # Log queries that are slower than the specified duration. Set to 0 to disable.
  # Set to < 0 to enable on all queries.
  # CLI flag: -frontend.log-queries-longer-than
  log_queries_longer_than: 20s # | default = 0s ]

query_scheduler:
  # Maximum number of outstanding requests per tenant per query-scheduler.
  # In-flight requests above this limit will fail with HTTP response status code
  # 429.
  # CLI flag: -query-scheduler.max-outstanding-requests-per-tenant
  max_outstanding_requests_per_tenant: 32000 # default = 32000


#ruler:
#  storage:
#    type: local
#    local:
#      directory: /data/loki/rules # volume, directory to scan for rules
#  rule_path: /data/loki/rules-temp # volume, store temporary rule files
#  alertmanager_url: "https://alertmanager.example.com"
#  enable_alertmanager_v2: true
#  alertmanager_client:
#    basic_auth_username: "{{ loki_alertmanager_username }}"
#    basic_auth_password: "{{ loki_alertmanager_password }}"

# By default, Loki will send anonymous, but uniquely-identifiable usage and configuration
# analytics to Grafana Labs. These statistics are sent to https://stats.grafana.org/
#
# Statistics help us better understand how Loki is used, and they show us performance
# levels for most users. This helps us prioritize features and documentation.
# For more information on what's sent, look at
# https://github.com/grafana/loki/blob/main/pkg/analytics/stats.go
# Refer to the buildReport method to see what goes into a report.
#
# If you would like to disable reporting, uncomment the following lines:
#analytics:
#  reporting_enabled: false