手机站:/m

大带宽_阿里云盘正式版_免费申请

时间:2020-11-03 01:32编辑:淘客樊里来源:淘客樊里当前位置:主页 > 云存储 >

在20分钟内构建实时流式ETL管道

最近有很多关于传统ETL已死的说法。在传统的ETL范式中,数据仓库是王者,ETL作业是批处理驱动的,所有的东西都在与其他事物对话,可伸缩性的限制非常普遍。当人们喃喃自语地说造成混乱的后果是"做生意的代价"时,人们勉强容忍了混乱的管道然而,ETL并没有死。开发人员越来越倾向于使用分布式系统和事件驱动应用程序的新ETL范式,在这种模式下,企业可以实时、大规模地处理数据。仍然需要"提取"、"转换"和"加载",但现在的区别是将数据作为一级公民处理。企业不再希望将数据交给批处理,而批处理通常仅限于离线处理,每天一次。他们有更多的数据源和不同的类型,并希望摆脱混乱的点到点连接。我们可以将流处理直接嵌入到每个服务中,核心业务应用程序可以依赖流式平台来分发和处理事件。本博客文章的重点是演示如何在Apache Kafka®中轻松实现这些流式ETL管道。Kafka是一个分布式流媒体平台,是现代企业架构的核心。它提供了在Kafka Connect框架中运行的Kafka连接器,从不同的源提取数据,丰富的Kafka Streams API从核心应用程序中执行复杂的转换和分析,以及更多的Kafka连接器来将转换后的数据加载到另一个系统。您可以部署汇合架构注册表来集中管理架构、验证兼容性,并在数据不符合架构时提供警告。(不明白为什么您需要一个用于关键数据的模式注册表?阅读此博客文章。)端到端参考体系结构如下:让我们考虑一个使用Kafka Streams API进行实时有状态流处理的应用程序。我们将通过一个端到端参考体系结构的具体示例,向您展示如何:运行Kafka源连接器从另一个系统(SQLite3数据库)读取数据,然后在将数据写入Kafka集群之前,使用单消息转换(SMTs)修改正在运行的数据使用Kafka Streams API处理和丰富Java应用程序中的数据(例如count和sum)运行Kafka sink连接器将数据从Kafka集群写入另一个系统(AWS S3)此示例的工作流如下:如果您想跟随并在您的环境中尝试这一点,请使用快速入门指南来设置Kafka集群并下载完整的源代码。将数据提取到卡夫卡中首先,我们必须将数据放入客户机应用程序。为了在Kafka和其他系统之间复制数据,用户可以从各种现成的连接器中选择Kafka连接器。Kafka源连接器将数据从另一个系统导入Kafka,Kafka接收器连接器将数据从Kafka导出到另一个系统。对于我们的示例,我们希望从SQLite3数据库中提取数据,该数据库保存在/usr/local/lib中/零售数据库. 数据库有一个名为locations的表,它有三个列id、name和sale,其中包含示例内容: 位置身份证件名称销售1罗利3002杜塞尔多夫1001罗利600三莫斯科8004悉尼2002杜塞尔多夫4005金奈400……… 我们希望从该表创建一个数据流,其中流中的每个消息都是一个键/值对。你会问,关键是什么,价值是什么?好吧,让我们来解决这个问题。为了将表数据提取到一个Kafka主题中,我们使用了绑定在Confluent Platform中的JDBC连接器。注意,默认情况下,JDBC连接器不向消息添加密钥。由于消息键对于组织和分组消息很有用,所以我们将使用smt设置密钥。如果我们使用默认配置设置,数据将被写入一个Kafka主题,配置如下:默认配置源数据库测试数据库卡夫卡主题创建测试sqlite jdbc位置存在消息密钥有效空值(带有空值的JSON模式)消息值架构JSON格式数据治理没有 相反,我们需要以下目标配置:目标配置源数据库/usr/本地/lib/零售数据库卡夫卡主题创建零售地点存在消息密钥是的,使用Kafka Connect的单消息转换功能插入密钥消息值架构阿芙罗数据治理是,使用架构注册表为了实现目标配置,我们修改了JDBC源连接器属性文件source quickstart-sqlite.properties:#数据库连接信息connection.url=jdbc:sqlite:/usr/local/lib/零售数据库#主题名的前缀;表名将附加到topic.prefix=零售-查看原始jdbc-1型托管❤ 通过GitHub然后,我们将这些配置行添加到JDBC源连接器属性文件中,以利用单消息转换(SMT)函数,该函数在将表行写入Kafka主题之前对从表行中提取的数据进行操作。我们使用ValueToKey和ExtractField SMT函数将空键替换为从消息值派生的字段。#使用简单的消息转换将"id"字段添加为键transforms=InsertKey,ExtractId#`ValueToKey`:将一个列字段(`id`)的对象推送到键中transforms.InsertKey.type=组织.apache.kafka.connect.transforms.ValueToKeytransforms.InsertKey.fields=id#"ExtractField":将键从对象转换为纯字段transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$键transforms.ExtractId.field=id查看原始jdbc-2型托管❤ 通过GitHub最后,我们将这些配置行添加到JDBC源连接器属性文件中,以将密钥转换器配置为String(可以像JSON或Avro一样容易序列化),将值转换器配置为schema'd Avro。合流架构注册表正在运行:8081。#密钥转换器:字符串(只是一个裸字段)key.converter=组织.apache.kafka.connect.存储.StringConverterkey.converter.schemas.enable=假#值转换器:指向架构注册表的schema'd Avrovalue.converter=io.汇合.connect.avro.AvroConverter公司value.converter.schemas.enable=真value.converter.schema.registry.url=http://schemaregistry1:8081查看原始jdbc-3型托管❤ 通过GitHub为了简单起见,我们在Kafka Connect独立模式下运行JDBC源连接器。在生产中,您应该始终使用分布式模式来实现可伸缩性和容错性,并使用合流的控制中心进行集中管理。$bin/connect独立连接-独立的.properties源代码快速启动-sqlite.properties查看原始连接-1托管❤ 通过GitHub此时,Kafka Connect运行JDBC连接器并将表的每一行作为键/值对写入Kafka topic retail位置。对该表的状态感兴趣的应用程序将从此主题中读取。当行被添加到SQLite3数据库的源表中时,Kafka Connect会自动将它们作为消息写入Kafka主题,然后它们将自动在KStream对象中供您的客户机应用程序使用。因此,我们实现了将数据作为一个无界的、连续的实时流来获取。这个数据流就是我们所说的"流"每个表行被序列化为一个Avro记录。我们可以从主题零售位置在Confluent schema Registry中查找消息值的模式。#查找主题"retail locations"的消息值模式$curl-X获取:8081/主题/零售地点价值/版本/1 | jq{"subject":"零售地点价值","版本":1,"身份证":2,以"模式"为基础的"模式",以"模式"为"模式",以"只有"的"记录\"的",\"名称\"的"地点"的"地点"的"地点"为"的"的"地点"的"地点"的"基金"的"\ \ \ \ \ \ \ \ \"金"的"基金\"起,\"长"只要只要只要"基"的"基\"\ \ \ \ \"基"的"基型"的"基金"的"基金"的"只有"基"的"基"基"的"基"基"的"基"基"的"基"基"的"基\"基"的"基金"只有"基金基的基金基金基金基金基金基金基金基的"基"基"基"基"基"基"long\"],\"默认值\":null}],\"connect.name连接\":\"位置\"}"}查看原始架构注册表命令托管❤ 通过GitHub用Kafka Streams API转换数据既然源数据被写入了一个Kafka主题,那么任何数量的应用程序都可以从该主题中读取并使用模式注册表对消息值(Avro记录)进行反序列化。一个简单的应用程序可以是kafka avro控制台消费者:$kafka avro控制台消费者--引导服务器broker1:9092--主题零售位置--f

上一篇中间件_宁波建设网站_9元

下一篇分布式存储_cdn网络_免费领

云市场知识本月排行

云市场知识精选