首页 > 编程知识 正文

fingerpass教学,code discipline principle

时间:2023-05-06 14:02:53 阅读:161557 作者:3100

要求:修复和扩展写入的数据

在Tags字段中,以逗号分隔的文本必须是数组,而不是字符串要求。 稍后需要Tags聚合统计信息

输入节点

Elasticsearch 5.0或更高版本中引入的新节点类型。 缺省情况下,每个节点都具有在Ingest Node中预处理数据的能力,并且可以阻止索引或Bulck API请求

将数据转换回索引和Bluck API

无需Logstash即可进行数据预处理,例如在某个字段中设置默认值; 更改字段的字段名称; 对字段值的剥离操作

支持Painless脚本的设置,对数据进行更复杂的加工

pipeline处理器

Pipeline -管道按顺序加工通过的数据(文档)

Processor - Elasticsearch抽象地包装了一些加工行为,Elasticsearch有很多内置的Processors。 也支持通过插件实现自己的Processsor

用Pipeline分隔字符串

剥离标签测试

POST _ingest/pipeline/_simulate

{

' pipeline': {

“说明”:“tosplitblogtags”,

' processors': [

{

' split': {

“field': 'tags”,

' separator': ','

}

}

]。

(,

' docs': [

{

' _index': 'index ',

' _id': 'id ',

' _source': {

' title ' : ' introducingbigdata……

' tags': 'hadoop,elasticsearch,spark ',

' content': 'You konw,for big data '

}

(,

{

' _index': 'index ',

' _id': 'idxx ',

' _source': {

“title ' : ' introducingcloudcomputering ',

' tags': 'openstack,k8s ',

' content': 'You konw,for cloud '

}

}

]。

}

在文档中添加字段

#在文档中添加字段。 日志显示量

POST _ingest/pipeline/_simulate

{

' pipeline': {

“说明”:“tosplitblogtags”,

' processors': [

{

' split': {

“field': 'tags”,

' separator': ','

}

(,

{

' set': {

“field': 'views”,

' value': 0

}

}

]。

(,

' docs': [

{

' _index': 'index ',

' _id': 'id ',

' _source': {

' title ' : ' introducingbigdata……

' tags': 'hadoop,elasticsearch,spark ',

' content': 'You konw,for big data '

}

(,

{

' _index': 'index ',

' _id': 'idxx ',

' _source': {

“title ' : ' introducingcloudcomputering ',

' tags': 'openstack,k8s ',

' content': 'You konw,for cloud '

}

}

]。

}

Pipeline API

添加Pipeline

并测试

# 为ES添加一个 Pipeline

PUT _ingest/pipeline/blog_pipeline

{

"description": "a blog pipeline",

"processors": [

{

"split": {

"field": "tags",

"separator": ","

}

},

{

"set": {

"field": "views",

"value": 0

}

}

]

}

#测试pipeline

POST _ingest/pipeline/blog_pipeline/_simulate

{

"docs": [

{

"_source": {

"title": "Introducing cloud computering",

"tags": "openstack,k8s",

"content": "You konw, for cloud"

}

}

]

}

Index & Update By Query

#不使用pipeline更新数据

PUT tech_blogs/_doc/1

{

"title":"Introducing big data......",

"tags":"hadoop,elasticsearch,spark",

"content":"You konw, for big data"

}

#使用pipeline更新数据

PUT tech_blogs/_doc/2?pipeline=blog_pipeline

{

"title": "Introducing cloud computering",

"tags": "openstack,k8s",

"content": "You konw, for cloud"

}

#查看两条数据,一条被处理,一条未被处理

POST tech_blogs/_search

{}

#update_by_query 会导致错误

POST tech_blogs/_update_by_query?pipeline=blog_pipeline

{

}

#增加update_by_query的条件

POST tech_blogs/_update_by_query?pipeline=blog_pipeline

{

"query": {

"bool": {

"must_not": {

"exists": {

"field": "views"

}

}

}

}

}

一些内置的 Processors

https://www.elastic.co/guide/en/elasticsea...Split Processor (例如:将给定字段分成一个数组)

Remove / Rename Processor (移除一个重命名字段)

Append(为商品增加一个新的标签)

Convert (将商品价格,从字符串转换成 float 类型)

Date / JSON (日期格式转换,字符串转 JSON 对象)

Date Index Name Processor (将通过该处理器的文档,分配到指定时间格式的索引中)

Fail Processor (一旦出现异常,该 Pipeline 指定的错误信息能返回给用户)

Foreach Process (数组字段,数组的每个元素都会使用到一个相同的处理器)

Grok Processor (日志的日志格式切割)

Gsub / Join / Split (字符串替换、数组转字符串、字符串转数组)

Lowercase / Upcase(大小写转换)

Ingest Node v.s Logstash

|| Logstash| Ingest Node|

|–|–|

|数据输入与输出|支持从不同的数据源读取,并写入不同的数据源|支持从ES REST API 获取数据,并且写入ES|

|数据源缓冲| 实现了简单的数据队列,支持重写| 不支持缓冲|

|数据处理| 支持大量的的插件,也支持定制开发|内置的插件,可以开发 Plugin 进行扩展(Plugin 更新需要重启)|

|配置和使用| 增加了一定的架构复杂度| 无需额外部署|

https://www.elastic.co/cn/blog/should-i-us...

Painless 简介

自 ES 5.x 后引入,专门为 ES 设置,扩展了 Java 的语法

6.0 开始,ES 只支持 Painless。Grooby ,JavaScript 和 Python 都不在支持

Painless 支持所有的 Java 的数据类型及 Java API 子集

Painless Script 具备以下特性高性能 、 安全

支持显示类型或者动态定义类型

Painless 的用途

可以对文档字段进行加工处理更新或者删除字段,处理数据聚合操作

Script Field: 对返回的字段提前进行计算

Function Score:对文档的算分进行处理

在Ingest Pipeline 中执行脚本

在Reindex API,Update By Query 时,对数据进行处理

通过 Painless 脚本访问字段

上线文

语法Ingestion

ctx.field_name

Update

ctx._source.field_name

Search & Aggregation

doc{“field_name”]

案例1:Script Processsor

# 增加一个 Script Prcessor

POST _ingest/pipeline/_simulate

{

"pipeline": {

"description": "to split blog tags",

"processors": [

{

"split": {

"field": "tags",

"separator": ","

}

},

{

"script": {

"source": """

if(ctx.containsKey("content")){

ctx.content_length = ctx.content.length();

}else{

ctx.content_length=0;

}

"""

}

},

{

"set": {

"field": "views",

"value": 0

}

}

]

},

"docs": [

{

"_index": "index",

"_id": "id",

"_source": {

"title": "Introducing big data......",

"tags": "hadoop,elasticsearch,spark",

"content": "You konw, for big data"

}

},

{

"_index": "index",

"_id": "idxx",

"_source": {

"title": "Introducing cloud computering",

"tags": "openstack,k8s",

"content": "You konw, for cloud"

}

}

]

}

案例2:文档更新计数

DELETE tech_blogs

PUT tech_blogs/_doc/1

{

"title":"Introducing big data......",

"tags":"hadoop,elasticsearch,spark",

"content":"You konw, for big data",

"views":0

}

POST tech_blogs/_update/1

{

"script": {

"source": "ctx._source.views += params.new_views",

"params": {

"new_views":100

}

}

}

# 查看views计数

POST tech_blogs/_search

案例3:搜索时的Script 字段

GET tech_blogs/_search

{

"script_fields": {

"rnd_views": {

"script": {

"lang": "painless",

"source": """

java.util.Random rnd = new Random();

doc['views'].value+rnd.nextInt(1000);

"""

}

}

},

"query": {

"match_all": {}

}

}

Script :Inline v.s Stored

#保存脚本在 Cluster State

POST _scripts/update_views

{

"script":{

"lang": "painless",

"source": "ctx._source.views += params.new_views"

}

}

POST tech_blogs/_update/1

{

"script": {

"id": "update_views",

"params": {

"new_views":1000

}

}

}

脚本缓存

编译的开销相较大

Elasticsearch 会将甲苯编译后缓存在 Cache 中Inline scripts 和 Stored Scripts 都会被缓存

默认缓存 100个脚本

本节知识点

概念讲解:Ingest Node,Pipeline 与 Processor

Ingest Node 与 Logstash 的⽐较

Pipeline 的 相关操作 / 内置 Processor 讲解与演示

Painless 脚本与Ingestion (Pipeline)

Update

Search & Aggregation

本作品采用《CC 协议》,转载必须注明作者和本文链接

快乐就是解决一个又一个的问题!

版权声明:该文观点仅代表作者本人。处理文章:请发送邮件至 三1五14八八95#扣扣.com 举报,一经查实,本站将立刻删除。