Raspberry Pi-Based Ping Testing for 60 Days

Posted by:

|

On:

|

Table of contents

  • Introduction
  • Download
  • Code explanation for Ping_script.py
  • Ping Configuration file
  • Ping Analysis

Introduction

In certain projects, it is essential to ensure the connectivity of the 4G/LTE cellular network at specific locations. To do this, I plan to regularly ping a chosen hostname (such as 8.8.8.8) using a mobile network, with a frequency of once per second. The ping results will be saved to an SD card for analysis purposes, covering a span of 60 days. To carry out the test, I will use a Raspberry Pi 3B running Raspbian GNU/Linux 11 (bullseye), along with a 4G/LTE router. I have developed a Python program called ping_script.py to handle the ping operation efficiently.

Download

To access the full code, please go to this link:

https://github.com/FelixT123/CS-Student/tree/main/Raspberry_Pi_Ping_Testing

Code explanation for Ping_script.py

Following functions are included in the ping_script.py.

The get_ip_address() function, presented below, retrieves the local IP address of the device through a series of steps. Firstly, it creates a UDP socket using the IPv4 address family (AF_INET) and the UDP protocol (SOCK_DGRAM). Following that, it attempts to establish a connection with a remote host by invoking sock.connect(("8.8.8.8", 80)). The IP address 8.8.8.8 corresponds to Google’s DNS server, and port 80 is employed as a placeholder since UDP sockets do not necessitate a specific port. In the event of a successful connection, it retrieves the local IP address from the connected socket using sock.getsockname()[0]. In cases where a socket.error exception is encountered, it handles the exception by assigning the value of None to the ip_address variable. Ultimately, the socket is closed by executing sock.close() to release any associated resources.

def get_ip_address():
    # Create a socket object
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    try:
        # Connect to a remote host (doesn't matter which one)
        sock.connect(("8.8.8.8", 80))
        
        # Get the local IP address from the connected socket
        ip_address = sock.getsockname()[0]
    except socket.error:
        # If an error occurs, return None or handle the exception
        ip_address = None
    finally:
        # Close the socket
        sock.close()
    return ip_address

The function clear_screen(), displayed below, is responsible for clearing the terminal screen. The variable os.name is a string that holds the name of the operating system. If the os.name is ‘posix’, indicating a Linux or OS X system, the function utilizes the os.system() function to execute the ‘clear’ command. Conversely, if the os.name is not ‘posix’, it implies that the system is running Windows. In such cases, the function employs the os.system() function to execute the ‘cls’ command. The underscore (_) is frequently used as a convention to denote a variable that will not be used or referenced later in the code.

def clear_screen():
    if os.name == 'posix':  # for Linux/OS X
        _ = os.system('clear')
    else:  # for Windows
        _ = os.system('cls')

The function, read_configuration(filename), as shown below, reads a configuration file and returns a dictionary containing the configuration settings. It takes the filename parameter as input and opens the file. It then iterates over each line, assuming each line contains a key-value pair separated by an equal sign (=). It strips any leading or trailing whitespace from the key and value, and stores them in the config dictionary. Finally, it returns the populated dictionary.

def read_configuration(filename):
    config = {}
    with open(filename, "r") as file:
        for line in file:
            key, value = line.strip().split("=")
            config[key.strip()] = value.strip()
    return config

The function, ping_ip_address(ip_address, packet_size), as shown below, pings the specified ip_address using the ping command and returns the ping result as a string. It takes  the ip_address and packet_size parameters as input. It uses the subprocess module to run the ping command and captures the output. The output is then parsed to extract the relevant information. If the ping is successful, it returns a formatted string containing the extracted information. If the ping fails, it returns a string indicating the failure.

def ping_ip_address(ip_address, packet_size):
    if packet_size > 68:
        packet_size = 68
    try:
        ping_output = subprocess.check_output(["ping", "-c", "1", "-s", str(packet_size), "-W", "1", ip_address], universal_newlines=True)
        # ping one packet, timout 1 sec, packet size 68 bytes (max allowed is 68)
        # Parse the output to extract the relevant information
        lines = ping_output.split('\n')
        for line in lines:
            if 'bytes from' in line:
                filtered_line = line.replace(': icmp_seq=1', '').replace('ttl', 'TTL')
                return filtered_line
    except subprocess.CalledProcessError as e:
        filtered_line = f"Ping {ip_address} failed"
        return filtered_line

The function, save_result_to_file(result, filename), presented below, saves the result string to a file specified by the filename. It appends the result to the existing file by opening it in append mode and writing the result string to it.

def save_result_to_file(result, filename):
    with open(filename, "a") as file:
        file.write(result)

The get_current_date() function presented below returns the current date as a string in the format “YYYY-MM-DD”. It uses the datetime module to get the current date and time and formats it accordingly.

def get_current_date():
    return datetime.datetime.now().strftime("%Y-%m-%d")

The get_current_time() function displayed below returns the current time as a string in the format “HH:MM:SS”. It utilizes the datetime module to obtain the current date and time and formats it accordingly.

def get_current_time():
    return datetime.datetime.now().strftime("%H:%M:%S")

The provided code showcases a main() function that repeatedly executes a series of actions within an infinite loop. It starts by reading the configuration from a file called “ping.conf” using the read_configuration function. From the configuration, it retrieves the ip_address and packet_size. Additionally, it initializes an empty list named lines to store relevant information.

Inside an infinite loop, the code repeatedly constructs a result string by invoking get_current_date()get_current_time(), and ping_ip_address() with the ip_address and packet_size as arguments. It then generates a filename by concatenating “ping_results_” with the current date if this file doesn’t already exist, and saves the result to this file.

After saving the data, the code proceeds to construct the message displayed on the console. Firstly, it splits the result string into two parts using the colon as a delimiter and assigns them to the timestamp and ttl_and_time variables, respectively. The code also removes any newline characters from ttl_and_time. Secondly, it appends the timestamp to the lines list and remove the oldest element using pop(0) if the number of elements in the lines list exceeds 14. Thirdly, it appends the ttl_and_time to the lines list. Again, if the number of elements in the lines list exceeds 14, it removes the oldest element.

Following these actions, the code clears the terminal screen using the clear_screen() function and prints the IP address on the console by employing the get_ip_address() function. Finally, it displays the last 14 lines from the lines list on the terminal.

Between each iteration, the code pauses for 1 second using the time.sleep() function.

def main():
    config = read_configuration('ping.conf')
    ip_address = config.get('ip_address') 
    packet_size = int(config.get('packet_size'))  # packet size in bytes and 68 is the max value allowed. Value larger than 68 will cause ping loss

    # Create a list to store the lines
    lines = []

    while True:
        result = f"{get_current_date()} {get_current_time()}: {ping_ip_address(ip_address,packet_size)}\n"
        filename = "ping_results_" + get_current_date() + ".txt"
        save_result_to_file(result, filename)
        
        split_line = result.rsplit(":", 1)
        if len(split_line) == 2:
            timestamp = split_line[0]
            ttl_and_time = split_line[1]
            # Remove the newline character
            ttl_and_time = ttl_and_time.replace("\n", "")
            lines.append(timestamp)
            if len(lines) > 14:
                lines.pop(0)
            lines.append(ttl_and_time)
            if len(lines) > 14:
                lines.pop(0)

        else:
            lines.append(result)
            # If the number of lines exceeds 14, remove the oldest line
            if len(lines) > 14:
                lines.pop(0)

        # Clear the terminal
        clear_screen()# Clear the terminal

        # Call the function to get the IP address
        ip = get_ip_address()
        print("My IP address:", ip)

        # Display the last 14lines
        for line in lines[-14:]:
            print(line)

        time.sleep(1)

The (if __name__ == "__main__":) block at the end of the program ensures that the main() function is executed only if the script is run directly, rather than imported as a module.

if __name__ == "__main__":
    main()

Ping Configuration file

Ping.conf contains the following configuration information for ping_script.py.

ip_address=8.8.8.8
packet_size=68

The ping results for each day are stored in individual files. For example, the results from February 2nd are saved in a text file named “ping_results_2024-02-02.txt” and have the following content. The size of a text file containing a complete day’s ping records is approximately 5.3MB.

Ping Analysis

Furthermore, I have developed a Python program called “ping_analysis.py” that analyzes all the ping result files and consolidates the summary into a single file named “ping_stat.txt”. To execute this program, open a terminal and enter the following command:

python3 ping_analysis.py

The following is an example of “ping_stat.txt”

date,ping_count,success_count,failure_count
2024-02-02,4689,4689,0
2024-02-03,14,14,0

I will provide an explanation of the code in ping_analysis.py when I have the available time.

Posted by

in