How to Integrate alwaysAI with External Applications Using TCP Sockets

Sockets are endpoints for inter-process communication over the network, which is supported by most platforms. Using sockets with the alwaysAI platform allows an application to communicate with external applications running locally or externally, as well as with applications written in different programming languages. There are many methods for inter-process communication, but cross-platform communication is handled best by sockets.

(Source code for this tutorial can be found in the alwaysAI Github repository here.)

Sockets

Sockets are defined by two parameters: address family and socket type. The former defines the network layer such as IPv4, IPv6, etc., and the latter defines the transport layer such as TCP, UDP, etc. The network layer handles the routing of packets from one network address to another. The most commonly used network protocol, IPv4, doesn't guarantee delivery of packets, and it also doesn’t check for errors. To guarantee the delivery of packets, the Transmission Control Protocol (TCP) is often paired with IPv4. TCP handles the delivery of packets, as well as the reassembly of packets at their destination. TCP also defines a three-way handshake that has to be made between a server and client before a connection is established, which is good for reliability, but can also add unwanted latency in your application. Once a connection is made, data can be retrieved from sockets and sent to the destination address via the socket. An application that does not require reliable data streams should use the User Datagram Protocol (UDP), which is a connectionless protocol that focuses on speed rather than reliability.

In this tutorial, we’ll build an object detection server which receives images and returns the marked-up image. We’ll start by building a simple echo server/client in order to showcase sockets in action, using TCP sockets on top of IPv4. 

Echo Server

The server is responsible for authenticating the connection, so it first has to bind to an address and listen for incoming connections.

1 import socket
2
3 def main():
4 # A empty host string will signal the app to
5 # listen on all network interfaces
6 HOST = ‘’
7 # Low number ports are reserved for popular use
8 # High number ports are suggested for development
9 PORT = 6000
10
11 # socket.AF_INET specifies that we will be in the IPv4 domain
12 # socket.SOCK_STREAM specifies this as a TCP socket
13 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
14 # Bind to the address to start listening for connections
15 sock.bind((HOST, PORT))
16
17 # Allow up to 5 unaccepted connections
18 sock.listen(5)
19
20 # Enter main loop
21 while True:
22 # Accept incoming connections and assign
23 # the new socket to clientsock
24 clientsock, address = sock.accept()
25 with clientsock:
26 # Read 1024 bytes from the socket
27 data = clientsock.recv(1024)
28 print(data)
29 # Echo data back to client
30 clientsock.sendall(data)
31 # Close the client socket
32 clientsock.close()
33
34 if __name__ == ‘__main__’:
35 main()
 

When a new connection is accepted, a new socket is generated to handle communication with the client. On line 27, we are reading a fixed length of 1024 bytes. This is very inefficient (especially when reading large byte streams) and will need to be optimized. Depending on the network load and other variables, not all packets will be transmitted instantly. Lastly, it is important to close the socket when communication is over to signal the closing of the socket on the server-side.

Echo Client

Client code is generally simpler than server code, because it merely needs to send a request for a connection to the server. It is usually good practice to keep the socket open only for the duration of one data exchange.

1 import socket
2
3 def main():
4 # localhost is fine if you're running both the server and client
5 # on the same machine. Else provide the IP address of the server
6 HOST = ‘localhost’
7
8 # This port should be the same as the port the server
9 # is listening for new connections
10 PORT = 6000
11
12 # Here we are using the same protocols as the server
13 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
14 # Send a request for connection at the server's address
15 sock.connect((HOST, PORT))
16
17 # Send 'Hello World' to server
18 sock.sendall(b’hello world’)
19
20 # Receive the echoed data from the server
21 data = sock.recv(1024)
22 print(data)
23
24 # Close the socket
25 sock.close()
26
27 if __name__ == ‘__main__’:
28 main()
 

Run the server code first, and then run the client code from another terminal. You should see that “Hello World” is sent to the server and then echoed back to the client.

Screen Shot 2020-02-24 at 4.01.15 PM

Now that we have gone over the hello world of network programming, let's build an alwayAI application using sockets. One limitation of the alwaysAI platform is that we need to copy all of the data needed for the application onto the device before running the application. If you want to run inferencing on a lot of images, you would need to copy all of the images to the device, which can quickly become expensive in terms of time and memory. One way to get around this limitation is to create an inferencing server on the device that accepts images over a TCP connection.

Get started now. Create your account now for free.

Inferencing Server

We could build the server using plain TCP sockets like in the example above, but Python provides a useful module for building servers called socketserver, which abstracts away the low-level concepts of sockets so that the user can instead focus on the way the data is handled.

1 import edgeiq
2 import socketserver
3
4 class InferenceServer(socketserver.TCPServer):
5  def __init__(self, host, port, handler):
6  
7    print(f'Serving on {host}:{port}')
8    # The constructor for socketserver.TCPServer requires the
9    # server address as well as a RequestHandler object 
10    super().__init__((host, port), handler)

Socketserver has a pre-made class for building TCP servers so we will be subclassing that. The constructor expects two parameters which are the server_address and the RequestHandlerClass. The server_address specifies the address to listen for connections. The RequestHandlerClass is the object that holds the handle method that is called whenever a new connection is established.

1 class InferenceServer(socketserver.TCPServer):
2  def __init__(self, host, port, handler):
3    self.object_dect = edgeiq.ObjectDetection('alwaysai/ssd_mobilenet_v1_coco_2018_01_28')
4    self.object_dect.load(edgeiq.Engine.DNN)
5    print('Model Loaded')
6    
7    ...

We can initialize the model in the constructor of our custom class. The RequestHandlerClass that we pass in will have a reference to the server object so we can access the model from within InferenceHandler.

1 import socketserver
2 import numpy as np
3 import cv2
4
5 class InferenceHandler(socketserver.BaseHandlerClass):
6  def pack_msg_length(self, msg: bytes):
7    return len(msg).tobytes(4, byteorder='little')
8    
9  def unpack_msg_length(self, msg: bytes):
10    return int.from_bytes(msg, byteorder='little')
11    
12  def unpack_image(self, data: bytes):
13    # Convert buffer into numpy array
14    img_buffer = np.frombuffer(data, dtype=np.uint8)
15    # Resize to the shape cv2.imdecode expects
16    img_buffer.resize(img_buffer.shape[0], 1)
17    # return the unpacked image 
18    return cv2.imdecode(img_buffer, cv2.IMREAD_COLOR)
19    
20  def pack_image(self, image):
21    ret, img_packed = cv2.imencode('.jpg', image)
22    if ret:
23      return img_packed
24    print('Failed to encode image')
25    return None
26    
27  def read(self):
28    msg_length = self.unpack_msg_length(self.request.recv(4))
29    byte_chunks = []
30    bytes_recvd = 0
31    while bytes_recvd < msg_length:
32      byte_chunk = self.request.recv(min(msg_length - bytes_recvd, 4092))
33      if byte_chunk == b'':
34        break
35      byte_chunks.append(byte_chunk)
36      bytes_recvd += len(byte_chunk)
37    
38    return b''.join(byte_chunks)
39  
40  def send(self, msg: bytes):
41    try:
42      msg_len = self.pack_msg_len(msg)
43      print(f'sending data of length {msg_len}')
44      self.request.sendall(msg_len)
45      self.request.sendall(data)
46      return True
47    except Exception as e:
48      print(f'Failed to msg to client with error {e}')
49    
50    return False
  • pack_msg_length

    • Returns the length of the message packed into 4 bytes.

  • unpack_msg_length

    • Returns the int value of the length of the message.

  • read

    • Optimized to read all of the data in the socket until it reaches the terminating character. Returns the message received.

  • send

    • Sends the length of the message in the first four bytes, then the rest of the message. Returns bool.

  • unpack_img

    • Takes the byte stream data and converts it into an image. Returns NumPy array containing image data.

  • pack_img

    • Takes the image and encodes it in the .jpg format. Returns NumPy array containing encoded image data.

Here we have defined a simple protocol for communication that needs to be followed by both the server and the client. When sending a message, the length of the message should be packed into the first four bytes. That value then can be used to read the rest of the message more efficiently. Also, the images are encoded/decoded using openCV and take the jpg format.

1 import socketserver
2
3 class InferenceHandler(socketserver.BaseHandlerClass):
4  ...
5  
6  def handle(self):
7    data = self.read()
8    image = self.unpack_img(data)
9    results = self.server.object_dect.detect_objects(image, confidence_level=.5)
10    final = edgeiq.markup_image(image, results.predictions, colors = self.server.object_dect.colors)
11    image = self.pack_img(final)
12    if image is None:
13      self.send(b'Failed to encode image')
14    else: 
15      self.send(image.tobytes())
16    
17    
18 if __name__ == '__main__':
19  host = ''
20  port = 6000
21  with InferenceServer(host, port, InferenceHandler) as server:
22    # Handle requests until explicitly called to shutdown()
23    server.serve_forever()
InferenceServer.handle() is pretty straightforward. This is the method that the InferenceServer will call to handle the new connection between the client and the server. We first receive the byte data from the client which we then decode. Next, we run object detection on the image and mark up the image. Lastly, we encode the image and send it to the client.
 
Updated Client

The directory for the client in my example is structured like so:

Screen Shot 2020-02-25 at 2.25.25 PM
1 import socket
2 import cv2
3 import numpy as np
4 import os
5
6 def pack_msg_len(data: bytes):
7  return len(msg).to_bytes(4, byteorder='little')
8
9 def unpack_msg_len(data: bytes):
10  return int.from_bytes(msg, byteorder='little')
11  
12 def read(sock):
13  msg_length = unpack_msg_len(sock.recv(4))
14  byte_chunks = []
15  bytes_recvd = 0
16  while bytes_recvd < msg_length:
17    byte_chunk = sock.recv(min(msg_length - bytes_recvd, 4092))
18    if byte_chunk == b'':
19      break
20    byte_chunks.append(byte_chunk)
21    bytes_recvd += len(byte_chunk)
22    
23  return b''.join(byte_chunks)
24  
25 def send(sock, data: bytes):
26  try:
27    msg_length = pack_msg_length(data)
28    sock.sendall(msg_length)
29    sock.sendall(data)
30    return True
31  except Exception as e:
32    print(f'Failed to send data with exception {e}')
33    
34  return False
35  
36 def unpack_image(data: bytes):
37  # Convert buffer into numpy array
38  img_buffer = np.frombuffer(data, dtype=np.uint8)
39  # Resize to the shape cv2.imdecode expects
40  img_buffer.resize(img_buffer.shape[0], 1)
41  # return the unpacked image 
42  return cv2.imdecode(img_buffer, cv2.IMREAD_COLOR)
43  
44 def pack_image(image, image_format):
45  ret, img_packed = cv2.imencode(image_format, image)
46  if ret:
47    return img_packed
48  print('Failed to encode image')
49  return None
50    
51 if __name__ == '__main__':
52  # Change this value to IP Address of the server
53  # If not running both the client and server on the same machine
54  host = ''
55  port = 5000
56  
57  # Make a list of string paths for each image in the images directory
58  images = list(map(lambda path: os.path.join('images', path), os.listdir('images')))
59  
60  for index, img_path in enumerate(images):
61    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
62      sock.connect((host, port))
63      
64      image = cv2.imread(img_path)
65      # Get image type from the path string
66      image_format = img_path[img_path.rfind('.'):]
67      
68      image_data = pack_image(image, image_format)
69      if image_data is not None:
70    
71        send(sock, image_data)
72        
73        recvd_data = read(sock)
74        image_final = unpack_image(recvd_data)
75        cv2.imwrite(f'inferenced/{index}.jpg', image_final)
76      
77      # Close socket
78      sock.close()

The helper functions are exactly the same as before from the InferenceHandler. Here, we iterate through each image in the ./images directory and send the data to the server. The image we receive back from the server is the inference image and we save those in the ./inferenced directory.

Next, deploy the inference server to a device and then start the application. Inside the client code, change the host to point to the IP address of the device where the server is running. Running the client should yield similar results as below.

Screen Shot 2020-02-26 at 11.49.15 AM

We can verify the results by checking some of the images in the inferenced folder.

2e855a04-b926-4afc-bb02-7465ee94e469 (1)
 
 
Conclusion

This tutorial covered the basics of socket programming, and demonstrated how to define a communication protocol between the server and client. Most programming languages offer a wrapper around sockets, which makes them great for inter-process communication. Leveraging socket programming allows an alwaysAI application to be integrated into cross-platform, multi-language environments. We can’t wait to see what you build with alwaysAI!

Get started now

We are providing professional developers with a simple and easy-to-use platform to build and deploy computer vision applications on edge devices. 

Get started