# Sean Corrigan 2020 # Script sends data to influx in bits/sec import speedtest import os import json from influxdb import InfluxDBClient from time import time, sleep serverip = os.environ['INFLUX_IP'] serverport = os.environ['INFLUX_PORT', 8086] serverdatabase = os.environ['INFLUX_DB', 'telegraf'] servernickname = os.environ['NICKNAME', 'SERV01'] # Name to put under in influx server = os.environ['SPEEDTEST_SERVER','19249'] def speed(): # Actual Speedtest using speedtest-cli servers = [servertag] # If you want to test against a specific server eg. ['13030'] or [] for closest server threads = 16 # Choose the amount of threads to use for the test test = speedtest.Speedtest() test.get_servers(servers) test.get_best_server() test.download(threads=threads) test.upload(threads=threads) test.results.share() results = test.results.dict() result = {} result["UploadSpeed"] = results["upload"] result["DownloadSpeed"] = results["download"] result["Ping"] = results["ping"] print(results["share"]) # Show share link return(result) def uploadInfluxdata(host, port): # Main upload section query = 'select Float_value from cpu_load_short;' query_where = 'select Int_value from cpu_load_short where host=$host;' bind_params = {'host': servernickname} testdata = speed() json_body = [ { "measurement": "speedtest-influx-docker", "tags": { "host": servernickname, }, "fields": { "Upload": testdata["UploadSpeed"], "Download": testdata["DownloadSpeed"], "Ping": testdata["Ping"] } } ] client = InfluxDBClient(host, port, database = serverdatabase) # Init connection to Influx Server client.write_points(json_body) # Write Speedtest results while True: print("Running Test") uploadInfluxdata(serverip, serverport) sleep(600)