# Sean Corrigan 2020 # Script sends data to influx in bits/sec import speedtest import os import json from influxdb import InfluxDBClient from time import time, sleep serverip = os.getenv('INFLUX_IP') serverport = os.getenv('INFLUX_PORT', '8086') serverdatabase = os.getenv('INFLUX_DB', 'telegraf') servernickname = os.getenv('NICKNAME', 'SERV01') # Name to put under in influx stserver = os.getenv('SPEEDTEST_SERVER','19249') def speed(): # Actual Speedtest using speedtest-cli servers = [stserver] # If you want to test against a specific server eg. ['13030'] or [] for closest server threads = 16 # Choose the amount of threads to use for the test test = speedtest.Speedtest() test.get_servers(servers) test.get_best_server() test.download(threads=threads) test.upload(threads=threads) test.results.share() results = test.results.dict() result = {} result["UploadSpeed"] = results["upload"] result["DownloadSpeed"] = results["download"] result["Ping"] = results["ping"] print(results["share"]) # Show share link return(result) def uploadInfluxdata(host, port): # Main upload section query = 'select Float_value from cpu_load_short;' query_where = 'select Int_value from cpu_load_short where host=$host;' bind_params = {'host': servernickname} testdata = speed() json_body = [ { "measurement": "speedtest-influx-docker", "tags": { "host": servernickname, }, "fields": { "Upload": testdata["UploadSpeed"], "Download": testdata["DownloadSpeed"], "Ping": testdata["Ping"] } } ] client = InfluxDBClient(host, port, database = serverdatabase) # Init connection to Influx Server client.write_points(json_body) # Write Speedtest results # Check ENV variables # print(serverip) # print(serverport) # print(serverdatabase) # print(servernickname) # print(stserver) while True: print("Running Test") # Show in console uploadInfluxdata(serverip, serverport) sleep(600)