Avatar Prakarsh is currently a senior at Ajay Kumar Garg Engineering College, pursuing Electronics and Instrumentation Engineering as his major. His research interests include Reinforcement Learning, Deep Learning, Autonomous Systems, and Embodied AI.

Week 9- multiprocessing spreading out bugfixes

Week 9

  • Lastly the using of different model, i.e., iris was decided for optimisation and multiprocessing and model change stats were shown in rotors. The PR & issue are in this post and the updates in this week’s post.*

How did I contributed in this week?

I spread the multiprocessing backend to the remaining drone based exercises which had their own specific changes in the general template like way of variable sharing and some bugs while implementing.

  • The PR and issue associated with it is this which can be referred for all the commits.

Bugs and confusions faced with some methods learnt for debugging:

  • In the beginning I needed the namespace as a parameter for rotors_driver which led to tricky code change but was then later realised that launching nodes in one group will automatically provide the namespace even without providing the rotors_driver node the param.

  • Usually while running the docker container I wasn’t able to see the printed message to tell me where does the code breaks as there were multiple processes being launched from manager.py. Only the messages in the manager were printed. What I found a workaround was try and except block, i.e., to put a statement at the line which you think is breaking the code which for sure will raise an error.

  • Down below is only changed value.py script from shared folder for position control for variable sharing

import numpy as np
import mmap
from posix_ipc import Semaphore, O_CREX, ExistentialError, O_CREAT, SharedMemory, unlink_shared_memory
from ctypes import sizeof, memmove, addressof, create_string_buffer, c_float
import struct

class SharedValue:
    def __init__(self, name):
        # Initialize varaibles for memory regions and buffers and Semaphore
        self.shm_buf = None; self.shm_region = None
        self.value_lock = None

        self.shm_name = name; self.value_lock_name = name

        # Initialize shared memory buffer
       # try:
        #    self.shm_region = SharedMemory(self.shm_name)
         #   self.shm_buf = mmap.mmap(self.shm_region.fd, sizeof(c_float))
          #  self.shm_region.close_fd()
        #except ExistentialError:
        #    self.shm_region = SharedMemory(self.shm_name, O_CREAT, size=sizeof(c_float))
        #    self.shm_buf = mmap.mmap(self.shm_region.fd, self.shm_region.size)
        #    self.shm_region.close_fd()

        # Initialize or retreive Semaphore
        try:
            self.value_lock = Semaphore(self.value_lock_name, O_CREX)
        except ExistentialError:
            value_lock = Semaphore(self.value_lock_name, O_CREAT)
            value_lock.unlink()
            self.value_lock = Semaphore(self.value_lock_name, O_CREX)

        self.value_lock.release()

    # Get the shared value
    def get(self, type_name= "value"):
        # Retreive the data from buffer
        if type_name=="value":
            try:
                self.shm_region = SharedMemory(self.shm_name)
                self.shm_buf = mmap.mmap(self.shm_region.fd, sizeof(c_float))
                self.shm_region.close_fd()
            except ExistentialError:
                self.shm_region = SharedMemory(self.shm_name, O_CREAT, size=sizeof(c_float))
                self.shm_buf = mmap.mmap(self.shm_region.fd, self.shm_region.size)
                self.shm_region.close_fd()
            self.value_lock.acquire()
            value = struct.unpack('f', self.shm_buf)[0]
            self.value_lock.release()

            return value
        elif  type_name=="list":
            self.shm_region = SharedMemory(self.shm_name)
            self.shm_buf = mmap.mmap(self.shm_region.fd, sizeof(c_float))
            self.shm_region.close_fd()
            self.value_lock.acquire()
            array_val = np.ndarray(shape=(6,),
                                dtype='float32', buffer=self.shm_buf)
            self.value_lock.release()

            return array_val
        
        else:
            print("missing argument for return type")
     

    # Add the shared value
    def add(self, value, type_name= "value"):
        # Send the data to shared regions
        if type_name=="value":
            try:
                self.shm_region = SharedMemory(self.shm_name)
                self.shm_buf = mmap.mmap(self.shm_region.fd, sizeof(c_float))
                self.shm_region.close_fd()
            except ExistentialError:
                self.shm_region = SharedMemory(self.shm_name, O_CREAT, size=sizeof(c_float))
                self.shm_buf = mmap.mmap(self.shm_region.fd, self.shm_region.size)
                self.shm_region.close_fd()

            self.value_lock.acquire()
            self.shm_buf[:] = struct.pack('f', value)
            self.value_lock.release()
        elif  type_name=="list":
            byte_size = value.nbytes
            self.shm_region = SharedMemory(self.shm_name, O_CREAT, size=byte_size)
            self.shm_buf = mmap.mmap(self.shm_region.fd, byte_size)
            self.shm_region.close_fd()
            self.value_lock.acquire()
            self.shm_buf[:] = value.tobytes()
            self.value_lock.release()

    # Destructor function to unlink and disconnect
    def close(self):
        self.value_lock.acquire()
        self.shm_buf.close()

        try:
            unlink_shared_memory(self.shm_name)
        except ExistentialError:
            pass

        self.value_lock.release()
        self.value_lock.close()