Table of contents
- Introduction
- Download
- Code explanation for Ping_script.py
- Ping Configuration file
- Ping Analysis
Introduction
In certain projects, it is essential to ensure the connectivity of the 4G/LTE cellular network at specific locations. To do this, I plan to regularly ping a chosen hostname (such as 8.8.8.8) using a mobile network, with a frequency of once per second. The ping results will be saved to an SD card for analysis purposes, covering a span of 60 days. To carry out the test, I will use a Raspberry Pi 3B running Raspbian GNU/Linux 11 (bullseye), along with a 4G/LTE router. I have developed a Python program called ping_script.py to handle the ping operation efficiently.
Download
To access the full code, please go to this link:
https://github.com/FelixT123/CS-Student/tree/main/Raspberry_Pi_Ping_Testing
Code explanation for Ping_script.py
Following functions are included in the ping_script.py.
The get_ip_address()
function, presented below, retrieves the local IP address of the device through a series of steps. Firstly, it creates a UDP socket using the IPv4 address family (AF_INET) and the UDP protocol (SOCK_DGRAM). Following that, it attempts to establish a connection with a remote host by invoking sock.connect(("8.8.8.8", 80))
. The IP address 8.8.8.8 corresponds to Google’s DNS server, and port 80 is employed as a placeholder since UDP sockets do not necessitate a specific port. In the event of a successful connection, it retrieves the local IP address from the connected socket using sock.getsockname()[0]
. In cases where a socket.error
exception is encountered, it handles the exception by assigning the value of None
to the ip_address
variable. Ultimately, the socket is closed by executing sock.close()
to release any associated resources.
def get_ip_address():
# Create a socket object
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# Connect to a remote host (doesn't matter which one)
sock.connect(("8.8.8.8", 80))
# Get the local IP address from the connected socket
ip_address = sock.getsockname()[0]
except socket.error:
# If an error occurs, return None or handle the exception
ip_address = None
finally:
# Close the socket
sock.close()
return ip_address
The function clear_screen()
, displayed below, is responsible for clearing the terminal screen. The variable os.name
is a string that holds the name of the operating system. If the os.name
is ‘posix’, indicating a Linux or OS X system, the function utilizes the os.system()
function to execute the ‘clear’ command. Conversely, if the os.name
is not ‘posix’, it implies that the system is running Windows. In such cases, the function employs the os.system()
function to execute the ‘cls’ command. The underscore (_) is frequently used as a convention to denote a variable that will not be used or referenced later in the code.
def clear_screen():
if os.name == 'posix': # for Linux/OS X
_ = os.system('clear')
else: # for Windows
_ = os.system('cls')
The function, read_configuration(filename), as shown below, reads a configuration file and returns a dictionary containing the configuration settings. It takes the filename parameter as input and opens the file. It then iterates over each line, assuming each line contains a key-value pair separated by an equal sign (=). It strips any leading or trailing whitespace from the key and value, and stores them in the config dictionary. Finally, it returns the populated dictionary.
def read_configuration(filename):
config = {}
with open(filename, "r") as file:
for line in file:
key, value = line.strip().split("=")
config[key.strip()] = value.strip()
return config
The function, ping_ip_address(ip_address, packet_size), as shown below, pings the specified ip_address using the ping command and returns the ping result as a string. It takes the ip_address and packet_size parameters as input. It uses the subprocess module to run the ping command and captures the output. The output is then parsed to extract the relevant information. If the ping is successful, it returns a formatted string containing the extracted information. If the ping fails, it returns a string indicating the failure.
def ping_ip_address(ip_address, packet_size):
if packet_size > 68:
packet_size = 68
try:
ping_output = subprocess.check_output(["ping", "-c", "1", "-s", str(packet_size), "-W", "1", ip_address], universal_newlines=True)
# ping one packet, timout 1 sec, packet size 68 bytes (max allowed is 68)
# Parse the output to extract the relevant information
lines = ping_output.split('\n')
for line in lines:
if 'bytes from' in line:
filtered_line = line.replace(': icmp_seq=1', '').replace('ttl', 'TTL')
return filtered_line
except subprocess.CalledProcessError as e:
filtered_line = f"Ping {ip_address} failed"
return filtered_line
The function, save_result_to_file(result, filename), presented below, saves the result
string to a file specified by the filename
. It appends the result
to the existing file by opening it in append mode and writing the result
string to it.
def save_result_to_file(result, filename):
with open(filename, "a") as file:
file.write(result)
The get_current_date() function presented below returns the current date as a string in the format “YYYY-MM-DD”. It uses the datetime
module to get the current date and time and formats it accordingly.
def get_current_date():
return datetime.datetime.now().strftime("%Y-%m-%d")
The get_current_time() function displayed below returns the current time as a string in the format “HH:MM:SS”. It utilizes the datetime
module to obtain the current date and time and formats it accordingly.
def get_current_time():
return datetime.datetime.now().strftime("%H:%M:%S")
The provided code showcases a main()
function that repeatedly executes a series of actions within an infinite loop. It starts by reading the configuration from a file called “ping.conf” using the read_configuration
function. From the configuration, it retrieves the ip_address
and packet_size
. Additionally, it initializes an empty list named lines
to store relevant information.
Inside an infinite loop, the code repeatedly constructs a result string by invoking get_current_date()
, get_current_time()
, and ping_ip_address()
with the ip_address
and packet_size
as arguments. It then generates a filename by concatenating “ping_results_” with the current date if this file doesn’t already exist, and saves the result to this file.
After saving the data, the code proceeds to construct the message displayed on the console. Firstly, it splits the result
string into two parts using the colon as a delimiter and assigns them to the timestamp
and ttl_and_time
variables, respectively. The code also removes any newline characters from ttl_and_time
. Secondly, it appends the timestamp
to the lines
list and remove the oldest element using pop(0)
if the number of elements in the lines
list exceeds 14. Thirdly, it appends the ttl_and_time
to the lines
list. Again, if the number of elements in the lines
list exceeds 14, it removes the oldest element.
Following these actions, the code clears the terminal screen using the clear_screen()
function and prints the IP address on the console by employing the get_ip_address()
function. Finally, it displays the last 14 lines from the lines
list on the terminal.
Between each iteration, the code pauses for 1 second using the time.sleep()
function.
def main():
config = read_configuration('ping.conf')
ip_address = config.get('ip_address')
packet_size = int(config.get('packet_size')) # packet size in bytes and 68 is the max value allowed. Value larger than 68 will cause ping loss
# Create a list to store the lines
lines = []
while True:
result = f"{get_current_date()} {get_current_time()}: {ping_ip_address(ip_address,packet_size)}\n"
filename = "ping_results_" + get_current_date() + ".txt"
save_result_to_file(result, filename)
split_line = result.rsplit(":", 1)
if len(split_line) == 2:
timestamp = split_line[0]
ttl_and_time = split_line[1]
# Remove the newline character
ttl_and_time = ttl_and_time.replace("\n", "")
lines.append(timestamp)
if len(lines) > 14:
lines.pop(0)
lines.append(ttl_and_time)
if len(lines) > 14:
lines.pop(0)
else:
lines.append(result)
# If the number of lines exceeds 14, remove the oldest line
if len(lines) > 14:
lines.pop(0)
# Clear the terminal
clear_screen()# Clear the terminal
# Call the function to get the IP address
ip = get_ip_address()
print("My IP address:", ip)
# Display the last 14lines
for line in lines[-14:]:
print(line)
time.sleep(1)
The (if __name__ == "__main__":
) block at the end of the program ensures that the main()
function is executed only if the script is run directly, rather than imported as a module.
if __name__ == "__main__":
main()
Ping Configuration file
Ping.conf contains the following configuration information for ping_script.py.
ip_address=8.8.8.8
packet_size=68
The ping results for each day are stored in individual files. For example, the results from February 2nd are saved in a text file named “ping_results_2024-02-02.txt” and have the following content. The size of a text file containing a complete day’s ping records is approximately 5.3MB.
Ping Analysis
Furthermore, I have developed a Python program called “ping_analysis.py” that analyzes all the ping result files and consolidates the summary into a single file named “ping_stat.txt”. To execute this program, open a terminal and enter the following command:
python3 ping_analysis.py
The following is an example of “ping_stat.txt”
date,ping_count,success_count,failure_count
2024-02-02,4689,4689,0
2024-02-03,14,14,0
I will provide an explanation of the code in ping_analysis.py when I have the available time.