频道栏目
首页 > 资讯 > 云计算 > 正文

深度学习算法实践12---卷积神经网络(CNN)实现

16-09-07        来源:[db:作者]  
收藏   我要投稿

在搞清楚卷积神经网络(CNN)的原理之后,在本篇博文中,我们将讨论基于Theano的算法实现技术。我们还将以MNIST手写数字识别为例,创建卷积神经网络(CNN),训练该网络,使识别误差达到1%以内。

我们首先需要读入MNIST手写数字识别的训练样本集,为此我们定义了一个工具类:

 

from __future__ import print_function

__docformat__ = 'restructedtext en'

import six.moves.cPickle as pickle
import gzip
import os
import sys
import timeit

import numpy

import theano
import theano.tensor as T

class MnistLoader(object):
    def load_data(self, dataset):
        data_dir, data_file = os.path.split(dataset)
        if data_dir == "" and not os.path.isfile(dataset):
            new_path = os.path.join(
                os.path.split(__file__)[0],
                "..",
                "data",
                dataset
            )
            if os.path.isfile(new_path) or data_file == 'mnist.pkl.gz':
                dataset = new_path

        if (not os.path.isfile(dataset)) and data_file == 'mnist.pkl.gz':
            from six.moves import urllib
            origin = (
                'http://www.iro.umontreal.ca/~lisa/deep/data/mnist/mnist.pkl.gz'
            )
            print('Downloading data from %s' % origin)
            urllib.request.urlretrieve(origin, dataset)

        print('... loading data')
        # Load the dataset
        with gzip.open(dataset, 'rb') as f:
            try:
                train_set, valid_set, test_set = pickle.load(f, encoding='latin1')
            except:
                train_set, valid_set, test_set = pickle.load(f)
        def shared_dataset(data_xy, borrow=True):
            data_x, data_y = data_xy
            shared_x = theano.shared(numpy.asarray(data_x,
                                               dtype=theano.config.floatX),
                                 borrow=borrow)
            shared_y = theano.shared(numpy.asarray(data_y,
                                               dtype=theano.config.floatX),
                                 borrow=borrow)
            return shared_x, T.cast(shared_y, 'int32')

        test_set_x, test_set_y = shared_dataset(test_set)
        valid_set_x, valid_set_y = shared_dataset(valid_set)
        train_set_x, train_set_y = shared_dataset(train_set)

        rval = [(train_set_x, train_set_y), (valid_set_x, valid_set_y),
            (test_set_x, test_set_y)]
        return rval

这个类在之前我们已经用过,在这里就不详细讲解了。之所以单独定义这个类,是因为如果我们将问题换为其他类型时,我们只需要修改这一个类,就可以实现训练数据的载入了,这样简化了程序修改工作量。

 

我们所采用的方法是将图像先接入卷积神经网络,之后再接入BP网络的隐藏层,然后再接入逻辑回归的输出层,因此我们需要先定义多层前向网络的隐藏层和逻辑回归输出层。隐藏层的定义如下所示:

 

from __future__ import print_function

__docformat__ = 'restructedtext en'


import os
import sys
import timeit

import numpy

import theano
import theano.tensor as T


from logistic_regression import LogisticRegression

# start-snippet-1
class HiddenLayer(object):
    def __init__(self, rng, input, n_in, n_out, W=None, b=None,
                 activation=T.tanh):
        self.input = input
        if W is None:
            W_values = numpy.asarray(
                rng.uniform(
                    low=-numpy.sqrt(6. / (n_in + n_out)),
                    high=numpy.sqrt(6. / (n_in + n_out)),
                    size=(n_in, n_out)
                ),
                dtype=theano.config.floatX
            )
            if activation == theano.tensor.nnet.sigmoid:
                W_values *= 4

            W = theano.shared(value=W_values, name='W', borrow=True)

        if b is None:
            b_values = numpy.zeros((n_out,), dtype=theano.config.floatX)
            b = theano.shared(value=b_values, name='b', borrow=True)

        self.W = W
        self.b = b

        lin_output = T.dot(input, self.W) + self.b
        self.output = (
            lin_output if activation is None
            else activation(lin_output)
        )
        # parameters of the model
        self.params = [self.W, self.b]
接下来我们定义逻辑回归算法类:

 

 

from __future__ import print_function

__docformat__ = 'restructedtext en'

import six.moves.cPickle as pickle
import gzip
import os
import sys
import timeit

import numpy

import theano
import theano.tensor as T

class LogisticRegression(object):  
    def __init__(self, input, n_in, n_out):  
        self.W = theano.shared(  
            value=numpy.zeros(  
                (n_in, n_out),  
                dtype=theano.config.floatX  
            ),  
            name='W',  
            borrow=True  
        )  
        self.b = theano.shared(  
            value=numpy.zeros(  
                (n_out,),  
                dtype=theano.config.floatX  
            ),  
            name='b',  
            borrow=True  
        )  
        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)  
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)  
        self.params = [self.W, self.b]  
        self.input = input  
        print("Yantao: ***********************************")
  
    def negative_log_likelihood(self, y):  
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])  
  
    def errors(self, y):  
        if y.ndim != self.y_pred.ndim:  
            raise TypeError(  
                'y should have the same shape as self.y_pred',  
                ('y', y.type, 'y_pred', self.y_pred.type)  
            )  
        if y.dtype.startswith('int'):  
            return T.mean(T.neq(self.y_pred, y))  
        else:  
            raise NotImplementedError()  

 

这段代码在逻辑回归博文中已经详细讨论过了,这里就不再重复了,有兴趣的读者可以查看这篇博文(逻辑回归算法实现)。

做完上述准备工作之后,我们就可以开始卷积神经网络(CNN)实现了。

我们先来定义基于简化版Lenet5的卷积神经网络(CNN)的定义,代码如下所示:

 

from __future__ import print_function

import os
import sys
import timeit

import numpy

import theano
import theano.tensor as T
from theano.tensor.signal import pool
from theano.tensor.nnet import conv2d


class LeNetConvPoolLayer(object):
    def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2, 2)):
        assert image_shape[1] == filter_shape[1]
        self.input = input
        fan_in = numpy.prod(filter_shape[1:])
        fan_out = (filter_shape[0] * numpy.prod(filter_shape[2:]) //
                   numpy.prod(poolsize))
        W_bound = numpy.sqrt(6. / (fan_in + fan_out))
        self.W = theano.shared(
            numpy.asarray(
                rng.uniform(low=-W_bound, high=W_bound, size=filter_shape),
                dtype=theano.config.floatX
            ),
            borrow=True
        )
        b_values = numpy.zeros((filter_shape[0],), dtype=theano.config.floatX)
        self.b = theano.shared(value=b_values, borrow=True)
        conv_out = conv2d(
            input=input,
            filters=self.W,
            filter_shape=filter_shape,
            input_shape=image_shape
        )
        pooled_out = pool.pool_2d(
            input=conv_out,
            ds=poolsize,
            ignore_border=True
        )
        self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x'))
        self.params = [self.W, self.b]
        self.input = input
上面代码实现了对输入信号的卷积操作,并对结果进行最大化池化。

 

下面我们来看怎样初始化Lenet层,怎样将Lenet层输出信号转为MLP网络隐藏层的输入信号,具体代码如下所示:

 

layer0 = LeNetConvPoolLayer(
        rng,
        input=layer0_input,
        image_shape=(batch_size, 1, 28, 28),
        filter_shape=(nkerns[0], 1, 5, 5),
        poolsize=(2, 2)
    )
如上所示,我们的输入信号是28*28的黑白图像,而且我们采用的批量学习,因此输入图像就定义为(batch_size, 1, 28, 28),我们对图像进行5*5卷积操作,根据卷积操作定义,最终得到的卷积输出层为(28-5+1,28-5+1)=(24,24)的“图像”,我们采用2*2的最大池化操作,即取2*2区域像素的最大值作为新的像素点的值,则最终输出层得到12*12的输出信号。

 

接下来,我们将输出信号继续输入一个Lenet卷积池化层,代码如下所示:

 

    layer1 = LeNetConvPoolLayer(
        rng,
        input=layer0.output,
        image_shape=(batch_size, nkerns[0], 12, 12),
        filter_shape=(nkerns[1], nkerns[0], 5, 5),
        poolsize=(2, 2)
    )
如上所示,这时输入信号变化为12*12的图像,我们还使用5*5的卷积核,可以得到(12-5+1, 12-5+1)=(8,8)的图像,采用2*2最大池化操作后,得到(4,4)图像。可以通过调用layer1.output.flatten(2)将其变为一维信号,从而输入MLP的隐藏层。

 

下面我们定义Lenet引擎来实现装入数据,定义网络模型,训练网络工作,代码如下所示:

 

from __future__ import print_function

import os
import sys
import timeit

import numpy

import theano
import theano.tensor as T
from theano.tensor.signal import pool
from theano.tensor.nnet import conv2d

from mnist_loader import MnistLoader
from logistic_regression import LogisticRegression
from hidden_layer import HiddenLayer
from lenet_conv_pool_layer import LeNetConvPoolLayer

class LenetMnistEngine(object):
    def __init__(self):
        print("create LenetMnistEngine")

    def train_model(self):
        learning_rate = 0.1
        n_epochs = 200
        dataset = 'mnist.pkl.gz'
        nkerns = [20, 50]
        batch_size = 500
        (n_train_batches, n_test_batches, n_valid_batches, \
                    train_model, test_model, validate_model) = \
                    self.build_model(learning_rate, n_epochs, \
                        dataset, nkerns, batch_size)
        self.train(n_epochs, n_train_batches, n_test_batches, \
                    n_valid_batches, train_model, test_model, \
                    validate_model)

    def run(self):
        print("run the model")
        classifier = pickle.load(open('best_model.pkl', 'rb'))
        predict_model = theano.function(
            inputs=[classifier.input],
            outputs=classifier.logRegressionLayer.y_pred
        )
        dataset='mnist.pkl.gz'
        loader = MnistLoader()
        datasets = loader.load_data(dataset)
        test_set_x, test_set_y = datasets[2]
        test_set_x = test_set_x.get_value()
        predicted_values = predict_model(test_set_x[:10])
        print("Predicted values for the first 10 examples in test set:")
        print(predicted_values)

    def build_model(self, learning_rate=0.1, n_epochs=200,
                        dataset='mnist.pkl.gz',
                        nkerns=[20, 50], batch_size=500):
        rng = numpy.random.RandomState(23455)
        loader = MnistLoader()
        datasets = loader.load_data(dataset)
        train_set_x, train_set_y = datasets[0]
        valid_set_x, valid_set_y = datasets[1]
        test_set_x, test_set_y = datasets[2]
        n_train_batches = train_set_x.get_value(borrow=True).shape[0]
        n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
        n_test_batches = test_set_x.get_value(borrow=True).shape[0]
        n_train_batches //= batch_size
        n_valid_batches //= batch_size
        n_test_batches //= batch_size
        index = T.lscalar() 
        x = T.matrix('x')   
        y = T.ivector('y') 
        print('... building the model')
        layer0_input = x.reshape((batch_size, 1, 28, 28))
        layer0 = LeNetConvPoolLayer(
            rng,
            input=layer0_input,
            image_shape=(batch_size, 1, 28, 28),
            filter_shape=(nkerns[0], 1, 5, 5),
            poolsize=(2, 2)
        )
        layer1 = LeNetConvPoolLayer(
            rng,
            input=layer0.output,
            image_shape=(batch_size, nkerns[0], 12, 12),
            filter_shape=(nkerns[1], nkerns[0], 5, 5),
            poolsize=(2, 2)
        )
        layer2_input = layer1.output.flatten(2)
        layer2 = HiddenLayer(
            rng,
            input=layer2_input,
            n_in=nkerns[1] * 4 * 4,
            n_out=500,
            activation=T.tanh
        )
        layer3 = LogisticRegression(input=layer2.output, n_in=500, n_out=10)
        cost = layer3.negative_log_likelihood(y)
        test_model = theano.function(
            [index],
            layer3.errors(y),
            givens={
                x: test_set_x[index * batch_size: (index + 1) * batch_size],
                y: test_set_y[index * batch_size: (index + 1) * batch_size]
            }
        )
        validate_model = theano.function(
            [index],
            layer3.errors(y),
            givens={
                x: valid_set_x[index * batch_size: (index + 1) * batch_size],
                y: valid_set_y[index * batch_size: (index + 1) * batch_size]
            }
        )
        params = layer3.params + layer2.params + layer1.params + layer0.params
        grads = T.grad(cost, params)
        updates = [
            (param_i, param_i - learning_rate * grad_i)
            for param_i, grad_i in zip(params, grads)
        ]
        train_model = theano.function(
            [index],
            cost,
            updates=updates,
            givens={
                x: train_set_x[index * batch_size: (index + 1) * batch_size],
                y: train_set_y[index * batch_size: (index + 1) * batch_size]
            }
        )
        return (n_train_batches, n_test_batches, n_valid_batches, \
                    train_model, test_model, validate_model)

    def train(self, n_epochs, n_train_batches, n_test_batches, n_valid_batches, 
                train_model, test_model, validate_model):
        print('... training')
        patience = 10000
        patience_increase = 2
        improvement_threshold = 0.995
        validation_frequency = min(n_train_batches, patience // 2)
        best_validation_loss = numpy.inf
        best_iter = 0
        test_score = 0.
        start_time = timeit.default_timer()
        epoch = 0
        done_looping = False
        while (epoch < n_epochs) and (not done_looping):
            epoch = epoch + 1
            for minibatch_index in range(n_train_batches):
                iter = (epoch - 1) * n_train_batches + minibatch_index
                if iter % 100 == 0:
                    print('training @ iter = ', iter)
                cost_ij = train_model(minibatch_index)
                if (iter + 1) % validation_frequency == 0:
                    validation_losses = [validate_model(i) for i
                                         in range(n_valid_batches)]
                    this_validation_loss = numpy.mean(validation_losses)
                    print('epoch %i, minibatch %i/%i, validation error %f %%' %
                          (epoch, minibatch_index + 1, n_train_batches,
                           this_validation_loss * 100.))
                    if this_validation_loss < best_validation_loss:
                        if this_validation_loss < best_validation_loss *  \
                           improvement_threshold:
                            patience = max(patience, iter * patience_increase)
                        best_validation_loss = this_validation_loss
                        best_iter = iter
                        test_losses = [
                            test_model(i)
                            for i in range(n_test_batches)
                        ]
                        test_score = numpy.mean(test_losses)
                        with open('best_model.pkl', 'wb') as f:
                            pickle.dump(classifier, f)
                        print(('     epoch %i, minibatch %i/%i, test error of '
                               'best model %f %%') %
                              (epoch, minibatch_index + 1, n_train_batches,
                               test_score * 100.))
                if patience <= iter:
                    done_looping = True
                    break
        end_time = timeit.default_timer()
        print('Optimization complete.')
        print('Best validation score of %f %% obtained at iteration %i, '
              'with test performance %f %%' %
              (best_validation_loss * 100., best_iter + 1, test_score * 100.))
        print(('The code for file ' +
               os.path.split(__file__)[1] +
               ' ran for %.2fm' % ((end_time - start_time) / 60.)), file=sys.stderr)
上述代码与之前的MLP的训练代码类似,这里就不再讨论了。在我的Mac笔记本上,运行大约6个小时,会得到错误率小于1%的结果。
相关TAG标签
上一篇:【OVS2.5.0源码分析】ovsd进程运行机制分析(1)
下一篇:SparkStreaming源码解读之Job动态生成和深度思考
相关文章
图文推荐

关于我们 | 联系我们 | 广告服务 | 投资合作 | 版权申明 | 在线帮助 | 网站地图 | 作品发布 | Vip技术培训 | 举报中心

版权所有: 红黑联盟--致力于做实用的IT技术学习网站