Elasticserch与Elasticsearch

    xiaoxiao2022-07-13  172

    Elasticserch与Elasticsearch_dsl用法

    Elasticsearch_dsl::https://elasticsearch-dsl.readthedocs.io/en/latest/search_dsl.html

    Elasticserch:https://elasticsearch-py.readthedocs.io/en/master/api.html

    1. 连接

    from elasticsearch import Elasticsearch es = Elasticsearch(hosts="127.0.0.1:9200") # connections可以设置多个集群 from elasticsearch_dsl import connections es = connections.create_connection(hosts=['localhost:9200'], timeout=60)

    2. 查询

    from elasticsearch import Elasticsearch from elasticsearch_dsl import Search client = Elasticsearch() s = Search(using=client)

    API是可链接的,允许在一个语句中组合多个方法调用:

    s = Search().using(client).query("match", title="python")

    要将请求发送到Elasticsearch:

    response = s.params(size=10000).execute() # params(size) 为指定搜索结果返回条数,默认size是10,最大size为10000

    如果只想迭代搜索返回的命中,可以遍历该Search对象:

    for hit in s: print(hit.title)

    搜索结果将被缓存。后续调用execute或尝试迭代已执行的Search对象不会触发发送到Elasticsearch的其他请求。强制请求指定 ignore_cache=True何时调用execute。

    出于调试目的,可以将Search对象序列化为dict 显式:

    s = Search().using(client).query("match", title="python") print(s.to_dict())
    按查询删除

    可以通过调用对象来删除与搜索匹配的文档delete,Search而不是 execute像这样:

    s = Search(index='i').query("match", title="python") response = s.delete()
    2.1 Q
    from elasticsearch_dsl import Q Q("multi_match", query='python django', fields=['title', 'body']) Q({"multi_match": {"query": "python django", "fields": ["title", "body"]}})

    要将查询添加到Search对象,使用.query()方法:

    q = Q("multi_match", query='python django', fields=['title', 'body']) s = Search().query(q)

    该方法还接受所有参数作为Q快捷方式:

    s = s.query("multi_match", query='python django', fields=['title', 'body'])

    如果已有查询对象或dict表示查询对象,则可以覆盖Search对象中使用的查询:

    s.query = Q('bool', must=[Q('match', title='python'), Q('match', body='best')])
    2.2 虚线字段

    有时想要引用另一个字段中的字段,可以是多字段(title.keyword),也可以是结构化json文档address.city。为了更方便的Q快捷方式(还有 query,filter以及exclude对方法Search的类),可以使用__(双下划线)代替关键字参数点的:

    s = Search() s = s.filter('term', category__keyword='Python') s = s.query('match', address__city='prague')

    或者:

    s = Search() s = s.filter('term', **{'category.keyword': 'Python'}) s = s.query('match', **{'address.city': 'prague'})
    2.3 查询组合

    可以使用逻辑运算符组合查询对象:

    Q("match", title='python') | Q("match", title='django') # {"bool": {"should": [...]}} Q("match", title='python') & Q("match", title='django') # {"bool": {"must": [...]}} ~Q("match", title="python") # {"bool": {"must_not": [...]}}

    当.query()多次调用该方法时,&将在内部使用该运算符:

    s = s.query().query() print(s.to_dict()) # {"query": {"bool": {...}}}

    如果要对查询表单进行精确控制,使用Q快捷方式直接构造组合查询:

    q = Q('bool', must=[Q('match', title='python')], should=[Q(...), Q(...)], minimum_should_match=1 ) s = Search().query(q) # start_time为开始时间,end_time为结束时间 q = Q('bool', must=[Q('range', start_time={"gte": start_time}), Q('range', end_time={"lte": end_time})]) s = Search().query(q)
    2.4 过滤器

    如果要在Search之后添加查询, 可以使用该filter()方法使事情变得更容易:

    s = Search() s = s.filter('terms', tags=['search', 'python']) # 过滤,在此为范围过滤,range是方法,timestamp是所要查询的field的名字,gte意为大于等于,lt意为小于,根据需要设定即可(似乎过滤只能接受数字形式的内容,如果是文本就会返回空) # 关于term和match的区别,term是精确匹配,match会模糊化,会进行分词,返回匹配度分数,(term查询字符串之接受小写字母,如果有大写会返回空即没有命中,match则是不区分大小写都可以进行查询,返回结果也一样)1:范围查询 s = s.filter("range", timestamp={"gte": 0, "lt": time.time()}).query("match", country="in")2:普通过滤 res_3 = s.filter("terms", balance_num=["39225", "5686"]).execute()

    在幕后,这将生成一个Bool查询并将指定的 terms查询放入其filter分支,使其等效于:

    s = Search() s = s.query('bool', filter=[Q('terms', tags=['search', 'python'])])

    如果要将post_filter元素用于分面导航,请使用该 .post_filter()方法。

    exclude()可以查询查询中的项目,如下所示:

    s = Search() s = s.exclude('terms', tags=['search', 'python'])

    这是简写: s = s.query('bool', filter=[~Q('terms', tags=['search', 'python'])])

    2.5 聚合

    聚合可以放在查询,过滤等操作的后面叠加,需要加aggs

    要定义聚合,可以使用A快捷方式:

    from elasticsearch_dsl import A A('terms', field='tags') # {"terms": {"field": "tags"}}

    也可以使用.bucket(),.metric()和 .pipeline()方法

    bucket即为分组,其中第一个参数是分组的名字,自己指定即可,第二个参数是方法,第三个是指定的field。

    metric也是同样,metric的方法有sum、avg、max、min等等,但是需要指出的是有两个方法可以一次性返回这些值,stats和extended_stats,后者还可以返回方差等值。

    a = A('terms', field='category') # {'terms': {'field': 'category'}} a.metric('clicks_per_category', 'sum', field='clicks').bucket('tags_per_category', 'terms', field='tags') # { # 'terms': {'field': 'category'}, # 'aggs': { # 'clicks_per_category': {'sum': {'field': 'clicks'}}, # 'tags_per_category': {'terms': {'field': 'tags'}} # } # } # 例1 s.aggs.bucket("per_country", "terms", field="timestamp").metric("sum_click", "stats", field="click").metric("sum_request", "stats", field="request") # 例2 s.aggs.bucket("per_age", "terms", field="click.keyword").metric("sum_click", "stats", field="click") # 例3 s.aggs.metric("sum_age", "extended_stats", field="age") # 例4 s.aggs.bucket("per_age", "terms", field="country.keyword") # 最后依然是要execute,此处注意s.aggs的操作不能用变量接收(如res=s.aggs的操作就是错误的),聚合的结果会在res中显示 # 例5 a = A("range", field="account_number", ranges=[{"to": 10}, {"from": 11, "to": 21}]) res = s.execute()

    要向Search对象添加聚合,请使用.aggs属性作为顶级聚合:

    s = Search() a = A('terms', field='category') s.aggs.bucket('category_terms', a) # { # 'aggs': { # 'category_terms': { # 'terms': { # 'field': 'category' # } # } # } # }

    要么

    s = Search() s.aggs.bucket('articles_per_day', 'date_histogram', field='publish_date', interval='day')\ .metric('clicks_per_day', 'sum', field='clicks')\ .pipeline('moving_click_average', 'moving_avg', buckets_path='clicks_per_day')\ .bucket('tags_per_day', 'terms', field='tags') s.to_dict() # { # "aggs": { # "articles_per_day": { # "date_histogram": { "interval": "day", "field": "publish_date" }, # "aggs": { # "clicks_per_day": { "sum": { "field": "clicks" } }, # "moving_click_average": { "moving_avg": { "buckets_path": "clicks_per_day" } }, # "tags_per_day": { "terms": { "field": "tags" } } # } # } # } # }

    您可以按名称访问现有的key(类似字典的方式):

    s = Search() s.aggs.bucket('per_category', 'terms', field='category') s.aggs['per_category'].metric('clicks_per_category', 'sum', field='clicks') s.aggs['per_category'].bucket('tags_per_category', 'terms', field='tags')

    3.删除

    es.delete(index='xxx', doc_type='xxx', id='xxx')

    4.更新

    body = {'doc': {}} body['doc']['filed_name'] = '新的值' es.update(index=_index, doc_type='alarm', id='xxx', body=body) es.update(index='xxx', doc_type='xxx', id='xxx', body={待更新字段})
    最新回复(0)