overhaul wifi location code

This commit is contained in:
Laura Klünder 2018-12-24 18:25:26 +01:00
parent 3adb6b662e
commit 3944ea5765

View file

@ -29,8 +29,9 @@ class Locator:
spaces = {}
for space in Space.objects.prefetch_related('wifi_measurements'):
new_space = LocatorSpace(
LocatorPoint.from_measurement(measurement, stations)
for measurement in space.wifi_measurements.all()
pk=space.pk,
points=(LocatorPoint.from_measurement(measurement, stations)
for measurement in space.wifi_measurements.all())
)
if new_space.points:
spaces[space.pk] = new_space
@ -67,43 +68,25 @@ class Locator:
scan = LocatorPoint.clean_scan(scan, ignore_invalid_stations=True)
scan_values = LocatorPoint.convert_scan(scan, self.stations, create=False)
station_ids = frozenset(scan_values.keys())
if not scan_values:
return None
# convert scan values
scan_values = {station_id: value**3 for station_id, value in scan_values.items()}
# get visible spaces
spaces = tuple((pk, space, station_ids & space.stations_set)
for pk, space in self.spaces.items()
if pk not in restrictions.spaces)
spaces = tuple(space for pk, space in self.spaces.items() if pk not in restrictions.spaces)
# get relevant spaces (they should contain the best station at least once)
# find best point
best_station_id = max(scan_values.items(), key=operator.itemgetter(1))[0]
spaces = tuple((pk, space, station_ids)
for pk, space, station_ids in spaces
if station_ids and best_station_id in station_ids)
if not spaces:
return None
# get good spaces
good_spaces = tuple((pk, space, space_station_ids)
for pk, space, space_station_ids in spaces
if len(space_station_ids) >= 3)
if not good_spaces:
for station_id in station_ids:
scan_values[station_id] = 0
good_spaces = spaces
best_location = None
best_score = float('inf')
for pk, space, station_ids in good_spaces:
point, score = space.get_best_point(scan_values, station_ids, needed_station_id=best_station_id)
for space in spaces:
point, score = space.get_best_point(scan_values, needed_station_id=best_station_id)
if point is None:
continue
if score < best_score:
location = CustomLocation(router.spaces[pk].level, point.x, point.y,
location = CustomLocation(router.spaces[space.pk].level, point.x, point.y,
permissions=permissions, icon='my_location')
best_location = location
best_score = score
@ -130,7 +113,10 @@ class LocatorStations:
class LocatorSpace:
def __init__(self, points):
no_signal = int(-90)**3
def __init__(self, pk, points):
self.pk = pk
self.points = tuple(points)
self.stations_set = reduce(operator.or_, (frozenset(point.values.keys()) for point in self.points), frozenset())
self.stations = tuple(self.stations_set)
@ -141,20 +127,35 @@ class LocatorSpace:
for station_id, value in point.values.items():
self.levels[i][self.stations_lookup[station_id]] = int(value)**3
def get_best_point(self, scan_values, station_ids, needed_station_id=None):
stations = tuple(self.stations_lookup[station_id] for station_id in station_ids)
values = np.array(tuple(scan_values[station_id]**3 for station_id in station_ids), dtype=np.int64)
acceptable_points = tuple(
np.argwhere(self.levels[:, self.stations_lookup[needed_station_id]] > int(-90)**3).ravel()
)
if not acceptable_points:
def get_best_point(self, scan_values, needed_station_id=None):
# check if this space knows the needed station id, otherwise no results here
if needed_station_id not in self.stations_set:
return None, None
scores = np.sum((self.levels[np.array(acceptable_points, dtype=np.uint32).reshape((-1, 1)),
stations]-values)**2, axis=1) / len(stations)
best_acceptable_point = np.argmin(scores).ravel()[0]
best_point = acceptable_points[best_acceptable_point]
return self.points[best_point], scores[best_acceptable_point]
# stations that this space knows
station_ids = frozenset(scan_values.keys()) & self.stations_set
penalty = 0
for station_id, value in scan_values.items():
if station_id not in self.stations_set:
penalty += (value - self.no_signal)**2
stations = tuple(self.stations_lookup[station_id] for station_id in station_ids)
values = np.array(tuple(scan_values[station_id] for station_id in station_ids), dtype=np.int64)
# acceptable points need to have a value for the needed_station_id
points = tuple(
np.argwhere(self.levels[:, self.stations_lookup[needed_station_id]] > self.no_signal).ravel()
)
if not points:
return None, None
scores = np.sum(
(self.levels[np.array(points, dtype=np.uint32).reshape((-1, 1)), stations] - values)**2,
axis=1
) / len(stations)
best_point_i = np.argmin(scores).ravel()[0]
best_point = points[best_point_i]
return self.points[best_point], scores[best_point_i]+penalty
class LocatorPoint(namedtuple('LocatorPoint', ('x', 'y', 'values'))):