# Sean Corrigan 2020 # Script sends data to influx in bits/sec import speedtest import json from influxdb import InfluxDBClient serverip = "192.168.0.13" serverport = 8086 serverdatabase = "telegraf" servernickname = 'Yoo01pn.ddns.net' def speed(): # Actual Speedtest using speedtest-cli servers = [] # If you want to test against a specific server eg. threads = 4 # Choose the amount of threads to use for the test test = speedtest.Speedtest() test.get_servers(servers) test.get_best_server() test.download(threads=threads) test.upload(threads=threads) test.results.share() results = test.results.dict() result = {} result["UploadSpeed"] = results["upload"] result["DownloadSpeed"] = results["download"] result["Ping"] = results["ping"] print(results["share"]) # Show share link return(result) def uploadInfluxdata(host='192.168.0.13', port=8086): # Main upload section query = 'select Float_value from cpu_load_short;' query_where = 'select Int_value from cpu_load_short where host=$host;' bind_params = {'host': servernickname} testdata = speed() json_body = [ { "measurement": "PythonSpeedTest", "tags": { "host": servernickname, }, "fields": { "Upload": testdata["UploadSpeed"], "Download": testdata["DownloadSpeed"], "Ping": testdata["Ping"] } } ] client = InfluxDBClient(host, port, database = serverdatabase) # Init connection to Influx Server client.write_points(json_body) # Write Speedtest results uploadInfluxdata()