Influxdb with scala

Hello,

I am trying to transfer this data to influxdb by pulling data from an address, but I could not transfer the data to influxdb even though I pulled the data. I am working in Scala language, service is running but data is not coming to influxdb. I want the monitoring EMC Recover Point

Could you help me with this please?

My Collector.xml code

<?xml version="1.0" encoding="UTF-8"?>
<collector>
  <extractors>
    <extractor name="rp">
      <baseUrl>https://EMC-RP-IP/fapi/rest/5_1</baseUrl>
    </extractor>
  </extractors>
  <outputs>
    <output name="influxdb">
      <type>influxdb</type>
      <address>localhost</address>
      <port>8086</port>
      <dbname>RP</dbname>
    </output>
  </outputs>
  <systems>
    <system>
      <name>rp01</name>
      <class>rp</class>
      <interval>10 min</interval>
      <extractor name="rp">
        <address>EMC-RP-IP</address>
        <username>admin</username>
        <password>pass</password>
        <ver>5_1</ver>
      </extractor>
      <output>influxdb</output>
    </system>
  </systems>
</collector>

My Influxdb.scala code

package universalstoragecollector

import scala.util.Properties.propOrElse
import scala.util.Try
import java.util.concurrent.TimeUnit

import org.influxdb.{InfluxDB, InfluxDBFactory}
import org.influxdb.InfluxDB.ConsistencyLevel
import org.influxdb.dto.Point.Builder
import org.influxdb.dto.{BatchPoints, Point}

// Output to InfluxDB
class Influx(name: String, config: Map[String, String],
               sysConfig: Map[String, Option[String]])
  extends Output(name, config, sysConfig) with Logger {

  val loggerFile: String = propOrElse("USC_HOME", "") + "log/collector-error.log"

  // Common parameters
  val address: Option[String] =
    if (config.contains("address"))
      Some(config("address"))
    else
      None

  val port: Option[Int] =
    if (config.contains("port"))
      Some(config("port").toInt)
    else
      None

  val dbname: Option[String] =
    if (config.contains("dbname"))
      Some(config("dbname"))
    else
      None

  var influxDB: InfluxDB = _

  // Validation by common parameters
  //def isValid: Boolean = config.contains("address") & config.contains("port") &
    //config.contains("dbname")

  // Connection initialization before data output
  def start(): Unit = {
    influxDB = InfluxDBFactory.connect("http://Influx-IP","username","pass")
    influxDB.open()
    String databaseName = "foo"
    influxDB.query(new Query("CREATE DATABASE " + databaseName, databaseName))
    influxDB.setDatabase(databaseName)
    
      
  }

  // Convert numbers in data from String type to Double type
  def parseDouble(s: String): Option[Double] = Try { s.toDouble }.toOption

  // Output data comes in 'msg' and 'data' structures
  def out(msg: Map[Int, (String, String)], timestamp: Long, data: Map[String, String]): Unit = {

    val header: Map[String, String] = (msg.values map (v => v._1 -> v._2)).toMap

    val batchPoints: BatchPoints = BatchPoints
      .database(databaseName)
      .retentionPolicy("autogen")
      .consistency(ConsistencyLevel.ALL)
      .build()

    val point: Builder = Point.measurement(header("measurement"))
      .time(timestamp.toLong, TimeUnit.SECONDS)
      .tag("storage", sysConfig("name").get)
      .tag("class", sysConfig("class").get)

    if (sysConfig("type").isDefined) point.tag("type", sysConfig("type").get)

    (header.keySet - "measurement") foreach {k => point.tag(k, header(k))}

    data.foreach {p =>
      if (parseDouble(p._2).isDefined) point.addField(p._1, parseDouble(p._2).get)
      else point.addField(p._1, p._2)
    }

    batchPoints.point(point.build())
    influxDB.write(batchPoints)
  }

  // Closing connection after data output
  def stop(): Unit = {
    influxDB.close()
  }
}

And my logs:
Wed Nov 30 09:24:36 UTC 2022 rp01, /clusters/statistics: java.util.NoSuchElementException: key not found: measurement
Wed Nov 30 09:24:37 UTC 2022 rp01, /rpas/statistics: java.util.NoSuchElementException: key not found: measurement
Wed Nov 30 09:24:37 UTC 2022 rp01, /groups: net.liftweb.json.MappingException: No usable value for innerSet
No usable value for groupCopiesInformation
No usable value for vmsInformation
Do not know how to convert JObject(List(JField(vmUID,JObject(List(JField(uuid,JString(500823dd-6738-b445-c705-7ad0d2b4e67c)), JField(virtualCenterUID,JObject(List(JField(uuid,JString(1a5e7115-0def-4796-b155-89d0c20e4ea5)))))))), JField(vmName,JString(YalcinMustafa_Management-win)), JField(vmBiosUuid,JString(4208652e-3aae-11bb-03a8-47d5ad33e1fc)))) into class java.lang.String

Thanks in advance

I am not sure the Grafana community forum is the best place to ask that question. Have you tried the InfluxDB forum?

Thanks for your reply

Is this xml from a result of an api or isnit a file on disk?

It’s file on disk. I created it

What is the source of the xml data?

You could also try using telegraf to consume this data into influxdb

[[inputs.exec]]
  ## Commands array
  commands = [
    "python.exe push_xml.py"
  ]

Or just pure python, here are some samples for csv import to influxdb