《Kafka Streams实战》配套资源

  • 更新时间:
  • 5897人关注
  • 点击下载

给大家带来的是《Kafka Streams实战》配套资源,介绍了关于Kafka、Streams、Kafka方面的内容,本书是由人民邮电出版社出版,已被342人关注,由热心网友宁从南 提供,目前本书在Kafka类综合评分为:7.2分

资源详情相关推荐
《《Kafka Streams实战》配套资源》封面
  • 出版社:人民邮电出版社
  • 作者:[美]小威廉·P.贝杰克(William
  • 大小:44.45 MB
  • 类别:Kafka
  • 热度:987
  • Apache Kafka源码剖析
  • Apache Kafka实战
  • Kafka技术内幕:图文详解Kafka源码设计与实现
  • Kafka入门与实践
  • 企业大数据处理:Spark、Druid、Flume与Kafka应用实践
  • 编辑推荐

    并非所有基于流的应用程序都需要处理集群,轻量级、简单易用的Kafka Streams库提供了微服务和实时事件处理中进行消息处理所需的强大功能。使用Kafka Streams API,只需使用Kafka及相应的流式应用程序可以对数据流进行过滤和转换。
    n
    本书教读者在Kafka平台上实现流式处理。在这本易于理解的书中,读者将通过实际的例子来收集、转换和聚合数据,使用多个处理器,处理实时事件,可以使用KSQL 深入研究流式SQL。本书还讲解了Kafka Streams应用程序的测试和运维方面的内容(如监控和调试)。
    n
    本书主要内容
    ● 使用KStream API。
    ● 过滤、转换和拆分数据。
    ● 使用处理器API。
    ● 与外部系统集成。
    n
    如果读者具备分布式系统的一些经验,那么Kafka或流式应用程序的知识并不是必需的。

    内容简介

    Kafka Streams是Kafka提供的一个用于构建流式处理程序的Java库,它与Storm、Spark等流式处理框架不同,是一个仅依赖于Kafka的Java库,而不是一个流式处理框架。除Kafka之外,Kafka Streams不需要额外的流式处理集群,提供了轻量级、易用的流式处理API。
    本书包括4部分,共9章,从基础API到复杂拓扑的应用,通过具体示例由浅入深地详细介绍了Kafka Streams基础知识及使用方法。本书的主要内容包含流式处理发展历程和Kafka Streams工作原理的介绍,Kafka基础知识的介绍,使用Kafka Streams实现一个具体流式处理应用程序,讨论状态存储及其使用方法,讨论表和流的二元性及使用场景,介绍Kafka Streams应用程序的监控及测试方法,介绍使用Kafka Connect将现有数据源集成到Kafka Streams中,使用KSQL进行交互式查询等。
    本书适合使用Kafka Streams实现流式处理应用的开发人员阅读。

    作者简介

    小威廉·P. 贝杰克(William P. Bejeck Jr.,本名Bill Bejeck)是Kafka Streams源码贡献者,在Conflument公司的Kafka Streams团队工作,有15年以上的软件开发经验,其中8年专注于后端开发,特别是处理大量数据,在数据提炼团队中,使用Kafka来改善下游客户的数据流。

    目录

    第 一部分 开启Kafka Streams之旅
    第 1章 欢迎来到Kafka Streams 3
    1.1 大数据的发展以及它是如何改变程序设计方式的 3
    1.1.1 大数据起源 4
    1.1.2 MapReduce中的重要概念 5
    1.1.3 批处理还不够 7
    1.2 流式处理简介 8
    1.3 处理购买交易 9
    1.3.1 权衡流式处理的选择 9
    1.3.2 将需求解构为图表 10
    1.4 改变看待购买交易的视角 10
    1.4.1 源节点 11
    1.4.2 信用卡屏蔽节点 11
    1.4.3 模式节点 11
    1.4.4 奖励节点 12
    1.4.5 存储节点 13
    1.5 Kafka Streams在购买处理节点图中的应用 13
    1.6 Kafka Streams在购买交易流中的应用 14
    1.6.1 定义源 15
    1.6.2 第 一个处理器:屏蔽信用卡号码 15
    1.6.3 第二个处理器:购买模式 16
    1.6.4 第三个处理器:客户奖励 17
    1.6.5 第四个处理器:写入购买记录 18
    1.7 小结 18
    第 2章 Kafka快速指南 20
    2.1 数据问题 20
    2.2 使用Kafka处理数据 21
    2.2.1 ZMart原始的数据平台 21
    2.2.2 一个Kafka销售交易数据中心 22
    2.3 Kafka架构 23
    2.3.1 Kafka是一个消息代理 23
    2.3.2 Kafka是一个日志 24
    2.3.3 Kafka日志工作原理 25
    2.3.4 Kafka和分区 25
    2.3.5 分区按键对数据进行分组 26
    2.3.6 编写自定义分区器 27
    2.3.7 指定一个自定义分区器 28
    2.3.8 确定恰当的分区数 29
    2.3.9 分布式日志 29
    2.3.10 ZooKeeper:领导者、追随者和副本 30
    2.3.11 Apache ZooKeeper 31
    2.3.12 选择一个控制器 31
    2.3.13 副本 31
    2.3.14 控制器的职责 32
    2.3.15 日志管理 33
    2.3.16 日志删除 33
    2.3.17 日志压缩 35
    2.4 生产者发送消息 36
    2.4.1 生产者属性 38
    2.4.2 指定分区和时间戳 39
    2.4.3 指定分区 39
    2.4.4 Kafka中的时间戳 40
    2.5 消费者读取消息 40
    2.5.1 管理偏移量 41
    2.5.2 自动提交偏移量 42
    2.5.3 手动提交偏移量 42
    2.5.4 创建消费者 43
    2.5.5 消费者和分区 43
    2.5.6 再平衡 43
    2.5.7 更细粒度的消费者分配 44
    2.5.8 消费者示例 44
    2.6 安装和运行Kafka 45
    2.6.1 Kafka本地配置 45
    2.6.2 运行Kafka 46
    2.6.3 发送第 一条消息 47
    2.7 小结 49
    第二部分 Kafka Streams开发篇
    第3章 开发Kafka Streams 53
    3.1 流式处理器API 53
    3.2 Kafka Streams的Hello World 54
    3.2.1 构建“Yelling App”的拓扑 55
    3.2.2 Kafka Streams配置 58
    3.2.3 Serde的创建 59
    3.3 处理客户数据 60
    3.3.1 构建一个拓扑 61
    3.3.2 创建一个自定义的Serde 67
    3.4 交互式开发 69
    3.5 下一步 71
    3.5.1 新需求 71
    3.5.2 将记录写入Kafka之外 76
    3.6 小结 78
    第4章 流和状态 79
    4.1 事件的思考 79
    4.2 将状态操作应用到Kafka Stream 81
    4.2.1 值转换处理器 82
    4.2.2 有状态的客户奖励 82
    4.2.3 初始化值转换器 84
    4.2.4 使用状态将Purchase对象映射为Reward Accumulator 84
    4.2.5 更新奖励处理器 88
    4.3 使用状态存储查找和记录以前看到的数据 89
    4.3.1 数据本地化 90
    4.3.2 故障恢复和容错 91
    4.3.3 Kafka Streams使用状态存储 91
    4.3.4 其他键/值存储供应者 92
    4.3.5 状态存储容错 93
    4.3.6 配置变更日志主题 93
    4.4 连接流以增加洞察力 94
    4.4.1 设置数据 95
    4.4.2 生成包含客户ID的键来执行连接 96
    4.4.3 构建连接 98
    4.4.4 其他连接选项 102
    4.5 Kafka Streams中的时间戳 104
    4.5.1 自带的时间戳提取器实现类 105
    4.5.2 WallclockTimestampExtractor 106
    4.5.3 自定义时间戳提取器 106
    4.5.4 指定一个时间戳提取器 107
    4.6 小结 108
    第5章 KTable API 109
    5.1 流和表之间的关系 110
    5.1.1 记录流 110
    5.1.2 更新记录或变更日志 111
    5.1.3 事件流与更新流对比 113
    5.2 记录更新和KTable配置 115
    5.2.1 设置缓存缓冲大小 115
    5.2.2 设置提交间隔 116
    5.3 聚合和开窗操作 117
    5.3.1 按行业汇总股票成交量 118
    5.3.2 开窗操作 122
    5.3.3 连接KStream和KTable 128
    5.3.4 GlobalKTable 130
    5.3.5 可查询的状态 133
    5.4 小结 133
    第6章 处理器API 135
    6.1 更高阶抽象与更多控制的权衡 135
    6.2 使用源、处理器和接收器创建一个拓扑 136
    6.2.1 添加一个源节点 136
    6.2.2 添加一个处理器节点 137
    6.2.3 增加一个接收器节点 140
    6.3 通过股票分析处理器深入研究处理器API 141
    6.3.1 股票表现处理器应用程序 142
    6.3.2 process()方法 145
    6.3.3 punctuator执行 147
    6.4 组合处理器 148
    6.5 集成处理器API和Kafka Streams API 158
    6.6 小结 159
    第三部分 管理Kafka Streams
    第7章 监控和性能 163
    7.1 Kafka基本监控 163
    7.1.1 测评消费者和生产者性能 164
    7.1.2 检查消费滞后 165
    7.1.3 拦截生产者和消费者 166
    7.2 应用程序指标 169
    7.2.1 指标配置 171
    7.2.2 如何连接到收集到的指标 172
    7.2.3 使用JMX 172
    7.2.4 查看指标 176
    7.3 更多Kafka Streams调试技术 177
    7.3.1 查看应用程序的表现形式 177
    7.3.2 获取应用程序各种状态的通知 178
    7.3.3 使用状态监听器 179
    7.3.4 状态恢复监听器 181
    7.3.5 未捕获的异常处理器 184
    7.4 小结 184
    第8章 测试Kafka Streams应用程序 185
    8.1 测试拓扑 186
    8.1.1 构建测试用例 188
    8.1.2 测试拓扑中的状态存储 190
    8.1.3 测试处理器和转换器 191
    8.2 集成测试 193
    8.3 小结 199
    第四部分 Kafka Streams进阶
    第9章 Kafka Streams的高级应用 203
    9.1 将Kafka与其他数据源集成 204
    9.1.1 使用Kafka Connect集成数据 205
    9.1.2 配置Kafka Connect 205
    9.1.3 转换数据 207
    9.2 替代数据库 211
    9.2.1 交互式查询的工作原理 213
    9.2.2 分配状态存储 213
    9.2.3 创建和查找分布式状态存储 215
    9.2.4 编写交互式查询 216
    9.2.5 查询服务器内部 218
    9.3 KSQL 221
    9.3.1 KSQL流和表 222
    9.3.2 KSQL架构 222
    9.3.3 安装和运行KSQL 224
    9.3.4 创建一个KSQL流 224
    9.3.5 编写KSQL查询 226
    9.3.6 创建一张KSQL表 227
    9.3.7 配置KSQL 227
    9.4 小结 228
    附录A 额外的配置信息 229
    附录B 精确一次处理语义 234

    展开阅读
    精选笔记1:KOA+egg.js集成kafka消息队列的示例

    1小时32分钟前回答

    Egg.js : 基于KOA2的企业级框架

    Kafka:高吞吐量的分布式发布订阅消息系统

    本文章将集成egg + kafka + mysql 的日志系统例子

    系统要求:日志记录,通过kafka进行消息队列控制

    思路图:

    KOA+egg.js集成kafka消息队列的示例

    这里消费者和生产者都由日志系统提供

    λ.1 环境准备

    ①Kafka

    官网下载kafka后,解压

    启动zookeeper:

    bin/zookeeper-server-start.sh config/zookeeper.properties

    启动Kafka server

    这里config/server.properties中将num.partitions=5,我们设置5个partitions

    bin/kafka-server-start.sh config/server.properties

    ② egg + mysql

    根据脚手架搭建好egg,再多安装kafka-node,egg-mysql

    mysql 用户名root 密码123456

    λ.2 集成

    1、根目录新建app.js,这个文件在每次项目加载时候都会运作

    'use strict';
     
    const kafka = require('kafka-node');
     
    module.exports = app => {
     app.beforeStart(async () => {
     const ctx = app.createAnonymousContext();
     
     const Producer = kafka.Producer;
     const client = new kafka.KafkaClient({ kafkaHost: app.config.kafkaHost });
     const producer = new Producer(client, app.config.producerConfig);
     
     producer.on('error', function(err) {
      console.error('ERROR: [Producer] ' + err);
     });
     
     app.producer = producer;
     
     const consumer = new kafka.Consumer(client, app.config.consumerTopics, {
      autoCommit: false,
     });
     
     consumer.on('message', async function(message) {
      try {
      await ctx.service.log.insert(JSON.parse(message.value));
      consumer.commit(true, (err, data) => {
       console.error('commit:', err, data);
      });
      } catch (error) {
      console.error('ERROR: [GetMessage] ', message, error);
      }
     });
     
     consumer.on('error', function(err) {
      console.error('ERROR: [Consumer] ' + err);
     });
     });
    };

    上述代码新建了生产者、消费者。

    生产者新建后加载进app全局对象。我们将在请求时候生产消息。这里只是先新建实例

    消费者获取消息将访问service层的insert方法(数据库插入数据)。

    具体参数可以参考kafka-node官方API,往下看会有生产者和消费者的配置参数。

    2、controller · log.js

    这里获取到了producer,并传往service层

    'use strict';
     
    const Controller = require('egg').Controller;
     
    class LogController extends Controller {
     /**
     * @description Kafka控制日志信息流
     * @host /log/notice
     * @method POST
     * @param {Log} log 日志信息
     */
     async notice() {
     const producer = this.ctx.app.producer;
     const Response = new this.ctx.app.Response();
     
     const requestBody = this.ctx.request.body;
     const backInfo = await this.ctx.service.log.send(producer, requestBody);
     this.ctx.body = Response.success(backInfo);
     }
    }
     
    module.exports = LogController;

    3、service · log.js

    这里有一个send方法,这里调用了producer.send ,进行生产者生产

    insert方法则是数据库插入数据

    'use strict';
     
    const Service = require('egg').Service;
    const uuidv1 = require('uuid/v1');
     
    class LogService extends Service {
     async send(producer, params) {
     const payloads = [
      {
      topic: this.ctx.app.config.topic,
      messages: JSON.stringify(params),
      },
     ];
     
     producer.send(payloads, function(err, data) {
      console.log('send : ', data);
     });
     
     return 'success';
     }
     async insert(message) {
     try {
      const logDB = this.ctx.app.mysql.get('log');
      const ip = this.ctx.ip;
     
      const Logs = this.ctx.model.Log.build({
      id: uuidv1(),
      type: message.type || '',
      level: message.level || 0,
      operator: message.operator || '',
      content: message.content || '',
      ip,
      user_agent: message.user_agent || '',
      error_stack: message.error_stack || '',
      url: message.url || '',
      request: message.request || '',
      response: message.response || '',
      created_at: new Date(),
      updated_at: new Date(),
      });
     
      const result = await logDB.insert('logs', Logs.dataValues);
     
      if (result.affectedRows === 1) {
      console.log(`SUCEESS: [Insert ${message.type}]`);
      } else console.error('ERROR: [Insert DB] ', result);
     } catch (error) {
      console.error('ERROR: [Insert] ', message, error);
     }
     }
    }
     
    module.exports = LogService;

    4、config · config.default.js

    一些上述代码用到的配置参数具体在这里,注这里开了5个partition。

    'use strict';
     
    module.exports = appInfo => {
     const config = (exports = {});
     
     const topic = 'logAction_p5';
     
     // add your config here
     config.middleware = [];
     
     config.security = {
     csrf: {
      enable: false,
     },
     };
     
     // mysql database configuration
     config.mysql = {
     clients: {
      basic: {
      host: 'localhost',
      port: '3306',
      user: 'root',
      password: '123456',
      database: 'merchants_basic',
      },
      log: {
      host: 'localhost',
      port: '3306',
      user: 'root',
      password: '123456',
      database: 'merchants_log',
      },
     },
     default: {},
     app: true,
     agent: false,
     };
     
     // sequelize config
     config.sequelize = {
     dialect: 'mysql',
     database: 'merchants_log',
     host: 'localhost',
     port: '3306',
     username: 'root',
     password: '123456',
     dialectOptions: {
      requestTimeout: 999999,
     },
     pool: {
      acquire: 999999,
     },
     };
     
     // kafka config
     config.kafkaHost = 'localhost:9092';
     
     config.topic = topic;
     
     config.producerConfig = {
     // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
     partitionerType: 1,
     };
     
     config.consumerTopics = [
     { topic, partition: 0 },
     { topic, partition: 1 },
     { topic, partition: 2 },
     { topic, partition: 3 },
     { topic, partition: 4 },
     ];
     
     return config;
    };

    5、实体类:

    mode · log.js

    这里使用了 Sequelize

    'use strict';
     
    module.exports = app => {
     const { STRING, INTEGER, DATE, TEXT } = app.Sequelize;
     
     const Log = app.model.define('log', {
     /**
      * UUID
      */
     id: { type: STRING(36), primaryKey: true },
     /**
      * 日志类型
      */
     type: STRING(100),
     /**
      * 优先等级(数字越高,优先级越高)
      */
     level: INTEGER,
     /**
      * 操作者
      */
     operator: STRING(50),
     /**
      * 日志内容
      */
     content: TEXT,
     /**
      * IP
      */
     ip: STRING(36),
     /**
      * 当前用户代理信息
      */
     user_agent: STRING(150),
     /**
      * 错误堆栈
      */
     error_stack: TEXT,
     /**
      * URL
      */
     url: STRING(255),
     /**
      * 请求对象
      */
     request: TEXT,
     /**
      * 响应对象
      */
     response: TEXT,
     /**
      * 创建时间
      */
     created_at: DATE,
     /**
      * 更新时间
      */
     updated_at: DATE,
     });
     
     return Log;
    };
    

    6、测试Python脚本:

    import requests
     
    from multiprocessing import Pool
    from threading import Thread
     
    from multiprocessing import Process
     
     
    def loop():
     t = 1000
     while t:
      url = "http://localhost:7001/log/notice"
     
      payload = "{\n\t\"type\": \"ERROR\",\n\t\"level\": 1,\n\t\"content\": \"URL send ERROR\",\n\t\"operator\": \"Knove\"\n}"
      headers = {
      'Content-Type': "application/json",
      'Cache-Control': "no-cache"
      }
     
      response = requests.request("POST", url, data=payload, headers=headers)
     
      print(response.text)
     
    if __name__ == '__main__':
     for i in range(10):
      t = Thread(target=loop)
      t.start()

    7、建表语句:

    SET NAMES utf8mb4;
    SET FOREIGN_KEY_CHECKS = 0;
     
    -- ----------------------------
    -- Table structure for logs
    -- ----------------------------
    DROP TABLE IF EXISTS `logs`;
    CREATE TABLE `logs` (
     `id` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
     `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL COMMENT '日志类型',
     `level` int(11) NULL DEFAULT NULL COMMENT '优先等级(数字越高,优先级越高)',
     `operator` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '操作人',
     `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '日志信息',
     `ip` varchar(36) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT 'IP\r\nIP',
     `user_agent` varchar(150) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前用户代理信息',
     `error_stack` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '错误堆栈',
     `url` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL COMMENT '当前URL',
     `request` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '请求对象',
     `response` text CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL COMMENT '响应对象',
     `created_at` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',
     `updated_at` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',
     PRIMARY KEY (`id`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;
     
    SET FOREIGN_KEY_CHECKS = 1;

    λ.3 后话

    网上类似资料甚少,啃各种文档,探寻技术实现方式

    以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持码农之家。

    展开阅读

    Kafka相关资源

    • Kafka并不难学

      Kafka并不难学

      本书基于Kafka 0.10.2.0以上版本,采用“理论 实践”的形式编写。全书共68个实例。

      大小:78.8 MBKafka

      立即下载
    • 构建Apache Kafka流数据应用

      构建Apache Kafka流数据应用

      大小:144.5 MBKafka

      立即下载
    • Kafka源码解析与实战

      Kafka源码解析与实战

      本书从LinkedIn(领英)公司内部大数据架构讲起,引申出消息队列Kafka,接着讲解Kafka的基本架构,然后着重分析Kafka内部的各模块实现细节,感兴趣的可以了解一下

      大小:63.1 MBKafka

      立即下载
    • Kafka权威指南

      Kafka权威指南

      本书是关于Kafka的全面教程, 详细介绍了如何部署Kafka集群、开发可靠的基于事件驱动的微服务,以及基于Kafka平台构建可伸缩的流式应用程序,感兴趣的可以下载学习

      大小:125.2 MBKafka

      立即下载
    • 流式架构:Kafka与MapR Streams数据流处理

      流式架构:Kafka与MapR Streams数据流处理

      所有连续的事件流都可以称为数据流。对连续数据流设计和构建流式数据架构,能够实现实时或近实时应用,提升整个组织的效率。《流式架构:Kafka与MapR Streams数据流处理》以Apache Kafka 和M

      大小:45.5 MB数据架构

      立即下载
    • 深入理解Kafka:核心设计与实践原理

      深入理解Kafka:核心设计与实践原理

      本书从Kafka的基本概念入手,主要从生产端、消费端、服务端等3个方面进行全面的陈述,主要内容包括Kafka的基本使用方式、生产者客户端的使用、消费者客户端的使用

      大小:246.3 MBKafka

      立即下载

    学习笔记

    7小时52分钟前回答

    在python环境下运用kafka对数据进行实时传输的方法

    背景: 为了满足各个平台间数据的传输,以及能确保历史性和实时性。先选用kafka作为不同平台数据传输的中转站,来满足我们对跨平台数据发送与接收的需要。 kafka简介: Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。 总之:kafka做为中转站有以下功能: 1.生产者(产生数据或者……

    13小时25分钟前回答

    Java使用kafka发送和生产消息的示例

    1. maven依赖包 dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version0.9.0.1/version /dependency 2. 生产者代码 package com.lnho.example.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "master:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer",……

    1小时3分钟前回答

    Java API方式调用Kafka各种协议的方法

    众所周知,Kafka自己实现了一套二进制协议(binary protocol)用于各种功能的实现,比如发送消息,获取消息,提交位移以及创建topic等。具体协议规范参见:Kafka协议 这套协议的具体使用流程为: 1.客户端创建对应协议的请求 2.客户端发送请求给对应的broker 3.broker处理请求,并发送response给客户端 虽然Kafka提供的大量的脚本工具用于各种功能的实现,但很多时候我们还是希望可以把某些功能以编程的方式嵌入到另一个系统中。这时使用Java API的方式就显得异常地灵活了。本文我将尝试给出Java API底层框架的一个范例,同时也会针对“创建topic”和“查看位移”这两个主要功能给出对应的例子。 需要提前说明的……