| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- import logging
- from collections import deque
- import matplotlib.pyplot as plt
- import numpy as np
- from matplotlib import cm
- from matplotlib.ticker import LinearLocator
- from target import Target
- from tracker import Tracker
- import sys
- import numpy
- numpy.set_printoptions(threshold=sys.maxsize)
- class Simsim:
- class Topics:
- def __init__(self) -> None:
- self.topics = dict()
- def add_topic(self, topic_name):
- self.topics[topic_name] = deque(maxlen=20)
- def del_topic(self, topic_name):
- try:
- del self.topics[topic_name]
- except:
- pass
- def publish(self, topic_name, msg):
- self.topics[topic_name].append(msg)
- def subs(self, topic_name):
- return self.topics[topic_name]
- def __init__(self) -> None:
- self.trackers = []
- self.edges = []
- self.targets = []
- self.map_size = [1000, 1000]
- self.rate = 30
- self.topics = self.Topics()
- plt.ion()
- # self.fig, (self.plt_sim, self.plt_pm) = plt.subplots(
- # 1, 2, figsize=(10, 5), dpi=160)
- fig = plt.figure(figsize=(20, 8))
- self.plt_sim = fig.add_subplot(121)
- # plt.axis('equal')
- self.plt_pm = fig.add_subplot(122, projection='3d')
- # plt.axis('equal')
- self.plt_sim.axis('equal')
- def add_tracker(self, name, position, sensor_rad):
- tracker = Tracker(self, name, len(self.trackers),
- position, sensor_rad)
- self.trackers.append(tracker)
- def add_edges(self, edges):
- self.edges.extend(edges)
- for e in edges:
- self.trackers[e[0]].neighbor.add(e[1])
- self.trackers[e[1]].neighbor.add(e[0])
- def add_target(self, name, position):
- target = Target(self, name, len(self.targets), position)
- self.targets.append(target)
- def _update_all(self):
- """Simulate once, update all trackers and targets
- """
- for tracker in self.trackers:
- tracker.job()
- for target in self.targets:
- target.job()
- def run(self, log_lvl=logging.WARN, ground_truth=False):
- logging.basicConfig(format='%(asctime)s.%(msecs)03d %(levelname)s: %(message)s',
- datefmt='%m/%d/%Y %H:%M:%S', level=log_lvl)
- while 1:
- self._update_all()
- self.plt_sim.cla()
- self.plt_pm.cla()
- self.plt_sim.set_xlim(0, self.map_size[0])
- self.plt_sim.set_ylim(0, self.map_size[1])
- self.plt_pm.set_xlim(1000, 2000)
- self.plt_pm.set_ylim(1000, 2000)
- if ground_truth:
- for target in self.targets:
- self.plt_sim.scatter(
- target.position[0], target.position[1], marker='x', s=2, color='r')
- for e in self.edges:
- t0 = self.trackers[e[0]]
- t1 = self.trackers[e[1]]
- self.plt_sim.plot([t0.position[0], t1.position[0]],
- [t0.position[1], t1.position[1]],
- linewidth=1, color='g', alpha=0.5)
- Z = np.zeros([2000, 2000])
- for t in self.trackers:
- # draw the trackers
- self.plt_sim.scatter(t.position[0], t.position[1],
- marker='s', s=20, c='b')
- self.plt_sim.annotate(t.id, (t.position[0], t.position[1]+10))
- # draw the camera coverage
- circle = plt.Circle(
- (t.position), t.sensor.coverage_radius, fill=False, color='grey', alpha=0.3)
- self.plt_sim.add_patch(circle)
- for est in t.target_estimates:
- det_pos = est[0:2]
- # det_abs_pos = det_pos+t.position
- self.plt_sim.scatter(det_pos[0], det_pos[1],
- marker='^', s=2)
- # t = self.trackers[2]
- for ind in t.prob_map.prob_map:
- Z[ind] = t.prob_map.prob_map[ind]
- for dx in [-2, -1, 0, 1, 2]:
- for dy in [-2, -1, 0, 1, 2]:
- if Z[ind[0]+dx, ind[1]+dy] <= t.prob_map.prob_map[ind]:
- Z[ind[0]+dx, ind[1] +
- dy] = t.prob_map.prob_map[ind]
- # print(Z)
- X = np.arange(1000, 2000, 1)
- Y = np.arange(1000, 2000, 1)
- X, Y = np.meshgrid(X, Y)
- self.plt_pm.set_zlim(0, 1.01)
- # print(Z)
- self.plt_pm.plot_surface(
- X, Y, Z[1000:, 1000:], cmap='RdBu_r', rcount=150, ccount=150, antialiased=True)
- self.plt_pm.view_init(elev=35., azim=0)
- plt.gca().invert_yaxis()
- # self.plt_pm.plot_surface(X, Y, Z, cmap=cm.bwr, linewidth=5, antialiased=True)
- self.plt_pm.zaxis.set_major_locator(LinearLocator(10))
- plt.pause(1/self.rate)
|