elasticsearch 学习笔记

2021/06/12 技术

elasticsearch 学习笔记

1. 客户端

1.1 python 客户端

安装依赖: elasticsearch-dsl==7.3.0 elasticsearch==7.12.0

客户端连接:


from elasticsearch import AsyncElasticsearch

ES_HOST: str = ""
ES_TIMEOUT: int = 60
es_client = AsyncElasticsearch(hosts=[ES_HOST], timeout=ES_TIMEOUT)

2. 创建索引

2.1 创建索引

示例:

PUT demo_index
{
  "settings": {
    "index": {
      "number_of_shards": "3",
      "number_of_replicas": "0"
    }
  },
  "mappings": {
    "properties": {
      "mid": {
        "type": "long"
      },
      "status": {
        "type": "integer"
      },
      "userName": {
        "fields": {
          "keyword": {
            "type": "keyword"
          }
        },
        "type": "text"
      },
      "tags": {
        "properties": {
          "tagId": {
            "type": "long"
          },
          "tagName": {
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            },
            "type": "text"
          },
          "tagType": {
            "fields": {
              "keyword": {
                "type": "keyword"
              }
            },
            "type": "text"
          },
          "updateTime": {
            "type": "long"
          }
        },
        "type": "nested"
      }
    }
  }
}

# 获取当前 mapping
GET /demo_index/_mapping

2.2 字段类型定义

python 环境中定义 模型:


import typing  # noqa

from elasticsearch_dsl import Document, Keyword, Long, Text, Integer, Nested, InnerDoc


class Tag(InnerDoc):
    tagId = Long()
    tagName = Text(fields={'keyword': Keyword()})
    tagType = Text(fields={'keyword': Keyword()})
    updateTime = Long()


class Model(Document):
    mid = Long()
    status = Integer()
    userName = Text(fields={'keyword': Keyword()})
    tags = Nested(Tag, multi=True)


python 中 可以利用 Document.init 方法, 直接在代码中创建索引.

更合适的方法时, 通过 Kibana 显式地 使用 2.1 创建索引 中的 api 创建索引.

一个根据 模型(Document) 创建 api 配置的方法:


def build_index_config(model: typing.Type[Document], index_name: str, number_of_shards=3) -> dict:
    """ """

    def create_settings(model_cls: typing.Type[Document]):
        i = model_cls._index

        config = i.to_dict()

        return config

    def create_index(_index_name: str, model_cls: typing.Type[Document], _number_of_shards=3):
        index_name = _index_name

        Index = type("Index", (), dict(
            name=index_name, settings={"number_of_shards": _number_of_shards, }
        ))

        model: typing.Type[Document] = type(model_cls.__name__, (model_cls,), {
            'Index': Index,
        })

        _index_name = model._index._name

        return create_settings(model_cls=model)

    return create_index(_index_name=index_name, model_cls=model, _number_of_shards=number_of_shards)


print(build_index_config(Model, index_name='demo_index', number_of_shards=3))

2.3 新增字段

PUT /demo_index/_mapping
{
    "properties": {
      "count": {
        "type": "integer"
      }
    }
}

3. 基本的CRUD

3.1 搜索示例

# 搜索所有
GET /demo_index/_search
{
  "query": {
    "match_all": {}
  },
  "_source": ["mid", "userName"],
  "size": 100
}

# tags
GET /demo_index/_search
{
  "query": {
    "nested": {
      "path": "tags",
      "query": {
        "terms": {
          "tags.tagName.keyword": [
            "商品"
          ]
        }
      }
    }
  }
}

# 范围
GET /demo_index/_search
{
  "query": {
    "range": {"count":{"gt": 0} }
  }
}

GET /demo_index/_search
{
  "query": {
    "term": {"userName.keyword": "admin" }
  }
}

# 排除部分字段
GET demo_index/_search
{
  "query": {
    "match_all": {}
  },
  "_source":{
    "exclude": ["tags"]
  }
}

3.2 更新操作示例


POST /demo_index/_update_by_query
{
  "script": {
    "inline": "ctx._source.count = 0",
    "lang": "painless"
  },
  "query": {
    "match_all": {}
  }
}


POST /demo_index/_update_by_query
{
  "script": {
    "inline": "ctx._source.tags = []",
    "lang": "painless"
  },
  "query": {
    "match_all": {}
  }
}

3.3 删除

POST /demo_index/_delete_by_query
{
  "query": {
    "match_all": {}
  }
}

4. painless 脚本

5. python使用实践

5.1 基本使用

搜索 es.search:


query_dict = {
    "query": {
        "match_all": {}
    },
    "size": 1,
    "_source": ["tags", "mid"]
}

data = await es_client.search(index="demo_index", body=query_dict, )

for doc in data["hits"]["hits"]:
    yield Model(**doc["_source"])

更新 es.update_by_query:


body = {
    "script": {
        "source": "ctx._source.count = 0",
        "params": {},
        "lang": "painless"
    },
    "query": {
        "match_all": {}

    }
}

await es_client.update_by_query(body=body, index="demo_index", conflicts="proceed")

删除 es.delete_by_query:


await es_client.delete_by_query(
    index="demo_index",
    body={
        "query": {
            "bool": {
                "must": [
                    {"term": {"mid": 10}}
                ]
            }
        }
    },
    refresh=True,
)

5.2 遍历操作

使用 async_scan 实现:


res_list = []
async for doc in async_scan(
        client=es_client,
        query={
            "query": {
                "match_all": {}
            }
        },
        index="demo_index"
):
    res_list.append(Model(**doc["_source"]))

使用 search_after 实现:


query_dict = {
    "query": {
        "match_all": {}
    }
}
sort_list = ["mid:desc"]  # 需要提供一个 unique 字段

raw_query_dict = dict(query_dict)

_sort = None
max_size = 200
query_dict["size"] = max_size
all_count = 0
mid_seen = set()

while True:
    _query_dict = dict(query_dict)
    if _sort:
        _query_dict["search_after"] = _sort

    data = await es_client.search(index="demo_index", body=_query_dict, sort=sort_list)

    _count = 0
    for doc in data["hits"]["hits"]:
        yield Model(**doc["_source"])
        _sort = doc['sort']
        _count += 1    
    
    if _count < max_size:
        break

count_info = await es_client.count(index="demo_index", body=raw_query_dict)

if count_info.get("count") != all_count:
    logging.warning(f"es.count {count_info.get('count')} != yield_count {all_count}!")

5.3 批量操作

Search

    Table of Contents