This repository has been archived on 2023-03-31. You can view files and clone it, but cannot push or open issues or pull requests.
Python_Speedtest_Grafana/main.py

56 lines
1.6 KiB
Python

# Sean Corrigan 2020
import speedtest
import json
from influxdb import InfluxDBClient
serverip = "192.168.0.13"
serverport = 8086
serverdatabase = "telegraf"
servernickname = 'PythonSpeedtest'
def speed(): # Actual Speedtest using speedtest-cli
servers = [] # If you want to test against a specific server
threads = 4
# Choose the amount of threads to use for the test
test = speedtest.Speedtest()
test.get_servers(servers)
test.get_best_server()
test.download(threads=threads)
test.upload(threads=threads)
test.results.share()
results = test.results.dict()
result = {}
result["UploadSpeed"] = results["upload"]
result["DownloadSpeed"] = results["download"]
result["Ping"] = results["ping"]
print(results["share"]) # Show share link
return(result)
def uploadInfluxdata(host='192.168.0.13', port=8086): # Main upload section
query = 'select Float_value from cpu_load_short;'
query_where = 'select Int_value from cpu_load_short where host=$host;'
bind_params = {'host': servernickname}
testdata = speed()
json_body = [
{
"measurement": "PythonSpeedTest",
"tags": {
"host": servernickname,
},
"fields": {
"Upload": testdata["UploadSpeed"],
"Download": testdata["DownloadSpeed"],
"Ping": testdata["Ping"]
}
}
]
#"time": "2009-11-10T23:00:00Z",
client = InfluxDBClient(host, port, database = serverdatabase) # Init connection to Influx Server
client.write_points(json_body) # Write Speedtest results
uploadInfluxdata()