Golang 中 Elasticsearch 的sdk使用

在现代数据驱动的应用开发中,Elasticsearch 以其强大的搜索和分析能力成为众多开发者的首选。Golang 凭借其高效的性能和简洁的语法,与 Elasticsearch 的结合能为应用提供强大的数据处理能力。

本文将基于 Golang 1.18 版本,深入探讨 Golang 对 Elasticsearch 的各类调用场景,涵盖数据的增删改查、聚合分析等操作。

一、环境准备与依赖获取

在开始编写代码前,需要确保开发环境已安装 Golang 1.18 版本。可以通过以下命令检查 Golang 版本:

go version

若版本不符合要求,请按照官方文档指引进行安装或升级。

接下来,使用go mod进行依赖管理。在项目根目录执行以下命令初始化go.mod文件:

go mod init your_project_name

将your_project_name替换为实际项目名称。

然后,获取与 Elasticsearch 交互的 Go 客户端库elastic:

go get github.com/olivere/elastic/v7

该客户端库基于 Elasticsearch 7.x 版本 API 设计,提供了丰富的功能接口,方便我们在 Golang 中操作 Elasticsearch。获取依赖后,go.mod文件会记录项目的依赖信息,go.sum文件则保存各依赖包的哈希值,用于确保依赖的一致性和安全性。

二、连接 Elasticsearch 集群

在进行数据操作前,首先需要建立与 Elasticsearch 集群的连接。以下是创建连接的示例代码:

package main
​
import (
    "context"
    "fmt"
    "github.com/olivere/elastic/v7"
)
​
func connectElasticsearch() (*elastic.Client, error) {
    client, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"))
    if err != nil {
        return nil, err
    }
    fmt.Println("Connected to Elasticsearch successfully!")
    return client, nil
}

上述代码中,elastic.NewClient函数用于创建 Elasticsearch 客户端实例,通过elastic.SetURL指定 Elasticsearch 集群的地址。如果集群设置了用户名和密码,可以使用elastic.SetBasicAuth方法进行认证:

client, err := elastic.NewClient(
    elastic.SetURL("http://localhost:9200"),
    elastic.SetBasicAuth("username", "password"),
)

三、数据操作场景

3.1 创建索引与文档

创建索引是在 Elasticsearch 中存储数据的第一步,以下是创建一个名为products的索引,并向其中插入文档的示例:

func createIndexAndDocument(client *elastic.Client) error {
    // 创建索引
    createIndex, err := client.CreateIndex("products").Do(context.TODO())
    if err != nil {
        return err
    }
    if!createIndex.Acknowledged {
        return fmt.Errorf("Index creation not acknowledged")
    }
​
    // 定义文档数据
    product := struct {
        Name  string `json:"name"`
        Price float64 `json:"price"`
    }{
        Name:  "T-shirt",
        Price: 19.99,
    }
​
    // 插入文档
    put1, err := client.Index().
        Index("products").
        BodyJson(product).
        Do(context.TODO())
    if err != nil {
        return err
    }
    fmt.Printf("Indexed product to index %s, type %s, id %s\n", put1.Index, put1.Type, put1.Id)
    return nil
}

在上述代码中,首先使用client.CreateIndex方法创建products索引,然后定义一个包含产品名称和价格的结构体作为文档数据,通过client.Index将文档插入到products索引中。

3.2 查询文档

Elasticsearch 提供了强大的查询功能,以下展示几种常见的查询方式:

3.2.1 按 ID 查询

func getDocumentByID(client *elastic.Client) error {
    get1, err := client.Get().
        Index("products").
        Id("1").
        Do(context.TODO())
    if err != nil {
        return err
    }
    if get1.Found {
        fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
    } else {
        fmt.Println("Document not found")
    }
    return nil
}

通过client.Get方法,指定索引和文档 ID,即可获取对应的文档。

3.2.2 全文搜索

func searchDocuments(client *elastic.Client) error {
    query := elastic.NewMatchQuery("name", "T-shirt")
    searchResult, err := client.Search().
        Index("products").
        Query(query).
        Pretty(true).
        Do(context.TODO())
    if err != nil {
        return err
    }
    fmt.Printf("Found a total of %d products\n", searchResult.Hits.TotalHits.Value)
    for _, hit := range searchResult.Hits.Hits {
        var product struct {
            Name  string `json:"name"`
            Price float64 `json:"price"`
        }
        err := hit.Source.Unmarshal(&product)
        if err != nil {
            return err
        }
        fmt.Printf("Name: %s, Price: %.2f\n", product.Name, product.Price)
    }
    return nil
}

这里使用elastic.NewMatchQuery创建一个全文搜索查询,搜索name字段中包含T-shirt的文档,并通过client.Search执行查询操作。

3.3 更新文档

更新文档有多种方式,以下是通过脚本更新文档字段的示例:

func updateDocument(client *elastic.Client) error {
    script := elastic.NewScript(`ctx._source.price = params.newPrice`).
        Param("newPrice", 24.99)
    update, err := client.Update().
        Index("products").
        Id("1").
        Script(script).
        Do(context.TODO())
    if err != nil {
        return err
    }
    fmt.Printf("Updated document with version %d\n", update.Version)
    return nil
}

通过elastic.NewScript定义更新脚本,使用client.Update方法指定索引、文档 ID 和更新脚本,实现文档字段的更新。

3.4 删除文档与索引

删除文档和索引的示例代码如下:

3.4.1 删除文档

func deleteDocument(client *elastic.Client) error {
    result, err := client.Delete().
        Index("products").
        Id("1").
        Do(context.TODO())
    if err != nil {
        return err
    }
    fmt.Printf("Deleted document with version %d\n", result.Version)
    return nil
}

使用client.Delete方法,指定索引和文档 ID,即可删除对应的文档。

3.4.2 删除索引

func deleteIndex(client *elastic.Client) error {
    deleteIndex, err := client.DeleteIndex("products").Do(context.TODO())
    if err != nil {
        return err
    }
    if!deleteIndex.Acknowledged {
        return fmt.Errorf("Index deletion not acknowledged")
    }
    fmt.Println("Index deleted successfully")
    return nil
}

通过client.DeleteIndex方法删除指定的索引。

四、聚合分析

聚合分析是 Elasticsearch 的一大特色功能,能帮助我们从大量数据中提取有价值的信息。以下是一个计算产品价格平均值的聚合分析示例:

func aggregateData(client *elastic.Client) error {
    avgAggregation := elastic.NewAvgAggregation().Field("price")
    searchResult, err := client.Search().
        Index("products").
        Aggregation("avg_price", avgAggregation).
        Pretty(true).
        Do(context.TODO())
    if err != nil {
        return err
    }
    avgPrice, exists := searchResult.Aggregations.Avg("avg_price")
    if exists {
        fmt.Printf("Average price of products: %.2f\n", avgPrice.Value)
    } else {
        fmt.Println("Aggregation result not found")
    }
    return nil
}

上述代码中,使用elastic.NewAvgAggregation创建一个计算平均值的聚合操作,通过client.Search执行聚合查询,并从结果中获取计算得到的平均值。

五、完整示例与运行

将上述各功能模块整合到一个完整的示例中:

package main
​
import (
    "context"
    "fmt"
    "github.com/olivere/elastic/v7"
)
​
func connectElasticsearch() (*elastic.Client, error) {
    client, err := elastic.NewClient(elastic.SetURL("http://localhost:9200"))
    if err != nil {
        return nil, err
    }
    fmt.Println("Connected to Elasticsearch successfully!")
    return client, nil
}
​
func createIndexAndDocument(client *elastic.Client) error {
    // 创建索引
    createIndex, err := client.CreateIndex("products").Do(context.TODO())
    if err != nil {
        return err
    }
    if!createIndex.Acknowledged {
        return fmt.Errorf("Index creation not acknowledged")
    }
​
    // 定义文档数据
    product := struct {
        Name  string `json:"name"`
        Price float64 `json:"price"`
    }{
        Name:  "T-shirt",
        Price: 19.99,
    }
​
    // 插入文档
    put1, err := client.Index().
        Index("products").
        BodyJson(product).
        Do(context.TODO())
    if err != nil {
        return err
    }
    fmt.Printf("Indexed product to index %s, type %s, id %s\n", put1.Index, put1.Type, put1.Id)
    return nil
}
​
func getDocumentByID(client *elastic.Client) error {
    get1, err := client.Get().
        Index("products").
        Id("1").
        Do(context.TODO())
    if err != nil {
        return err
    }
    if get1.Found {
        fmt.Printf("Got document %s in version %d from index %s, type %s\n", get1.Id, get1.Version, get1.Index, get1.Type)
    } else {
        fmt.Println("Document not found")
    }
    return nil
}
​
func searchDocuments(client *elastic.Client) error {
    query := elastic.NewMatchQuery("name", "T-shirt")
    searchResult, err := client.Search().
        Index("products").
        Query(query).
        Pretty(true).
        Do(context.TODO())
    if err != nil {
        return err
    }
    fmt.Printf("Found a total of %d products\n", searchResult.Hits.TotalHits.Value)
    for _, hit := range searchResult.Hits.Hits {
        var product struct {
            Name  string `json:"name"`
            Price float64 `json:"price"`
        }
        err := hit.Source.Unmarshal(&product)
        if err != nil {
            return err
        }
        fmt.Printf("Name: %s, Price: %.2f\n", product.Name, product.Price)
    }
    return nil
}
​
func updateDocument(client *elastic.Client) error {
    script := elastic.NewScript(`ctx._source.price = params.newPrice`).
        Param("newPrice", 24.99)
    update, err := client.Update().
        Index("products").
        Id("1").
        Script(script).
        Do(context.TODO())
    if err != nil {
        return err
    }
    fmt.Printf("Updated document with version %d\n", update.Version)
    return nil
}
​
func deleteDocument(client *elastic.Client) error {
    result, err := client.Delete().
        Index("products").
        Id("1").
        Do(context.TODO())
    if err != nil {
        return err
    }
    fmt.Printf("Deleted document with version %d\n", result.Version)
    return nil
}
​
func deleteIndex(client *elastic.Client) error {
    deleteIndex, err := client.DeleteIndex("products").Do(context.TODO())
    if err != nil {
        return err
    }
    if!deleteIndex.Acknowledged {
        return fmt.Errorf("Index deletion not acknowledged")
    }
    fmt.Println("Index deleted successfully")
    return nil
}
​
func aggregateData(client *elastic.Client) error {
    avgAggregation := elastic.NewAvgAggregation().Field("price")
    searchResult, err := client.Search().
        Index("products").
        Aggregation("avg_price", avgAggregation).
        Pretty(true).
        Do(context.TODO())
    if err != nil {
        return err
    }
    avgPrice, exists := searchResult.Aggregations.Avg("avg_price")
    if exists {
        fmt.Printf("Average price of products: %.2f\n", avgPrice.Value)
    } else {
        fmt.Println("Aggregation result not found")
    }
    return nil
}
​
func main() {
    client, err := connectElasticsearch()
    if err != nil {
        fmt.Println("Failed to connect to Elasticsearch:", err)
        return
    }
​
    // 依次调用各功能函数
    if err := createIndexAndDocument(client); err != nil {
        fmt.Println("Failed to create index and document:", err)
        return
    }
    if err := getDocumentByID(client); err != nil {
        fmt.Println("Failed to get document by ID:", err)
        return
    }
    if err := searchDocuments(client); err != nil {
        fmt.Println("Failed to search documents:", err)
        return
    }
    if err := updateDocument(client); err != nil {
        fmt.Println("Failed to update document:", err)
        return
    }
    if err := aggregateData(client); err != nil {
        fmt.Println("Failed to aggregate data:", err)
        return
    }
    if err := deleteDocument(client); err != nil {
        fmt.Println("Failed to delete document:", err)
        return
    }
    if err := deleteIndex(client); err != nil {
        fmt.Println("Failed to delete index:", err)
        return
    }
​
    fmt.Println("All operations completed successfully!")
}

在运行上述代码前,请确保 Elasticsearch 集群已启动并正常运行。运行go run main.go即可执行各功能操作。

通过以上内容,我们详细介绍了 Golang 1.18 版本下对 Elasticsearch 的各类调用场景,从环境搭建到具体功能实现,涵盖了数据的全生命周期操作和聚合分析。掌握这些知识,开发者能够在 Golang 项目中高效地利用 Elasticsearch 的强大功能,满足复杂的数据处理需求。