[Perl多线程]自动上传FTP文件的多线程扩展

类别:软件工程 点击:0 评论:0 推荐:
Perl脚本多线程上传FTP文件

 

本脚本是对《[Perl]FTP自动上传文件的脚本以及配置文件》的多线程扩展,当然首先对方FTP站点允许同一个IP发起多个连接。

 

ithreads支持

Perl5.6.0已经加入了ithreads支持,我们通过

use threads;

导入threads多线程处理包;并且通过

use threads::shared;

使用线程间共享变量。

在定义全局并线程间共享的变量时,要这样写:

my $CurrentThreads: shared = 0;     #当前线程总数

因为在Perl的实现中,线程并不是像pthread那样共享变量,而是大家都分开,如同原来的进程一样。如果想让一个变量共享,须要显式地指定它才行。

 

当然我们还要限制并发活动线程数目,因为最多线程数是1-65。perl最多允许64个子线程,加上主线程因此最多65个线程,而且FTP站点允许多大连接数。这里$MaxThreads设置是从配置文件读取的。

配置文件的读取我们是采用

use Config::IniFiles;

库的。它的读取很简单:

my $cfg = Config::IniFiles->new( -file => "FTPUpload.config" );

 

等待线程的完全退出

通过不断地调用

my $thread = threads->create('processFile', $srcpath, $dstpath, $dstdir);

启动了多干个线程后,我们这里一定要调用

push(@$self, \$thread);

因为,创建一个thread以后要用join取得该thread的返回值,然后系统才会对thread进行清理,否则所有thread的信息都会保留下来,当然越积越多了。对返回值不关心的时候要用detach显式剥离该thread。

所以,在最后我们要等待这些线程的完全退出:

foreach my $thread (@$self)

{

print("Joining thread\n");

$$thread->join();

}

否则,threads库会在最后打印如下信息:

A thread exited while 2 threads were running.

 

 

程序大致的流程是:

      第一步,尝试用配置信息登陆ftp站点,这只是验证能够登陆而已;

       第二步,在指定文件夹A类下寻找符合条件的文件,并将对每一个A类文件创建一个线程上传。每个线程单独创建一个FTP连接,并不断地尝试上传这个文件直至成功;

       第三步,如果A类文件们全部上传成功,那么在指定文件夹B类下寻找指定文件,   并且上传到FTP指定目录下

        第四步,写成功/失败日志。

 

    最后,我们要写的成功/失败日志的格式如下所示: 

         成功: 生成一个名为“Upload_Succ_2005_01_04_17_23.log”的日志文件

       文件格式:输出上传时间,以及所有上传文件名及其大小和耗费的时间。

        失败: 生成一个名为“Upload_Fail_2005_01_04_17_23.log”的日志文件

       文件格式:输出上传时间,以及已经上传的文件名及其大小和耗费的时间,和失败的文件名及原因。

 

配置perl脚本运行有两个办法:

u       您可以在Windows计划任务中配置运行“Perl Upload.pl”的时间,这需要在Windows环境中配置ActivePerl 5.8.4.810;

u       您也可以利用Perl2Exe(p2x-8.40-Win32)来将perl脚本编译为一个exe可执行程序,在计划任务中运行这个exe(这需要PerlCRT.dll在系统路径下)。

[注意!]在运行之前,您必须修改“Upload.config”文件以配置所需的重要参数。

 

外部配置参数

 在和perl脚本同一目录下的“Upload.config”配置文件中,是事先配置的10个外部参数: 

#            参数1: ftp_server:

#                                        FTP服务器的IP地址。

#            参数2: ftp_dir:

#                                        指定的FTP上传目录路径;

#            参数3: ftp_uid:

#                                        FTP的登陆用户名;

#            参数4: ftp_pw:

#                                        FTP的登陆密码;

#                   参数5: src_dir_WAVFiles,这是一个数组:

?         指定A类文件夹,放置所有要上传的语音文件;

?         注意:这个目录下的子文件夹也会被上传。

#                   参数6: src_dir_NamesListFile,这是一个数组:

?         指定B类文件夹,放置B类文件. 注意:这个目录下的子文件夹也会被上传。

#            参数7: ftp_timeout:

#                                        FTP的访问超时时间,以秒为单位;

#            参数8: ftp_retrytimes:

#                                        FTP上传一个文件的重试次数,如果重试这么多次都失败之后,报告错误;

#            参数9: ftp_debug :

#                                 FTP的详细的调试信息输出

#            参数10: maxthreads:

#                                 启动同时访问FTP站点的线程最大数目

 

附录:
Upoad.pl内容:
#!/usr/bin/perl -w

####################################################################
#
# 工程项目: FTP自动上传两类文件
#
# 模块名称: FTPAutoUpload
#
# 模块任务: 按照指定的文件夹目录,自动将该文件夹下的所有文件上传到指定ftp站点的指定目录下
#
# 程序名称:     Upload.pl

# 程序员:   Uwe Keim
#                   郑昀
# 历史记录:
# 编号     日期       作者               备注
# 1       2000          Uwe Keim    创建
# 2       2005.1.5    郑昀              加入容错处理和读取外部配置文件部分

# 3    2005.1.14   郑昀       加入多线程支持
#
####################################################################

 

#use strict;

 

##================================================================##

## 引用的库声明 1

## 读取ini配置文件的库

use Config::IniFiles;

my $cfg = Config::IniFiles->new( -file => "Upload.config" );

## 启动同时访问FTP站点的线程最大数目 ##

$config_maxthreads          = $cfg->val('Threads', 'maxthreads') || 1;

##================================================================##

 

##================================================================##

## 引用的库声明 2

use File::Copy;

use File::stat;

use File::Find;

use Net::FTP;

use Date::Pcalc qw(Delta_DHMS);

use Date::Parse;

#!!use Win32::OLE; # 加入这个库会导致Perl执行线程时崩溃

#!!use Win32::OLE::Variant; # 加入这个库会导致Perl执行线程时崩溃

##================================================================##

 

##================================================================##

## 引用的库声明 3

#       Perl5.6.0已经加入了ithreads支持

use threads;                #导入threads多线程处理包

use threads::shared;        #使用线程间共享变量

my $MaxThreads = $config_maxthreads;    #最多线程数(1-65),perl最多允许64个子线程,加上主线程因此最多65个线程

                                        #此次设置是从配置文件读取的

 

# 在定义全局并线程间共享的变量时,要这样写

my $CurrentThreads: shared = 0;     #当前线程总数

##  $total_files是上传文件的数目

my $total_files: shared = 0;

 

##  $processed_files是已上传文件的数目

my $processed_files: shared = 0;

 

##  $skipped_files是跳过文件的数目

my $skipped_files: shared = 0;

 

##  FTP重试次数

my $ftp_retrytimes: shared = 0;

 

## $g_nIsAllWAVsFile_UploadSuccess代表是否已经完全将语音文件上传,-1为不是,1为是:

my $g_nIsAllWAVsFile_UploadSuccess: shared = 1;

 

# 因为在Perl的实现中,线程并不是像pthread那样共享变量,而是大家都分开,如同原来的进程一样。

# 如果想让一个变量共享,须要显式地指定它才行。

#

##================================================================##

 

 

##================================================================##

## 从配置文件读取外部参数 ##

##

## FTP服务器的IP地址 ##

my $ftp_server              = $cfg->val('FTPServer', 'ftp_server') || '';

 

## 指定的FTP上传目录路径 ##

#! 切记:文件夹最后不要加"/"符号 !#

my $ftp_dir             = $cfg->val('FTPServer', 'ftp_dir') || '';

 

## FTP的登陆用户名 ##

my $ftp_uid         = $cfg->val('FTPServer', 'ftp_uid') || '';

 

## FTP的登陆密码 ##

my $ftp_pw                  = $cfg->val('FTPServer', 'ftp_pw') || '';

 

## FTP的访问超时时间,以秒为单位,默认设置为30分钟 ##

my $ftp_timeout                 = $cfg->val('FTPServer', 'ftp_timeout') || 1800;

####$ftp_timeout                = 1800;

 

## FTP上传一个文件的重试次数,如果重试这么多次都失败之后,报告错误 ##

$ftp_retrytimes                 = $cfg->val('FTPServer', 'ftp_retrytimes') || 3;

####$ftp_retrytimes             = 3;

 

## FTP的详细的调试信息输出 ##

$ftp_debug                  = $cfg->val('FTPServer', 'ftp_debug') || 0;

####$ftp_debug              = 0;

 

## 指定文件夹“语音文件”,放置所有要上传的语音文件 ##

#! 切记:文件夹最后不要加"\\"符号 !#

my @src_dir_WAVFiles    = $cfg->val('SrcDirectory', 'src_dir_WAVFiles');

 

## 指定文件夹“命名对照列表文件TXT”,放置命名对照列表文件 ##

#! 切记:文件夹最后不要加"\\"符号 !#

my @src_dir_NamesListFile   = $cfg->val('SrcDirectory', 'src_dir_NamesListFile');

 

## 一个字符串集合,表明哪些类型的文件/文件夹将不被上传到服务器 ##

my @wc_exclude          = ("_vti",".log","\\bak","\\data","server.inc");

##================================================================##

 

 

##================================================================##

## 记录全部过程的日志文件准备

my $logfilename = 'upload.log';

 

my $log_cnt = 0;

 

my $span = 0;

 

LOG("");

LOG("郑昀(R) 自动上传两类文件 Version 0.1");

LOG("没有版权(C) Linktone 2005-2006。不保留所有权利。");

LOG("");

LOG("用法: Perl FTPUpload.MultiThread.pl");

LOG("");

##================================================================##

 

##================================================================##

##=== 程序执行的第一步:尝试登陆ftp站点 ==========================##

##  $start_date计算出当前开始的时间

my $start_date = timeString(time);

 

## $g_nUploadSuccess代表是否已经完全上传,-1为不是,1为是:

my $g_nUploadSuccess = 1;

 

## $g_strLastError代表上次错误原因:

my $g_strLastError = "";

 

LOG("正在链接至指定FTP服务器($ftp_server)...");

# 第二个参数用于指定超时时间

my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);

if($@)

{

    $g_strLastError = "不能连接到FTP服务器,错误原因:".$@;

    LOG("$g_strLastError@\n");

    $g_nUploadSuccess = -1;

}

else

{

    $ftp->login($ftp_uid, $ftp_pw);

    if($@)

    {

        $g_strLastError = "不能登陆FTP服务器,错误原因:".$@;

        LOG("$g_strLastError\n");

        $g_nUploadSuccess = -1;

    }

    else

    {

        LOG("链接FTP服务器成功!");

        ## 关闭连接

        $ftp->quit() or warn "咦,为什么不能够和FTP服务器断开连接呢?: $@\n";

##================================================================##

 

 

##================================================================##

##=== 程序执行的第二步,将指定文件夹“语音文件”下所有语音文件上传到FTP站点指定目录下 ===##

 

        #my %lookup;

        LOG("准备上传“语音文件”目录(@src_dir_WAVFiles)下的所有文件!");

        find(\&processFiles, @src_dir_WAVFiles);

        LOG("目录(@src_dir_WAVFiles)已经处理完毕,结果是:");

 

        foreach my $thread (@$self)

        {

            print("Joining thread\n");

            # join() does three things: it waits for a thread to exit,

            # cleans up after it, and returns any data the thread may

            # have produced.

            $$thread->join();

        }

       

##================================================================##

##=== 程序执行的第三步,将指定文件夹“命名对照列表文件TXT”下文件上传到FTP站点指定目录下 ===##

        if($g_nIsAllWAVsFile_UploadSuccess > 0)

        {

            LOG("+===============================+");

            LOG("准备上传“语音文件”目录(@src_dir_NamesListFile)下的所有文件!");

            @$self = {};

            find(\&processFiles, @src_dir_NamesListFile);

            foreach my $thread (@$self)

            {

                print("Joining thread\n");

                $$thread->join();

            }

            LOG("目录(@src_dir_NamesListFile)已经处理完毕,结果是:");

            LOG("-===============================-");

        }

        else

        {

            LOG("-===============================-");

            LOG("由于语音文件目录并没有完全上传,所以本命名对照文件不上传!");

            LOG("-===============================-");

        }

##================================================================##

 

##================================================================##

# 日志文件的最后是一个统计报告

        $span = calcDeltaSeconds($start_date,timeString(time));

        LOG("上传结果:成功。\n花费:$span 秒, 总共处理了 $total_files 个文件, 其中 $processed_files 上传成功, 跳过了 $skipped_files 个文件。");

 

    }

    closeLogfile();

}

 

##================================================================##

 

##================================================================##

##=== 程序执行的第四步,写成功日志 ===============================##

if($g_nIsAllWAVsFile_UploadSuccess > 0 && $g_nUploadSuccess > 0)

{

    $logfilename = 'FTPUpload.MultiThread_Succ_'.shortTimeString(time).'.log';

 

    $log_cnt = 0;

 

    LOG("");

    LOG("郑昀(R)自动上传文件 Version 0.1");

    LOG("没有版权 (C) Linktone 2005-2006。不保留所有权利。");

    LOG("");

    LOG("上传结果:成功。\n花费:$span 秒, 总共处理了 $total_files 个文件, 其中 $processed_files 上传成功, 跳过了 $skipped_files 个文件。");

    LOG("");

    closeLogfile();

}

##================================================================##

 

##================================================================##

##=== 程序执行的第四步,写失败日志 ===============================##

if($g_nIsAllWAVsFile_UploadSuccess < 0 || $g_nUploadSuccess < 0)

{

    $logfilename = 'FTPUpload.MultiThread_Fail_'.shortTimeString(time).'.log';

 

    $log_cnt = 0;

 

    LOG("");

    LOG("郑昀(R)自动上传文件 Version 0.1");

    LOG("没有版权 (C) Linktone 2005-2006。不保留所有权利。");

    LOG("");

    LOG("上传结果:失败。失败原因:$g_strLastError。\n花费:$span 秒, 总共处理了 $total_files 个文件, 其中 $processed_files 上传成功, 跳过了 $skipped_files 个文件。");

    LOG("");

    closeLogfile();

}

##================================================================##

 

 

####################################################################

 

## 以下是子函数体的定义

 

##-------------------------------------------------------------##

##

## 函数名称:processFiles

## 功能:   

##           得到指定文件夹下的所有文件以及子文件夹,然后依次处理它们。

##

## 程序员:  郑昀([email protected])

##

## 历史记录:

## 编号     日期          作者    备注

## 1       2005.1.4       郑昀   

##

##-------------------------------------------------------------##

sub processFiles

{

    my $srcdir = fsToBs($File::Find::dir);

    my $srcpath = fsToBs($File::Find::name);

    my $base = fsToBs($File::Find::topdir);

 

    foreach my $exclude (@wc_exclude) {

        if ( index($srcpath, $exclude)>-1 ) {

            $File::Find::prune = 1 if -d $srcpath;

            return;

        }

    }

 

    # no DIRECT processing of directories.

    if ( -d $srcpath ) {

        return;

    }

 

    my $dstdir = $srcdir;

    my $dstpath = $srcpath;

    $dstdir =~ s{\Q$base\E}{$ftp_dir}is;

    $dstpath =~ s{\Q$base\E}{$ftp_dir}is;

    $dstdir = bsToFs($dstdir);

    $dstpath = bsToFs($dstpath);

 

    #-------------------------------------

    # 早先版本,单线程逐次处理每一个文件

    #processFile($srcpath,$dstpath,$dstdir);

    #-------------------------------------

   

    #-------------------------------------

    # 20050114版本,多线程并发处理文件

    # 每次创建线程处理文件之前,先检查当前已启动的线程数目是否小于最大允许数目

    # 如果已经创建了足够多的线程,就不断地轮询

    while(1)

    {

        if($CurrentThreads < $MaxThreads)

        {

            # 创建线程,传入参数

            my $thread = threads->create('processFile', $srcpath, $dstpath, $dstdir);

            #my $thread = threads->create('processFile2',"zhengyun");

            # 创建一个thread以后要用join取得该thread的返回值,然后系统才会对thread进行清理,

            # 否则所有thread的信息都会保留下来,当然越积越多了。对返回值不关心的时候要用detach显式剥离该thread。

            push(@$self, \$thread);

            #$thread->detach();

            last; # 退出循环

        }

        else

        {

            # 静候一秒钟,注意这里sleep的参数是以秒为单位的:

            LOG("-sleep 1秒钟");

            sleep 1;

        }

    }

    #-------------------------------------

}

sub processFile2

{

    my ($prime) = @_;

    print "Thread started\n".$prime."\n";

}

##-------------------------------------------------------------##

##

## 函数名称:processFile

## 功能:   

##           对目标文件进行FTP上传,如果失败就重试若干次。

##

## 程序员:  郑昀([email protected])

##

## 历史记录:

## 编号     日期          作者    备注

## 1       2005.1.4       郑昀   

##

##-------------------------------------------------------------##

sub processFile

{

    ## 当前新建的线程数加一:

    {

        lock($CurrentThreads);

        $CurrentThreads++;

    }

    my ($srcThread, $dstThread, $dstdirThread) = @_;

 

    ## 当前处理的总文件数加一:

    {

        lock($total_files);

        $total_files++;

        LOG("正在处理文件 $total_files \"$srcThread\"...");

    }  

 

    # --------------------

    # check time.

 

    my $need_upload = 0;

    my $bPutResult = 0;

 

    # create time.

    my $t1 = $lookup{$srcThread};

    my $t2 = timeString(stat($srcThread)->mtime);

 

    if ( not defined $t1 ) {

        $lookup{$srcThread} = $t2;

        $need_upload = 1;

    } else {

        my $delta_sec = calcDeltaSeconds($t1,$t2);

        $need_upload = 1 if $delta_sec>5;                   # 5 seconds as tolerance.

    }

 

    # --------------------

 

    if ( $need_upload > 0 )

    {

        my $nProcessIndex = 1;

        for ($nProcessIndex = 1; $nProcessIndex <= $ftp_retrytimes; $nProcessIndex++)

        {

            my $ftp = Net::FTP->new($ftp_server, Debug=>$ftp_debug, Timeout=>$ftp_timeout);

            if($@)

            {

                $g_strLastError = "不能连接到FTP服务器,错误原因:".$@;

                LOG("$g_strLastError@\n");

            }

            else

            {

                $ftp->login($ftp_uid, $ftp_pw);

                if($@)

                {

                    $g_strLastError = "不能登陆FTP服务器,错误原因:".$@;

                    LOG("$g_strLastError\n");

                }

                else

                {

                    $ftp->binary;

                    LOG("第$nProcessIndex次尝试:正在上传文件:从源 \"$srcThread\" 到目标 \"$dstThread\"...当前活动线程数:$CurrentThreads");

 

                    {

                        $bPutResult = 0;

                        $ftp->mkdir($dstdirThread, 1);

                        $ftp->put($srcThread, $dstThread) or $bPutResult = -1;

                    }

                    if($bPutResult < 0)

                    {

                        LOG("第$nProcessIndex次尝试:不能上传文件:从源 \"$srcThread\" 到目标 \"$dstThread\" (dst-dir: \"$dstdirThread\")。\n");

                        if($@)

                        {

                            LOG("第$nProcessIndex次尝试:错误原因是:$@\n");

                        }

                    }

                    else

                    {

                        LOG("第$nProcessIndex次尝试:上传文件成功:从源 \"$srcThread\" 到目标 \"$dstThread\"!\n");

                        {

                            lock($processed_files);

                            $processed_files++;

                        }

                        ## 关闭连接

                        $ftp->quit() if($ftp);

                        last; # 退出循环

                    }

                }

            }

           

            ## 关闭连接

            $ftp->quit() if($ftp);

           

        }#for

               

        if($bPutResult < 0)

        {

            # 尝试了这么多次之后还是不能够上传,报告错误罢了

            {

                lock($skipped_files);

                $skipped_files++;

                lock($g_nIsAllWAVsFile_UploadSuccess);

                $g_nIsAllWAVsFile_UploadSuccess = -1;

            }

        }

    }

    else

    {

        {

            lock($skipped_files);

            $skipped_files++;

        }

    }

   

    ## 当前活动的线程数减一:

    {

        lock($CurrentThreads);

        $CurrentThreads--;

    }

}

 

####################################################################

 

sub bsToFs {

    my ($s) = @_;

    $s =~ s/\\/\//gis;

    return $s;

}

 

sub fsToBs {

    my ($s) = @_;

    $s =~ s/\//\\/gis;

    return $s;

}

 

sub timeString {

    my ($tm) = @_;

    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);

    return sprintf("%04d-%02d-%02d %02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);

}

 

sub shortTimeString {

    my ($tm) = @_;

    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime($tm);

    return sprintf("%04d_%02d_%02d_%02d_%02d", $year+1900, $mon+1, $mday, $hour, $min);

}

 

# input dates as string "YYYY-MM-DD HH:MM:SS".

# earlier as first parameter, later as second.

sub calcDeltaSeconds {

    my ($t1,$t2) = @_;

 

    my ($year1,$month1,$day1,$hh1,$mm1,$ss1) = scanDate($t1);

    my ($year2,$month2,$day2,$hh2,$mm2,$ss2) = scanDate($t2);

 

    my ($days, $hours, $minutes, $seconds) = Delta_DHMS(

        $year1, $month1, $day1, $hh1, $mm1, $ss1,                   # earlier.

        $year2, $month2, $day2, $hh2, $mm2, $ss2);              # later.

 

    return $seconds + $minutes*60 + $hours*60*60 + $days*60*60*24.

}

 

sub removeFilename {

    my ($s) = @_;

    my $pos = rindex($s,'\\');

    return substr($s, 0, $pos);

}

 

# format: "2000-09-29 09:09:51".

sub scanDate {

    my ($date) = @_;

    my ($year, $month, $day, $hour, $minute, $seconds);

 

    $year           = substr($date, 0, 4);

    $month      = substr($date, 5, 2);

    $day            = substr($date, 8, 2);

    $hour           = substr($date, 11, 2);

    $minute     = substr($date, 14, 2);

    $seconds    = substr($date, 17, 2);

 

    return ($year, $month, $day, $hour, $minute, $seconds);

}

 

####################################################################

 

sub LOG {

    my ($text) = @_;

    my $time = timeString time;

 

    # log to stdout.

    print "[$time] $text\n";

 

    # log to logfile.

    my $LOG_STEP = 10;

    flushLogfile() if ($log_cnt % $LOG_STEP)==0 or $log_cnt==0;

    $log_cnt++;

    print HLOG "[$time] $text\n";

}

 

sub openLogfile {

    closeLogfile();

    open(HLOG,">>$logfilename") or die("打开日志文件出错:文件名为 $logfilename ;错误原因为: $!");

};

 

sub closeLogfile {

    close HLOG if defined HLOG;

}

 

sub flushLogfile {

    closeLogfile();

    openLogfile();

}

 

####################################################################

 

 

附录:
Upoad.config内容:
##================================================================##
## 配置的外部参数 ##
##
[FTPServer]
#- FTP服务器的IP地址 -#
ftp_server   =
#- 指定的FTP上传目录路径 -#
#! 切记:文件夹最后不要加"/"符号 !#
ftp_dir       =
#- FTP的登陆用户名 -#
ftp_uid    =
#- FTP的登陆密码 -#
ftp_pw    =
#- FTP的访问超时时间,以秒为单位 -#

ftp_timeout          =1800

#- FTP上传一个文件的重试次数,如果重试这么多次都失败之后,报告错误 -#

ftp_retrytimes           =10

#- FTP的详细的调试信息输出 -#

    ftp_debug            =1
##=====================================================##


##=====================================##
## 配置的外部参数 ##
##
[SrcDirectory]
#- 指定文件夹“语音文件”,放置所有要上传的语音文件 -#
#! 切记:文件夹最后不要加"\"符号 !#
src_dir_WAVFiles  =


#- 指定文件夹“命名对照列表文件TXT”,放置命名对照列表文件 -#
#! 切记:文件夹最后不要加"\"符号 !#
src_dir_NamesListFile =
##=========================================##

 

##========================================##

## 配置的外部参数3 ##

##

[Threads]

#- 启动同时访问FTP站点的线程最大数目 -#

maxthreads    =1

##==========================================##

 

本文地址:http://com.8s8s.com/it/it32915.htm