领先的免费Web技术教程,涵盖HTML到ASP.NET

网站首页 > 知识剖析 正文

Elasticsearch(GEO)数据写入和空间检索

nixiaole 2024-11-24 19:57:24 知识剖析 29 ℃

Elasticsearch简介

什么是 Elasticsearch?

Elasticsearch 是一个开源的分布式 RESTful搜索和分析引擎,能够解决越来越多不同的应用场景。

本文内容

本文主要是介绍了ES GEO数据写入和空间检索,ES版本为7.3.1

数据准备

Qgis使用渔网工具,对范围进行切割,得到网格的Geojson

新建索引设置映射

def set_mapping(es,index_name="content_engine",doc_type_name="en",my_mapping={}):

# ignore 404 and 400

es.indices.delete(index=index_name, ignore=[400, 404])

print("delete_index")

# ignore 400 cause by IndexAlreadyExistsException when creating an index

my_mapping = {

"properties": {

"location": {"type": "geo_shape"},

"id": {"type": "long"}

}

}

create_index = es.indices.create(index=index_name)

mapping_index = es.indices.put_mapping(index=index_name, doc_type=doc_type_name, body=my_mapping, include_type_name=True)

print("create_index")

if create_index["acknowledged"] is not True or mapping_index["acknowledged"] is not True:

print("Index creation failed...")

数据插入

使用multiprocessing和elasticsearch.helpers.bulk进行数据写入,每一万条为一组写入,剩下的为一组,然后多线程写入。分别写入4731254条点和面数据。写入时候使用多核,ssd,合适的批量数据可以有效加快写入速度,通过这些手段可以在三分钟左右写入四百多万的点或者面数据。

def mp_worker(features):

count = 0

es = Elasticsearch(hosts=[ip], timeout=5000)

success, _ = bulk(es,features, index=index_name, raise_on_error=True)

count += success

return count

def mp_handler(input_file, index_name, doc_type_name="en"):

with open(input_file, 'rb') as f:

data = json.load(f)

features = data["features"]

del data

act=[]

i=0

count=0

actions = []

for feature in features:

action = {

"_index": index_name,

"_type": doc_type_name,

"_source": {

"id": feature["properties"]["id"],

"location": {

"type": "polygon",

"coordinates": feature["geometry"]["coordinates"]

}

}

}

i=i+1

actions.append(action)

if (i == 9500):

act.append(actions)

count=count+i

i = 0

actions = []

if i!=0:

act.append(actions)

count = count + i

del features

print('read all %s data ' % count)

p = multiprocessing.Pool(4)

i=0

for result in p.imap(mp_worker, act):

i=i+result

print('write all %s data ' % i)

GEO(point)查询距离nkm附近的点和范围选择

from elasticsearch import Elasticsearch

from elasticsearch.helpers import scan

import time

starttime = time.time()

_index = "gis_point"

_doc_type = "20190824"

ip = "127.0.0.1:9200"

# 附近nkm 选择

_body = {

"query": {

"bool": {

"must": {

"match_all": {}

},

"filter": {

"geo_distance": {

"distance": "9km",

"location": {

"lat": 18.1098857850465471,

"lon": 109.1271036098896730

}

}

}

}

}

}

# 范围选择

# _body={

# "query": {

# "geo_bounding_box": {

# "location": {

# "top_left": {

# "lat": 18.4748659238899933,

# "lon": 109.0007435371629470

# },

# "bottom_right": {

# "lat": 18.1098857850465471,

# "lon": 105.1271036098896730

# }

# }

# }

# }

# }

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = scan(es, query=_body, scroll="10m", index=_index, timeout="10m")

for resp in scanResp:

print(resp)

endtime = time.time()

print(endtime - starttime)

GEO(shape)范围选择

from elasticsearch import Elasticsearch

from elasticsearch.helpers import scan

import time

starttime = time.time()

_index = "gis"

_doc_type = "20190823"

ip = "127.0.0.1:9200"

# envelope format, [[minlon,maxlat],[maxlon,minlat]]

_body = {

"query": {

"bool": {

"must": {

"match_all": {}

},

"filter": {

"geo_shape": {

"location": {

"shape": {

"type": "envelope",

"coordinates": [[108.987103609889, 18.474865923889993], [109.003537162947, 18.40988578504]]

},

"relation": "within"

}

}

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m")

for resp in scanResp:

print(resp)

endtime = time.time()

print(endtime - starttime)

GEO(point)距离聚合

from elasticsearch import Elasticsearch

import time

starttime = time.time()

_index = "gis_point"

_doc_type = "20190824"

ip = "127.0.0.1:9200"

# 距离聚合

_body = {

"aggs" : {

"rings_around_amsterdam" : {

"geo_distance" : {

"field" : "location",

"origin" : "18.1098857850465471,109.1271036098896730",

"ranges" : [

{ "to" : 100000 },

{ "from" : 100000, "to" : 300000 },

{ "from" : 300000 }

]

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search( body=_body, index=_index)

for i in scanResp['aggregations']['rings_around_amsterdam']['buckets']:

print(i)

endtime = time.time()

print(endtime - starttime)

中心点聚合

_body ={

"aggs" : {

"centroid" : {

"geo_centroid" : {

"field" : "location"

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search( body=_body, index=_index)

print(scanResp['aggregations'])

范围聚合

_body = {

"aggs": {

"viewport": {

"geo_bounds": {

"field": "location"

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search(body=_body, index=_index)

print(scanResp['aggregations']['viewport'])

geohash聚合

##低精度聚合,precision代表geohash长度

_body = {

"aggregations": {

"large-grid": {

"geohash_grid": {

"field": "location",

"precision": 3

}

}

}

}

# 高精度聚合,范围聚合以及geohash聚合

# _body = {

# "aggregations": {

# "zoomed-in": {

# "filter": {

# "geo_bounding_box": {

# "location": {

# "top_left": "18.4748659238899933,109.0007435371629470",

# "bottom_right": "18.4698857850465471,108.9971036098896730"

# }

# }

# },

# "aggregations": {

# "zoom1": {

# "geohash_grid": {

# "field": "location",

# "precision": 7

# }

# }

# }

# }

# }

# }

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search(body=_body, index=_index)

for i in scanResp['aggregations']['large-grid']['buckets']:

print(i)

#for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:

# print(i)

切片聚合

# 低精度切片聚合,precision代表级别

_body = {

"aggregations": {

"large-grid": {

"geotile_grid": {

"field": "location",

"precision": 8

}

}

}

}

# 高精度切片聚合,范围聚合以切片聚合

# _body={

# "aggregations" : {

# "zoomed-in" : {

# "filter" : {

# "geo_bounding_box" : {

# "location" : {

# "top_left": "18.4748659238899933,109.0007435371629470",

# "bottom_right": "18.4698857850465471,108.9991036098896730"

# }

# }

# },

# "aggregations":{

# "zoom1":{

# "geotile_grid" : {

# "field": "location",

# "precision": 18

# }

# }

# }

# }

# }

# }

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search(body=_body, index=_index)

for i in scanResp['aggregations']['large-grid']['buckets']:

print(i)

# for i in scanResp['aggregations']['zoomed-in']['zoom1']['buckets']:

# print(i)

Elasticsearch和PostGIS相同功能对比

PostGIS最近点查询

SELECT id,geom, ST_DistanceSphere(geom,'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry)

FROM h5

ORDER BY geom <->

'SRID=4326;POINT(109.1681036098896730 18.1299957850465471)'::geometry

LIMIT 1

Elasticsearch最近点查询

from elasticsearch import Elasticsearch

import time

starttime = time.time()

_index = "gis_point"

_doc_type = "20190824"

ip = "127.0.0.1:9200"

_body={

"sort": [

{

"_geo_distance": {

"unit": "m",

"order": "asc",

"location": [

109.1681036098896730,

18.1299957850465471

],

"distance_type": "arc",

"mode": "min",

"ignore_unmapped": True

}

}

],

"from": 0,

"size": 1,

"query": {

"bool": {

"must": {

"match_all": {}

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = es.search(body=_body, index=_index)

endtime = time.time()

print(endtime - starttime)

PostGIS范围查询

select id,geom,fid FROM public."California"

where

ST_Intersects(geom,ST_MakeEnvelope(-117.987103609889,33.40988578504,-117.003537162947,33.494865923889993, 4326))=true

[-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504]

Elasticsearch范围查询

from elasticsearch import Elasticsearch

from elasticsearch.helpers import scan

import time

starttime = time.time()

_index = "gis_california"

ip = "127.0.0.1:9200"

# envelope format, [[minlon,maxlat],[maxlon,minlat]]

_body = {

"query": {

"bool": {

"must": {

"match_all": {}

},

"filter": {

"geo_shape": {

"geom": {

"shape": {

"type": "envelope",

"coordinates": [[-117.987103609889, 33.494865923889993], [-117.003537162947, 33.40988578504]]

},

"relation": "INTERSECTS"

}

}

}

}

}

}

es = Elasticsearch(hosts=[ip], timeout=5000)

scanResp = scan(es, query=_body, scroll="1m", index=_index, timeout="1m")

i=0

for resp in scanResp:

i=i+1

a=resp

print(i)

endtime = time.time()

print(endtime - starttime)

两种场景中PostGIS的性能更好


参考资料:

1.Elasticsearch(GEO)空间检索查询

2.Elasticsearch官网

3.PostGIS拆分LineString为segment,point

4.亿级“附近的人”,打通“特殊服务”通道

5.PostGIS教程二十二:最近邻域搜索

最近发表
标签列表