# Sean Corrigan 2020 import speedtest import json from influxdb import InfluxDBClient serverip = "192.168.0.13" serverport = 8086 serverdatabase = "telegraf" servernickname = 'PythonSpeedtest' def speed(): # To be called on schedule servers = [] # If you want to test against a specific server threads = 4 # Choose the amount of threads to use for the test test = speedtest.Speedtest() test.get_servers(servers) test.get_best_server() test.download(threads=threads) test.upload(threads=threads) test.results.share() results = test.results.dict() resultlist = [] result = {} result["UploadSpeed"] = results["upload"] result["DownloadSpeed"] = results["download"] result["Ping"] = results["ping"] print(result) return(result) def influxupload(): # Still having errors in uploading the data.. pass # client = InfluxDBClient(host=serverip, port=8086, database=serverdatabase) # client.write_points(jsondata, tags=None ) def main(host='192.168.0.13', port=8086): query = 'select Float_value from cpu_load_short;' query_where = 'select Int_value from cpu_load_short where host=$host;' bind_params = {'host': servernickname} testdata = speed() json_body = [ { "measurement": "PythonSpeedTest", "tags": { "host": servernickname, }, "fields": { "Upload": testdata["UploadSpeed"], "Download": testdata["DownloadSpeed"], "Ping": testdata["Ping"] } } ] #"time": "2009-11-10T23:00:00Z", client = InfluxDBClient(host, port, database = serverdatabase) client.write_points(json_body) # influxupload() main()