This project involves activating a remote device using three buttons located in the office, leveraging the existing LAN infrastructure. I have developed a simple circuit to test the functionality of my code.
Three buttons, designated as Button 1, Button 2, and Button 3, are connected to the GPIO pins of a Raspberry Pi 4B. This Raspberry Pi is linked to another Raspberry Pi 4B via a LAN cable. The second Raspberry Pi is connected to three LEDs, labeled LED1, LED2, and LED3, through its GPIO pins. When any of the buttons are pressed, the corresponding LED will illuminate for one second. Communication between the two Raspberry Pis is facilitated through TCP/IP sockets.
My supervisor has outlined several key requirements for this project:
- The program must automatically start when the Raspberry Pi is powered on.
- The debounce effect of the buttons must be eliminated.
- The system should ensure that only a single press is registered, even if the button is held down.
- The system must exhibit robustness, ensuring that the communication between the two Raspberry Pis is automatically re-established following any network disconnection or reboot of either device.
- Implement a logging system to facilitate troubleshooting and debugging.
The following blog is structure as follows:
- (A) Hardware Setup
- (B) Network Configuration
- (C) Code Implementation
- (D) Test the System
Let’s start now !
(A) Hardware Setup
Button GPIO Pins (Button Pi)
- Button 1 → GPIO 17 (Pin 11 on the GPIO header)
- Button 2 → GPIO 27 (Pin 13 on the GPIO header)
- Button 3 → GPIO 22 (Pin 15 on the GPIO header)
Use pull-down resistors (10kΩ) to connect the buttons to ground and ensure stable signals.
LED GPIO Pins (LED Pi)
- LED 1 → GPIO 5 (Pin 29 on the GPIO header)
- LED 2 → GPIO 6 (Pin 31 on the GPIO header)
- LED 3 → GPIO 13 (Pin 33 on the GPIO header)
Use 330Ω resistors to connect the LEDs to ground.
GPIO Pinout Reference: Ensure the Raspberry Pi GPIO numbering is set to BCM
mode in the code.
(B) Network Configuration
- Connect the two Raspberry Pi devices using a LAN cable.
- Assign static IPs to both devices or ensure they are on the same subnet.
(C) Code Implementation
The procedure to install Operation System at Raspberry Pi and install packages are not described here.
The solution has three components:
(1) First python file :
A custom logger using the getLogger function that can be configured to write log messages to a file and also display them on the console (terminal). It is simple and easy to use. Just import logger in your own python file and add the code log.info(“message”) to write the message to the log file and console at the same time. This logging python file are used in my other projects as well.
import logging
import os
from logging.handlers import TimedRotatingFileHandler
def getLogger(name):
# Create logs directory if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# specifying the log file's name as "logs/logfile" and rotating the log file at midnight each day.
logHandler = TimedRotatingFileHandler(filename="logs/logfile", when="midnight")
logFormatter = logging.Formatter('[%(asctime)s][%(name)-14s][%(levelname)-8s] %(message)s')
logHandler.setFormatter(logFormatter)
# display the log messages on the console.
consoleHandler = logging.StreamHandler()
consoleHandler.setFormatter(logFormatter)
logger = logging.getLogger(name)
# there are two handlers: logHandler and consoleHandler
logger.addHandler(logHandler)
logger.addHandler(consoleHandler)
logger.setLevel(logging.INFO)
return logger
The log messages use a specific format:
%(asctime)s: This placeholder is used to insert the timestamp of the log record in a human-readable format.
The asctime attribute is replaced with the actual timestamp when the log message is formatted.
%(name)-14s: This placeholder is used to insert the name of the logger. It is left-aligned with a width of 14 characters.
If the logger name is shorter than 14 characters, it will be padded with spaces on the right.
%(levelname)-8s: This placeholder is used to insert the log level name. It is left-aligned with a width of 8 characters.
If the log level name is shorter than 8 characters, it will be padded with spaces on the right.
%(message)s: This placeholder is used to insert the log message itself.
(2) Client Python Code (Button Pi): Detects button presses, debounces them, and sends the signal to the server.
import socket
import RPi.GPIO as GPIO
from time import sleep
import threading
import subprocess
import logger
# Create a logger instance
log = logger.getLogger('client')
# GPIO Pin Configuration
BUTTON1 = 17 # GPIO Pin for Button 1
BUTTON2 = 27 # GPIO Pin for Button 2
BUTTON3 = 22 # GPIO Pin for Button 3
# GPIO Setup
GPIO.setmode(GPIO.BCM)
GPIO.setup(BUTTON1, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(BUTTON2, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(BUTTON3, GPIO.IN, pull_up_down=GPIO.PUD_UP)
# TCP/IP Client Configuration
SERVER_IP = '192.168.21.238' # Replace with the IP address of the LED Raspberry Pi
PORT = 12345
# Global client socket
client_socket = None
# Function to connect to the server
def connect_to_server():
while True:
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((SERVER_IP, PORT))
log.info("Connected to the server.")
return client_socket
except Exception as e:
log.info("Failed to connect to the server. Retrying in 5 seconds...")
sleep(5)
client_socket = connect_to_server()
# Mapping of button GPIO pins to button labels
button_labels = {
BUTTON1: "Button 1",
BUTTON2: "Button 2",
BUTTON3: "Button 3"
}
# Button state tracking to avoid multiple presses when held
button_states = {BUTTON1: False, BUTTON2: False, BUTTON3: False}
# Function to send button signals to the server
def send_button_signal(button_number):
try:
client_socket.send(str(button_number).encode('utf-8'))
log.info(f"Sent button {button_number} signal to the server.")
except Exception as e:
log.info(f"Error sending signal: {e}")
# Function to ping the server. Call function reconnect_to_server() for 4 consecutive ping loss.
def ping_server():
global client_socket
ping_fail_count = 0
max_failures = 4
while True:
try:
response = subprocess.run(
["ping", "-c", "1", SERVER_IP],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if response.returncode == 0:
print("Ping successful.")
ping_fail_count = 0
else:
print("Ping failed.")
ping_fail_count += 1
if ping_fail_count >= max_failures:
print("Cannot find the server.")
reconnect_to_server()
ping_fail_count = 0
except Exception as e:
print(f"Error during ping: {e}")
ping_fail_count += 1
sleep(2)
# Function to send periodic heartbeats to the server
def send_heartbeats():
global client_socket
while True:
try:
client_socket.sendall("HEARTBEAT".encode('utf-8'))
sleep(2) # Send a heartbeat every 2 seconds
except BrokenPipeError:
print("BrokenPipeError: The connection to the server is broken.")
reconnect_to_server()
except OSError as e:
print(f"OSError: Connection issue detected: {e}")
reconnect_to_server()
# Function to reconnect to server
def reconnect_to_server():
global client_socket
try:
client_socket.close()
except OSError:
pass # Ignore errors during close
log.info("Attempting to reconnect...")
client_socket = connect_to_server()
# Start a separate thread for sending heartbeats
heartbeat_thread = threading.Thread(target=send_heartbeats, daemon=True)
heartbeat_thread.start()
# Start the ping thread
ping_thread = threading.Thread(target=ping_server, daemon=True)
ping_thread.start()
try:
while True:
# Check the state of each button
for button, state in button_states.items():
if GPIO.input(button) == GPIO.LOW: # Button is pressed
if not state: # If it wasn't already pressed
button_states[button] = True # Mark as pressed
log.info(f"{button_labels[button]} pressed.")
if button == BUTTON1:
send_button_signal(1)
elif button == BUTTON2:
send_button_signal(2)
elif button == BUTTON3:
send_button_signal(3)
else: # Button is released
if button_states[button]: # Only print when it changes from pressed to released
button_states[button] = False # Reset the state
log.info(f"{button_labels[button]} released.")
sleep(0.05)
finally:
client_socket.close()
GPIO.cleanup()
log.info("Socket closed and GPIO cleaned up.")
Code Explanation
GPIO Button Configuration:
BUTTON1 = 17 # GPIO Pin for Button 1
BUTTON2 = 27 # GPIO Pin for Button 2
BUTTON3 = 22 # GPIO Pin for Button 3
GPIO.setmode(GPIO.BCM)
GPIO.setup(BUTTON1, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(BUTTON2, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(BUTTON3, GPIO.IN, pull_up_down=GPIO.PUD_UP)
Three buttons are connected to GPIO pins 17, 27, and 22. Each button is configured as an input pin with a pull-up resistor (GPIO.PUD_UP
), ensuring the default state is HIGH when the button is not pressed.
TCP Client Configuration:
def connect_to_server():
while True:
try:
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client_socket.connect((SERVER_IP, PORT))
log.info("Connected to the server.")
return client_socket
except Exception as e:
log.info("Failed to connect to the server. Retrying in 5 seconds...")
sleep(5)
A persistent socket connection using static IP and port is maintained to send button press signals and heartbeats. If the connection to the server fails, the client retries every 5 seconds until it successfully connects.
Button Monitoring:
button_states = {BUTTON1: False, BUTTON2: False, BUTTON3: False}
The script keeps track of each button’s state (True
for pressed, False
for released) to prevent multiple signals being sent when a button is held down.
while True:
for button, state in button_states.items():
if GPIO.input(button) == GPIO.LOW: # Button is pressed
if not state: # If it wasn't already pressed
button_states[button] = True # Mark as pressed
log.info(f"{button_labels[button]} pressed.")
if button == BUTTON1:
send_button_signal(1)
elif button == BUTTON2:
send_button_signal(2)
elif button == BUTTON3:
send_button_signal(3)
else: # Button is released
if button_states[button]: # Only print when it changes from pressed to released
button_states[button] = False # Reset the state
log.info(f"{button_labels[button]} released.")
sleep(0.05)
The script continuously checks the state of each button by reading the GPIO pins (GPIO.input(button)
). If a button is pressed, it sends a corresponding signal (1, 2, or 3) to the server using send_button_signal()
. A small delay (sleep(0.05)
) in the client code reduces the effects of button bounce. The button_states
dictionary ensures that holding down a button only triggers one signal.
Sending Button Signals:
def send_button_signal(button_number):
try:
client_socket.send(str(button_number).encode('utf-8'))
log.info(f"Sent button {button_number} signal to the server.")
except Exception as e:
log.info(f"Error sending signal: {e}")
When a button is pressed, the client sends the corresponding number (1, 2, or 3) as a UTF-8 encoded message.
Heartbeat and Ping Mechanism:
def send_heartbeats():
while True:
try:
client_socket.sendall("HEARTBEAT".encode('utf-8'))
sleep(2) # Send a heartbeat every 2 seconds
except BrokenPipeError:
reconnect_to_server()
except OSError as e:
reconnect_to_server()
The client periodically sends a “HEARTBEAT” message every 2 seconds to ensure the server knows the connection is still active. If the connection is broken, the client attempts to reconnect. I find it takes a long time for the system to detect the connection error and perform reconnection action. So I add an extra ping function to ping the server every 2 seconds. If there is 4 consecutive ping fails, a reconnection will be applied using reconnect_to_server() function.
def ping_server():
global client_socket
ping_fail_count = 0
max_failures = 4
while True:
try:
response = subprocess.run(
["ping", "-c", "1", SERVER_IP],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if response.returncode == 0:
print("Ping successful.")
ping_fail_count = 0
else:
print("Ping failed.")
ping_fail_count += 1
if ping_fail_count >= max_failures:
print("Cannot find the server.")
reconnect_to_server()
ping_fail_count = 0
except Exception as e:
print(f"Error during ping: {e}")
ping_fail_count += 1
sleep(2)
Both heartbeat and Ping mechanism are run under two different threads
# Start a separate thread for sending heartbeats
heartbeat_thread = threading.Thread(target=send_heartbeats, daemon=True)
heartbeat_thread.start()
# Start the ping thread
ping_thread = threading.Thread(target=ping_server, daemon=True)
ping_thread.start()
(3) Server Python Code (LED Pi): Listens for button press signals and lights up the corresponding LED for 1 second.
import socket
import RPi.GPIO as GPIO
from time import sleep
import threading
import logger
# Create a logger instance
log = logger.getLogger('server')
# GPIO Pin Configuration
LED1 = 5 # GPIO Pin for LED 1
LED2 = 6 # GPIO Pin for LED 2
LED3 = 13 # GPIO Pin for LED 3
# GPIO Setup
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED1, GPIO.OUT)
GPIO.setup(LED2, GPIO.OUT)
GPIO.setup(LED3, GPIO.OUT)
# TCP/IP Server Configuration
HOST = '0.0.0.0' # Listen on all interfaces
PORT = 12345
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Enable TCP Keep-Alive
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10) # Idle time before keep-alive probes (in seconds)
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 3) # Interval between keep-alive probes (in seconds)
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5) # Number of keep-alive probes before failure
server_socket.bind((HOST, PORT))
server_socket.listen(1)
log.info("Server is listening for connections...")
# Function to control the LED
def control_led(led_pin):
GPIO.output(led_pin, GPIO.HIGH)
log.info(f"Turning LED at GPIO {led_pin} ON.")
sleep(1)
GPIO.output(led_pin, GPIO.LOW)
log.info(f"Turning LED at GPIO {led_pin} OFF.")
# Function to handle a single client connection
def handle_client(conn, addr):
log.info(f"Connection established with {addr}")
conn.settimeout(5) # Set a timeout for receiving data (5 seconds)
try:
while True:
try:
data = conn.recv(1024).decode('utf-8').strip()
if not data: # Client disconnected
log.info(f"Connection lost with {addr}.")
break
if data == "HEARTBEAT":
print(f"Heartbeat received from {addr}.")
elif data == '1':
control_led(LED1)
elif data == '2':
control_led(LED2)
elif data == '3':
control_led(LED3)
else:
log.info(f"Unknown command received: {data}")
except socket.timeout:
log.info(f"No heartbeat from {addr}. Connection timed out.")
break
except (ConnectionResetError, BrokenPipeError):
log.info(f"Connection with {addr} was forcibly closed.")
break
except Exception as e:
log.info(f"Error with client {addr}: {e}")
finally:
conn.close()
log.info(f"Connection with {addr} closed.")
try:
while True:
log.info("Waiting for a connection...")
conn, addr = server_socket.accept()
client_thread = threading.Thread(target=handle_client, args=(conn, addr), daemon=True)
client_thread.start()
finally:
server_socket.close()
GPIO.cleanup()
log.info("Server socket closed and GPIO cleaned up.")
Code Explanation
GPIO LED Configuration
LED1 = 5 # GPIO Pin for LED 1
LED2 = 6 # GPIO Pin for LED 2
LED3 = 13 # GPIO Pin for LED 3
GPIO.setmode(GPIO.BCM)
GPIO.setup(LED1, GPIO.OUT)
GPIO.setup(LED2, GPIO.OUT)
GPIO.setup(LED3, GPIO.OUT)
Three LEDs are connected to GPIO pins 5, 6, and 13. Each LED is configured as an output pin.
TCP Server Configuration
HOST = '0.0.0.0' # Listen on all interfaces
PORT = 12345
The server listens on all interfaces (0.0.0.0
) at port 12345.
Keep-Alive Mechanism:
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 10)
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 3)
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
The server enables TCP Keep-Alive to detect dead connections. If no data is received for a certain amount of time, the connection is closed. However, I find Keep-Alive mechanism does not work well. So I add the codes and let client side sends heartbeats every two seconds. You can find in the “Handling Client Connections” explanation below that if no data (heartbeat) is received for five seconds, the server will close the connection with that client. Try deleting the Keep-Alive code if you like. I think the remote control system will work fine.
Controlling LEDs:
def control_led(led_pin):
GPIO.output(led_pin, GPIO.HIGH)
log.info(f"Turning LED at GPIO {led_pin} ON.")
sleep(1)
GPIO.output(led_pin, GPIO.LOW)
log.info(f"Turning LED at GPIO {led_pin} OFF.")
When a signal is received, the server lights up the corresponding LED for 1 second.
Handling Client Connections:
def handle_client(conn, addr):
conn.settimeout(5) # Set a timeout for receiving data (5 seconds)
try:
while True:
data = conn.recv(1024).decode('utf-8').strip()
if not data: # Client disconnected
break
if data == "HEARTBEAT":
print(f"Heartbeat received from {addr}.")
elif data == '1':
control_led(LED1)
elif data == '2':
control_led(LED2)
elif data == '3':
control_led(LED3)
else:
log.info(f"Unknown command received: {data}")
finally:
conn.close()
log.info(f"Connection with {addr} closed.")
The server listens for data from the client. If a button signal (1
, 2
, or 3
) is received, the corresponding LED is activated.
Accepting Client Connections:
while True:
conn, addr = server_socket.accept()
client_thread = threading.Thread(target=handle_client, args=(conn, addr), daemon=True)
client_thread.start()
The server continuously listens for new client connections. Each connection is handled in a separate thread.
(D) Test the System
Run the Server Code (LED Pi) by executing the server script:
python3 server.py
Run the Client Code (Button Pi) by executing the server script:
python3 client.py
Pressing a button will send only one signal, even if the button is held down.
Press Button 1, 2, or 3 on the Button Pi. The corresponding LED (LED1, LED2, LED3) on the LED Pi should light up for 1 second. Temporary disconnect the connections between both Raspberry Pi and make sure they can reconnect again. Reboot client Pi and server Pi one by one and make sure they can reconnect. Finally create a service for both Pi so that the remote control system can automatically start when the Raspberry Pi are powered on. Please follow the instructions in the blog “Auto startup on Raspberry Pi” to create the auto start service.