nginx lua集成kafka的实现方法

第一步:进入opresty目录

[root@node03 openresty]# cd /export/servers/openresty/
[root@node03 openresty]# ll
total 356
drwxr-xr-x 2 root root  4096 Jul 26 11:33 bin
drwxrwxr-x 44 1000 1000  4096 Jul 26 11:31 build
drwxrwxr-x 43 1000 1000  4096 Nov 13 2017 bundle
-rwxrwxr-x 1 1000 1000 45908 Nov 13 2017 configure
-rw-rw-r-- 1 1000 1000 22924 Nov 13 2017 COPYRIGHT
drwxr-xr-x 6 root root  4096 Jul 26 11:33 luajit
drwxr-xr-x 6 root root  4096 Aug 1 08:14 lualib
-rw-r--r-- 1 root root  5413 Jul 26 11:32 Makefile
drwxr-xr-x 11 root root  4096 Jul 26 11:35 nginx
drwxrwxr-x 2 1000 1000  4096 Nov 13 2017 patches
drwxr-xr-x 44 root root  4096 Jul 26 11:33 pod
-rw-rw-r-- 1 1000 1000  3689 Nov 13 2017 README.markdown
-rw-rw-r-- 1 1000 1000  8690 Nov 13 2017 README-win32.txt
-rw-r--r-- 1 root root 218352 Jul 26 11:33 resty.index
drwxr-xr-x 5 root root  4096 Jul 26 11:33 site
drwxr-xr-x 2 root root  4096 Aug 1 10:54 testlua
drwxrwxr-x 2 1000 1000  4096 Nov 13 2017 util
[root@node03 openresty]#

说明:接下来我们关注两个目录 lualib 和 nginx

1.lualib: 是存放opresty所需要的集成软件包的

2.nginx: 是nginx服务目录

接下来,我们进入lualib目录一看究竟:

[root@node03 openresty]# cd lualib/
[root@node03 lualib]# ll
total 116
-rwxr-xr-x 1 root root 101809 Jul 26 11:33 cjson.so
drwxr-xr-x 3 root root  4096 Jul 26 11:33 ngx
drwxr-xr-x 2 root root  4096 Jul 26 11:33 rds
drwxr-xr-x 2 root root  4096 Jul 26 11:33 redis
drwxr-xr-x 9 root root  4096 Aug 1 10:34 resty

这里我们看到了redis和ngx集成软件包,说明我们可以之间使用nginx和redis而无需导入任何依赖包!!!!

下面看看resty里面有些说明呢????

[root@node03 lualib]# cd resty/
[root@node03 resty]# ll
total 152
-rw-r--r-- 1 root root 6409 Jul 26 11:33 aes.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 core
-rw-r--r-- 1 root root  596 Jul 26 11:33 core.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 dns
drwxr-xr-x 2 root root 4096 Aug 1 10:42 kafka  #这是我们自己导入的
drwxr-xr-x 2 root root 4096 Jul 26 11:33 limit
-rw-r--r-- 1 root root 4616 Jul 26 11:33 lock.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 lrucache
-rw-r--r-- 1 root root 4620 Jul 26 11:33 lrucache.lua
-rw-r--r-- 1 root root 1211 Jul 26 11:33 md5.lua
-rw-r--r-- 1 root root 14544 Jul 26 11:33 memcached.lua
-rw-r--r-- 1 root root 21577 Jul 26 11:33 mysql.lua
-rw-r--r-- 1 root root  616 Jul 26 11:33 random.lua
-rw-r--r-- 1 root root 9227 Jul 26 11:33 redis.lua
-rw-r--r-- 1 root root 1192 Jul 26 11:33 sha1.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha224.lua
-rw-r--r-- 1 root root 1221 Jul 26 11:33 sha256.lua
-rw-r--r-- 1 root root 1045 Jul 26 11:33 sha384.lua
-rw-r--r-- 1 root root 1359 Jul 26 11:33 sha512.lua
-rw-r--r-- 1 root root  236 Jul 26 11:33 sha.lua
-rw-r--r-- 1 root root  698 Jul 26 11:33 string.lua
-rw-r--r-- 1 root root 5178 Jul 26 11:33 upload.lua
drwxr-xr-x 2 root root 4096 Jul 26 11:33 upstream
drwxr-xr-x 2 root root 406 Jul 26 11:33 websocket

这里我们看到了熟悉的mysql.lua和redis.lua,好了其他的先不要管

注意:这里的 kafka 这个包是没有的,说明opnresty么有集成kafka。此处我已经提前导入啦kafka集成包

我们看看kafka里面多有哪些包:

[root@node03 resty]# cd kafka
[root@node03 kafka]# ll
total 48
-rw-r--r-- 1 root root 1369 Aug 1 10:42 broker.lua
-rw-r--r-- 1 root root 5537 Aug 1 10:42 client.lua
-rw-r--r-- 1 root root  710 Aug 1 10:42 errors.lua
-rw-r--r-- 1 root root 10718 Aug 1 10:42 producer.lua
-rw-r--r-- 1 root root 4072 Aug 1 10:42 request.lua
-rw-r--r-- 1 root root 2118 Aug 1 10:42 response.lua
-rw-r--r-- 1 root root 1494 Aug 1 10:42 ringbuffer.lua
-rw-r--r-- 1 root root 4845 Aug 1 10:42 sendbuffer.lua

附上 kafka 集成包:kafka_jb51.rar

第二步:创建kafka测试lua文件

1.退回到openresty

[root@node03 kafka]# cd /export/servers/openresty/

2.创建测试文件

[root@node03 openresty]# mkdir -r testlua
#这里文件名自己取,文件位置自己定,但必须找得到

这里文件名自己取,文件位置自己定,但必须找得到!!!!!!!!!!!下面会用到!!!!!!!!!!

3.进入刚刚创建的文件夹并创建kafkalua.lua脚本文件

创建文件:vim kafkalua.lua或者touch kafkalua.lua

[root@node03 openresty]# cd testlua/
[root@node03 testlua]# ll
total 8
-rw-r--r-- 1 root root 3288 Aug 1 10:54 kafkalua.lua

kafkalua.lua:

--测试语句可以不用
ngx.say('hello kafka file configuration successful!!!!!!')

--数据采集阈值限制,如果lua采集超过阈值,则不采集
local DEFAULT_THRESHOLD = 100000
-- kafka分区数
local PARTITION_NUM = 6
-- kafka主题名称
local TOPIC = 'B2CDATA_COLLECTION1'
-- 轮询器共享变量KEY值
local POLLING_KEY = "POLLING_KEY"
-- kafka集群(定义kafka broker地址,ip需要和kafka的host.name配置一致)
local function partitioner(key, num, correlation_id)
  return tonumber(key)
end
--kafka broker列表
local BROKER_LIST = {{host="192.168.52.100",port=9092},{host="192.168.52.110",port=9092},{host="192.168.52.120",port=9092}}
--kafka参数,
local CONNECT_PARAMS = { producer_type = "async", socket_timeout = 30000, flush_time = 10000, request_timeout = 20000, partitioner = partitioner }
-- 共享内存计数器,用于kafka轮询使用
local shared_data = ngx.shared.shared_data
local pollingVal = shared_data:get(POLLING_KEY)
if not pollingVal then
  pollingVal = 1
  shared_data:set(POLLING_KEY, pollingVal)
end
--获取每一条消息的计数器,对PARTITION_NUM取余数,均衡分区
local partitions = '' .. (tonumber(pollingVal) % PARTITION_NUM)
shared_data:incr(POLLING_KEY, 1)

-- 并发控制
local isGone = true
--获取ngx.var.connections_active进行过载保护,即如果当前活跃连接数超过阈值进行限流保护
if tonumber(ngx.var.connections_active) > tonumber(DEFAULT_THRESHOLD) then
  isGone = false
end
-- 数据采集
if isGone then

  local time_local = ngx.var.time_local
  if time_local == nil then
    time_local = ""
  end

  local request = ngx.var.request
  if request == nil then
    request = ""
  end

  local request_method = ngx.var.request_method
  if request_method == nil then
    request_method = ""
  end

  local content_type = ngx.var.content_type
  if content_type == nil then
    content_type = ""
  end
  ngx.req.read_body()
  local request_body = ngx.var.request_body
  if request_body == nil then
    request_body = ""
  end

  local http_referer = ngx.var.http_referer
  if http_referer == nil then
    http_referer = ""
  end

  local remote_addr = ngx.var.remote_addr
  if remote_addr == nil then
    remote_addr = ""
  end

  local http_user_agent = ngx.var.http_user_agent
  if http_user_agent == nil then
    http_user_agent = ""
  end

  local time_iso8601 = ngx.var.time_iso8601
  if time_iso8601 == nil then
    time_iso8601 = ""
  end

  local server_addr = ngx.var.server_addr
  if server_addr == nil then
    server_addr = ""
  end

  local http_cookie = ngx.var.http_cookie
  if http_cookie == nil then
    http_cookie = ""
  end
--封装数据
  local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;
--引入kafka的producer
local producer = require "resty.kafka.producer"
--创建producer
local bp = producer:new(BROKER_LIST, CONNECT_PARAMS)
--发送数据
local ok, err = bp:send(TOPIC, partitions, message)
--打印错误日志
  if not ok then
    ngx.log(ngx.ERR, "kafka send err:", err)
    return
  end
end

第三步:修改nginx配置文件nginx.conf

1.进入ngin/conf目录

[root@node03 openresty]# cd /export/servers/openresty/nginx/conf/
[root@node03 conf]# ll
total 76
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf
-rw-r--r-- 1 root root 1077 Jul 26 11:33 fastcgi.conf.default
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params
-rw-r--r-- 1 root root 1007 Jul 26 11:33 fastcgi_params.default
-rw-r--r-- 1 root root 2837 Jul 26 11:33 koi-utf
-rw-r--r-- 1 root root 2223 Jul 26 11:33 koi-win
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types
-rw-r--r-- 1 root root 5170 Jul 26 11:33 mime.types.default
-rw-r--r-- 1 root root 3191 Aug 1 10:52 nginx.conf
-rw-r--r-- 1 root root 2656 Jul 26 11:33 nginx.conf.default
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params
-rw-r--r-- 1 root root 636 Jul 26 11:33 scgi_params.default
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params
-rw-r--r-- 1 root root 664 Jul 26 11:33 uwsgi_params.default
-rw-r--r-- 1 root root 3610 Jul 26 11:33 win-utf

2.修改nginx.conf

[root@node03 conf]# vim nginx.conf

    #1.说明找到第一个server
    #2.在server上面添加两行代码如下
    #3.在server里面添加kafka相关的代码如下

#------------------添加的代码---------------------------------------
 #开启共享字典,设置内存大小为10M,供每个nginx的线程消费
 lua_shared_dict shared_data 10m;
 #配置本地域名解析
 resolver 127.0.0.1;
#------------------添加的代码---------------------------------------

 server {
    listen    80;
    server_name localhost;

    #charset koi8-r;

    #access_log logs/host.access.log main;
    location / {
      root  html;
      index index.html index.htm;
    }

    #------------------添加的代码---------------------------------------
    location /kafkalua { #这里的kafkalua就是工程名字,不加默认为空
      #开启nginx监控
      stub_status on;
      #加载lua文件
      default_type text/html;
      #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
      content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
    }
    #------------------添加的代码---------------------------------------
}

说明:location /kafkalua{...}这里的kafkalua是工程名,可以随意取也可以不取,但是必须要记住!!!

看到我们上面配置了两个location,第一个为location /{...}第二个为location /kafkalua{...}那么他们有什么区别呢???先向下看,迷雾将会慢慢揭开。

第四步:启动nginx

1.进入nginx/sbin

[root@node03 sbin]# cd /export/servers/openresty/nginx/sbin/
[root@node03 sbin]# ll
total 16356
-rwxr-xr-x 1 root root 16745834 Jul 26 11:33 nginx

2.测试配置文件是否正确

[root@node03 sbin]# nginx -t
nginx: the configuration file /export/servers/openresty/nginx/conf/nginx.conf syntax is ok
nginx: configuration file /export/servers/openresty/nginx/conf/nginx.conf test is successful
#看到已经成功啦

3.启动nginx

[root@node03 sbin]# nginx
#不显示任何东西一般是成功啦

4.查看nginx是否启动成功

[root@node03 sbin]# ps -ef | grep nginx
root    3730   1 0 09:24 ?    00:00:00 nginx: master process nginx
nobody   3731  3730 0 09:24 ?    00:00:20 nginx: worker process is shutting down
nobody   5766  3730 0 12:17 ?    00:00:00 nginx: worker process
root    5824  3708 0 12:24 pts/1  00:00:00 grep nginx
#看到有两个nginx进程,表示成功le

5.浏览器访问nginx

在浏览器输入:node03/kafkalua

说明:如何么有配置hosts则输入openresty所在设备的地址如:192.168.52.120/kafkalua

在浏览器输入:node03/或者 192.168.52.120/

再在浏览器输入:node03:80/kafkalua 和 node03:80/试试 搬来nginx.conf来看看:

node03:80/kafkalua 这里的nide03是服务器的别名或者之间写文服务器地址,80是【listen 80;】配置的监听端口,80端口可以省略不写,如果这写成【listen 8088;】那么浏览器需输入 node03:8088/kafkalua (这里不能省略8088),kafkalua是工程名。

 server {
    listen    80;
    server_name localhost;

    #charset koi8-r;

    #access_log logs/host.access.log main;
    location / {
      root  html;
      index index.html index.htm;
    }

    #------------------添加的代码---------------------------------------
    location /kafkalua { #这里的kafkalua就是工程名字,不加默认为空
      #开启nginx监控
      stub_status on;
      #加载lua文件
      default_type text/html;
      #指定kafka的lua文件位置,就是我们刚才创建的kafkalua.lua(前面已经强调要记住的!!!!)
      content_by_lua_file /export/servers/openresty/testlua/kafkalua.lua;
    }

第五步:创建测试爬虫程序

1.创建maven工程导入依赖

 <dependencies>
    <dependency>
      <groupId>org.jsoup</groupId>
      <artifactId>jsoup</artifactId>
      <version>1.11.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.httpcomponents</groupId>
      <artifactId>httpclient</artifactId>
      <version>4.5.4</version>
    </dependency>
  </dependencies>

2.伪爬虫程序

public class SpiderGoAirCN {
  private static String basePath = "http://node03/kafkalua";
  public static void main(String[] args) throws Exception {
    for (int i = 0; i < 50000; i++) {
      // 请求查询信息
      spiderQueryao();
      // 请求html
      spiderHtml();
      // 请求js
      spiderJs();
      // 请求css
      spiderCss();
      // 请求png
      spiderPng();
      // 请求jpg
      spiderJpg();
      Thread.sleep(100);
    }
  }

  /**
   *
   * @throws Exception
   */
  public static void spiderQueryao() throws Exception {
    // 1.指定目标网站   ^.*/B2C40/query/jaxb/direct/query.ao.*$
    String url = basePath + "/B2C40/query/jaxb/direct/query.ao";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
          "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1="
            + getGoTime() + "&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.80");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "243.45.78.132");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D"
            + getGoTime()
            + "%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1("
            + getGoTime() + ")");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }

  public static void spiderHtml() throws Exception {
    // 1.指定目标网站     ^.*html.*$
    String url = basePath + "/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=CTU&d1=2018-01-17&at=1&ct=0&it=0";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    // httpPost.setEntity(new StringEntity(
    // "depcity=CAN&arrcity=WUH&flightdate=20180220&adultnum=1&childnum=0&infantnum=0&cabinorder=0&airline=1&flytype=0&international=0&action=0&segtype=1&cache=0&preUrl=&isMember="));
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }

  public static void spiderJs() throws Exception {

    // 1.指定目标网站
    String url = basePath +"/B2C40/dist/main/modules/common/requireConfig.js";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }

  public static void spiderCss() throws Exception {

    // 1.指定目标网站
    String url = basePath +"/B2C40/dist/main/css/flight.css";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader("Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }

  public static void spiderPng() throws Exception {

    // 1.指定目标网站
    String url =basePath + "/B2C40/dist/main/images/common.png";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }

  public static void spiderJpg() throws Exception {

    // 1.指定目标网站
    String url = basePath +"/B2C40/dist/main/images/loadingimg.jpg";
    // 2.发起请求
    HttpPost httpPost = new HttpPost(url);
    // 3. 设置请求参数
    httpPost.setHeader("Time-Local", getLocalDateTime());
    httpPost.setHeader("Requst",
        "POST /B2C40/query/jaxb/direct/query.ao HTTP/1.1");
    httpPost.setHeader("Request Method", "POST");
    httpPost.setHeader("Content-Type",
        "application/x-www-form-urlencoded; charset=UTF-8");
    httpPost.setHeader(
        "Referer",
        "http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=S&c1=CAN&c2=WUH&d1=2018-02-20&at=1&ct=0&it=0");
    httpPost.setHeader("Remote Address", "192.168.56.1");
    httpPost.setHeader(
        "User-Agent",
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36");
    httpPost.setHeader("Time-Iso8601", getISO8601Timestamp());
    httpPost.setHeader("Server Address", "192.168.56.80");
    httpPost.setHeader(
        "Cookie",
        "JSESSIONID=782121159357B98CA6112554CF44321E; sid=b5cc11e02e154ac5b0f3609332f86803; aid=8ae8768760927e280160bb348bef3e12; identifyStatus=N; userType4logCookie=M; userId4logCookie=13818791413; useridCookie=13818791413; userCodeCookie=13818791413; temp_zh=cou%3D0%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-13%3B%E5%B9%BF%E5%B7%9E-%E5%8C%97%E4%BA%AC%3B1%2C0%2C0%3B%26cou%3D1%3Bsegt%3D%E5%8D%95%E7%A8%8B%3Btime%3D2018-01-17%3B%E5%B9%BF%E5%B7%9E-%E6%88%90%E9%83%BD%3B1%2C0%2C0%3B%26; JSESSIONID=782121159357B98CA6112554CF44321E; WT-FPC=id=211.103.142.26-608782688.30635197:lv=1516170718655:ss=1516170709449:fs=1513243317440:pn=2:vn=10; language=zh_CN; WT.al_flight=WT.al_hctype(S)%3AWT.al_adultnum(1)%3AWT.al_childnum(0)%3AWT.al_infantnum(0)%3AWT.al_orgcity1(CAN)%3AWT.al_dstcity1(CTU)%3AWT.al_orgdate1(2018-01-17)");
    // 4.设置请求参数
    ArrayList<BasicNameValuePair> parameters = new ArrayList<BasicNameValuePair>();
    parameters
        .add(new BasicNameValuePair(
            "json",
            "{\"depcity\":\"CAN\", \"arrcity\":\"WUH\", \"flightdate\":\"20180220\", \"adultnum\":\"1\", \"childnum\":\"0\", \"infantnum\":\"0\", \"cabinorder\":\"0\", \"airline\":\"1\", \"flytype\":\"0\", \"international\":\"0\", \"action\":\"0\", \"segtype\":\"1\", \"cache\":\"0\", \"preUrl\":\"\", \"isMember\":\"\"}"));
    httpPost.setEntity(new UrlEncodedFormEntity(parameters));
    // 5. 发起请求
    CloseableHttpClient httpClient = HttpClients.createDefault();
    CloseableHttpResponse response = httpClient.execute(httpPost);
    // 6.获取返回值
    System.out.println(response != null);
  }

  public static String getLocalDateTime() {
    DateFormat df = new SimpleDateFormat("dd/MMM/yyyy'T'HH:mm:ss +08:00",
        Locale.ENGLISH);
    String nowAsISO = df.format(new Date());
    return nowAsISO;

  }

  public static String getISO8601Timestamp() {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss+08:00");
    String nowAsISO = df.format(new Date());
    return nowAsISO;
  }

  public static String getGoTime() {
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd");
    String nowAsISO = df.format(new Date());
    return nowAsISO;
  }

  public static String getBackTime() {
    Date date = new Date();// 取时间
    Calendar calendar = new GregorianCalendar();
    calendar.setTime(date);
    calendar.add(calendar.DATE, +1);// 把日期往前减少一天,若想把日期向后推一天则将负数改为正数
    date = calendar.getTime();
    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
    String dateString = formatter.format(date);
    return dateString;
  }
}

第六步:启动kafka

1.创建主题topic

[root@node01 bin]# kafka-topics.sh --zookeeper node01:2181 --partitions 3
--replication-factor 3 --create --topic B2CDATA_COLLECTION1

2.开启kafka消费者

[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092
--topic B2CDATA_COLLECTION1

第七步:开启爬虫程序并观察结果

1.启动爬虫程序

2.观察消费者窗口如下

第八步:启动kafka-manager观察

1.启动kafka-manager

[root@node01 conf]# cd /export/servers/kafka-manager-1.3.3.23/bin/
[root@node01 bin]# ll
total 36
-rwxr-xr-x 1 root root 13747 May 1 06:27 kafka-manager
-rw-r--r-- 1 root root 9975 May 1 06:27 kafka-manager.bat
-rwxr-xr-x 1 root root 1383 May 1 06:27 log-config
-rw-r--r-- 1 root root  105 May 1 06:27 log-config.bat
[root@node01 bin]# 

#启动
[root@node01 bin]# ./kafka-manager

启动后的窗口:

2.浏览器访问

浏览器输入:node01:9000

kafka manager使用不做讲解,观察B2CDATA_COLLECTION1主题消费情况:

​ 有三个分区,每个分区消费的消息差多说明成功啦,

​ 如果不一样,则是kafkalua.lua 脚本中没有配置分区策略,默认分区会导致 数据倾斜 我们需配置自己的分区策略!

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 安装Nginx+Lua开发环境

    首先我们选择使用OpenResty,其是由Nginx核心加很多第三方模块组成,其最大的亮点是默认集成了Lua开发环境,使得Nginx可以作为一个Web Server使用.借助于Nginx的事件驱动模型和非阻塞IO,可以实现高性能的Web应用程序.而且OpenResty提供了大量组件如Mysql.Redis.Memcached等等,使在Nginx上开发Web应用更方便更简单.目前在京东如实时价格.秒杀.动态服务.单品页.列表页等都在使用Nginx+Lua架构,其他公司如淘宝.去哪儿网等. 安装环境

  • 通过lua来配置实现Nginx服务器的防盗链功能

    下载服务器时常被人盗链,时间久了导致服务器大量资源浪费,由于服务器使用nginx做为web服务器.nginx的防盗链方法有很多,可以使用现成的防盗链模块nginx-accesskey-2.0.3,编译ningx时添加此模块即可. 由于服务其他业务需要,所以nginx编译了lua模块,所以就想通过lua来实现下载服务器的防盗链功能(通过lua的Nginx模块lua_nginx_module.这里不再详细介绍配置过程),这样就可以免去了accesskey模块.原理就是生成经过处理过的下载链接,然后下

  • Nginx+lua 实现调用.so文件

    最近在和智能硬件部门一起,做一个室内定位的服务,该服务根据手机端传过来的beacon设备列表,根据一定的算法计算出具体的商场,并将商场ID和beason设备列表作为参数,调用.so文件中的计算方法,得出位置数据(坐标:x.y.z),返回给手机端. 因为服务对QPS要求比较高,并且都是纯查询操作,于是决定使用Nginx+lua+Redis的架构(该架构在公司内部已成主流,比较成熟).下面我将对lua调用.so文件的方式,做一下介绍. lua调用.so文件,主要有两种方式: 1.Lua直接调用动态链

  • 解决nginx+lua搭建文件上传下载服务问题

    导语 项目需要做一个文件上传下载服务,利用 nginx+lua 做一个代理服务,上传入口统一,分发到不同的机器存储,下载链接和物理存储隔离,支持添加 agent 的方式扩容,这里主要讲一下思路和搭建配置过程,大神勿喷. 主要逻辑 上传 前端请求 nginx 服务, nginx 调用 upload 脚本,脚本通过查找配置,找到对应的逻辑存储路径和物理存储机器的 agent 的 ip 和端口,通过 tcp 发包到对应 agent ,部署在对应机器的 agent 接受数据,并写到本地文件. 下载 ht

  • Nginx中使用Lua脚本配置示例

    先配置nginx 复制代码 代码如下: location ~ .*\.(php|php5)?$         {                 if ($request_uri ~ "one/test") {                         access_by_lua_file /opt/nginxrw/lua/limiturl.lua;                 }                 fastcgi_pass  127.0.0.1:9000;

  • nginx中使用lua脚本的方法

    Lua是一种跟JavaScript很像的语言,Ngix_Lua同样使用异步单线程,语法甚至比JS更加简单,之前的评测指出,Ngix_lua的性能几乎是Node.JS的一倍. Nginx 特点 1.流行的高性能HTTP服务器 2.事件驱动(异步)架构 3.少量且可测内存占用 4.声明性配置语言 5.基于C的可扩展模块 通过lua-nginx-module即可在nginx上启动lua脚本. 一个例子: 复制代码 代码如下: location / {     content_by_lua '     

  • 利用nginx+lua+redis实现反向代理方法教程

    前言 最近因为工作需要,要进行IVR的重构, 我们现在系统接了三家IVR服务商, N个业务, 由于IVR这玩意一般只能外网回调, 而开发环境又不允许外网随便访问, 着实烦人. 所有我们打算重构一把, 封装多家IVR, 对业务透明, 同时回调可以针对多家IVR服务商的不同callid直接转发到当时请求的同学的 开发域名去. 而不同的IVR服务商的callid参数是不同的,有的是在url里面(call_id), 有的则是直接post的json数据(callid), 所以太扯了. 直接用lua处理下,

  • Lua和Nginx结合使用的超级指南

     Nginx作为API代理 有很多原因说明你为什使用nginx作为API代理.首先因为他是开源的:其次,Nginx有大量的安装基础,他背后有一个强大的社区支持,在性能方面也表现的非常出色.对于我们来说,这是显而易见的,如果开源软件有相同的解决方案我们为啥还要用那些私有的软件. 另外一个极大的优势就是nginx对lua的支持,nginx+lua是一个非常好的组合,它允许使用一个高性能的脚本语言扩展nginx.nginx有很多方法是自带的,但是使用lua没有限制的. 原理很简单.有没有这样的情况你更

  • 使用nginx+lua实现信息访问量统计

    根据URI参数后去信息类型和信息ID,通过lua client for memcached插入memcached 复制代码 代码如下: require('Memcached') local args = ngx.req.get_uri_args()   if (ngx.var.remote_addr == '192.168.1.5') then         local key = args['k']         local class = args['c']         local

  • nginx lua集成kafka的实现方法

    第一步:进入opresty目录 [root@node03 openresty]# cd /export/servers/openresty/ [root@node03 openresty]# ll total 356 drwxr-xr-x 2 root root 4096 Jul 26 11:33 bin drwxrwxr-x 44 1000 1000 4096 Jul 26 11:31 build drwxrwxr-x 43 1000 1000 4096 Nov 13 2017 bundle

  • 在Debian11上安装Openresty服务(Nginx+Lua)的详细教程

    目录 Debian 更新系统 添加 openresty 仓库 查看安装的版本: 参考链接 OpenResty 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库.第三方模块以及大多数的依赖项.用于方便地搭建能够处理超高并发.扩展性极高的动态 Web 应用.Web 服务和动态网关. OpenResty 官方 APT 包仓库提供了 deb 包 (适用于 Ubuntu 和 Debian),可以很方便的安装,一起来看下具体的安装步骤吧. Debian 更新系统

  • Spring Boot集成Kafka的示例代码

    本文介绍了Spring Boot集成Kafka的示例代码,分享给大家,也给自己留个笔记 系统环境 使用远程服务器上搭建的kafka服务 Ubuntu 16.04 LTS kafka_2.12-0.11.0.0.tgz zookeeper-3.5.2-alpha.tar.gz 集成过程 1.创建spring boot工程,添加相关依赖: <?xml version="1.0" encoding="UTF-8"?> <project xmlns=&qu

  • Spring纯Java配置集成kafka代码实例

    这篇文章主要介绍了Spring纯Java配置集成kafka代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 KafkaConfig.java package com.niugang.config; import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache

  • SpringBoot集成Kafka的步骤

    SpringBoot集成Kafka 本篇主要讲解SpringBoot 如何集成Kafka ,并且简单的 编写了一个Demo 来测试 发送和消费功能 前言 选择的版本如下: springboot : 2.3.4.RELEASE spring-kafka : 2.5.6.RELEASE kafka : 2.5.1 zookeeper : 3.4.14 本Demo 使用的是 SpringBoot 比较高的版本 SpringBoot 2.3.4.RELEASE 它会引入 spring-kafka 2.5

  • SpringBoot集成kafka全面实战记录

    本文是SpringBoot+Kafka的实战讲解,如果对kafka的架构原理还不了解的读者,建议先看一下<大白话kafka架构原理>.<秒懂kafka HA(高可用)>两篇文章. 一.生产者实践 普通生产者 带回调的生产者 自定义分区器 kafka事务提交 二.消费者实践 简单消费 指定topic.partition.offset消费 批量消费 监听异常处理器 消息过滤器 消息转发 定时启动/停止监听器 一.前戏 1.在项目中连接kafka,因为是外网,首先要开放kafka配置文件

随机推荐