// Package openstack implements an OpenStack input plugin for Telegraf // // The OpenStack input plug is a simple two phase metric collector. In the first // pass a set of gatherers are run against the API to cache collections of resources. // In the second phase the gathered resources are combined and emitted as metrics. // // No aggregation is performed by the input plugin, instead queries to InfluxDB should // be used to gather global totals of things such as tag frequency. package openstack import ( "context" "fmt" "regexp" "sort" "strconv" "strings" "time" "github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud/openstack" "github.com/gophercloud/gophercloud/openstack/blockstorage/extensions/schedulerstats" "github.com/gophercloud/gophercloud/openstack/blockstorage/extensions/volumetenants" "github.com/gophercloud/gophercloud/openstack/blockstorage/v2/volumes" "github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/aggregates" "github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/diagnostics" "github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/hypervisors" nova_services "github.com/gophercloud/gophercloud/openstack/compute/v2/extensions/services" "github.com/gophercloud/gophercloud/openstack/compute/v2/flavors" "github.com/gophercloud/gophercloud/openstack/compute/v2/servers" "github.com/gophercloud/gophercloud/openstack/identity/v3/projects" "github.com/gophercloud/gophercloud/openstack/identity/v3/services" "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/agents" "github.com/gophercloud/gophercloud/openstack/networking/v2/networks" "github.com/gophercloud/gophercloud/openstack/networking/v2/ports" "github.com/gophercloud/gophercloud/openstack/networking/v2/subnets" "github.com/gophercloud/gophercloud/openstack/orchestration/v1/stacks" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal/choice" httpconfig "github.com/influxdata/telegraf/plugins/common/http" "github.com/influxdata/telegraf/plugins/inputs" ) var ( typePort = regexp.MustCompile(`_rx$|_rx_drop$|_rx_errors$|_rx_packets$|_tx$|_tx_drop$|_tx_errors$|_tx_packets$`) typeCPU = regexp.MustCompile(`cpu[0-9]{1,2}_time$`) typeStorage = regexp.MustCompile(`_errors$|_read$|_read_req$|_write$|_write_req$`) ) // volume is a structure used to unmarshal raw JSON from the API into. type volume struct { volumes.Volume volumetenants.VolumeTenantExt } // OpenStack is the main structure associated with a collection instance. type OpenStack struct { // Configuration variables IdentityEndpoint string `toml:"authentication_endpoint"` Domain string `toml:"domain"` Project string `toml:"project"` Username string `toml:"username"` Password string `toml:"password"` EnabledServices []string `toml:"enabled_services"` ServerDiagnotics bool `toml:"server_diagnotics"` OutputSecrets bool `toml:"output_secrets"` TagPrefix string `toml:"tag_prefix"` TagValue string `toml:"tag_value"` HumanReadableTS bool `toml:"human_readable_timestamps"` MeasureRequest bool `toml:"measure_openstack_requests"` Log telegraf.Logger `toml:"-"` httpconfig.HTTPClientConfig // Locally cached clients identity *gophercloud.ServiceClient compute *gophercloud.ServiceClient volume *gophercloud.ServiceClient network *gophercloud.ServiceClient stack *gophercloud.ServiceClient // Locally cached resources openstackFlavors map[string]flavors.Flavor openstackHypervisors []hypervisors.Hypervisor diag map[string]interface{} openstackProjects map[string]projects.Project openstackServices map[string]services.Service } // containsService indicates whether a particular service is enabled func (o *OpenStack) containsService(t string) bool { for _, service := range o.openstackServices { if service.Type == t { return true } } return false } // convertTimeFormat, to convert time format based on HumanReadableTS func (o *OpenStack) convertTimeFormat(t time.Time) interface{} { if o.HumanReadableTS { return t.Format("2006-01-02T15:04:05.999999999Z07:00") } return t.UnixNano() } // Description returns a description string of the input plugin and implements // the Input interface. func (o *OpenStack) Description() string { return "Collects performance metrics from OpenStack services" } // sampleConfig is a sample configuration file entry. var sampleConfig = ` ## The recommended interval to poll is '30m' ## The identity endpoint to authenticate against and get the service catalog from. authentication_endpoint = "https://my.openstack.cloud:5000" ## The domain to authenticate against when using a V3 identity endpoint. # domain = "default" ## The project to authenticate as. # project = "admin" ## User authentication credentials. Must have admin rights. username = "admin" password = "password" ## Available services are: ## "agents", "aggregates", "flavors", "hypervisors", "networks", "nova_services", ## "ports", "projects", "servers", "services", "stacks", "storage_pools", "subnets", "volumes" # enabled_services = ["services", "projects", "hypervisors", "flavors", "networks", "volumes"] ## Collect Server Diagnostics # server_diagnotics = false ## output secrets (such as adminPass(for server) and UserID(for volume)). # output_secrets = false ## Amount of time allowed to complete the HTTP(s) request. # timeout = "5s" ## HTTP Proxy support # http_proxy_url = "" ## Optional TLS Config # tls_ca = /path/to/cafile # tls_cert = /path/to/certfile # tls_key = /path/to/keyfile ## Use TLS but skip chain & host verification # insecure_skip_verify = false ## Options for tags received from Openstack # tag_prefix = "openstack_tag_" # tag_value = "true" ## Timestamp format for timestamp data recieved from Openstack. ## If false format is unix nanoseconds. # human_readable_timestamps = false ## Measure Openstack call duration # measure_openstack_requests = false ` // SampleConfig return a sample configuration file for auto-generation and // implements the Input interface. func (o *OpenStack) SampleConfig() string { return sampleConfig } // initialize performs any necessary initialization functions func (o *OpenStack) Init() error { if len(o.EnabledServices) == 0 { o.EnabledServices = []string{"services", "projects", "hypervisors", "flavors", "networks", "volumes"} } if o.Username == "" || o.Password == "" { return fmt.Errorf("username or password can not be empty string") } if o.TagValue == "" { return fmt.Errorf("tag_value option can not be empty string") } sort.Strings(o.EnabledServices) o.openstackFlavors = map[string]flavors.Flavor{} o.openstackHypervisors = []hypervisors.Hypervisor{} o.diag = map[string]interface{}{} o.openstackProjects = map[string]projects.Project{} o.openstackServices = map[string]services.Service{} // Authenticate against Keystone and get a token provider authOption := gophercloud.AuthOptions{ IdentityEndpoint: o.IdentityEndpoint, DomainName: o.Domain, TenantName: o.Project, Username: o.Username, Password: o.Password, } provider, err := openstack.NewClient(authOption.IdentityEndpoint) if err != nil { return fmt.Errorf("unable to create client for OpenStack endpoint %v", err) } ctx := context.Background() client, err := o.HTTPClientConfig.CreateClient(ctx, o.Log) if err != nil { return err } provider.HTTPClient = *client if err := openstack.Authenticate(provider, authOption); err != nil { return fmt.Errorf("unable to authenticate OpenStack user %v", err) } // Create required clients and attach to the OpenStack struct if o.identity, err = openstack.NewIdentityV3(provider, gophercloud.EndpointOpts{}); err != nil { return fmt.Errorf("unable to create V3 identity client %v", err) } if err := o.gatherServices(); err != nil { return fmt.Errorf("failed to get resource openstack services %v", err) } if o.compute, err = openstack.NewComputeV2(provider, gophercloud.EndpointOpts{}); err != nil { return fmt.Errorf("unable to create V2 compute client %v", err) } // Create required clients and attach to the OpenStack struct if o.network, err = openstack.NewNetworkV2(provider, gophercloud.EndpointOpts{}); err != nil { return fmt.Errorf("unable to create V2 network client %v", err) } // The Orchestration service is optional if o.containsService("orchestration") { if o.stack, err = openstack.NewOrchestrationV1(provider, gophercloud.EndpointOpts{}); err != nil { return fmt.Errorf("unable to create V1 stack client %v", err) } } // The Cinder volume storage service is optional if o.containsService("volumev2") { if o.volume, err = openstack.NewBlockStorageV2(provider, gophercloud.EndpointOpts{}); err != nil { return fmt.Errorf("unable to create V2 volume client %v", err) } } return nil } // Gather gathers resources from the OpenStack API and accumulates metrics. This // implements the Input interface. func (o *OpenStack) Gather(acc telegraf.Accumulator) error { // Gather resources. Note service harvesting must come first as the other // gatherers are dependant on this information. gatherers := map[string]func(telegraf.Accumulator) error{ "projects": o.gatherProjects, "hypervisors": o.gatherHypervisors, "flavors": o.gatherFlavors, "servers": o.gatherServers, "volumes": o.gatherVolumes, "storage_pools": o.gatherStoragePools, "subnets": o.gatherSubnets, "ports": o.gatherPorts, "networks": o.gatherNetworks, "aggregates": o.gatherAggregates, "nova_services": o.gatherNovaServices, "agents": o.gatherAgents, "stacks": o.gatherStacks, } callDuration := map[string]interface{}{} for _, service := range o.EnabledServices { // As Services are already gathered in Init(), using this to accumulate them. if service == "services" { o.accumulateServices(acc) continue } start := time.Now() gatherer := gatherers[service] if err := gatherer(acc); err != nil { acc.AddError(fmt.Errorf("failed to get resource %q %v", service, err)) } callDuration[service] = time.Since(start).Nanoseconds() } if o.MeasureRequest { for service, duration := range callDuration { acc.AddFields("openstack_request_duration", map[string]interface{}{service: duration}, map[string]string{}) } } if o.ServerDiagnotics { if !choice.Contains("servers", o.EnabledServices) { if err := o.gatherServers(acc); err != nil { acc.AddError(fmt.Errorf("failed to get resource server diagnostics %v", err)) return nil } } o.accumulateServerDiagnostics(acc) } return nil } // gatherServices collects services from the OpenStack API. func (o *OpenStack) gatherServices() error { page, err := services.List(o.identity, &services.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list services %v", err) } extractedServices, err := services.ExtractServices(page) if err != nil { return fmt.Errorf("unable to extract services %v", err) } for _, service := range extractedServices { o.openstackServices[service.ID] = service } return nil } // gatherStacks collects and accumulates stacks data from the OpenStack API. func (o *OpenStack) gatherStacks(acc telegraf.Accumulator) error { page, err := stacks.List(o.stack, &stacks.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list stacks %v", err) } extractedStacks, err := stacks.ExtractStacks(page) if err != nil { return fmt.Errorf("unable to extract stacks %v", err) } for _, stack := range extractedStacks { tags := map[string]string{ "description": stack.Description, "name": stack.Name, } for _, stackTag := range stack.Tags { tags[o.TagPrefix+stackTag] = o.TagValue } fields := map[string]interface{}{ "status": strings.ToLower(stack.Status), "id": stack.ID, "status_reason": stack.StatusReason, "creation_time": o.convertTimeFormat(stack.CreationTime), "updated_time": o.convertTimeFormat(stack.UpdatedTime), } acc.AddFields("openstack_stack", fields, tags) } return nil } // gatherNovaServices collects and accumulates nova_services data from the OpenStack API. func (o *OpenStack) gatherNovaServices(acc telegraf.Accumulator) error { page, err := nova_services.List(o.compute, &nova_services.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list nova_services %v", err) } novaServices, err := nova_services.ExtractServices(page) if err != nil { return fmt.Errorf("unable to extract nova_services %v", err) } for _, novaService := range novaServices { tags := map[string]string{ "name": novaService.Binary, "host_machine": novaService.Host, "state": novaService.State, "status": strings.ToLower(novaService.Status), "zone": novaService.Zone, } fields := map[string]interface{}{ "id": novaService.ID, "disabled_reason": novaService.DisabledReason, "forced_down": novaService.ForcedDown, "updated_at": o.convertTimeFormat(novaService.UpdatedAt), } acc.AddFields("openstack_nova_service", fields, tags) } return nil } // gatherSubnets collects and accumulates subnets data from the OpenStack API. func (o *OpenStack) gatherSubnets(acc telegraf.Accumulator) error { page, err := subnets.List(o.network, &subnets.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list subnets %v", err) } extractedSubnets, err := subnets.ExtractSubnets(page) if err != nil { return fmt.Errorf("unable to extract subnets %v", err) } for _, subnet := range extractedSubnets { var allocationPools []string for _, pool := range subnet.AllocationPools { allocationPools = append(allocationPools, pool.Start+"-"+pool.End) } tags := map[string]string{ "network_id": subnet.NetworkID, "name": subnet.Name, "description": subnet.Description, "ip_version": strconv.Itoa(subnet.IPVersion), "cidr": subnet.CIDR, "gateway_ip": subnet.GatewayIP, "tenant_id": subnet.TenantID, "project_id": subnet.ProjectID, "ipv6_address_mode": subnet.IPv6AddressMode, "ipv6_ra_mode": subnet.IPv6RAMode, "subnet_pool_id": subnet.SubnetPoolID, } for _, subnetTag := range subnet.Tags { tags[o.TagPrefix+subnetTag] = o.TagValue } fields := map[string]interface{}{ "id": subnet.ID, "dhcp_enabled": subnet.EnableDHCP, "dns_nameservers": strings.Join(subnet.DNSNameservers[:], ","), "allocation_pools": strings.Join(allocationPools[:], ","), } acc.AddFields("openstack_subnet", fields, tags) } return nil } // gatherPorts collects and accumulates ports data from the OpenStack API. func (o *OpenStack) gatherPorts(acc telegraf.Accumulator) error { page, err := ports.List(o.network, &ports.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list ports %v", err) } extractedPorts, err := ports.ExtractPorts(page) if err != nil { return fmt.Errorf("unable to extract ports %v", err) } for _, port := range extractedPorts { tags := map[string]string{ "network_id": port.NetworkID, "name": port.Name, "description": port.Description, "status": strings.ToLower(port.Status), "tenant_id": port.TenantID, "project_id": port.ProjectID, "device_owner": port.DeviceOwner, "device_id": port.DeviceID, } for _, portTag := range port.Tags { tags[o.TagPrefix+portTag] = o.TagValue } fields := map[string]interface{}{ "id": port.ID, "mac_address": port.MACAddress, "admin_state_up": port.AdminStateUp, "fixed_ips": len(port.FixedIPs), "allowed_address_pairs": len(port.AllowedAddressPairs), "security_groups": strings.Join(port.SecurityGroups[:], ","), } if len(port.FixedIPs) > 0 { for _, ip := range port.FixedIPs { fields["subnet_id"] = ip.SubnetID fields["ip_address"] = ip.IPAddress acc.AddFields("openstack_port", fields, tags) } } else { acc.AddFields("openstack_port", fields, tags) } } return nil } // gatherNetworks collects and accumulates networks data from the OpenStack API. func (o *OpenStack) gatherNetworks(acc telegraf.Accumulator) error { page, err := networks.List(o.network, &networks.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list networks %v", err) } extractedNetworks, err := networks.ExtractNetworks(page) if err != nil { return fmt.Errorf("unable to extract networks %v", err) } for _, network := range extractedNetworks { tags := map[string]string{ "name": network.Name, "description": network.Description, "status": strings.ToLower(network.Status), "tenant_id": network.TenantID, "project_id": network.ProjectID, } for _, networkTag := range network.Tags { tags[o.TagPrefix+networkTag] = o.TagValue } fields := map[string]interface{}{ "id": network.ID, "admin_state_up": network.AdminStateUp, "subnets": len(network.Subnets), "shared": network.Shared, "availability_zone_hints": strings.Join(network.AvailabilityZoneHints[:], ","), "updated_at": o.convertTimeFormat(network.UpdatedAt), "created_at": o.convertTimeFormat(network.CreatedAt), } if len(network.Subnets) > 0 { for _, subnet := range network.Subnets { fields["subnet_id"] = subnet acc.AddFields("openstack_network", fields, tags) } } else { acc.AddFields("openstack_network", fields, tags) } } return nil } // gatherAgents collects and accumulates agents data from the OpenStack API. func (o *OpenStack) gatherAgents(acc telegraf.Accumulator) error { page, err := agents.List(o.network, &agents.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list neutron agents %v", err) } extractedAgents, err := agents.ExtractAgents(page) if err != nil { return fmt.Errorf("unable to extract neutron agents %v", err) } for _, agent := range extractedAgents { tags := map[string]string{ "agent_type": agent.AgentType, "availability_zone": agent.AvailabilityZone, "binary": agent.Binary, "description": agent.Description, "agent_host": agent.Host, "topic": agent.Topic, } fields := map[string]interface{}{ "id": agent.ID, "admin_state_up": agent.AdminStateUp, "alive": agent.Alive, "resources_synced": agent.ResourcesSynced, "created_at": o.convertTimeFormat(agent.CreatedAt), "started_at": o.convertTimeFormat(agent.StartedAt), "heartbeat_timestamp": o.convertTimeFormat(agent.HeartbeatTimestamp), } acc.AddFields("openstack_neutron_agent", fields, tags) } return nil } // gatherAggregates collects and accumulates aggregates data from the OpenStack API. func (o *OpenStack) gatherAggregates(acc telegraf.Accumulator) error { page, err := aggregates.List(o.compute).AllPages() if err != nil { return fmt.Errorf("unable to list aggregates %v", err) } extractedAggregates, err := aggregates.ExtractAggregates(page) if err != nil { return fmt.Errorf("unable to extract aggregates %v", err) } for _, aggregate := range extractedAggregates { tags := map[string]string{ "availability_zone": aggregate.AvailabilityZone, "name": aggregate.Name, } fields := map[string]interface{}{ "id": aggregate.ID, "aggregate_hosts": len(aggregate.Hosts), "deleted": aggregate.Deleted, "created_at": o.convertTimeFormat(aggregate.CreatedAt), "updated_at": o.convertTimeFormat(aggregate.UpdatedAt), "deleted_at": o.convertTimeFormat(aggregate.DeletedAt), } if len(aggregate.Hosts) > 0 { for _, host := range aggregate.Hosts { fields["aggregate_host"] = host acc.AddFields("openstack_aggregate", fields, tags) } } else { acc.AddFields("openstack_aggregate", fields, tags) } } return nil } // gatherProjects collects and accumulates projects data from the OpenStack API. func (o *OpenStack) gatherProjects(acc telegraf.Accumulator) error { page, err := projects.List(o.identity, &projects.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list projects %v", err) } extractedProjects, err := projects.ExtractProjects(page) if err != nil { return fmt.Errorf("unable to extract projects %v", err) } for _, project := range extractedProjects { o.openstackProjects[project.ID] = project tags := map[string]string{ "description": project.Description, "domain_id": project.DomainID, "name": project.Name, "parent_id": project.ParentID, } for _, projectTag := range project.Tags { tags[o.TagPrefix+projectTag] = o.TagValue } fields := map[string]interface{}{ "id": project.ID, "is_domain": project.IsDomain, "enabled": project.Enabled, "projects": len(extractedProjects), } acc.AddFields("openstack_identity", fields, tags) } return nil } // gatherHypervisors collects and accumulates hypervisors data from the OpenStack API. func (o *OpenStack) gatherHypervisors(acc telegraf.Accumulator) error { page, err := hypervisors.List(o.compute, hypervisors.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list hypervisors %v", err) } extractedHypervisors, err := hypervisors.ExtractHypervisors(page) if err != nil { return fmt.Errorf("unable to extract hypervisors %v", err) } o.openstackHypervisors = extractedHypervisors if choice.Contains("hypervisors", o.EnabledServices) { for _, hypervisor := range extractedHypervisors { tags := map[string]string{ "cpu_vendor": hypervisor.CPUInfo.Vendor, "cpu_arch": hypervisor.CPUInfo.Arch, "cpu_model": hypervisor.CPUInfo.Model, "status": strings.ToLower(hypervisor.Status), "state": hypervisor.State, "hypervisor_hostname": hypervisor.HypervisorHostname, "hypervisor_type": hypervisor.HypervisorType, "hypervisor_version": strconv.Itoa(hypervisor.HypervisorVersion), "service_host": hypervisor.Service.Host, "service_id": hypervisor.Service.ID, "service_disabled_reason": hypervisor.Service.DisabledReason, } for _, cpuFeature := range hypervisor.CPUInfo.Features { tags["cpu_feature_"+cpuFeature] = "true" } fields := map[string]interface{}{ "id": hypervisor.ID, "host_ip": hypervisor.HostIP, "cpu_topology_sockets": hypervisor.CPUInfo.Topology.Sockets, "cpu_topology_cores": hypervisor.CPUInfo.Topology.Cores, "cpu_topology_threads": hypervisor.CPUInfo.Topology.Threads, "current_workload": hypervisor.CurrentWorkload, "disk_available_least": hypervisor.DiskAvailableLeast, "free_disk_gb": hypervisor.FreeDiskGB, "free_ram_mb": hypervisor.FreeRamMB, "local_gb": hypervisor.LocalGB, "local_gb_used": hypervisor.LocalGBUsed, "memory_mb": hypervisor.MemoryMB, "memory_mb_used": hypervisor.MemoryMBUsed, "running_vms": hypervisor.RunningVMs, "vcpus": hypervisor.VCPUs, "vcpus_used": hypervisor.VCPUsUsed, } acc.AddFields("openstack_hypervisor", fields, tags) } } return nil } // gatherFlavors collects and accumulates flavors data from the OpenStack API. func (o *OpenStack) gatherFlavors(acc telegraf.Accumulator) error { page, err := flavors.ListDetail(o.compute, &flavors.ListOpts{}).AllPages() if err != nil { return fmt.Errorf("unable to list flavors %v", err) } extractedflavors, err := flavors.ExtractFlavors(page) if err != nil { return fmt.Errorf("unable to extract flavors %v", err) } for _, flavor := range extractedflavors { o.openstackFlavors[flavor.ID] = flavor tags := map[string]string{ "name": flavor.Name, "is_public": strconv.FormatBool(flavor.IsPublic), } fields := map[string]interface{}{ "id": flavor.ID, "disk": flavor.Disk, "ram": flavor.RAM, "rxtx_factor": flavor.RxTxFactor, "swap": flavor.Swap, "vcpus": flavor.VCPUs, "ephemeral": flavor.Ephemeral, } acc.AddFields("openstack_flavor", fields, tags) } return nil } // gatherVolumes collects and accumulates volumes data from the OpenStack API. func (o *OpenStack) gatherVolumes(acc telegraf.Accumulator) error { page, err := volumes.List(o.volume, &volumes.ListOpts{AllTenants: true}).AllPages() if err != nil { return fmt.Errorf("unable to list volumes %v", err) } v := []volume{} if err := volumes.ExtractVolumesInto(page, &v); err != nil { return fmt.Errorf("unable to extract volumes %v", err) } for _, volume := range v { tags := map[string]string{ "status": strings.ToLower(volume.Status), "availability_zone": volume.AvailabilityZone, "name": volume.Name, "description": volume.Description, "volume_type": volume.VolumeType, "snapshot_id": volume.SnapshotID, "source_volid": volume.SourceVolID, "bootable": volume.Bootable, "replication_status": strings.ToLower(volume.ReplicationStatus), "consistency_group_id": volume.ConsistencyGroupID, } fields := map[string]interface{}{ "id": volume.ID, "size": volume.Size, "total_attachments": len(volume.Attachments), "encrypted": volume.Encrypted, "multiattach": volume.Multiattach, "created_at": o.convertTimeFormat(volume.CreatedAt), "updated_at": o.convertTimeFormat(volume.UpdatedAt), } if o.OutputSecrets { tags["user_id"] = volume.UserID } if len(volume.Attachments) > 0 { for _, attachment := range volume.Attachments { if !o.HumanReadableTS { fields["attachment_attached_at"] = attachment.AttachedAt.UnixNano() } else { fields["attachment_attached_at"] = attachment.AttachedAt.Format("2006-01-02T15:04:05.999999999Z07:00") } tags["attachment_attachment_id"] = attachment.AttachmentID tags["attachment_device"] = attachment.Device tags["attachment_host_name"] = attachment.HostName fields["attachment_server_id"] = attachment.ServerID acc.AddFields("openstack_volume", fields, tags) } } else { acc.AddFields("openstack_volume", fields, tags) } } return nil } // gatherStoragePools collects and accumulates storage pools data from the OpenStack API. func (o *OpenStack) gatherStoragePools(acc telegraf.Accumulator) error { results, err := schedulerstats.List(o.volume, &schedulerstats.ListOpts{Detail: true}).AllPages() if err != nil { return fmt.Errorf("unable to list storage pools %v", err) } storagePools, err := schedulerstats.ExtractStoragePools(results) if err != nil { return fmt.Errorf("unable to extract storage pools %v", err) } for _, storagePool := range storagePools { tags := map[string]string{ "name": storagePool.Capabilities.VolumeBackendName, "driver_version": storagePool.Capabilities.DriverVersion, "storage_protocol": storagePool.Capabilities.StorageProtocol, "vendor_name": storagePool.Capabilities.VendorName, "volume_backend_name": storagePool.Capabilities.VolumeBackendName, } fields := map[string]interface{}{ "total_capacity_gb": storagePool.Capabilities.TotalCapacityGB, "free_capacity_gb": storagePool.Capabilities.FreeCapacityGB, } acc.AddFields("openstack_storage_pool", fields, tags) } return nil } // gatherServers collects servers from the OpenStack API. func (o *OpenStack) gatherServers(acc telegraf.Accumulator) error { if !choice.Contains("hypervisors", o.EnabledServices) { if err := o.gatherHypervisors(acc); err != nil { acc.AddError(fmt.Errorf("failed to get resource hypervisors %v", err)) } } serverGather := choice.Contains("servers", o.EnabledServices) for _, hypervisor := range o.openstackHypervisors { page, err := servers.List(o.compute, &servers.ListOpts{AllTenants: true, Host: hypervisor.HypervisorHostname}).AllPages() if err != nil { return fmt.Errorf("unable to list servers %v", err) } extractedServers, err := servers.ExtractServers(page) if err != nil { return fmt.Errorf("unable to extract servers %v", err) } for _, server := range extractedServers { if serverGather { o.accumulateServer(acc, server, hypervisor.HypervisorHostname) } if !o.ServerDiagnotics || server.Status != "ACTIVE" { continue } diagnostic, err := diagnostics.Get(o.compute, server.ID).Extract() if err != nil { acc.AddError(fmt.Errorf("unable to get diagnostics for server(%v) %v", server.ID, err)) continue } o.diag[server.ID] = diagnostic } } return nil } // accumulateServices accumulates statistics of services. func (o *OpenStack) accumulateServices(acc telegraf.Accumulator) { for _, service := range o.openstackServices { tags := map[string]string{ "name": service.Type, } fields := map[string]interface{}{ "service_id": service.ID, "service_enabled": service.Enabled, } acc.AddFields("openstack_service", fields, tags) } } // accumulateServer accumulates statistics of a server. func (o *OpenStack) accumulateServer(acc telegraf.Accumulator, server servers.Server, hostName string) { tags := map[string]string{} // Extract the flavor details to avoid joins (ignore errors and leave as zero values) var vcpus, ram, disk int if flavorIDInterface, ok := server.Flavor["id"]; ok { if flavorID, ok := flavorIDInterface.(string); ok { tags["flavor"] = flavorID if flavor, ok := o.openstackFlavors[flavorID]; ok { vcpus = flavor.VCPUs ram = flavor.RAM disk = flavor.Disk } } } if imageIDInterface, ok := server.Image["id"]; ok { if imageID, ok := imageIDInterface.(string); ok { tags["image"] = imageID } } // Try derive the associated project project := "unknown" if p, ok := o.openstackProjects[server.TenantID]; ok { project = p.Name } tags["tenant_id"] = server.TenantID tags["name"] = server.Name tags["host_id"] = server.HostID tags["status"] = strings.ToLower(server.Status) tags["key_name"] = server.KeyName tags["host_name"] = hostName tags["project"] = project fields := map[string]interface{}{ "id": server.ID, "progress": server.Progress, "accessIPv4": server.AccessIPv4, "accessIPv6": server.AccessIPv6, "addresses": len(server.Addresses), "security_groups": len(server.SecurityGroups), "volumes_attached": len(server.AttachedVolumes), "fault_code": server.Fault.Code, "fault_details": server.Fault.Details, "fault_message": server.Fault.Message, "vcpus": vcpus, "ram_mb": ram, "disk_gb": disk, "fault_created": o.convertTimeFormat(server.Fault.Created), "updated": o.convertTimeFormat(server.Updated), "created": o.convertTimeFormat(server.Created), } if o.OutputSecrets { tags["user_id"] = server.UserID fields["adminPass"] = server.AdminPass } if len(server.AttachedVolumes) == 0 { acc.AddFields("openstack_server", fields, tags) } else { for _, AttachedVolume := range server.AttachedVolumes { fields["volume_id"] = AttachedVolume.ID acc.AddFields("openstack_server", fields, tags) } } } // accumulateServerDiagnostics accumulates statistics from the compute(nova) service. // currently only supports 'libvirt' driver. func (o *OpenStack) accumulateServerDiagnostics(acc telegraf.Accumulator) { for serverID, diagnostic := range o.diag { s, ok := diagnostic.(map[string]interface{}) if !ok { o.Log.Warnf("unknown type for diagnostics %T", diagnostic) continue } tags := map[string]string{ "server_id": serverID, } fields := map[string]interface{}{} portName := make(map[string]bool) storageName := make(map[string]bool) memoryStats := make(map[string]interface{}) for k, v := range s { if typePort.MatchString(k) { portName[strings.Split(k, "_")[0]] = true } else if typeCPU.MatchString(k) { fields[k] = v } else if typeStorage.MatchString(k) { storageName[strings.Split(k, "_")[0]] = true } else { memoryStats[k] = v } } fields["memory"] = memoryStats["memory"] fields["memory-actual"] = memoryStats["memory-actual"] fields["memory-rss"] = memoryStats["memory-rss"] fields["memory-swap_in"] = memoryStats["memory-swap_in"] tags["no_of_ports"] = strconv.Itoa(len(portName)) tags["no_of_disks"] = strconv.Itoa(len(storageName)) for key := range storageName { fields["disk_errors"] = s[key+"_errors"] fields["disk_read"] = s[key+"_read"] fields["disk_read_req"] = s[key+"_read_req"] fields["disk_write"] = s[key+"_write"] fields["disk_write_req"] = s[key+"_write_req"] tags["disk_name"] = key acc.AddFields("openstack_server_diagnostics", fields, tags) } for key := range portName { fields["port_rx"] = s[key+"_rx"] fields["port_rx_drop"] = s[key+"_rx_drop"] fields["port_rx_errors"] = s[key+"_rx_errors"] fields["port_rx_packets"] = s[key+"_rx_packets"] fields["port_tx"] = s[key+"_tx"] fields["port_tx_drop"] = s[key+"_tx_drop"] fields["port_tx_errors"] = s[key+"_tx_errors"] fields["port_tx_packets"] = s[key+"_tx_packets"] tags["port_name"] = key acc.AddFields("openstack_server_diagnostics", fields, tags) } } } // init registers a callback which creates a new OpenStack input instance. func init() { inputs.Add("openstack", func() telegraf.Input { return &OpenStack{ Domain: "default", Project: "admin", TagPrefix: "openstack_tag_", TagValue: "true", } }) }