272 lines
10 KiB
Python
272 lines
10 KiB
Python
|
|
# -------------------------------------------------------------------
|
|
# ORTHOGRAPHIC
|
|
# Your personal aerial satellite. Always on. At any altitude.*
|
|
# Developed by MarStrMind
|
|
# License: Open Software License 3.0
|
|
# Up to date version always on marstr.online
|
|
# -------------------------------------------------------------------
|
|
# osmxml.py
|
|
# Performs calls to Overpass API to acquire XML files which we can
|
|
# then store into a much faster SQLite3 database.
|
|
# -------------------------------------------------------------------
|
|
|
|
|
|
import xml.dom.minidom
|
|
from pyexpat import ExpatError
|
|
|
|
import requests
|
|
import os
|
|
from defines import *
|
|
from log import *
|
|
import time
|
|
|
|
class mstr_osmxml:
|
|
def __init__(self):
|
|
#self._xmlfn = mstr_datafolder + "_cache/tile_" + str(lat) + "-" + str(v) + "_" + str(lng) + "-" + str(h) + ".xml"
|
|
self._xmldata = None
|
|
self._xmlcontent = ""
|
|
self._lat = 0
|
|
self._lng = 0
|
|
self._curB_lat = 0
|
|
self._curB_lng = 0
|
|
|
|
|
|
# Adjust bbox for when this class should persost, but acquire data for a different bbox
|
|
def adjust_bbox(self, lat, lng, lat_e, lng_e):
|
|
self._lat = round(lat, 4)
|
|
self._lng = round(lng, 4)
|
|
self._curB_lat = round(lat_e, 4)
|
|
self._curB_lng = round(lng_e, 4)
|
|
|
|
|
|
def generate_osm(self, v, h, asobject=False):
|
|
mstr_msg("osmxml", "Acquiring OSM data for " + str(self._lat)+","+str(self._lng)+" - "+str(self._curB_lat)+","+str(self._curB_lng))
|
|
|
|
# We will use our self-hosted API for this.
|
|
data = {
|
|
"bbox": {
|
|
"lat": str(self._lat),
|
|
"lng": str(self._lng),
|
|
"lat_b": str(self._curB_lat),
|
|
"lng_b": str(self._curB_lng)
|
|
},
|
|
"tile_lat": str(self._lat),
|
|
"tile_lng": str(self._lng),
|
|
"square_lat": str(v),
|
|
"square_lng": str(h),
|
|
"as_pbf": "true"
|
|
}
|
|
r = requests.post(mstr_osm_endpoint, json=data)
|
|
|
|
|
|
# Acquire XMLs in chunks, then store them
|
|
def acquire_osm(self, v, h):
|
|
mstr_msg("osmxml", "Acquiring OSM data for " + str(self._lat)+","+str(self._lng)+" - "+str(self._curB_lat)+","+str(self._curB_lng))
|
|
|
|
# We will use our self-hosted API for this.
|
|
parse = False
|
|
while parse == False:
|
|
data = {
|
|
"bbox": {
|
|
"lat": str(self._lat),
|
|
"lng": str(self._lng),
|
|
"lat_b": str(self._curB_lat),
|
|
"lng_b": str(self._curB_lng)
|
|
},
|
|
"tile_lat": str(self._lat),
|
|
"tile_lng": str(self._lng),
|
|
"square_lat": str(v),
|
|
"square_lng": str(h)
|
|
}
|
|
r = requests.post(mstr_osm_endpoint, json=data)
|
|
|
|
try:
|
|
# Attempt to parse the XML string
|
|
dom = xml.dom.minidom.parseString(r.content)
|
|
|
|
# Check if the DOM object has a document element
|
|
if dom.documentElement:
|
|
# Store the content in memory
|
|
self._xmlcontent = r.content
|
|
self._xmldata = xml.dom.minidom.parseString(self._xmlcontent)
|
|
self._xmlcontent = "" # Clear
|
|
parse = True
|
|
|
|
except ExpatError as e:
|
|
parse = False
|
|
time.sleep(1)
|
|
except Exception as e:
|
|
parse = False
|
|
time.sleep(1)
|
|
|
|
|
|
# Get all nodes from the specified OSM file
|
|
def acquire_nodes(self):
|
|
#xml_doc = xml.dom.minidom.parse(xmlfile)
|
|
nodedata = self._xmldata.getElementsByTagName("node")
|
|
nodes = []
|
|
for node in nodedata:
|
|
p = (node.getAttribute("id"), node.getAttribute("lat"), node.getAttribute("lon"))
|
|
nodes.append(p)
|
|
return nodes
|
|
|
|
|
|
# Get all waypoint data
|
|
def acquire_waypoint_data(self):
|
|
#xml_doc = xml.dom.minidom.parse(xmlfile)
|
|
wpdata = self._xmldata.getElementsByTagName("way")
|
|
wps = []
|
|
for wp in wpdata:
|
|
nddata = wp.getElementsByTagName("nd")
|
|
for nd in nddata:
|
|
p = (wp.getAttribute("id"), nd.getAttribute("ref"), "", "")
|
|
wps.append(p)
|
|
for wp in wpdata:
|
|
tagdata = wp.getElementsByTagName("tag")
|
|
for tag in tagdata:
|
|
c = tag.getAttribute("v").replace(",", "")
|
|
c = c.replace("'", "")
|
|
p = (wp.getAttribute("id"), "NULL", tag.getAttribute("k"), c)
|
|
wps.append(p)
|
|
return wps
|
|
|
|
|
|
# Checks if there is an airport or many airports with an ICAO code in the
|
|
# supplied XML data chunk
|
|
def find_icao_codes(self):
|
|
icao = []
|
|
#xml_doc = xml.dom.minidom.parse(xmlfile)
|
|
wpdata = self._xmldata.getElementsByTagName("way")
|
|
for wp in wpdata:
|
|
tags = wp.getElementsByTagName("tag")
|
|
for tag in tags:
|
|
a = tag.getAttribute("k")
|
|
if a == "icao":
|
|
v = tag.getAttribute("v")
|
|
icao.append(v)
|
|
# Return list of found airports
|
|
return icao
|
|
|
|
|
|
# Finds the surface type of a runway in the current data chunk.
|
|
# If no surface type is specified, the runway will be rendered similar
|
|
# to how motorways are rendered.
|
|
# This gets called only if some ICAO code was found
|
|
def find_runway_surface(self):
|
|
surface = ""
|
|
wpid = ""
|
|
#xml_doc = xml.dom.minidom.parse(xmlfile)
|
|
wpdata = self._xmldata.getElementsByTagName("way")
|
|
for wp in wpdata:
|
|
tags = wp.getElementsByTagName("tag")
|
|
for tag in tags:
|
|
a = tag.getAttribute("k")
|
|
v = tag.getAttribute("v")
|
|
if a == "aeroway" and v == "runway":
|
|
wpid = wp.getAttribute("id")
|
|
break
|
|
for wp in wpdata:
|
|
wid = wp.getAttribute("id")
|
|
if wid == wpid:
|
|
tags = wp.getElementsByTagName("tag")
|
|
for tag in tags:
|
|
a = tag.getAttribute("k")
|
|
v = tag.getAttribute("v")
|
|
if a == "surface":
|
|
surface = v
|
|
|
|
# Return the found surface type
|
|
return surface
|
|
|
|
|
|
# Find the levels of a building (for shadow rendering)
|
|
def find_building_levels(self, way_id):
|
|
lvl = 2 # Standard if we don't find anything else
|
|
#xml_doc = xml.dom.minidom.parse(mstr_datafolder + "_cache/tile.xml")
|
|
wpdata = self._xmldata.getElementsByTagName("way")
|
|
for wp in wpdata:
|
|
if wp.getAttribute("id") == way_id:
|
|
tags = wp.getElementsByTagName("tag")
|
|
for tag in tags:
|
|
a = tag.getAttribute("k")
|
|
v = tag.getAttribute("v")
|
|
if a == "building:levels":
|
|
lvl = int(float(v)) # <- This blew layergen and maskgen at some buildings with 1.5 floors
|
|
break
|
|
return lvl
|
|
|
|
# Find minimum level of a building
|
|
def find_building_minlevel(self, way_id):
|
|
lvl = 0 # Standard if we don't find anything else
|
|
#xml_doc = xml.dom.minidom.parse(mstr_datafolder + "_cache/tile.xml")
|
|
wpdata = self._xmldata.getElementsByTagName("way")
|
|
for wp in wpdata:
|
|
if wp.getAttribute("id") == way_id:
|
|
tags = wp.getElementsByTagName("tag")
|
|
for tag in tags:
|
|
a = tag.getAttribute("k")
|
|
v = tag.getAttribute("v")
|
|
if a == "building:min_level":
|
|
lvl = int(float(v))
|
|
break
|
|
return lvl
|
|
|
|
# Find the roof shape
|
|
def find_building_roof_shape(self, way_id):
|
|
rs = "flat" # Standard if we don't find anything else
|
|
#xml_doc = xml.dom.minidom.parse(mstr_datafolder + "_cache/tile.xml")
|
|
wpdata = self._xmldata.getElementsByTagName("way")
|
|
for wp in wpdata:
|
|
if wp.getAttribute("id") == way_id:
|
|
tags = wp.getElementsByTagName("tag")
|
|
for tag in tags:
|
|
a = tag.getAttribute("k")
|
|
v = tag.getAttribute("v")
|
|
if a == "roof:shape":
|
|
rs = v
|
|
break
|
|
return rs
|
|
|
|
# Find the roof levels
|
|
def find_building_roof_levels(self, way_id):
|
|
lvl = 0 # Standard if we don't find anything else
|
|
#xml_doc = xml.dom.minidom.parse(mstr_datafolder + "_cache/tile.xml")
|
|
wpdata = self._xmldata.getElementsByTagName("way")
|
|
for wp in wpdata:
|
|
if wp.getAttribute("id") == way_id:
|
|
tags = wp.getElementsByTagName("tag")
|
|
for tag in tags:
|
|
a = tag.getAttribute("k")
|
|
v = tag.getAttribute("v")
|
|
if a == "roof:levels":
|
|
lvl = int(v)
|
|
break
|
|
return lvl
|
|
|
|
|
|
# It turns out that some features hide themselves in the relations section.
|
|
# I figured this out during testing, and almost going insane over the
|
|
# question as to why some parts like forests are missing in the masks, while
|
|
# it is clearly labeled as forest on openstreetmap. We need to scan
|
|
# the relations and then scan through the outer and inner way IDs.
|
|
#
|
|
# Get all relation entries of importance.
|
|
def acquire_relations(self):
|
|
#xml_doc = xml.dom.minidom.parse(xmlfile)
|
|
rlxml = self._xmldata.getElementsByTagName("relation")
|
|
rls = []
|
|
for rl in rlxml:
|
|
rldata = rl.getElementsByTagName("member")
|
|
tagdata = rl.getElementsByTagName("tag")
|
|
for rp in rldata:
|
|
t = rp.getAttribute("role")
|
|
if t == "inner" or t == "outer":
|
|
tgd = []
|
|
for i in tagdata:
|
|
tgd.append(i.getAttribute("k"))
|
|
tgd.append(i.getAttribute("v"))
|
|
rls.append((rp.getAttribute("ref"), tgd))
|
|
return rls
|
|
|