原文转载自 「解道JDON」 (https://www.jdon.com/53430)预计阅读时间 0 分钟(共 0 个字, 0 张图片, 0 个链接)
Kafka Streams是一个功能强大的库,用于在Apache Kafka之上构建复杂的流应用程序。随着时间的推移,以及经过多个项目之后,我们发现自己正在编写相同的代码来在生产环境中运行和与Kafka Streams应用程序交互。
我们坚信,基于Kafka Streams的简单微服务的开发应该花费几天而不是几周的时间,而最少的功能才能安全地投入生产。
因此,我们决定构建自己的框架,以简化Kafka Streams应用程序的开发和操作。
今天,我们很高兴宣布推出Azkarra Streams,这是一个新的开源微型Java框架,可让您集中精力编写Kafka Streams拓扑代码,而不是执行它们所需的样板代码。
主要特征
Azkarra Streams提供了一组功能,可以快速调试和构建可用于生产环境的Kafka Streams应用程序。其中包括:
为什么选择Azkarra
在编写第一个Azkarra应用程序之前,让我们花一些时间描述标准Kafka Streams应用程序的不同部分,以便更好地了解Azkarra的好处。
为此,我们将使用著名的单词计数示例,该示例可在Kafka Streams官方文档中找到。
首先,我们必须声明并构建一个Topology。
StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder .stream("streams-plaintext-input"); textLines.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")) ) .groupBy((key, value) -> value) .count(Materialized.as("WordCount")) .toStream() .to( "streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()) ); Topology topology = builder.build(); |
定义应用配置:
Properties props = new Properties(); props.put(APPLICATION_ID_CONFIG, "streams-word-count"); props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); |
创建KafkaStreams 实例:
final KafkaStreams streams = new KafkaStreams(topology, props); |
最后,我们必须管理Kafka Streams应用程序的运行部分。这意味着,启动KafkaStreams实例并使用关闭挂钩管理应用程序的干净关闭。
final CountDownLatch latch = new CountDownLatch(1); Runtime.getRuntime().addShutdownHook( new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); |
这是完整的代码:GitHub Gist。
当然,流应用程序很少那么简单。例如,您将需要添加错误处理并监视Kafka Streams实例的状态。此外,您可能希望使用交互式查询等查询内部商店。
最后但并非最不重要的一点是,有时您必须处理一些问题(例如:https : //issues.apache.org/jira/browse/KAFKA-7380)。
但是现实是,作为开发人员,您应该始终将开发工作重点放在拓扑的定义和优化上。原因很简单-这是为您的业务创造价值的部分。
Azkarra的第一步
我们希望解决的第一个方面是将构建Topology与执行之间的关注点分离。确实,我们认为创建和启动新KafkaStreams实例不应由开发人员直接管理。
因此,让我们使用Azkarra API重写WordCount示例。
首先,我们将使用Azkarra Streams Maven原型创建一个简单的项目结构。您可以运行以下命令:
$ mvn archetype:generate -DarchetypeGroupId=io.streamthoughts \ -DarchetypeArtifactId=azkarra-quickstart-java \ -DarchetypeVersion=0.3 \ -DgroupId=azkarra.streams \ -DartifactId=azkarra-getting-started \ -Dversion=1.0-SNAPSHOT \ -Dpackage=azkarra \ -DinteractiveMode=false |
在pom.xml已经包含Azkarra流和卡夫卡流的依赖关系:
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>io.streamthoughts</groupId> <artifactId>azkarra-streams</artifactId> <version>0.3</version> </dependency> </dependencies> |
使用您喜欢的IDE或编辑器,打开Maven项目。创建一个新文件src/main/java/azkarra,该文件具有基本的java main(String[] args)方法和如下的拓扑定义:
package azkarra; @AzkarraStreamsApplication public class SimpleStreamsApp { public static void main(String[] args) { AzkarraApplication.run(SimpleStreamsApp.class, args); } @Component public static class WordCountTopology implements TopologyProvider { @Override public Topology get() { StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("streams-plaintext-input"); textLines .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")) ) .groupBy((key, value) -> value) .count(Materialized.as("WordCount")) .toStream() .to( "streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()) ); return builder.build(); } @Override public String version() { return "1.0"; } } } |
如您所见,我们仅实现了一个名为的接口以TopologyProvider构建Topology。Azkarra Streams负责自动配置和启动所谓的KafkaStreams实例。
接下来,我们必须配置我们的应用程序。我们将创建一个名为一个简单的文件application.conf中src/main/resources/application.conf包含以下代码:
注意:您也可以将文件保留在项目中。
azkarra { context { streams { bootstrap.servers = "localhost:9092" default.key.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde" default.value.serde = "org.apache.kafka.common.serialization.Serdes$StringSerde" } } } |
恭喜你!您使用Azkarra编了写第一个流应用程序。
在Docker上运行您的应用
为了执行我们的应用程序,我们必须首先启动一个Kafka集群。为此,我们将使用Confluent.Inc维护的官方Kafka Docker映像。
要启动单节点Kafka群集,请运行docker-compose.yml项目中包含的文件。
$ cd azkarra-getting-started $ docker -compose up -d |
然后,创建拓扑使用的两个主题(源,接收器)。为此,您可以运行提供的脚本:
$ chmod u + x quickstart-create-wordcount-topics.sh $ ./ quickstart-create-wordcount-topics.sh |
最后,我们将打包并运行Maven项目:
$ mvn clean package && java -jar target / azkarra-quickstart-java-0.3.jar |
要验证,您的流应用程序正在运行,请检查运行状况终结点:
$ curl -sX GET'http :// localhost:8080 / health' | grep'UP' |
最后,让我们向Kafka主题发送一些消息streams-plaintext-input:
$ docker exec -it azkarra-cp-broker /usr/bin/kafka-console-producer --topic streams-plaintext-input --broker-list kafka:9092 Azkarra Streams WordCount I Heart Logs Kafka Streams Making Sense of Stream Processing |
消费 |
$ docker exec -it azkarra-cp-broker /usr/bin/kafka-console-consumer --from-beginning --property print.key=true --property key.separator="-" --topic streams-wordcount-output --bootstrap-server kafka:9092 |
嵌入式HTTP服务器
Azkarra的主要功能之一是嵌入式Web服务器,它公开了用于管理和监视本地流应用程序的端点
例如,您可以列出本地运行的流实例(即:在JVM中执行的实例)。
$ curl -sX GET http://localhost:8080/api/v1/streams | jq . <p>[ "word-count-topology-1–0" ] |
获得指定流信息:
$ curl -sX GET http://localhost:8080/api/v1/streams/word-count-topology-1-0/ | jq . { "id": "word-count-topology-1–0", "since": "2019–11–26T13:48:17.35+01:00[Europe/Paris]", "name": "WordCountTopology", "version": "1.0", "state": { "state": "RUNNING", "since": "2019–11–26T13:48:18.772+01:00[Europe/Paris]" } } |
最后输出流应用到Prometheus格式:
$ curl -sX GET ‘http://localhost:8080/api/v1/streams/word-count-topology-1-0/metrics?format=prometheus' # HELP streams_incoming_byte_rate The number of incoming bytes per second # TYPE streams_incoming_byte_rate counter streams_incoming_byte_rate{group=”admin-client-node-metrics”,id=”word-count-topology-1–0",client_id=”word-count-topology-1–0-e04c076a-7d46–4bdc-9bc2–14d181370762-admin”,node_id=”node — 1",} 0.0 # HELP streams_incoming_byte_total The total number of incoming bytes # TYPE streams_incoming_byte_total counter streams_incoming_byte_total{group=”admin-client-node-metrics”,id=”word-count-topology-1–0",client_id=”word-count-topology-1–0-e04c076a-7d46–4bdc-9bc2–14d181370762-admin”,node_id=”node-1",} 1066.0 |
Azkarra WebUI
Azkarra Streams的另一个很酷的功能是:http:// localhost:8080/ui上提供了默认的嵌入式Web UI,可用于管理流应用程序。
Azkarra WebUI最初是为了促进开发而设计的,但很快发展成为一个小型管理界面。
例如,您可以停止,使用“可用操作”按钮重新启动流应用程序,浏览指标,配置等。
Azkarra WebUI还附带了用于流拓扑的简单DAG表示。
互动查询
最后,Kafka Streams具有强大的机制来查询stream应用程序实现的状态。通常,作为开发人员,我们构建HTTP端点以使用公共Kafka Streams API公开这些状态。
Azkarra Streams为此提供了一个默认终结点,可以通过Azkarra WebUI直接访问该终结点。