Using Alloy to parse a JSON log file to both metrics (Mimir) and logs (Loki)

This topic seems closely related to Alloy config to get metrics from log files, a case that unfortunately still don’t seem solved for good, which would be of great help.

The case I need to raise is related in the matter, that I also need to split a JSON log file, by the field “logger”, where all “logger=metrics” need to be forwarded to Mimir, and the rest to Loki.

I general we have build an Alloy infrastructure (for Windows), with a base config.alloy in %ProgramFiles%\Alloy, where modules for logs vs metrics gets imported based on applications and/or OS to monitor, and lastly a “destination” metrics.alloy vs logs.metrics, where the endpoints are defined.

The challenge is logs events like this:

{
“timestamp”: “2025-08-04T10:25:14.6342802Z”,
“level”: “INFO”,
“logger”: “Metrics”,
“details”: {
“content”: {
“lastUpdated”: “2025-08-04T10:25:14.4242787+00:00”,
“dayOfWeek”: 1,
“month”: 8,
“weekOfYear”: 32,
“value”: {
“value”: 7.0573165,
“portId”: 207,
“elapsedMilliseconds”: 89,
“requestTime”: “2025-08-04T10:00:13.7582003+00:00”,
“binId”: 137900,
“productId”: “P58255900”
}
},
“name”: “OperatorHandlingTimeForBinSeconds”
},
“correlationStarter”: “PickView.ascx.ProcessAjaxCommand”,
“correlationId”: “d6e229e13b32456ebabe6d86a7fa50c7”,
“appVersion”: “1.53.21.0”,
“domainUser”: “NT AUTHORITY\NETWORK SERVICE”,
“hostname”: “my_host”,
“companyName”: “CompanyName - prod”,
“thread”: “62”
}

My current plan was to use “loki.process”, stage.match {
selector = “{logger=\"metrics\"}” stage.metrics, metric.gauge to capture the metrics events, and vice versa:

stage.match { selector = “{logger!=\"metrics\"}” to capture the log events.

The logs arrives at Loki, but I still haven’t managed to see any metrics at Mimir yet.

The real confusing part is to figure out exactly how the “stage.metrics” works, moreover how the “forward_to” works for the metrics.

So far I haven’t been able to find strictly clear documentation on how to accomplish this task, why I’d kindly ask for a tested simple configuration on how to split log lines above.

My current logs.alloy looks like this:

declare “fetch” {
argument “forward_to” {
comment = “Must be a list(LogsReceiver) where collected logs should be forwarded to”
}

argument "tail_from_end" {
	comment  = "Whether to start reading from the end of the file or not (default: true)"
	optional = true
}

argument "path_target" {
	comment  = "Path to the test app log files (default: D:\\Logs\\eManager\\*)"
	optional = true
}

// Get and process the test app logfiles
local.file_match "default" {
	path_targets = [
		{
			__address__ = "localhost",
			__path__    = coalesce(argument.path_target.value, "D:\\Logs\\eManager\\*"),
		},
	]
	sync_period = "60s"
}

loki.source.file "default" {
	targets       = local.file_match.default.targets
	forward_to    = [loki.process.split.receiver]
	tail_from_end = coalesce(argument.tail_from_end.value, true)

	file_watch {
		min_poll_frequency = "1s"
		max_poll_frequency = "10s"
	}
}

loki.process "split" {
	forward_to = argument.forward_to.value

	stage.json {
		expressions = {
			_ts       = "timestamp",
			_level    = "level",
			_logger   = "logger",
		}
	}

	stage.timestamp {
		source = "_ts"
		format = "RFC3339Nano"
	}

	stage.replace {
		expression = "\"(dayOfWeek|month|weekOfYear)\"\\s*:\\s*(\"[^\"]*\"|[0-9]+),?"
		replace    = ""
	}

	stage.static_labels {
		values = {
			service_name = "app/test_app",
		}
	}


	// For metrics entries
	stage.match {
		selector = "{logger=\"metrics\"}"

		// Extract exactly the fields you described
		stage.json {
			expressions = {
			  metric_name     = "details.name",                                // e.g. "PortClosed"
			  elapsed_ms      = "details.content.value.elapsedMilliseconds",   // numeric value
			}
		}
		
		// Promote 'logger' and a safe label for the metric name (name stays dynamic as a *label*)
		stage.labels {
			values = {
			  logger      = "",
			  metric_name = "metric_name",
			}
		}

		// Extract metrics and generate Prometheus metrics here
		stage.metrics {
			metric.gauge {
				name        = "elapsed_Milliseconds"
				description = "elapsed Milliseconds for process"
				source      = "elapsedMilliseconds"
				action      = "set"
				prefix      = "test_app_"
				/*	labels      = {
					portOperation = "portOpereation"
					portId        = "portId"
					binId         = "binId"
					productId     = "productId"
				} */
			}
		}

		//action = "drop" // Optionally drop after extracting metrics so they don't go to Loki
	}
	
	// For non-metrics entries
	stage.match {
		selector = "{logger!=\"metrics\"}"
		stage.labels {
			values = {
				level     = "_level",
				component = "_logger",
			}
		}

		stage.structured_metadata {
			values = {
				file_name = "filename",
			}
		}

		stage.label_drop {
			values = [
				"filename",
				"hostname",
				"_level",
				"_logger",
			]
		}

		stage.tenant {
			value = string.format("%s_app", coalesce(sys.env("TENANT_PREFIX"), "cust"))
		}
	}
}

}

I’m looking forward to see your input

I don’t use stage.metrics myself, so I can’t really say, but the test section has some examples that might be helpful to you, see alloy/internal/component/loki/process/stages/metric_test.go at main · grafana/alloy · GitHub

I will try to spend a bit of time and try your configuration out later and see if i can make it work

1 Like

I finally found the solution myself, though I found the way of doing it a bit confusing until I finally nailed it.

Here is the final working solution:

declare “fetch” {
argument “forward_to” {
comment = “Must be a list(LogsReceiver) where collected logs should be forwarded to”
}

argument "tail_from_end" {
	comment  = "Whether to start reading from the end of the file or not (default: true)"
	optional = true
}

argument "path_target" {
	comment  = "Path to the testManager log files (default: D:\\Logs\\testManager\\*)"
	optional = true
}

// Get and process the testManager logfiles
local.file_match "default" {
	path_targets = [
		{
			__address__ = "localhost",
			__path__    = coalesce(argument.path_target.value, "D:\\Logs\\testManager\\*"),
		},
	]
	sync_period = "60s"
}

loki.source.file "default" {
	targets       = local.file_match.default.targets
	forward_to    = [loki.process.split.receiver]
	tail_from_end = coalesce(argument.tail_from_end.value, true)

	file_watch {
		min_poll_frequency = "1s"
		max_poll_frequency = "10s"
	}
}

loki.process "split" {
	forward_to = argument.forward_to.value

	// Get the base JSON structure and values
	stage.json {
		expressions = {
			_ts     = "timestamp",
			_level  = "level",
			_logger = "logger",
			details = "details",
		}
	}

	// Make sure the "_logger" is always in lowercase
	stage.replace {
		expression = "([Mm]etrics)"
		replace    = "metrics"
		source     = "_logger"
	}

	// Parse the 'details' object, and get the name and content
	stage.json {
		source      = "details"
		expressions = {
			_name   = "name",
			content = "",
		}
	}

	// Parse the 'content' object, and get the value
	stage.json {
		source      = "content"
		expressions = {
			value = "value",
		}
	}

	// Parse the 'value' object to get portOperation and other fields
	stage.json {
		source      = "value"
		expressions = {
			elapsed_ms        = "elapsedMilliseconds",
			value             = "",
			portId            = "",
			productId         = "",
		}
	}

	// Create labels of value's above
	stage.labels {
		values = {
			component         = "_logger",
			name              = "_name",
			portId            = "",
		}
	}

	stage.structured_metadata {
		values = {
			productId         = "",
		}
	}

	// Set timestamp according to log source
	stage.timestamp {
		source = "_ts"
		format = "RFC3339Nano"
	}

	stage.static_labels {
		values = {
			service_name = "app/testManager",
		}
	}

	// For metrics entries
	stage.match {
		selector = "{component=\"metrics\"}"

		// Convert logs to (Prometheus) metrics. Results can be seen here: http://127.0.0.1:12345/metrics
		stage.metrics {
			metric.gauge {
				name        = "elapsed_milliseconds"
				description = "elapsed Milliseconds for process"
				source      = "elapsed_ms"
				action      = "set"
				prefix      = "testManager_"
			}
		}
	}


	// Make sure metrics entries are dropped and not sent to Loki
	stage.match {
		selector            = "{component=\"metrics\"}"
		drop_counter_reason = "Data already sent to Mimir"
		action              = "drop" // Optionally drop after extracting metrics so they don't go to Loki
	}

	// For non-metrics entries
	stage.match {
		selector = "{component!=\"metrics\"}"

		stage.labels {
			values = {
				level = "_level",
			}
		}

		stage.structured_metadata {
			values = {
				file_name = "filename",
			}
		}

		stage.label_drop {
			values = [
				"filename",
				"hostname",
				"_level",
				"_logger",
			]
		}

		stage.tenant {
			value = string.format("%s_app", coalesce(sys.env("TENANT_PREFIX"), "cust"))
		}
	}
}

}