2015年02月12日

ISC DHCP サーバーの IP アドレス・リース履歴を DB 化するプロトタイプ的実装

通信事業者にとって DHCP を使って配布した IP アドレスとクライアントを指し示す物理アドレスを管理することは重要な意味があります。 例えば、不正アクセスがあればアクセス元 IP アドレスをスタート地点として、そのような行いをした個人・集団まで特定・追跡する必要があるからです。 ISC-DHCP でこのような要件を満たすシステムを構築するにはどんな方法があるのでしょうか? ここ数日、思考設計していたんですが今日は具体的なプロトタイプを実装してみますよ。

追跡

システム構成と概要


システム構成はこうです。

dhcp_address_management_network

DHCP サーバーでリース(DHCP ACK)成立、リリース(解放)、エクスパイア(期限切れ)の3つのイベントを検知し、これを Fluentd を使って Kafka(メッセージ・キュー)に転送します。 Fluentd だけでも色々と出来ますが(例えばデータを加工して MySQL に保存する)、ここには複雑なビジネス・ロジックを実装せず、一旦 Kafka に集めることでメッセージ伝送の堅牢性、再利用性を確保します(Kafka を使った理由は単純に僕の好奇心)。

また、Kafka にストアしただけではメッセージが一定量になった時点で消えてしまうので、これを受信して MongoDB(モデル)に保存します。 MongoDB を使った理由もただなんとなく。。。 本気で MongoDB を使うのならばトランザクション管理が完全では無いことを覚悟する必要がありますよ。


DHCP サーバーのセットアップ


DHCP サーバーの OS は CentOS 6 の前提で進めますね。 まずは rsyslog の設定。 local6 というログ・ファシリティで /var/log/dhcpd-lease.log を定義します。 このログは fluentd(td-agent)が監視するので全てのユーザーで read できるよう unmask、FileCreateMode も設定しています。

dhcpd $ sudo vi /etc/rsyslog.conf
... snip ...
$umask 0000
$FileCreateMode 0644
local6.*   /var/log/dhcpd-lease.log


このログ・ファイルは最も低レベルの DHCP トランザクション・ログになります。 万が一のことを考え、できるだけ長い期間保存しておく場合には weekly (or day), rotate をチューニング。※デフォルトでは weekly, rotate 4 つまり 1 週間でローテート x 4 世代分ファイルを保存=最大 1 ヶ月間保存します。

このログ・ファイルをローテートに登録しておきましょうね(このブロック内で個別に保存期間を設定することも可能)。

dhcpd $ sudo vi /etc/logrotate.d/syslog
... snip ...
/var/log/cron
/var/log/maillog
/var/log/messages
/var/log/secure
/var/log/spooler
/var/log/dhcpd-lease.log # + add
{
    missingok # + add
    sharedscripts
    postrotate
        /bin/kill -HUP `cat /var/run/syslogd.pid 2> /dev/null` 2> /dev/null || true
    endscript
}
※ compress するなら delaycompress とセットで。


再起動して動作確認してみましょう。 問題なければ logger を使ってロギングできるはずです。

dhcpd $ sudo /etc/init.d/rsyslog restart
システムロガーを停止中:                                    [  OK  ]
システムロガーを起動中:                                    [  OK  ]

dhcpd $ logger -p local6.info "Hello Logger"
dhcpd $ tail /var/log/dhcpd-lease.log
Feb 10 22:38:26 dhcpd vagrant: Hello Logger


さあ、準備が出来ました ISC DHCP サーバーをインストールしましょう。

dhcpd $ sudo yum install dhcp


設定ファイルを編集して、

dhcpd $ sudo vi /etc/dhcp/dhcpd.conf


こんな内容のファイルを作ります。

# DHCP Server Configuration file.
#   see /usr/share/doc/dhcp*/dhcpd.conf.sample
#   see 'man 5 dhcpd.conf'

option domain-name "example.org";
option domain-name-servers ns1.internal.example.org;

default-lease-time 600;
max-lease-time 7200;
log-facility local7;

on commit {
    set remote_circuit_id = "None";
    if exists agent.circuit-id {
        set remote_circuit_id = binary-to-ascii (10, 8, "/", suffix(option agent.circuit-id, 2));
    }
    set remote_agent_id = "None";
    if exists agent.remote-id {
        set remote_agent_id = binary-to-ascii(16, 8, ":", option agent.remote-id);
    }
    execute("logger", "-p", "local6.info",
         concat(
            "[commit] "
            , "- addr: ", binary-to-ascii(10, 8, ".", leased-address)
            , ", client: ", binary-to-ascii(16, 8, ":", substring(hardware, 1, 6))
            , ", lease-time: ", binary-to-ascii(10, 32, "", encode-int(lease-time, 32))
            , ", circuit-id: ", remote_circuit_id
            , ", remote-id: ", remote_agent_id
       )
    );
}

on release {
    set remote_circuit_id = "None";
    if exists agent.circuit-id {
        set remote_circuit_id = binary-to-ascii (10, 8, "/", suffix(option agent.circuit-id, 2));
    }
    set remote_agent_id = "None";
    if exists agent.remote-id {
        set remote_agent_id = binary-to-ascii(16, 8, ":", option agent.remote-id);
    }
    execute("logger", "-p", "local6.info",
         concat(
            "[release] "
            , "- addr: ", binary-to-ascii(10, 8, ".", leased-address)
            , ", client: ", binary-to-ascii(16, 8, ":", substring(hardware, 1, 6))
            , ", lease-time: ", binary-to-ascii(10, 32, "", encode-int(lease-time, 32))
            , ", circuit-id: ", remote_circuit_id
            , ", remote-id: ", remote_agent_id
         )
    );
}

on expiry {
    execute("logger", "-p", "local6.info",
         concat(
            "[expiry] "
            , "- addr: ", binary-to-ascii(10, 8, ".", leased-address)
            , ", client: ", "None"
            , ", lease-time: ", "0"
            , ", circuit-id: ", "None"
            , ", remote-id: ", "None"
        )
    );
}

# direct connected network.
subnet 172.16.0.0 netmask 255.255.255.0 {
}

# management network.
subnet 10.5.0.0 netmask 255.255.0.0 {
    range 10.5.0.100 10.5.0.200;
    option domain-name-servers 8.8.8.8, 8.8.4.4;
    option domain-name "internal.netbuffalo.net";
    option routers 10.5.0.1;
    default-lease-time 60;
    max-lease-time 60;
}


on commit(DHCP ACK 成立)、on release(解放)、on expiry(期限切れ)でアドレスの DHCP イベントを検知し、利用者追跡に役立つ情報を logger 出力しています。 組み込みの log 関数もあるのですが dhcpd 自体のログ(ここでは local7)とは出力先を分けたいので logger を execute しています。

※ client、つまり MAC アドレスは 1 byte ごとに string に変換して”:”で連結していますが、この byte string はゼロ・トリム(0e なら e)されることにも注意。

ちなみに、今回検証に利用する subnet は 10.5.0.0/16。 アドレスが再利用される状況を加速試験したいのでアドレス・プールは 100 個だけ、リース期間も 60 秒にしています。

さあ、DHCP サーバーを再起動して、
 
dhcpd $ sudo /etc/init.d/dhcpd restart
dhcpd を停止中:                                            [  OK  ]
dhcpd を起動中:                                            [  OK  ]

クライアントは実機を使うか「ISC、dhcpd やめるってよ - 次世代 ISC DHCP サーバー Kea 導入・設定ガイド」で紹介したエミューレーターを使って DHCP DISCOVER から DHCP ACK まで成立させてみて下さい。

こんなメッセージが出力されたら成功ですよ。

dhcpd $ tail -n2 /var/log/dhcpd-lease.log
Feb 10 22:42:08 dhcpd vagrant: [commit] - addr: 10.5.0.131, client: 69:95:7b:e5:21:a5, lease-time: 60, circuit-id: 1, remote-id: 4e:ca:2:e1:f1:61
Feb 10 22:43:08 dhcpd vagrant: [expiry] - addr: 10.5.0.131, client: None, lease-time: 0, circuit-id: None, remote-id: None


Fluentd - td-agent のセットアップ


Fluentd(http://www.fluentd.org/)を一言でいえば大規模な構成にまで対応したログ活用ツール。 次のコマンドでインストールします。

dhcpd $ curl -L http://toolbelt.treasuredata.com/sh/install-redhat.sh | sh


設定ファイルは td-agent.conf。 source タグで dhcpd-lease.log を登録、type tail でファイルへの行追加を監視し、format で指定したキャプチャ付き正規表現で行を分解・ラベル付け(time, host, role, state, addr, ...)しています。 match はパースした結果の出力先ですが、まだ Kafka の準備ができていないので stdout にしておきましょう。

dhcpd $ sudo vi /etc/td-agent/td-agent.conf
... snip ...
# dhcpd
<source>
    type tail
    # target file
    path /var/log/dhcpd-lease.log
    # save last position of the corsor in a path.
    pos_file /var/log/td-agent/dhcpd-lease.pos
    tag dhcpd.lease
    #format none
    format /^(?<time>[^ ]*[ ]+[^ ]* [^ ]*) (?<host>[^ ]*) (?<role>\w+): \[(?<state>[^\]]*)\] - addr: (?<a
ddr>[^\]]*), client: (?<client>[^\]]*), lease-time: (?<lease-time>[^\]]*), circuit-id: (?<circuit-id>[^\]
]*), remote-id: (?<remote-id>[^\]]*)$/
    time_format %b %d %H:%M:%S
</source>

<match **>
  type stdout
</match>


一度、td-agent を再起動して、

dhcpd $ sudo /etc/init.d/td-agent start
Starting td-agent:                                         [  OK  ]


再び DHCP トランザクションを発生させると、/var/log/td-agent/td-agent.log にパースした結果(json)が出力されるはずです。

dhcpd $ tail /var/log/td-agent/td-agent.log
2015-02-10 23:00:51 +0900 dhcpd.lease: {"host":"dhcpd","role":"vagrant","state":"commit","addr":"10.5.0.103","client":"fb:c8:d3:d9:d6:9a","lease-time":"60","circuit-id":"1","remote-id":"4e:ca:2:e1:f1:61"}

※ <time>ラベルは Fluentd 自体のイベントタイム、つまり上の例でいう左端のログ時刻になります。

ここまで動いたら、一度 Kafka のセットアップに移りましょうね。


Kafka メッセージ・キューのセットアップ


Apache Kafka(http://kafka.apache.org/)は publish-subscribe モデルに基づいたメッセージ指向のミドルウェア(メッセージ伝送に特化したミドルウエア)。 最近だと RabbitMQ をよく目にしますが、少し前に Kafka 知ってる?と聞かれた(が知らなかった)ので Kafka を使います。

まず、Java 実行環境をインストールして、

kafka $ wget --no-check-certificate --no-cookies - --header "Cookie: oraclelicense=accept-securebackup-cookie" http://download.oracle.com/otn-pub/java/jdk/8u31-b13/jdk-8u31-linux-x64.rpm
kafka $ sudo rpm -ivh jdk-8u31-linux-x64.rpm
kafka $ java -version
java version "1.8.0_31"
Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)


Kafka をダウンロード、起動します(zookeeper は Kafka server クラスタを管理するサーバー)。

kafka $ wget http://ftp.jaist.ac.jp/pub/apache/kafka/0.8.2.0/kafka_2.9.1-0.8.2.0.tgz
kafka $ tar xvzf kafka_2.9.1-0.8.2.0.tgz
# start zookeeper
kafka $ kafka_2.9.1-0.8.2.0/bin/zookeeper-server-start.sh kafka_2.9.1-0.8.2.0/config/zookeeper.properties
# start server
kafka $ kafka_2.9.1-0.8.2.0/bin/kafka-server-start.sh kafka_2.9.1-0.8.2.0/config/server.properties


詳細は割愛しますが、メッセージをストリームするには topic を作っておく必要があります。 ここでは dhcpd-lease トピックを作成。

kafka $ kafka_2.9.1-0.8.2.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dhcpd-lease
Created topic "dhcpd-lease".


注意したいのが Kafka に接続するサーバーは全てホスト名から Kafka サーバーのアドレスを解決できる必要があり、これが不足していると Fluentd ではこんなエラーが出ます。

emit transaction failed  error_class=SocketError error=#<SocketError: getaddrinfo: Name or service not known>


手っ取り早く動かすなら全てのサーバーの /etc/hosts に Kafka のホスト名とアドレス(ここでは 172.16.0.16)を登録しましょう。

kafka & dhcpd & db $ cat /etc/hosts
...
172.16.0.16 kafka


ここで dhcpd(Fluentd)サーバーに戻り、次の手順で Fluentd に Kafka プラグインをインストールします。

dhcpd $ sudo /usr/lib64/fluent/ruby/bin/fluent-gem update
dhcpd $ sudo yum install gcc patch
dhcpd $ sudo /usr/lib64/fluent/ruby/bin/fluent-gem install fluent-plugin-kafka


そして、とりあえず stdout しておいた td-agent.conf/match ディレクティブも変更します。

... snip ...
<match **>
#  type stdout
  type kafka

  # Brokers: you can choose either brokers or zookeeper.
  # brokers             <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>,.. # Set brokers directly
  zookeeper           172.16.0.16:2181

  default_topic       dhcpd-lease
  output_data_type    json
  #output_include_tag  (true|false) :default => false
  output_include_time true
  #max_send_retries    (integer)    :default => 3
  #required_acks       (integer)    :default => 0
  #ack_timeout_ms      (integer)    :default => 1500
</match>


td-agent を再起動して、DHCP トランザクションを発生させると /var/log/td-agent/td-agent.log には何も出力されなくなったはずです(出るとしたらエラー)。

kafka サーバーに戻り、kafka-console-consumer.sh で dhcpd-lease トピックを読んでみましょう。 

kafka $ kafka_2.9.1-0.8.2.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dhcpd-lease --from-beginning
{"host":"dhcpd","role":"vagrant","state":"commit","addr":"10.5.0.107","client":"41:24:9:fa:6f:ed","lease-time":"60","circuit-id":"1","remote-id":"4e:ca:2:e1:f1:61","time":1423757761}
{"host":"dhcpd","role":"vagrant","state":"expiry","addr":"10.5.0.105","client":"None","lease-time":"0","circuit-id":"None","remote-id":"None","time":1423757770}

購読できましたね!


MongoDB のセットアップとリース・アドレスの保存


MongoDB はドキュメント指向のデータベース(構造の定義よりもデータ自体を指向するデータベース)。 RDBMS を使い、時間を掛けてテーブル設計するほど本気では無いので MongoDB を使って 1 つのコレクションに IP アドレスのスナップショット(現在の状態)と過去のクライアント履歴を保存することにします。

まずは、次の手順でセットアップ。 mongo シェルにアクセスできればOKですよ。

db $ curl -O http://downloads.mongodb.org/linux/mongodb-linux-x86_64-2.6.7.tgz
db $ tar xvzf mongodb-linux-x86_64-2.6.7.tgz
db $ sudo mkdir -p /data/db
db $ sudo mongodb-linux-x86_64-2.6.7/bin/mongod
db $ mongodb-linux-x86_64-2.6.7/bin/mongo
MongoDB shell version: 2.6.7
> exit
bye


この DB サーバー上で Kafka コンシューマー(メッセージの購読者)兼 MongoDB へのモデル生成アプリケーションを動かします。

言語は Python (本気で作るなら僕は Java を選ぶかも)。 まずは MongoDB、Kafka ドライバーをインストールします(DB サーバーは Ubuntu Server にしますわ)。

db $ sudo apt-get install python-pymongo
db $ sudo apt-get install python-pip
db $ sudo pip install kafka-python


コードは例外、性能等は考慮せずシンプルに書いてみます。

from kafka import KafkaClient, SimpleConsumer
import json
import datetime
import pymongo

KAFKA_HOST = '172.16.0.16'
MONGO_HOST = '172.16.0.17'

kafka = KafkaClient(KAFKA_HOST + ':' + '9092')
consumer = SimpleConsumer(kafka, None, 'dhcpd-lease', True) # kafka, group, topic, auto_commit

print 'starting... kafka 2 mongodb'

for message in consumer:

    if 'addr' not in message.message.value:
        # fluentd shutting down message, etc
        print 'system: ' + message.message.value
        continue

    lease = json.loads(message.message.value)
    addr = lease['addr'] # IPv4
    state = lease['state'] # commit | expiry | release
    print 'connecting to mongo...'
    conn = pymongo.Connection(MONGO_HOST, 27017)
    db = conn.dhcpd

    # commit address.
    if state == 'commit':
        client = lease['client'] # dhcp-client addr
        remote_id = lease['remote-id'] #agent.remote-id
        st = lease['time'] # logged time (unix timestamp)
        lt = lease['lease-time'] # lease-time
        et = st + int(lt) # end time
        if db.ipv4leases.find({'addr': addr}).count() == 0:
            print 'inserting to mongo (%s : %s)...' % (addr, state)
            db.ipv4leases.insert(
                {
                    'addr': addr, 'client': client, 'state': state, 'starts': st,
                    'ends': et, 'remote-id': remote_id,
                    'clients': [
                        {'client': client, 'remote-id': remote_id, 'starts': st, 'lease-time': lt}
                    ]
                }
            )
        else:
            print 'updating mongo (%s : %s)...' % (addr, state)
            db.ipv4leases.update(
                {'addr': addr},
                {
                    '$set':
                    {
                         'client': client,
                         'state': state, 'starts': st,
                         'ends': et,
                         'remote-id': remote_id,
                    },
                    '$addToSet':
                    {
                        'clients':
                            {
                                'client': client,
                                'remote-id': remote_id,
                                'starts': st,
                                'lease-time': lt
                            }
                    }
                })
             
    # expire or release address.
    elif state in {'expiry', 'release'}:
        print 'updating mongo (%s : %s)...' % (addr, state)
        # update only state.
        db.ipv4leases.update({'addr': addr}, {'$set': {'state': state}})
    else:
        print 'unknown state:'
        print lease
  
    print 'disconnecting from mongodb...'
    conn.disconnect()

kafka.close()


このコードを実行すると Kafka からメッセージ受信し、MongoDB の dhcpd.ipv4leases コレクションに(MongoDB はスキーマ・レスなので事前に DDL する必要は無し)リース情報を保存します(consumer の for ループが終了することは無く、強制終了しない限り新しいメッセージを待機し続けます)。

現在のスナップショットを addr, starts, ends, state, client に保存しつつ、アドレスを利用したクライアント履歴を clients 配列に連想配列としても保存しています(リース開始、終了時刻は UNIX 時間)。

例えば、最後に 23:ce:90:61:95:fc クライアントにリースされ(現在はエクスパイア)、過去 82:e6:b:6e:77:7a クライアントにもリースされた 10.5.0.122 というアドレスのドキュメントは次のようになります。
 
mongo shell > use dhcpd
switched to db dhcpd
mongo shell > db.ipv4leases.findOne()
{
        "_id" : ObjectId("54dc2d36bbb2bd1616bd2bae"),
        "ends" : 1423776217,
        "addr" : "10.5.0.122",
        "starts" : 1423776157,
        "clients" : [
                {
                        "starts" : 1423775638,
                        "lease-time" : "60",
                        "client" : "82:e6:b:6e:77:7a",
                        "remote-id" : "4e:ca:2:e1:f1:61"
                },
                {
                        "starts" : 1423775860,
                        "lease-time" : "60",
                        "client" : "23:ce:90:61:95:fc",
                        "remote-id" : "4e:ca:2:e1:f1:61"
                }
        ],
        "state" : "expiry",
        "client" : "23:ce:90:61:95:fc",
        "remote-id" : "4e:ca:2:e1:f1:61"
}


このデータをクエリーするコードはこんな感じ。

import datetime
import pymongo

MONGO_HOST = '172.16.0.17'
conn = pymongo.Connection(MONGO_HOST, 27017)
db = conn.dhcpd

query_addr = '10.5.0.122'

entry = db.ipv4leases.find_one({"addr": query_addr})
addr = entry['addr']
# client mac address
client = entry['client']
# human starts time
starts = datetime.datetime.fromtimestamp(entry['starts']).strftime('%Y-%m-%d %H:%M:%S')
# human ends time
ends = datetime.datetime.fromtimestamp(entry['ends']).strftime('%Y-%m-%d %H:%M:%S')
# lease state
state = entry['state']
remote_id = entry['remote-id']

print "======================= %s =========================" % query_addr
print "[current] client: %s, state: %s, span: %s - %s, remote-id: %s" % (client, state, starts, ends, remote_id)

for client in entry['clients']:
    st = client['starts']
    et = st + int(client['lease-time'])
    print "[clients] client: %s, starts: %s, ends: %s, remote-id: %s" % (client['client'], st, et, remote_id)

conn.disconnect()


実行すると指定したアドレスの現在、過去のクライアント情報が出力されるはず。

db $ python query.py
======================= 10.5.0.122 =========================
[current] client: 23:ce:90:61:95:fc, state: expiry, span: 2015-02-13 01:42:37 - 2015-02-13 01:43:37, remote-id: 4e:ca:2:e1:f1:61
[clients] client: 82:e6:b:6e:77:7a, starts: 1423715638, ends: 1423715698, remote-id: 4e:ca:2:e1:f1:61
[clients] client: 88:28:3a:d0:78:83, starts: 1423715860, ends: 1423715920, remote-id: 4e:ca:2:e1:f1:61
[clients] client: 23:ce:90:61:95:fc, starts: 1423716157, ends: 1423716217, remote-id: 4e:ca:2:e1:f1:61


あとは、ビューを作ればプロトタイプの完成(僕は Tornado で作りましたがこのコードは割愛しますね)。

dhcpd lease address  search


IP アドレスから現在・過去のクライアント(物理的なデバイス)情報を確認できましたよ!

dhcpd lease address search result


clients の配列(履歴ハッシュ)は常に肥大していくので一定期間以上古いものは削除する(MongoDB では pull で抜き出す)必要があるでしょうね。

mongo shell > db.ipv4leases.update({}, {$pull: {"clients": {"starts": {$lte: ?}}}}) # ?: UNIX TIME


また、貴方が真面目に考えるならば Fluentd、Kafkaのクラスタ構成、ファイル・バッファの活用など耐障害性を考慮した設定・設計を行う必要があるかもしれません。

それでは、また今度。

サーバ/インフラエンジニア養成読本 ログ収集~可視化編 [現場主導のデータ分析環境を構築!] (Software Design plus)
鈴木 健太 吉田 健太郎 大谷 純 道井 俊介
技術評論社
売り上げランキング: 45,103

Apache Kafka入門
Apache Kafka入門
posted with amazlet at 15.02.12
リクルートテクノロジーズ (2014-04-01)
売り上げランキング: 8,252

Posted by netbuffalo at 22:30│Comments(0)TrackBack(0)ネットワーク | プログラミング


この記事へのトラックバックURL

http://trackback.blogsys.jp/livedoor/netbuffalo/4966206

コメントする

名前
URL
 
  絵文字