#--replace-me-with-perl-command-- =pod =head1 NAME mysql_replicate - realtime replication of a mysql database server =head1 SYNOPSIS mysql_replicate.pl --path I --remote-server I [--resync-with-remote-server] [--force-resync I] [--exclude-database I] [--maintain-state-database] =head1 PREREQUISITES requires that DBI with the MySQL drivers be installed =head1 DESCRIPTION Replicates all changes to databases on a mysql server to a remote mysql server. =head1 OPTIONS =over 4 =item --path Specifies the path to the update logs. logfiles should be named like "update.*". mysql_replicate will create a status directory called replica_logs in this directory, so the user who is running mysql_replicate will need the appropriate permissions to create that directory. =item --remote-server The hostname of a remote server to replicate to. This option may be specified many times, for each host you'd like to replicate to. =item --resync-with-remote-server If a database doesn't exist on the remote server, recreate it. =item --force-resync I Force a resync/recreation of a given database on the remote server =item --exclude-databases I Exclude a database from being mirrored on the remote server =item --maintain-state-database Create and update a database on the local server called "replica". This database has information that reflects the status of the remote replica. If this item is specified, one can trigger force-resync's without having to restart the process by updating the 'force_resync' field with a comma seperated list of databases to resync in the row for the given remote servers. =item --dont-delete-old-update-logs Don't delete the old update logs. instead, rename them. (e.g. update.log to old-update.log) =back =head1 USAGE =over 4 =item Create a "replica" user on all servers involved. This user should have "root" access to all databases on the server(s). edit the script, and change the variables $replicate_user and $replicate_password as appropriate. =item Set up the server for logging updates. The mysql server should be running with the --log-update option configured as follows: --log-update=/update this is what you should specify for the path option to the script at runtime. The logs will be rotated every ten minutes. mysql_replicate will create a status directory in , so the user who is running mysql_replicate will need write access to that directory. =item NOTE: logfiles that are no longer required will be I! if multiple mysql_replicates are running to multiple servers, the lowest needed update log will still be tracked amongst the running mysql_replicates. =back if a remote server should disappear, the script will keep retrying the connection/replication transaction until it is successful. =head1 SEE ALSO =head1 NOTES =over 4 =item Should daemonize, and run as the mysql-user. This implies parsing the my.cnf file. =item Should write a utility script to help users set force-resyncs if running with the --maintain-state-database option. =item on startup, --force-resync should force the transfer of the database immediately (without waiting for a statement in the update log) =back =head1 AUTHOR =over 4 =item Andrew Elble elble@icculus.nsg.nwu.edu =back =head1 BUGS =over 4 =item remote servers get out of sync under certain (unknown) conditions. =back =head1 HISTORY =over 4 =item 11/9/1999 - fixed problem with resyncing a database related to removing the database from the list of databases to be resynced before calling reload_database() (thanks go to Yingjie Yang for pointing this out) =item 1/1/2000 - patch from Max Parke. lots of things: setuid/gid, option to move instead of delete old update logs, bug in write_log where force_write was being ignored, cleanup of error message printouts, fixed bug where recovery from a lost connection to a server was causing resyncs, fixed bug where the code to stop looking for database to use would quit too late, fixed bug where a comment would get appended to line to be commited to database, ignore INT, better method for initial connection to database, we now keep a pidfile. =back =head1 DISCLAIMER =item This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. =over 4 =back =cut use strict; use DBI; use FileHandle; use Getopt::Long; # # global variables # my $writelogint = 100; my $logcommits = 0; my $replicate_username = ""; my $replicate_password = ""; my $run_as_user = ""; my $run_as_group = ""; my $pid_file = ""; my $logflush_interval = (30*60); my $fileprefix = "update"; my %hold_commit; my @remote_servers; my @exclude_databases; my @force_resync; my @childpids; my ($pathname, $remote_server, $resync_with_remote_server, $maintain_state_database, $dont_delete_old_update_logs, $database, $default, $filename, $next_flush_time, $synced, $prefix, $extension, $curpos, $dbh, $localdbh); # # Get our options # GetOptions("path=s" => \$pathname, "remote-server=s" => \@remote_servers, "resync-with-remote-server" => \$resync_with_remote_server, "dont-delete-old-update-logs" => \$dont_delete_old_update_logs, "exclude-databases=s" => \@exclude_databases, "force-resync=s" => \@force_resync, "maintain-state-database" => \$maintain_state_database); if (($pathname eq "") || ($#remote_servers < 0)) { print "you must specify a path and at least one remote server!\n"; exit(0); } my $synclogdir = "$pathname/replica_logs"; my ($uid, $gid); if (($run_as_user ne "") && ($run_as_group ne "")) { die "unknown user $run_as_user" unless ($uid = getpwnam($run_as_user)); die "unknown group $run_as_group" unless ($gid = getgrnam($run_as_group)); die "unable to change user to $run_as_user" if ($<) ; $) = $gid; $( = $gid; $> = $uid; $< = $uid; } # set log file to flush after each write select STDOUT; $| = 1; select; if ($pid_file ne "") { # write out PID open PIDF, ">$pid_file"; print PIDF $$; close PIDF; } # # ROUTINE # reload_database # # DESCRIPTION # replicates the entire database on the remote system # sub reload_database { if (($resync_with_remote_server) || (grep(/\Q$database\E/,@force_resync))) { print "$remote_server: dumping $database to $remote_server\n"; if (grep(/\Q$database\E/,@force_resync)) { my @tmpforce_resync = grep(!/\Q$database\E/,@force_resync); @force_resync = @tmpforce_resync; } if ($maintain_state_database) { replica_table_check(); $localdbh->do("use replica"); $localdbh->do("SET SQL_LOG_UPDATE=0"); $localdbh->do("update replica_status set resyncing = '".$database."' where hostname = '".$remote_server."'"); } # print "$remote_server: unsetting default\n"; $default = undef; commit("drop database $database"); commit("create database $database"); commit("use $database"); $localdbh->do("use $database"); $localdbh->do("SET SQL_LOG_UPDATE=0"); # # get a write lock on all tables so that the data doesn't change. # my $query = $localdbh->prepare("show tables"); $query->execute; my $table = $query->fetchall_arrayref; my $i; my $dostring = "lock tables"; my $first = 1; foreach $i (0 .. $#{ $table }) { if ($first == 1) { $dostring .= " ".$table->[$i][0]." write"; $first = 0; } else { $dostring .= ",".$table->[$i][0]." write"; } } # print "LOCK: $dostring\n"; $localdbh->do($dostring); # # force a logflush. that way, any potential changes to this database # are only valid in the latest logfile. # $next_flush_time = 0; flush_logs(); # # find the highest numbered logfile, and temporarily block commits to the database # until we reach the highest numbered logfile. # opendir(LOGDIR,$pathname); my @logs = sort {$b cmp $a} (grep { /^\Q$fileprefix\E/ && -f "$pathname/$_" } readdir(LOGDIR)); closedir(LOGDIR); my $tmpfilename = $pathname."/".$logs[0]; if ( -f $tmpfilename) { print "$remote_server: updates for $database will recommence with: $tmpfilename\n"; my ($tmpprefix, $tmpextension) = ($tmpfilename =~ /(\w+)\.(\d+)/); $hold_commit{$tmpprefix.$tmpextension} = $database; } else { die "unable to keep track of logfile!"; } # # Create the tables and dump the data! # my $dostring = ""; foreach $i (0 .. $#{ $table }) { $dostring = "CREATE TABLE ".$table->[$i][0]." (\n"; my $subquery = $localdbh->prepare("show fields from ".$table->[$i][0]); $subquery->execute; my $subtable = $subquery->fetchall_arrayref; my $prikey; my %unikey; my %mulkey; my $subi; foreach $subi (0 .. $#{ $subtable }) { $dostring .= $subtable->[$subi][0]." ".$subtable->[$subi][1]; if ($subtable->[$subi][2] eq "") { $dostring .= " NOT NULL"; } if ($subtable->[$subi][3] eq "PRI") { if ($prikey ne "") { $prikey .= ",".$subtable->[$subi][0]; } else { $prikey = $subtable->[$subi][0]; } } elsif ($subtable->[$subi][3] eq "UNI") { $unikey{$subtable->[$subi][0]} = 1; } elsif ($subtable->[$subi][3] eq "MUL") { $mulkey{$subtable->[$subi][0]} = 1; } if ($subtable->[$subi][4] eq "NULL") { $dostring .= " DEFAULT ''"; } elsif ($subtable->[$subi][4] ne "") { $dostring .= " DEFAULT '$subtable->[$subi][4]'"; } if ($subtable->[$subi][5] ne "") { $dostring .= " ".$subtable->[$subi][5]; } if ($subi < $#{ $subtable}) { $dostring .= ",\n"; } } if ($prikey ne "") { $dostring .= ",\nPRIMARY KEY($prikey)"; } my $key; foreach $key (keys %unikey) { $dostring .= ",\nUNIQUE KEY($key)"; } my $key; foreach $key (keys %mulkey) { $dostring .= ",\nKEY($key)"; } $dostring .= ")\n\n"; $subquery->finish; $subtable = undef; # print "doing: $dostring"; $dbh->do($dostring); # # dump data (should lock tables as well!) # my $subquery = $localdbh->prepare("select * from ".$table->[$i][0]); $subquery->execute; my $subtable = $subquery->fetchall_arrayref; my $subi; $dbh->do("lock tables ".$table->[$i][0]." write"); foreach $subi (0 .. $#{ $subtable }) { my $subdostring = "INSERT INTO ".$table->[$i][0]." VALUES ("; my $j; foreach $j (0 .. $#{ $subtable->[$subi] }) { if ($subtable->[$subi][$j] ne "") { my $quoted = $localdbh->quote($subtable->[$subi][$j]); $subdostring .= $quoted; } else { $subdostring .= "''"; } if ($j < $#{ $subtable->[$subi] }) { $subdostring .= ","; } } $subdostring .= ")\n"; # print "insert: $subdostring"; $dbh->do($subdostring); } $dbh->do("unlock tables"); $subquery->finish; $subquery = undef; $subtable = undef; } $query->finish; $query = undef; $table = undef; # # exclude the database from updates 'till the selected log. # push(@exclude_databases, $database); print "$remote_server excluding $database from updates until log: $tmpfilename\n"; $localdbh->do("unlock tables"); if ($maintain_state_database) { replica_table_check(); $localdbh->do("use replica"); $localdbh->do("SET SQL_LOG_UPDATE=0"); $localdbh->do("update replica_status set resyncing = '' where hostname = '".$remote_server."'"); } } else { print "$remote_server database $database doesn't exist on remote server!\n"; if (!(grep(/\Q$database\E/,@exclude_databases))) { print "$remote_server adding $database to exclude list!\n"; push(@exclude_databases, $database); } } } # # ROUTINE # check_for_resyncs # # DESCRIPTION # checks to see if there are resync requests pending. # sub check_for_resyncs { if ($maintain_state_database) { $localdbh->do("use replica"); $localdbh->do("SET SQL_LOG_UPDATE=0"); my $query = $localdbh->prepare("select force_resync, resyncing from replica_status where hostname = '".$remote_server."'"); $query->execute; my $table = $query->fetchall_arrayref; my $i; foreach $i (0 .. $#{ $table }) { if ($table->[$i][0] ne "") { my @tmppush = split(/\,/,$table->[$i][0]); my $tpsh; foreach $tpsh (@tmppush) { if (!(grep(/\Q$tpsh\E/,@force_resync))) { print "$remote_server: acknowledging resync request for: ".$tpsh."\n"; push(@force_resync,$tpsh); } } } if ($table->[$i][1] ne "") { my @tmppush = split(/\,/,$table->[$i][1]); my $tpsh; foreach $tpsh (@tmppush) { if (!(grep(/\Q$tpsh\E/,@force_resync))) { print "$remote_server: redoing interrupted resync for : ".$tpsh."\n"; push(@force_resync,$tpsh); } } } } $query->finish; $query = undef; $table = undef; $localdbh->do("update replica_status set force_resync='' where hostname = '".$remote_server."'"); } } # # ROUTINE # check_for_next_log_in_sequence # # DESCRIPTION # check to see if the next logfile is available # # RETURNS # the name of the next logfile (if next logfile is present) # undef otherwise # sub check_for_next_log_in_sequence { my $tmpprefix = $prefix; my $tmpextension = $extension; $tmpextension++; my $tempfilename = $pathname."/".$tmpprefix.".".$tmpextension; if ( -f $tempfilename) { return($tempfilename); } else { return(undef); } } # # ROUTINE # flush_old_update_files # # DESCRIPTION # discover what update logs are no longer needed, and remove them # sub flush_old_update_files { if (!stat($synclogdir)) { if (!mkdir($synclogdir,0755)) { die("can't create status directory\n"); } } opendir(SYNC,$synclogdir); my @logs = sort {$a cmp $b} (grep { -f "$synclogdir/$_" } readdir(SYNC)); closedir(SYNC); my $lowestlognumber = $extension; my $origlognumber = $lowestlognumber; my $file; foreach $file (@logs) { open(LOG, "$synclogdir/$file"); while () { chomp; my ($num,$pos,$db) = ($_ =~ /(\d+)\:(\w+)\:(\w+)/); if (($num < $lowestlognumber) && ($num ne "")) { $lowestlognumber = $num; } } close(LOG); } print "$remote_server: lowest needed file# is $lowestlognumber\n"; while ($lowestlognumber > 0) { $lowestlognumber--; # # need commandline option to engage this behavior... # if ($dont_delete_old_update_logs) { my $file1 = sprintf("$pathname/$fileprefix.%3.3d", $lowestlognumber); my $file2 = sprintf("$pathname/old-$fileprefix.%3.3d", $lowestlognumber); if (-f $file1) { print "$remote_server: renaming $file1 to $file2\n"; rename ($file1, $file2); } } else { my $rmfile = sprintf("$pathname/$fileprefix.%3.3d", $lowestlognumber); if (-f $rmfile) { print "$remote_server: removing $rmfile\n"; unlink($rmfile); } } } } # # ROUTINE # replica_table_check # # DESCRIPTION # check to see if the replica_status table exists. if not, create it. # sub replica_table_check { $localdbh->do("SET SQL_LOG_UPDATE=0"); if ($maintain_state_database) { $localdbh->do("use replica"); $localdbh->do("SET SQL_LOG_UPDATE=0"); if ($localdbh->errstr =~ /Unknown database/) { print "$remote_server: creating replica database\n"; $localdbh->do("create database replica"); } $localdbh->do("select * from replica_status limit 1"); my $err = $localdbh->err; if ($err == 0) { $localdbh->do("insert into replica_status (hostname, in_sync) values ('".$remote_server."','".$synced."')"); $localdbh->do("update replica_status set in_sync = '".$synced."' where hostname = '".$remote_server."'"); return 1; } print "$remote_server: creating replica tables\n"; $localdbh->do("CREATE table replica_status ( hostname char(255) NOT NULL, in_sync tinyint(1) default 0, resyncing char(255), force_resync char(255), PRIMARY KEY(hostname))"); $localdbh->do("insert into replica_status (hostname, in_sync) values ('".$remote_server."','".$synced."')"); return 0; } } sub lost_sync { $synced = 0; replica_table_check(); print "$remote_server: lost sync\n"; } sub attained_sync { $synced = 1; replica_table_check(); print "$remote_server: attained sync\n"; } # # ROUTINE # mfile # # DESCRIPTION # 1.) finds the next logfile to work from. # 2.) determines the correct position within the file to work from # 3.) calls readfile() to open the file and process the updates. # sub mfile { my ($num, $pos); if ($filename eq "") { lost_sync(); if (-f "$synclogdir/$remote_server") { open(LOG, "$synclogdir/$remote_server"); while () { chomp; my ($tnum,$tpos,$db) = ($_ =~ /(\d+)\:(\w+)\:(\w+)/); if ($tnum ne "") { $num = $tnum; } if ($tpos ne "") { $pos = $tpos; } if ($db ne "") { $dbh->do("use ".$db); $database = $db; } } close(LOG); $filename = "$pathname/$fileprefix.$num"; my $foo = (-f "$filename"); if (-f $filename) { print "$remote_server: restarting with: $filename\n"; ($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/); $curpos = $pos; readfile(); } else { unlink("$synclogdir/$remote_server"); $filename = ""; } } if ($filename eq "") { opendir(LOGDIR,$pathname); my @logs = sort {$a cmp $b} (grep { /^\Q$fileprefix\E/ && -f "$pathname/$_" } readdir(LOGDIR)); closedir(LOGDIR); $filename = $pathname."/".$logs[0]; if ( -f $filename) { print "$remote_server: starting with: $filename\n"; ($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/); $curpos = 0; readfile(); } else { die "i can't find a log to start syncing with!"; } } } else { if (my $tempfilename = check_for_next_log_in_sequence()) { close(IN); lost_sync(); print "$remote_server: switching to log: $tempfilename\n"; $filename = $tempfilename; ($prefix, $extension) = ($filename =~ /(\w+)\.(\d+)/); $curpos = 0; readfile(); } else { return; } } } sub write_log { my $force_write = shift; $logcommits++; if (($extension ne "") && ($curpos ne "") && ($database ne "")) { if (($logcommits%$writelogint == 0) || $force_write) { truncate(POSLOG,0); seek (POSLOG, 0, 0); print POSLOG "$extension:$curpos:$database\n"; } } } # # ROUTINE # commit # # DESCRIPTION # calls $dbh->do in a safe manner, and only when the correct conditions are met! # sub commit { # print "in commit\n"; my $line = shift; my $lasterr; my $retry_msg_issued = 0; if (defined($dbh)) { alarm(0); local $SIG{ALRM} = sub { lost_sync(); }; alarm(2); $dbh->ping; alarm(0); local $SIG{ALRM} = undef; } if ($line eq "") { return; } # print "commit: got $line to $remote_server for $database\n"; if ($line =~ /^use (\w+)\;$/i) { ($database) = ($line =~ /^use (\w+)/i); if (!(grep(/\Q$database\E/,@exclude_databases))) { # print "$remote_server: switching to database $database\n"; } else { # print "database $database has been excluded\n"; } while (!defined($dbh)) { sleep(1); $dbh = DBI->connect("DBI:mysql:$database:$remote_server", $replicate_username, $replicate_password, { PrintError => 0 }); } $dbh->do("SET SQL_LOG_UPDATE=0"); } if (grep(/\Q$database\E/,@force_resync)) { print "$remote_server: forced resync of $database\n"; lost_sync(); reload_database(); } my $result; my $first = 1; my $dbherr = 1; my $dbherrstr; while (($dbherr ne "") && (!(grep(/\Q$database\E/,@exclude_databases))) && (($default ne "set") || ($line =~ /^use (\w+)/i))) { if ($default ne "") { # print "$remote_server: got default: $default\n"; } if ($first == 0) { unless ($retry_msg_issued) { print "$remote_server: invoking retry\n"; $retry_msg_issued = 1; } if ($synced == 1) { lost_sync(); } sleep(1); } # print "$remote_server: doing: $database: $default: $line\n"; $result = $dbh->do($line); $dbherr = $dbh->err; $dbherrstr = $dbh->errstr; if (($dbherr ne "") && ($dbherr ne $lasterr)) { print "commit: *** $remote_server: DBI error occurred processing SQL command: $line\n"; print "commit: *** $remote_server: dbherr: $dbherr, dbherrstr: $dbherrstr\n"; $lasterr = $dbherr; } if (($dbherr eq "") && ($line =~ /^use (\w+)/i)) { # print "$remote_server: unsetting default use ($1)\n"; $default = ""; } if (($dbherrstr =~ /Unknown database/) || ($dbherrstr =~ /Table \'.+\' doesn\'t exist/)) { # print "$remote_server($curpos): result: $result dberr: $dbherrstr ($dbherr) reload!\n"; reload_database(); # print "$remote_server: unsetting default\n"; $default = ""; } elsif ($dbherrstr ne "") { # a hack to handle lost connection to server if ($dbherrstr =~ /server has gone away/i) { print "commit: lost connection to $remote_server, attempting reconnect\n"; $dbh->disconnect(); $dbh = undef; while (!defined($dbh)) { sleep(2); $dbh = DBI->connect("DBI:mysql:$database:$remote_server", $replicate_username, $replicate_password, { PrintError => 0 }); } $dbh->do("SET SQL_LOG_UPDATE=0"); print "commit: reconnected to $remote_server\n"; } else { print "$remote_server($curpos): result: $result dberr: $dbherrstr ($dbherr)\n"; print "$line\n"; $dbherrstr = ""; $dbherr = ""; } } $first = 0; } # print "leaving commit\n"; write_log(0); } # # ROUTINE # readfile # # DESCRIPTION # 1.) opens the update log, and seeks to the correct position. # 2.) sends the updates from the log to the remote server. # sub readfile { # # remove holddown for database that has been recently loaded to the remote server. # if (defined($hold_commit{$prefix.$extension})) { print "$remote_server: removing database $hold_commit{$prefix.$extension} from exclude list\n"; my @tmpexclude_databases = grep(!/\Q$hold_commit{$prefix.$extension}\E/,@exclude_databases); @exclude_databases = @tmpexclude_databases; } open(POSLOG,"> $synclogdir/$remote_server"); POSLOG->autoflush(); while(!open(IN,"$filename")) { sleep(3); } check_for_resyncs(); if ($curpos != 0) { if ($database eq "") { # print "looking for database to use\n"; my $line; my $tmppos = tell(IN); while() { last if ($tmppos >= $curpos); chomp; if ($_ !~ /^\#/) { if ($_ =~ /\;$/) { $line .= $_; if ($line =~ /^use (\w+)/i) { commit($line); } $line = undef; } else { $line .= $_; } } $tmppos = tell(IN); } # print "end looking for database to use\n"; } else { print "$remote_server: readfile: using database $database\n"; commit("use $database;"); } seek(IN, $curpos, 0); } flush_old_update_files(); my $line; for (;;) { for ($curpos = tell(IN); $_ = ; $curpos = tell(IN)) { # print "syncing: $curpos\n"; chomp; if ($_ !~ /^\#/) { if ($_ =~ /\;$/) { $line .= $_; commit("SET SQL_LOG_UPDATE=0"); commit($line); $line = undef; } else { $line .= $_; } } } sleep(1); check_for_resyncs(); my $cur_log_size = (stat $filename)[7]; if ($cur_log_size == $curpos) { mfile(); } if ($synced == 0) { # print "status: ($curpos / $cur_log_size)\n"; if ((($cur_log_size - $curpos) < 8192) && (!defined(check_for_next_log_in_sequence()))) { attained_sync(); } else { lost_sync(); } } seek(IN, $curpos, 0); } close(POSLOG); } sub closelogs { write_log(1); close(POSLOG); close(IN); unlink($pid_file); lost_sync(); print "$remote_server: clean exit\n"; exit(0); } sub flush_logs { if ($next_flush_time < time()) { $localdbh->do("SET SQL_LOG_UPDATE=0"); $localdbh->do("flush logs"); $next_flush_time = time() + $logflush_interval; } return; } my $child; foreach $child (@remote_servers) { my $pid = fork(); if ($pid == 0) { # in child $remote_server = $child; $SIG{TERM} = \&closelogs; $SIG{INT} = 'IGNORE'; print "connecting to local DB\n"; $localdbh = undef; while (!defined($localdbh)) { $localdbh = DBI->connect("DBI:mysql:mysql", $replicate_username, $replicate_password, { PrintError => 0 }); sleep(1); } print "connected to local DB\n"; $localdbh->do("SET SQL_LOG_UPDATE=0"); print "$remote_server: using default database\n"; while (!defined($dbh)) { sleep(1); if ($database eq "") { $database = "mysql"; $default = "set"; } $dbh = DBI->connect("DBI:mysql:mysql:$remote_server", $replicate_username, $replicate_password, { PrintError => 0 }); } commit("SET SQL_LOG_UPDATE=0"); mfile(); exit(0); } else { print "forked child: $pid for $child\n"; push(@childpids,$pid); } } $localdbh = undef; while(!defined($localdbh)) { $localdbh = DBI->connect("DBI:mysql:mysql", $replicate_username, $replicate_password, { PrintError => 0 }); sleep(1); } $next_flush_time = 0; while(1) { $localdbh->do("SET SQL_LOG_UPDATE=0"); flush_logs(); sleep(20); }