Jelajahi Sumber

Add prob_map

Zhilong Li 4 tahun lalu
induk
melakukan
c0a4e91d89
1 mengubah file dengan 338 tambahan dan 0 penghapusan
  1. 338 0
      ProbMap.py

+ 338 - 0
ProbMap.py

@@ -0,0 +1,338 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import numpy as np
+import logging
+
+"""
+Tracking implementation for the perimeter monitoring problem
+
+Implementations
+--------------- 
+1. ProbMap := Probability map for estimating targets position
+
+References
+----------
+[1] Hu, J.; Xie, L.; Lum, K.Y.; Xu, J. Multiagent Information Fusion and 
+    Cooperative Control in Target Search. IEEE Trans. Control Syst. Technol.
+    2013, 21, 1223–1235.
+
+"""
+
+
+class ProbMap:
+
+    def __init__(self, width_meter, height_meter, resolution,
+                 center_x, center_y, init_val=0.01, false_alarm_prob=0.05):
+        """Generate a probability map
+
+        Args:
+            width_meter (int): width of the area [m]
+            height_meter (int): height of the area [m]
+            resolution (float): grid resolution [m]
+            center_x (float): center x position  [m]
+            center_y (float): center y position  [m]
+            init_val (float, optional): Initial value for all cells. Defaults to 0.01.
+            false_alarm_prob (float, optional): False alarm probability of the detector. Defaults to 0.05.
+        """
+        # TODO make this grid map unlimited, deprecate the width and height params
+        # number of cells for width
+        self.width = int(np.ceil(width_meter / resolution))
+        # number of cells for height
+        self.height = int(np.ceil(height_meter / resolution))
+        self.resolution = resolution
+        self.center_x = center_x
+        self.center_y = center_y
+        self.init_val = init_val
+        self.false_alarm_prob = false_alarm_prob
+
+        self._left_lower_x = self.center_x - self.width / 2.0 * self.resolution
+        self._left_lower_y = self.center_y - self.height / 2.0 * self.resolution
+
+        self.ndata = self.width * self.height
+        # this stores all data, {grid_inx: grid_value}
+        self.non_empty_cell = dict()
+
+    def _calc_xy_index_from_pos(self, pos, lower_pos, max_index):
+        """Calculate the grid index by position
+        """
+        ind = int(np.floor((pos - lower_pos) / self.resolution))
+        if not 0 <= ind <= max_index:
+            # XXX may not need this warning
+            logging.warning("Position not within the area")
+        return ind
+
+    def _calc_pos_from_xy_index(self, ind, lower_pos, _max_index):
+        """Calculate the position by grid index
+        """
+        pos = ind*self.resolution+lower_pos+self.resolution/2.0
+        return pos
+
+    def get_value_from_xy_index(self, index):
+        # type: (tuple) -> None
+        """Get the value from given cell
+        """
+        return self.non_empty_cell[index]
+
+    def get_xy_index_from_xy_pos(self, x_pos, y_pos):
+        """Get grid index from position
+
+        Args:
+            x_pos ([type]): x position [m]
+            y_pos ([type]): y position [m]
+
+        Returns:
+            tuple: the grid index of self.non_empty_cell
+        """
+        x_ind = self._calc_xy_index_from_pos(
+            x_pos, self._left_lower_x, self.width)
+        y_ind = self._calc_xy_index_from_pos(
+            y_pos, self._left_lower_y, self.height)
+        return tuple([int(x_ind), int(y_ind)])
+
+    def get_xy_pos_from_xy_index(self, x_ind, y_ind):
+        """get_xy_pos_from_xy_index
+        """
+        x_pos = self._calc_pos_from_xy_index(
+            x_ind, self._left_lower_x, self.width)
+        y_pos = self._calc_pos_from_xy_index(
+            y_ind, self._left_lower_y, self.height)
+        return tuple([x_pos, y_pos])
+
+    def get_value_from_xy_pos(self, x_pos, y_pos):
+        cell_ind = self.get_xy_index_from_xy_pos(x_pos, y_pos)
+        return self.get_value_from_xy_index(cell_ind)
+
+    def set_value_from_xy_index(self, index, val):
+        """Stores the value in grid map
+
+        Args:
+            index (tuple): 2D tuple of x, y coordinates.
+            val (float): Value that needs to be stored.
+        """
+        # If Q value after update is small enough to make the probability be zero,
+        # it's safe to delete the cell for a better memory usage
+        if val == 700.0:
+            self.delete_value_from_xy_index(index)
+        else:
+            self.non_empty_cell[index] = val
+
+    def delete_value_from_xy_index(self, index):
+        """Delete the item from grid map
+
+        Args:
+            index (tuple): 2D tuple of x, y coordinates.
+        """
+        try:
+            del self.non_empty_cell[index]
+        except KeyError:
+            pass
+
+    def generate_shareable_v(self, local_measurement):
+        # type: (dict) -> dict
+        """Generate the shareable information from local detection
+
+        Args:
+            local_measurement (dict): local detections
+
+        Returns:
+            dict: converted shareable detection info
+        """
+        meas_index = dict()
+        for _target_id, meas in local_measurement.items():
+            x_pos, y_pos, meas_confidence = meas
+            point_ind = tuple(
+                self.get_xy_index_from_xy_pos(x_pos, y_pos))
+            # meas_index[point_ind] = meas_confidence
+            meas_index[point_ind] = np.log(
+                self.false_alarm_prob/meas_confidence)
+        return meas_index
+
+    def generate_zero_meas(self):
+        def cut(x): return 1e-6 if x <= 1e-6 else 1 - \
+            1e-6 if x >= 1-1e-6 else x
+        meas_confidence = cut(np.random.normal(0.85, 0.1))
+        x = np.log((1-self.false_alarm_prob)/(1-meas_confidence))
+        return x
+
+    def map_update(self, local_measurement, neighbor_measurement, N, d):
+        """Update the probability map using measurements from local and neighbors
+
+        Args:
+            local_measurement (dict): Contains local detections like {id1:[x1, y1, confidence1], id2:[x2, y2, confidence2]}
+            neighbor_measurement (dict): Contains neighbors' detections
+            N (int): Number of all trackers (working on the same perimeter)
+            d (int): Number of all neighbors
+        """
+
+        def bound_Q(Q):
+            # 700 is big enough to make 1/(1+exp(700)) -> 0 and 1/(1+exp(-700)) -> 1
+            return max(min(Q, 700), -700)
+
+        # Get the weight of measurements
+        weight_local = 1. - (d-1.)/N
+        weight_neighbor = 1./N
+
+        # Time decaying factor
+        # NOTE Fine tune this param to get a good performance
+        alpha = 5
+        T = 0.1
+        decay_factor = np.exp(-alpha*T)
+        # The diagram below shows the composition of the information for each update
+        # ┌─────────────────────────────────────────────────────┐
+        # │ Whole area                  .─────────.             │
+        # │                          ,─'   Local   '─.          │
+        # │             .─────────.,'   measurement   `.        │
+        # │          ,─' Existing ╱'─.                  ╲       │
+        # │        ,'      Cell  ;    `.                 :      │
+        # │      ,'              │  2   `.      5        │      │
+        # │     ;                │        :              │      │
+        # │     ;                :       .─────────.     ;      │
+        # │    ;                  ╲   ,─'  :        '─. ╱       │
+        # │    │                   ╲,'  4  │       6   `.       │
+        # │    │        1          ╱`.     │         ,'  ╲      │
+        # │    :                  ;   '─.  ;      ,─'     :     │
+        # │     :                 │      `───────'        │     │
+        # │     :                 │  3    ;               │     │
+        # │      ╲                :      ╱           7    ;     │
+        # │       `.               ╲   ,'                ╱      │
+        # │         `.              ╲,'   Neighbor      ╱       │
+        # │           '─.         ,─'`.  measurement  ,'        │
+        # │              `───────'     '─.         ,─'          │
+        # │                               `───────'             │
+        # └─────────────────────────────────────────────────────┘
+
+        # update all existing grids (Area 1,2,3,4)
+        for cell_ind in list(self.non_empty_cell):
+            # Check if it's in area 2 or 4
+            if cell_ind in local_measurement:
+                v_local = local_measurement[cell_ind]
+                del local_measurement[cell_ind]
+            else:
+                # If not, we believe there is no targets in that grid
+                v_local = self.generate_zero_meas()
+
+            if cell_ind in neighbor_measurement:
+                v_neighbors = neighbor_measurement[cell_ind]
+                del neighbor_measurement[cell_ind]
+            else:
+                v_neighbors = sum(
+                    [self.generate_zero_meas() for i in range(d)])
+
+            Q = weight_local*(self.non_empty_cell[cell_ind] + v_local) + weight_neighbor * (
+                d*self.non_empty_cell[cell_ind]+v_neighbors)
+            self.set_value_from_xy_index(cell_ind, bound_Q(decay_factor * Q))
+
+        # If got measurement for a new grid (Grids in area 5, 6, 7)
+        else:
+            # get the union set of all remaining measurements (Union of area 5, 6, 7)
+            all_meas = set(list(local_measurement) +
+                           list(neighbor_measurement))
+            for cell_ind in all_meas:
+                try:
+                    v_local = local_measurement[cell_ind]
+                except KeyError:
+                    v_local = self.generate_zero_meas()
+                try:
+                    v_neighbors = neighbor_measurement[cell_ind]
+                except KeyError:
+                    v_neighbors = sum(
+                        [self.generate_zero_meas() for i in range(d)])
+                Q = weight_local*(self.init_val + v_local) + weight_neighbor * (
+                    d*self.init_val+v_neighbors)
+                self.set_value_from_xy_index(
+                    cell_ind, bound_Q(decay_factor * Q))
+
+    def consensus(self, neighbors_map):
+        # type: (dict) -> None
+        """Merge neighbors map into local map and make a consensus
+
+        Args:
+            neighbors_map (dict): Contains all values from neighbors and have a count of it. Format: {(x, y):[value, count]}
+        """
+        for cell_ind, value in self.non_empty_cell.items():
+            if cell_ind in neighbors_map.keys():
+                # Calculate the average value of Q
+                Q = (neighbors_map[cell_ind][0]+value) / \
+                    (neighbors_map[cell_ind][1]+1)
+                self.set_value_from_xy_index(cell_ind, Q)
+                del neighbors_map[cell_ind]
+        else:
+            for cell_ind, value_and_count in neighbors_map.items():
+                Q = value_and_count[0]/value_and_count[1]
+                self.set_value_from_xy_index(cell_ind, Q)
+
+    def convert_to_prob_map(self, threshold, normalization=False):
+        """Convert log value to probability value [0~1]
+
+        Args:
+            threshold (float): Values higher than this will be returned
+        """
+        lower_threshold = 0.2
+        if threshold < 0.5:
+            # shrink the lower threshold value
+            lower_threshold *= threshold
+            logging.warning(
+                "Got probability threshold smaller than 0.5, it's not recommended.")
+        self.prob_map = dict()
+
+        max_prob = lower_threshold
+        if normalization:
+            # Generate the full prob map
+            for cell_ind in list(self.non_empty_cell):
+                value = self.non_empty_cell[cell_ind]
+                # Decode the probability value
+                prob = 1./(1.+np.exp(value))
+                if prob > max_prob:
+                    max_prob = prob
+                self.prob_map[cell_ind] = prob
+            # Normalize the whole map and delete data which is small enough
+            for cell_ind in list(self.prob_map):
+                factor = 1/max_prob
+                normed_prob = factor*self.prob_map[cell_ind]
+                if normed_prob >= threshold:
+                    self.prob_map[cell_ind] = normed_prob
+                else:
+                    del self.prob_map[cell_ind]
+
+                if normed_prob < lower_threshold:
+                    # keep some uncertainty between the lower and upper thresholds
+                    self.delete_value_from_xy_index(cell_ind)
+        else:
+            for cell_ind in list(self.non_empty_cell):
+                value = self.non_empty_cell[cell_ind]
+                # Decode the probability value
+                prob = 1./(1.+np.exp(value))
+                if prob >= threshold:
+                    self.prob_map[cell_ind] = prob
+                elif prob < lower_threshold:
+                    # keep some uncertainty between the lower and upper thresholds
+                    self.delete_value_from_xy_index(cell_ind)
+
+    def get_target_est(self, threshold, normalization=False):
+        """Get all targets' estimated position
+
+        Args:
+            threshold (float): Probability threshold value to filter out the targets
+
+        Returns:
+            list: Targets' position
+        """
+        # if normalization:
+        #     logging.warning(
+        #         "Using normalization for PorbMap, the real probability will be hidden.")
+        self.convert_to_prob_map(threshold, normalization)
+        targets_est = list(self.prob_map.keys())
+        for i in range(len(targets_est)):
+            # XXX since we don't need z-data, I put a placeholder here
+            x, y = self.get_xy_pos_from_xy_index(
+                targets_est[i][0], targets_est[i][1])
+            targets_est[i] = [x, y, 150]
+        return targets_est
+
+
+class ProbMapData:
+    def __init__(self):
+        self.myid = -1
+        self.type = 'n'
+        self.grid_ind = list()
+        self.values = list()