This repository has been archived on 2023-03-31. You can view files and clone it, but cannot push or open issues or pull requests.
Python_Speedtest_Grafana/main.py

45 lines
1.2 KiB
Python

# Sean Corrigan 2020
import speedtest
import json
from influxdb import InfluxDBClient
serverip = "192.168.0.13"
serverport = 8086
serverdatabase = "telegraf"
def speed(): # To be called on schedule
servers = [] # If you want to test against a specific server
threads = 4
# Choose the amount of threads to use for the test
test = speedtest.Speedtest()
test.get_servers(servers)
test.get_best_server()
test.download(threads=threads)
test.upload(threads=threads)
test.results.share()
results = test.results.dict()
resultlist = []
resultupload = {}
resultupload["UploadSpeed"] = results["upload"]
resultdownload = {}
resultdownload["DownloadSpeed"] = results["download"]
resultlatency = {}
resultlatency["Latency"] = results["ping"]
resultlist.append(resultupload)
resultlist.append(resultdownload)
resultlist.append(resultlatency)
resultforupload = json.dumps(resultlist)
print(resultforupload)
return(resultforupload)
def influxupload(): # Still having errors in uploading the data..
jsondata = speed()
client = InfluxDBClient(host=serverip, port=8086, database=serverdatabase)
client.write(str(jsondata), protocol='json')
influxupload()