logo头像

分享技术,品味人生

微服务-ES聚合、拼音补全、mq同步mysql、集群

Day07- ES聚合、拼音补全、mq同步mysql、集群

本章小结:

  1. 同步

小技巧:

  • CTRL+ALT+B 从接口进入实现类
  • i

[TOC]

No.01-ES聚合

1、DSL实现聚合、聚合排序、聚合过滤、聚合运算(最大、最小平均值等)

官方文档链接

  • 简单聚合
    • 出汇总数
    • 出汇总数,加排序
    • 出汇总数,加排序,加过滤
    • 聚合运算(最大、最小等)
  • 多字段聚合

# 聚合,出汇总数
# select  brand, count(*) as cc from hotel group by brand
GET /hotel/_search
{
  "size": 0,  
  "aggs": {
    "brandAgg": { 
      "terms": { 
        "field": "brand", 
        "size": 5 
      }
    }
  }
}


# 聚合,出汇总数,并排序
# select  brand, count(*) as cc from hotel group by brand order by cc asc
GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "order": {
          "_count": "asc" 
        },
        "size": 20
      }
    }
  }
}

# 聚合,出汇总数,并排序,加过滤
# select  brand, count(*) as cc from hotel where sex=0 group by brand
GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200 
      }
    }
  }, 
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      }
    }
  }
}

# 多字段聚合,出汇总数,并排序,加过滤
# select  brand, city, count(*) as cc from hotel where sex=0 group by brand, city
GET /hotel/_search
{
  "query": {
    "range": {
      "price": {
        "lte": 200 
      }
    }
  }, 
  "size": 0, 
  "aggs": {
    "brandAgg": {
      "terms": {
        "field": "brand",
        "size": 20
      },
      "aggs": {
                "cityGroup": {
                    "terms": {
                        "field": "city",
                        "min_doc_count": 1,
                        "shard_min_doc_count": 0,
                        "show_term_doc_count_error": false
                    }
                }
            }
    }
  }
}


# 聚合,出汇总数、最大、最小、平均、汇总
# select  brand, max(price), min(price), avg(price), sum(price) as cc from hotel group by brand
GET /hotel/_search
{
  "size": 0, 
  "aggs": {
    "brandAgg": { 
      "terms": { 
        "field": "brand", 
        "size": 20
      },
      "aggs": { 
        "score_stats": { 
          "stats": { 
            "field": "score" 
          }
        }
      }
    }
  }
}

2、RestClient实现聚合操作

  • 简单聚合

    • 出汇总数

      
        @Test
        void testAggsTerm() throws IOException {
          // 1、获取请求对象
          SearchRequest request = new SearchRequest("hotel");
      
          // 2.组装请求DSL
          request.source().aggregation(AggregationBuilders.terms("brand_aggs").field("brand").size(10));
      
          // 3.客户端发起请求
          SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      
          // 4.解析结果
          Aggregations aggregations = response.getAggregations();
          Terms brandTerms = aggregations.get("brand_aggs");
          List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
          for (Terms.Bucket bucket : buckets) {
            String brandName = bucket.getKeyAsString();
            System.out.println(brandName + "\t" + bucket.getDocCount());
          }
        }
      
    • 出汇总数,加排序

      
        @Test
        void testAggsTermOrder() throws IOException {
          // 1、获取请求对象
          SearchRequest request = new SearchRequest("hotel");
      
          // 2.组装请求DSL
          request
              .source()
              .aggregation(
                  AggregationBuilders.terms("brand_aggs")
                      .field("brand")
                      .size(10)
                      .order(BucketOrder.count(true)));
      
          // 3.客户端发起请求
          SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      
          // 4.解析结果
          Aggregations aggregations = response.getAggregations();
          Terms brandTerms = aggregations.get("brand_aggs");
          List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
          for (Terms.Bucket bucket : buckets) {
            String brandName = bucket.getKeyAsString();
            System.out.println(brandName + "\t" + bucket.getDocCount());
          }
        }
      
    • 出汇总数,加排序,加过滤

      
        @Test
        void testAggsTermQuery() throws IOException {
          // 1、获取请求对象
          SearchRequest request = new SearchRequest("hotel");
      
          // 2.组装请求DSL
          request
              .source().query(QueryBuilders.rangeQuery("price").lte(200))
              .aggregation(
                  AggregationBuilders.terms("brand_aggs")
                      .field("brand")
                      .size(10)
                      .order(BucketOrder.count(true)));
      
          // 3.客户端发起请求
          SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      
          // 4.解析结果
          Aggregations aggregations = response.getAggregations();
          Terms brandTerms = aggregations.get("brand_aggs");
          List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
          for (Terms.Bucket bucket : buckets) {
            String brandName = bucket.getKeyAsString();
            System.out.println(brandName + "\t" + bucket.getDocCount());
          }
        }
      
    • 聚合运算(最大、最小等), 加了双字段聚合

        @Test
        void testAggsTermStats() throws IOException {
          // 1、获取请求对象
          SearchRequest request = new SearchRequest("hotel");
      
          // 2.组装请求DSL
          TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("brand_aggs").field("brand").size(10).order(BucketOrder.count(true));
          TermsAggregationBuilder termsAggregationBuilder2 = AggregationBuilders.terms("city_aggs").field("city").size(10).order(BucketOrder.count(true));
          MaxAggregationBuilder maxAggregationBuilder = AggregationBuilders.max("price_max").field("price");
          MinAggregationBuilder minAggregationBuilder = AggregationBuilders.min("price_min").field("price");
          AvgAggregationBuilder avgAggregationBuilder = AggregationBuilders.avg("price_avg").field("price");
          SumAggregationBuilder sumAggregationBuilder = AggregationBuilders.sum("price_sum").field("price");
      
          request
                  .source().query(QueryBuilders.rangeQuery("price").lte(200))
                  .aggregation(termsAggregationBuilder
                          .subAggregation(termsAggregationBuilder2)
                          .subAggregation(minAggregationBuilder)
                          .subAggregation(maxAggregationBuilder)
                          .subAggregation(avgAggregationBuilder)
                          .subAggregation(sumAggregationBuilder)
                  );
      
          // 3.客户端发起请求
          SearchResponse response = client.search(request, RequestOptions.DEFAULT);
      
          // 4.解析结果
          Aggregations aggregations = response.getAggregations();
          Terms brandTerms = aggregations.get("brand_aggs");
          List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
          for (Terms.Bucket bucket : buckets) {
            Aggregations ags = bucket.getAggregations();
            String brandName = bucket.getKeyAsString();
            List<? extends Terms.Bucket> cityBuckets = ((Terms) bucket.getAggregations().get("city_aggs")).getBuckets();
            for (Terms.Bucket cityBucket : cityBuckets) {
      
              String cityName = cityBucket.getKeyAsString();
              Max price_max = bucket.getAggregations().get("price_max");
              Min price_min = bucket.getAggregations().get("price_min");
              Avg price_avg = bucket.getAggregations().get("price_avg");
              Sum price_sum = bucket.getAggregations().get("price_sum");
              System.out.println(brandName + "\t" + cityName
                      + "\t" + price_max.getValue()
                      + "\t" + price_min.getValue()
                      + "\t" + price_avg.getValue()
                      + "\t" + price_sum.getValue());
            }
          }
        }
      
  • 多字段聚合

      @Test
      void testAggsTermGroup() throws IOException {
        // 1、获取请求对象
        SearchRequest request = new SearchRequest("hotel");
    
        // 2.组装请求DSL
        TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("brand_aggs").field("brand").size(10).order(BucketOrder.count(true));
        TermsAggregationBuilder termsAggregationBuilder2 = AggregationBuilders.terms("city_aggs").field("city").size(10).order(BucketOrder.count(true));
    
        request
                .source().query(QueryBuilders.rangeQuery("price").lte(200))
                .aggregation(termsAggregationBuilder
                        .subAggregation(termsAggregationBuilder2)
                );
    
        // 3.客户端发起请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
    
        // 4.解析结果
        Aggregations aggregations = response.getAggregations();
        Terms brandTerms = aggregations.get("brand_aggs");
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        for (Terms.Bucket bucket : buckets) {
          Aggregations ags = bucket.getAggregations();
          String brandName = bucket.getKeyAsString();
          List<? extends Terms.Bucket> cityBuckets = ((Terms) bucket.getAggregations().get("city_aggs")).getBuckets();
          for (Terms.Bucket cityBucket : cityBuckets) {
    
            String cityName = cityBucket.getKeyAsString();
            System.out.println(brandName + "\t" + cityName
                    + "\t" + bucket.getDocCount()
                    + "\t" + cityBucket.getDocCount());
          }
        }
      }
    

No.02-拼音补全

目标:输入拼音全称或首字母集合,自动搜索出相关数据

  • 拼音分词器可以转换中文的拼音,并提前首字母,但分词效果不好
  • 有效使用还需要先进行ik分词,然后pinyin分词器进行过滤

全自动补全:分词+拼音(首字母组、拼音组)+多维度数组猜想

1、拼音分词器安装

官方下载地址及api文档查询

# 1、查询es plugin挂载目录地址(之前es安装采用外部挂载,所以查询volume即可)
[root@docker config]# docker volume ls
DRIVER    VOLUME NAME
local     es-plugins
[root@docker config]# docker inspect es-plugins
[
    {
        "CreatedAt": "2021-10-19T16:58:02+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/es-plugins/_data",
        "Name": "es-plugins",
        "Options": null,
        "Scope": "local"
    }
]

# 2、进入目录,创建文件夹,上传官网提供的压缩包并解压
[root@docker config]# cd /var/lib/docker/volumes/es-plugins/_data
[root@docker _data]# mkdir pinyin
[root@docker _data]# cd pinyin
[root@docker pinyin]# rz
rz waiting to receive.
Starting zmodem transfer.  Press Ctrl+C to cancel.
Transferring elasticsearch-analysis-pinyin-7.14.2.zip...
  100%    7880 KB    2626 KB/sec    00:00:03       0 Errors  

[root@docker pinyin]# ll
total 7884
-rw-r--r-- 1 root root 8069546 Oct 22 15:21 elasticsearch-analysis-pinyin-7.14.2.zip
[root@docker pinyin]# unzip elasticsearch-analysis-pinyin-7.14.2.zip 
Archive:  elasticsearch-analysis-pinyin-7.14.2.zip
  inflating: elasticsearch-analysis-pinyin-7.14.2.jar  
  inflating: nlp-lang-1.7.jar        
  inflating: plugin-descriptor.properties  
[root@docker pinyin]# ll
total 15820
-rw-r--r-- 1 root root   27435 Sep 24 14:42 elasticsearch-analysis-pinyin-7.14.2.jar
-rw-r--r-- 1 root root 8069546 Oct 22 15:21 elasticsearch-analysis-pinyin-7.14.2.zip
-rw-r--r-- 1 root root 8091448 Sep 24 14:42 nlp-lang-1.7.jar
-rw-r--r-- 1 root root    1824 Sep 24 14:42 plugin-descriptor.properties


# 3、重启es容器
[root@docker pinyin]# docker restart es
  • 测试分词器效果

    POST /_analyze
    {
      "text": ["如家这酒店还不错"],
      "analyzer": "ik_smart"
    }
    
    POST /_analyze
    {
      "text": ["如家这酒店还不错"],
      "analyzer": "ik_max_word"
    }
    
    POST /_analyze
    {
      "text": ["如家这酒店还不错"],
      "analyzer": "pinyin"
    }
    

image-20211022155617316

2、自定义分词器(ik分词+拼音分词过滤完美组合

# 1、自定义分词器,建立在test库上
DELETE /test
PUT /test
{
  "settings": {
    "analysis": {
      "analyzer": { 
        "my_analyzer": {  
          "tokenizer": "ik_max_word",
          "filter": "py"
        }
      },
      "filter": { 
        "py": { 
          "type": "pinyin", 
          "keep_full_pinyin": false,
          "keep_joined_full_pinyin": true,
          "keep_original": true,
          "limit_first_letter_length": 16,
          "remove_duplicated_term": true,
          "none_chinese_pinyin_tokenize": false
        }
      }
    }
  },
  "mappings": {
    "properties": {
      "name":{
        "type": "text",
        "analyzer": "my_analyzer",
        "search_analyzer": "ik_smart"
      }
    }
  }
}

# 2、test库上测试自定义分词器
POST /test/_analyze
{
  "text": ["如家这酒店还不错"],
  "analyzer": "my_analyzer"
}


# 3、插入测试数据
POST /test/_doc/1
{
  "id": 1,
  "name": "狮子"
}
POST /test/_doc/2
{
  "id": 2,
  "name": "虱子"
}

# 4、正确查询,sz, shizi 都能查出两个
GET /test/_search
{
  "query": {
    "match": {
      "name": "sz"
    }
  }
}

# 5、错误示范, 狮子应该精确匹配, 建索引时指定search_analyzer 即可
GET /test/_search
{
  "query": {
    "match": {
      "name": "掉入狮子笼咋办"
    }
  }
}

纯拼音分词器和自定义的对比,很明显

image-20211022162315709

3、自动补全查询

要求:字段是completion类型, 字段值是数组,这样才可以多维度查询

#范例

# 1、创建索引库
DELETE /test
PUT test
{
  "mappings": {
    "properties": {
      "title":{
        "type": "completion"
      }
    }
  }
}

# 2、塞入测试数据

POST test/_doc
{
  "title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{
  "title": ["SK-II", "PITERA"]
}
POST test/_doc
{
  "title": ["Nintendo", "switch"]
}

# 自动补全查询, 其中s匹配了第一字段的sony、skII,还有第二字段的switch
GET /test/_search
{
  "suggest": {
    "title_suggest": {
      "text": "s", 
      "completion": {
        "field": "title", 
        "skip_duplicates": true, 
        "size": 10 
      }
    }
  }
}

4、更新hotel测试库

  • 调整说明库,说明如下:

    • 多了setting节点,配置text_anlyzer和completion_analyzer两个自定义分词器
      • text_anlyzer,用ik进行分词,过滤使用拼音,这个用在常见text字段上
      • completion_analyzer,用关键字分词(不分词),过滤用拼音,这个主要用在suggestion的全拼音字段上
    • 自定义分词器(含拼音分词)主要用在创建分词的时候,也就是insert和update操作上
    • 查询为了避免过多无异议的拼音匹配,所以要另外配置搜索分词器,参考下方name字段
    DELETE /hotel
    
    PUT /hotel
    {
      "settings": {
        "analysis": {
          "analyzer": {
            "text_anlyzer": {
              "tokenizer": "ik_max_word",
              "filter": "py"
            },
            "completion_analyzer": {
              "tokenizer": "keyword",
              "filter": "py"
            }
          },
          "filter": {
            "py": {
              "type": "pinyin",
              "keep_full_pinyin": false,
              "keep_joined_full_pinyin": true,
              "keep_original": true,
              "limit_first_letter_length": 16,
              "remove_duplicated_term": true,
              "none_chinese_pinyin_tokenize": false
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "id":{
            "type": "keyword"
          },
          "name":{
            "type": "text",
            "analyzer": "text_anlyzer",
            "search_analyzer": "ik_smart",
            "copy_to": "all"
          },
          "address":{
            "type": "keyword",
            "index": false
          },
          "price":{
            "type": "integer"
          },
          "score":{
            "type": "integer"
          },
          "brand":{
            "type": "keyword",
            "copy_to": "all"
          },
          "city":{
            "type": "keyword"
          },
          "starName":{
            "type": "keyword"
          },
          "business":{
            "type": "keyword",
            "copy_to": "all"
          },
          "location":{
            "type": "geo_point"
          },
          "pic":{
            "type": "keyword",
            "index": false
          },
          "all":{
            "type": "text",
            "analyzer": "text_anlyzer",
            "search_analyzer": "ik_smart"
          },
          "suggestion":{
              "type": "completion",
              "analyzer": "completion_analyzer"
          }
        }
      }
    }
    
  • 修正pojo,重新bulk批量导入

    package com.iyyxx.hotel.pojo;
    
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.util.Arrays;
    import java.util.List;
    
    @Data
    @NoArgsConstructor
    public class HotelDoc {
        private Long id;
        private String name;
        private String address;
        private Integer price;
        private Integer score;
        private String brand;
        private String city;
        private String starName;
        private String business;
        private String location;
        private String pic;
        private Object distance;
        private Boolean isAD;
        private List<String> suggestion;
    
        public HotelDoc(Hotel hotel) {
            this.id = hotel.getId();
            this.name = hotel.getName();
            this.address = hotel.getAddress();
            this.price = hotel.getPrice();
            this.score = hotel.getScore();
            this.brand = hotel.getBrand();
            this.city = hotel.getCity();
            this.starName = hotel.getStarName();
            this.business = hotel.getBusiness();
            this.location = hotel.getLatitude() + ", " + hotel.getLongitude();
            this.pic = hotel.getPic();
            this.suggestion = Arrays.asList(brand, name);
        }
    }
    
  • 测试程序(数组匹配)

    GET /hotel/_search
    {
      "suggest": {
        "ttt": {
          "text": "shxz",
          "completion": {
            "field": "suggestion",
            "skip_duplicates": true, 
            "size": 10 
          }
        }
      }
    

    image-20211022171735148

5、RestAPI实现网站升级

可能部分程序是前后端框架中机构提供,所以这里如果有刻意需要做回忆或备查,就放这里!

No.03-通过mq异步通讯完成mysql同步

1、全部查询

No.04-ES集群处理

1、全部查询

评论系统未开启,无法评论!