1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 #include "config.h"
  27 
  28 #ifdef HAVE_LWPS
  29 #include <sys/lwp.h>
  30 #endif
  31 #include <fcntl.h>
  32 #include "filebench.h"
  33 #include "flowop.h"
  34 #include "stats.h"
  35 
  36 #ifdef LINUX_PORT
  37 #include <sys/types.h>
  38 #include <linux/unistd.h>
  39 #endif
  40 
  41 static flowop_t *flowop_define_common(threadflow_t *threadflow, char *name,
  42     flowop_t *inherit, flowop_t **flowoplist_hdp, int instance, int type);
  43 static int flowop_composite(threadflow_t *threadflow, flowop_t *flowop);
  44 static int flowop_composite_init(flowop_t *flowop);
  45 static void flowop_composite_destruct(flowop_t *flowop);
  46 
  47 /*
  48  * A collection of flowop support functions. The actual code that
  49  * implements the various flowops is in flowop_library.c.
  50  *
  51  * Routines for defining, creating, initializing and destroying
  52  * flowops, cyclically invoking the flowops on each threadflow's flowop
  53  * list, collecting statistics about flowop execution, and other
  54  * housekeeping duties are included in this file.
  55  *
  56  * User Defined Composite Flowops
  57  *    The ability to define new flowops as lists of built-in or previously
  58  * defined flowops has been added to Filebench. In a sense they are like
  59  * in-line subroutines, which can have default attributes set at definition
  60  * time and passed arguments at invocation time. Like other flowops (and
  61  * unlike conventional subroutines) you can invoke them with an iteration
  62  * count (the "iter" attribute), and they will loop through their associated
  63  * list of flowops for "iter" number of times each time they are encountered
  64  * in the thread or outer composite flowop which invokes them.
  65  *
  66  * Composite flowops are created with a "define" command, are given a name,
  67  * optional default attributes, and local variable definitions on the
  68  * "define" command line, followed by a brace enclosed list of flowops
  69  * to execute. The enclosed flowops may include attributes that reference
  70  * the local variables, as well as constants and global variables.
  71  *
  72  * Composite flowops are used pretty much like regular flowops, but you can
  73  * also set local variables to constants or global variables ($local_var =
  74  * [$var | $random_var | string | boolean | integer | double]) as part of
  75  * the invocation. Thus each invocation can pass customized values to its
  76  * inner flowops, greatly increasing their generality.
  77  *
  78  * All flowops are placed on a global, singly linked list, with fo_next
  79  * being the link pointer for this list. The are also placed on a private
  80  * list for the thread or composite flowop they are part of. The tf_thrd_fops
  81  * pointer in the thread will point to the list of top level flowops in the
  82  * thread, which are linked together by fo_exec_next. If any of these flowops
  83  * are composite flowops, they will have a list of second level flowops rooted
  84  * at the composite flowop's fo_comp_fops pointer. So, there is one big list
  85  * of all flowops, and an n-arry tree of threads, composite flowops, and
  86  * flowops, with composite flowops being the branch nodes in the tree.
  87  *
  88  * To illustrate, if we have three first level flowops, the first of which is
  89  * a composite flowop consisting of two other flowops, we get:
  90  *
  91  * Thread->tf_thrd_fops -> flowop->fo_exec_next -> flowop->fo_exec_next
  92  *                         flowop->fo_comp_fops                  |
  93  *                                  |                       V
  94  *                                  |                   flowop->fo_exec_next
  95  *                                  |
  96  *                                  V
  97  *                              flowop->fo_exec_next -> flowop->fo_exec_next
  98  *
  99  * And all five flowops (plus others from any other threads) are on a global
 100  * list linked with fo_next.
 101  */
 102 
 103 /*
 104  * Prints the name and instance number of each flowop in
 105  * the supplied list to the filebench log.
 106  */
 107 int
 108 flowop_printlist(flowop_t *list)
 109 {
 110         flowop_t *flowop = list;
 111 
 112         while (flowop) {
 113                 filebench_log(LOG_DEBUG_IMPL, "flowop-list %s-%d",
 114                     flowop->fo_name, flowop->fo_instance);
 115                 flowop = flowop->fo_exec_next;
 116         }
 117         return (0);
 118 }
 119 
 120 /*
 121  * Prints the name and instance number of all flowops on
 122  * the master flowop list to the console and the filebench log.
 123  */
 124 void
 125 flowop_printall(void)
 126 {
 127         flowop_t *flowop = filebench_shm->shm_flowoplist;
 128 
 129         while (flowop) {
 130                 filebench_log(LOG_VERBOSE, "flowop-list %s-%d",
 131                     flowop->fo_name, flowop->fo_instance);
 132                 flowop = flowop->fo_next;
 133         }
 134 }
 135 
 136 #define TIMESPEC_TO_HRTIME(s, e) (((e.tv_sec - s.tv_sec) * 1000000000LL) + \
 137                                         (e.tv_nsec - s.tv_nsec))
 138 /*
 139  * Puts current high resolution time in start time entry
 140  * for threadflow and may also calculate running filebench
 141  * overhead statistics.
 142  */
 143 void
 144 flowop_beginop(threadflow_t *threadflow, flowop_t *flowop)
 145 {
 146 #ifdef HAVE_PROCFS
 147         if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
 148                 if ((noproc == 0) && (threadflow->tf_lwpusagefd == 0)) {
 149                         char procname[128];
 150 
 151                         (void) snprintf(procname, sizeof (procname),
 152                             "/proc/%d/lwp/%d/lwpusage", my_pid, _lwp_self());
 153                         threadflow->tf_lwpusagefd = open(procname, O_RDONLY);
 154                 }
 155 
 156                 (void) pread(threadflow->tf_lwpusagefd,
 157                     &threadflow->tf_susage,
 158                     sizeof (struct prusage), 0);
 159 
 160                 /* Compute overhead time in this thread around op */
 161                 if (threadflow->tf_eusage.pr_stime.tv_nsec) {
 162                         flowop->fo_stats.fs_mstate[FLOW_MSTATE_OHEAD] +=
 163                             TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_utime,
 164                             threadflow->tf_susage.pr_utime) +
 165                             TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_ttime,
 166                             threadflow->tf_susage.pr_ttime) +
 167                             TIMESPEC_TO_HRTIME(threadflow->tf_eusage.pr_stime,
 168                             threadflow->tf_susage.pr_stime);
 169                 }
 170         }
 171 #endif
 172 
 173         /* Start of op for this thread */
 174         threadflow->tf_stime = gethrtime();
 175 }
 176 
 177 flowstat_t controlstats;
 178 pthread_mutex_t controlstats_lock;
 179 static int controlstats_zeroed = 0;
 180 
 181 /*
 182  * Updates flowop's latency statistics, using saved start
 183  * time and current high resolution time. Updates flowop's
 184  * io count and transferred bytes statistics. Also updates
 185  * threadflow's and flowop's cumulative read or write byte
 186  * and io count statistics.
 187  */
 188 void
 189 flowop_endop(threadflow_t *threadflow, flowop_t *flowop, int64_t bytes)
 190 {
 191         hrtime_t t;
 192 
 193         flowop->fo_stats.fs_mstate[FLOW_MSTATE_LAT] +=
 194             (gethrtime() - threadflow->tf_stime);
 195 #ifdef HAVE_PROCFS
 196         if ((filebench_shm->shm_mmode & FILEBENCH_MODE_NOUSAGE) == 0) {
 197                 if ((pread(threadflow->tf_lwpusagefd, &threadflow->tf_eusage,
 198                     sizeof (struct prusage), 0)) != sizeof (struct prusage))
 199                         filebench_log(LOG_ERROR, "cannot read /proc");
 200 
 201                 t =
 202                     TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_utime,
 203                     threadflow->tf_eusage.pr_utime) +
 204                     TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_ttime,
 205                     threadflow->tf_eusage.pr_ttime) +
 206                     TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_stime,
 207                     threadflow->tf_eusage.pr_stime);
 208                 flowop->fo_stats.fs_mstate[FLOW_MSTATE_CPU] += t;
 209 
 210                 flowop->fo_stats.fs_mstate[FLOW_MSTATE_WAIT] +=
 211                     TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_tftime,
 212                     threadflow->tf_eusage.pr_tftime) +
 213                     TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_dftime,
 214                     threadflow->tf_eusage.pr_dftime) +
 215                     TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
 216                     threadflow->tf_eusage.pr_kftime) +
 217                     TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_kftime,
 218                     threadflow->tf_eusage.pr_kftime) +
 219                     TIMESPEC_TO_HRTIME(threadflow->tf_susage.pr_slptime,
 220                     threadflow->tf_eusage.pr_slptime);
 221         }
 222 #endif
 223 
 224         flowop->fo_stats.fs_count++;
 225         flowop->fo_stats.fs_bytes += bytes;
 226         (void) ipc_mutex_lock(&controlstats_lock);
 227         if ((flowop->fo_type & FLOW_TYPE_IO) ||
 228             (flowop->fo_type & FLOW_TYPE_AIO)) {
 229                 controlstats.fs_count++;
 230                 controlstats.fs_bytes += bytes;
 231         }
 232         if (flowop->fo_attrs & FLOW_ATTR_READ) {
 233                 threadflow->tf_stats.fs_rbytes += bytes;
 234                 threadflow->tf_stats.fs_rcount++;
 235                 flowop->fo_stats.fs_rcount++;
 236                 controlstats.fs_rbytes += bytes;
 237                 controlstats.fs_rcount++;
 238         } else if (flowop->fo_attrs & FLOW_ATTR_WRITE) {
 239                 threadflow->tf_stats.fs_wbytes += bytes;
 240                 threadflow->tf_stats.fs_wcount++;
 241                 flowop->fo_stats.fs_wcount++;
 242                 controlstats.fs_wbytes += bytes;
 243                 controlstats.fs_wcount++;
 244         }
 245         (void) ipc_mutex_unlock(&controlstats_lock);
 246 }
 247 
 248 /*
 249  * Calls the flowop's initialization function, pointed to by
 250  * flowop->fo_init.
 251  */
 252 static int
 253 flowop_initflow(flowop_t *flowop)
 254 {
 255         /*
 256          * save static copies of two items, in case they are supplied
 257          * from random variables
 258          */
 259         if (!AVD_IS_STRING(flowop->fo_value))
 260                 flowop->fo_constvalue = avd_get_int(flowop->fo_value);
 261 
 262         flowop->fo_constwss = avd_get_int(flowop->fo_wss);
 263 
 264         if ((*flowop->fo_init)(flowop) < 0) {
 265                 filebench_log(LOG_ERROR, "flowop %s-%d init failed",
 266                     flowop->fo_name, flowop->fo_instance);
 267                 return (-1);
 268         }
 269         return (0);
 270 }
 271 
 272 static int
 273 flowop_create_runtime_flowops(threadflow_t *threadflow, flowop_t **ops_list_ptr)
 274 {
 275         flowop_t *flowop = *ops_list_ptr;
 276 
 277         while (flowop) {
 278                 flowop_t *newflowop;
 279 
 280                 if (flowop == *ops_list_ptr)
 281                         *ops_list_ptr = NULL;
 282 
 283                 newflowop = flowop_define_common(threadflow, flowop->fo_name,
 284                     flowop, ops_list_ptr, 1, 0);
 285                 if (newflowop == NULL)
 286                         return (FILEBENCH_ERROR);
 287 
 288                 /* check for fo_filename attribute, and resolve if present */
 289                 if (flowop->fo_filename) {
 290                         char *name;
 291 
 292                         name = avd_get_str(flowop->fo_filename);
 293                         newflowop->fo_fileset = fileset_find(name);
 294 
 295                         if (newflowop->fo_fileset == NULL) {
 296                                 filebench_log(LOG_ERROR,
 297                                     "flowop %s: file %s not found",
 298                                     newflowop->fo_name, name);
 299                                 filebench_shutdown(1);
 300                         }
 301                 }
 302 
 303                 if (flowop_initflow(newflowop) < 0) {
 304                         filebench_log(LOG_ERROR, "Flowop init of %s failed",
 305                             newflowop->fo_name);
 306                 }
 307 
 308                 flowop = flowop->fo_exec_next;
 309         }
 310         return (FILEBENCH_OK);
 311 }
 312 
 313 /*
 314  * Calls the flowop's destruct function, pointed to by
 315  * flowop->fo_destruct.
 316  */
 317 static void
 318 flowop_destructflow(flowop_t *flowop)
 319 {
 320         (*flowop->fo_destruct)(flowop);
 321 }
 322 
 323 /*
 324  * call the destruct funtions of all the threadflow's flowops,
 325  * if it is still flagged as "running".
 326  */
 327 void
 328 flowop_destruct_all_flows(threadflow_t *threadflow)
 329 {
 330         flowop_t *flowop;
 331 
 332         /* wait a moment to give other threads a chance to stop too */
 333         (void) sleep(1);
 334 
 335         (void) ipc_mutex_lock(&threadflow->tf_lock);
 336 
 337         /* prepare to call destruct flow routines, if necessary */
 338         if (threadflow->tf_running == 0) {
 339 
 340                 /* allready destroyed */
 341                 (void) ipc_mutex_unlock(&threadflow->tf_lock);
 342                 return;
 343         }
 344 
 345         flowop = threadflow->tf_thrd_fops;
 346         threadflow->tf_running = 0;
 347         (void) ipc_mutex_unlock(&threadflow->tf_lock);
 348 
 349         while (flowop) {
 350                 flowop_destructflow(flowop);
 351                 flowop = flowop->fo_exec_next;
 352         }
 353 }
 354 
 355 /*
 356  * The final initialization and main execution loop for the
 357  * worker threads. Sets threadflow and flowop start times,
 358  * waits for all process to start, then creates the runtime
 359  * flowops from those defined by the F language workload
 360  * script. It does some more initialization, then enters a
 361  * loop to repeatedly execute the flowops on the flowop list
 362  * until an abort condition is detected, at which time it exits.
 363  * This is the starting routine for the new worker thread
 364  * created by threadflow_createthread(), and is not currently
 365  * called from anywhere else.
 366  */
 367 void
 368 flowop_start(threadflow_t *threadflow)
 369 {
 370         flowop_t *flowop;
 371         size_t memsize;
 372         int ret = FILEBENCH_OK;
 373 
 374 #ifdef HAVE_PROCFS
 375         if (noproc == 0) {
 376                 char procname[128];
 377                 long ctl[2] = {PCSET, PR_MSACCT};
 378                 int pfd;
 379 
 380                 (void) snprintf(procname, sizeof (procname),
 381                     "/proc/%d/lwp/%d/lwpctl", my_pid, _lwp_self());
 382                 pfd = open(procname, O_WRONLY);
 383                 (void) pwrite(pfd, &ctl, sizeof (ctl), 0);
 384                 (void) close(pfd);
 385         }
 386 #endif
 387 
 388         (void) ipc_mutex_lock(&controlstats_lock);
 389         if (!controlstats_zeroed) {
 390                 (void) memset(&controlstats, 0, sizeof (controlstats));
 391                 controlstats_zeroed = 1;
 392         }
 393         (void) ipc_mutex_unlock(&controlstats_lock);
 394 
 395         flowop = threadflow->tf_thrd_fops;
 396         threadflow->tf_stats.fs_stime = gethrtime();
 397         flowop->fo_stats.fs_stime = gethrtime();
 398 
 399         /* Hold the flowop find lock as reader to prevent lookups */
 400         (void) pthread_rwlock_rdlock(&filebench_shm->shm_flowop_find_lock);
 401 
 402         /*
 403          * Block until all processes have started, acting like
 404          * a barrier. The original filebench process initially
 405          * holds the run_lock as a reader, preventing any of the
 406          * threads from obtaining the writer lock, and hence
 407          * passing this point. Once all processes and threads
 408          * have been created, the original process unlocks
 409          * run_lock, allowing each waiting thread to lock
 410          * and then immediately unlock it, then begin running.
 411          */
 412         (void) pthread_rwlock_wrlock(&filebench_shm->shm_run_lock);
 413         (void) pthread_rwlock_unlock(&filebench_shm->shm_run_lock);
 414 
 415         /* Create the runtime flowops from those defined by the script */
 416         (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
 417         if (flowop_create_runtime_flowops(threadflow, &threadflow->tf_thrd_fops)
 418             != FILEBENCH_OK) {
 419                 (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
 420                 filebench_shutdown(1);
 421                 return;
 422         }
 423         (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
 424 
 425         /* Release the find lock as reader to allow lookups */
 426         (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
 427 
 428         /* Set to the start of the new flowop list */
 429         flowop = threadflow->tf_thrd_fops;
 430 
 431         threadflow->tf_abort = 0;
 432         threadflow->tf_running = 1;
 433 
 434         memsize = (size_t)threadflow->tf_constmemsize;
 435 
 436         /* If we are going to use ISM, allocate later */
 437         if (threadflow->tf_attrs & THREADFLOW_USEISM) {
 438                 threadflow->tf_mem =
 439                     ipc_ismmalloc(memsize);
 440         } else {
 441                 threadflow->tf_mem =
 442                     malloc(memsize);
 443         }
 444 
 445         (void) memset(threadflow->tf_mem, 0, memsize);
 446         filebench_log(LOG_DEBUG_SCRIPT, "Thread allocated %d bytes", memsize);
 447 
 448 #ifdef HAVE_LWPS
 449         filebench_log(LOG_DEBUG_SCRIPT, "Thread %zx (%d) started",
 450             threadflow,
 451             _lwp_self());
 452 #endif
 453 
 454         /* Main filebench worker loop */
 455         while (ret == FILEBENCH_OK) {
 456                 int i, count;
 457 
 458                 /* Abort if asked */
 459                 if (threadflow->tf_abort || filebench_shm->shm_f_abort)
 460                         break;
 461 
 462                 /* Be quiet while stats are gathered */
 463                 if (filebench_shm->shm_bequiet) {
 464                         (void) sleep(1);
 465                         continue;
 466                 }
 467 
 468                 /* Take it easy until everyone is ready to go */
 469                 if (!filebench_shm->shm_procs_running) {
 470                         (void) sleep(1);
 471                         continue;
 472                 }
 473 
 474                 if (flowop == NULL) {
 475                         filebench_log(LOG_ERROR, "flowop_read null flowop");
 476                         return;
 477                 }
 478 
 479                 if (flowop->fo_stats.fs_stime == 0)
 480                         flowop->fo_stats.fs_stime = gethrtime();
 481 
 482                 /* Execute the flowop for fo_iters times */
 483                 count = (int)avd_get_int(flowop->fo_iters);
 484                 for (i = 0; i < count; i++) {
 485 
 486                         filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
 487                             "%s-%d", threadflow->tf_name, flowop->fo_name,
 488                             flowop->fo_instance);
 489 
 490                         ret = (*flowop->fo_func)(threadflow, flowop);
 491 
 492                         /*
 493                          * Return value FILEBENCH_ERROR means "flowop
 494                          * failed, stop the filebench run"
 495                          */
 496                         if (ret == FILEBENCH_ERROR) {
 497                                 filebench_log(LOG_ERROR,
 498                                     "%s-%d: flowop %s-%d failed",
 499                                     threadflow->tf_name,
 500                                     threadflow->tf_instance,
 501                                     flowop->fo_name,
 502                                     flowop->fo_instance);
 503                                 (void) ipc_mutex_lock(&threadflow->tf_lock);
 504                                 threadflow->tf_abort = 1;
 505                                 filebench_shm->shm_f_abort =
 506                                     FILEBENCH_ABORT_ERROR;
 507                                 (void) ipc_mutex_unlock(&threadflow->tf_lock);
 508                                 break;
 509                         }
 510 
 511                         /*
 512                          * Return value of FILEBENCH_NORSC means "stop
 513                          * the filebench run" if in "end on no work mode",
 514                          * otherwise it indicates an error
 515                          */
 516                         if (ret == FILEBENCH_NORSC) {
 517                                 (void) ipc_mutex_lock(&threadflow->tf_lock);
 518                                 threadflow->tf_abort = FILEBENCH_DONE;
 519                                 if (filebench_shm->shm_rmode ==
 520                                     FILEBENCH_MODE_Q1STDONE) {
 521                                         filebench_shm->shm_f_abort =
 522                                             FILEBENCH_ABORT_RSRC;
 523                                 } else if (filebench_shm->shm_rmode !=
 524                                     FILEBENCH_MODE_QALLDONE) {
 525                                         filebench_log(LOG_ERROR1,
 526                                             "WARNING! Run stopped early:\n   "
 527                                             "             flowop %s-%d could "
 528                                             "not obtain a file. Please\n      "
 529                                             "          reduce runtime, "
 530                                             "increase fileset entries "
 531                                             "($nfiles), or switch modes.",
 532                                             flowop->fo_name,
 533                                             flowop->fo_instance);
 534                                         filebench_shm->shm_f_abort =
 535                                             FILEBENCH_ABORT_ERROR;
 536                                 }
 537                                 (void) ipc_mutex_unlock(&threadflow->tf_lock);
 538                                 break;
 539                         }
 540 
 541                         /*
 542                          * Return value of FILEBENCH_DONE means "stop
 543                          * the filebench run without error"
 544                          */
 545                         if (ret == FILEBENCH_DONE) {
 546                                 (void) ipc_mutex_lock(&threadflow->tf_lock);
 547                                 threadflow->tf_abort = FILEBENCH_DONE;
 548                                 filebench_shm->shm_f_abort =
 549                                     FILEBENCH_ABORT_DONE;
 550                                 (void) ipc_mutex_unlock(&threadflow->tf_lock);
 551                                 break;
 552                         }
 553 
 554                         /*
 555                          * If we get here and the return is something other
 556                          * than FILEBENCH_OK, it means a spurious code
 557                          * was returned, so treat as major error. This
 558                          * probably indicates a bug in the flowop.
 559                          */
 560                         if (ret != FILEBENCH_OK) {
 561                                 filebench_log(LOG_ERROR,
 562                                     "Flowop %s unexpected return value = %d\n",
 563                                     flowop->fo_name, ret);
 564                                 filebench_shm->shm_f_abort =
 565                                     FILEBENCH_ABORT_ERROR;
 566                                 break;
 567                         }
 568                 }
 569 
 570                 /* advance to next flowop */
 571                 flowop = flowop->fo_exec_next;
 572 
 573                 /* but if at end of list, start over from the beginning */
 574                 if (flowop == NULL) {
 575                         flowop = threadflow->tf_thrd_fops;
 576                         threadflow->tf_stats.fs_count++;
 577                 }
 578         }
 579 
 580 #ifdef HAVE_LWPS
 581         filebench_log(LOG_DEBUG_SCRIPT, "Thread %d exiting",
 582             _lwp_self());
 583 #endif
 584 
 585         /* Tell flowops to destroy locally acquired state */
 586         flowop_destruct_all_flows(threadflow);
 587 
 588         pthread_exit(&threadflow->tf_abort);
 589 }
 590 
 591 void flowoplib_flowinit(void);
 592 void fb_lfs_flowinit(void);
 593 
 594 void
 595 flowop_init(void)
 596 {
 597         (void) pthread_mutex_init(&controlstats_lock,
 598             ipc_mutexattr(IPC_MUTEX_NORMAL));
 599         flowoplib_flowinit();
 600 }
 601 
 602 static int plugin_flowinit_done = FALSE;
 603 
 604 /*
 605  * Initialize any "plug-in" flowops. Called when the first "create fileset"
 606  * command is encountered.
 607  */
 608 void
 609 flowop_plugin_flowinit(void)
 610 {
 611         if (plugin_flowinit_done)
 612                 return;
 613 
 614         plugin_flowinit_done = TRUE;
 615 
 616         switch (filebench_shm->shm_filesys_type) {
 617         case LOCAL_FS_PLUG:
 618                 fb_lfs_flowinit();
 619                 break;
 620 
 621         case NFS3_PLUG:
 622         case NFS4_PLUG:
 623         case CIFS_PLUG:
 624                 break;
 625         }
 626 }
 627 
 628 
 629 /*
 630  * Delete the designated flowop from the thread's flowop list.
 631  */
 632 static void
 633 flowop_delete(flowop_t **flowoplist, flowop_t *flowop)
 634 {
 635         flowop_t *entry = *flowoplist;
 636         int found = 0;
 637 
 638         filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
 639             flowop->fo_name,
 640             flowop->fo_instance);
 641 
 642         /* Delete from thread's flowop list */
 643         if (flowop == *flowoplist) {
 644                 /* First on list */
 645                 *flowoplist = flowop->fo_exec_next;
 646                 filebench_log(LOG_DEBUG_IMPL,
 647                     "Delete0 flowop: (%s-%d)",
 648                     flowop->fo_name,
 649                     flowop->fo_instance);
 650         } else {
 651                 while (entry->fo_exec_next) {
 652                         filebench_log(LOG_DEBUG_IMPL,
 653                             "Delete0 flowop: (%s-%d) == (%s-%d)",
 654                             entry->fo_exec_next->fo_name,
 655                             entry->fo_exec_next->fo_instance,
 656                             flowop->fo_name,
 657                             flowop->fo_instance);
 658 
 659                         if (flowop == entry->fo_exec_next) {
 660                                 /* Delete */
 661                                 filebench_log(LOG_DEBUG_IMPL,
 662                                     "Deleted0 flowop: (%s-%d)",
 663                                     entry->fo_exec_next->fo_name,
 664                                     entry->fo_exec_next->fo_instance);
 665                                 entry->fo_exec_next =
 666                                     entry->fo_exec_next->fo_exec_next;
 667                                 break;
 668                         }
 669                         entry = entry->fo_exec_next;
 670                 }
 671         }
 672 
 673 #ifdef HAVE_PROCFS
 674         /* Close /proc stats */
 675         if (flowop->fo_thread)
 676                 (void) close(flowop->fo_thread->tf_lwpusagefd);
 677 #endif
 678 
 679         /* Delete from global list */
 680         entry = filebench_shm->shm_flowoplist;
 681 
 682         if (flowop == filebench_shm->shm_flowoplist) {
 683                 /* First on list */
 684                 filebench_shm->shm_flowoplist = flowop->fo_next;
 685                 found = 1;
 686         } else {
 687                 while (entry->fo_next) {
 688                         filebench_log(LOG_DEBUG_IMPL,
 689                             "Delete flowop: (%s-%d) == (%s-%d)",
 690                             entry->fo_next->fo_name,
 691                             entry->fo_next->fo_instance,
 692                             flowop->fo_name,
 693                             flowop->fo_instance);
 694 
 695                         if (flowop == entry->fo_next) {
 696                                 /* Delete */
 697                                 entry->fo_next = entry->fo_next->fo_next;
 698                                 found = 1;
 699                                 break;
 700                         }
 701 
 702                         entry = entry->fo_next;
 703                 }
 704         }
 705         if (found) {
 706                 filebench_log(LOG_DEBUG_IMPL,
 707                     "Deleted flowop: (%s-%d)",
 708                     flowop->fo_name,
 709                     flowop->fo_instance);
 710                 ipc_free(FILEBENCH_FLOWOP, (char *)flowop);
 711         } else {
 712                 filebench_log(LOG_DEBUG_IMPL, "Flowop %s-%d not found!",
 713                     flowop->fo_name,
 714                     flowop->fo_instance);
 715         }
 716 }
 717 
 718 /*
 719  * Deletes all the flowops from a flowop list.
 720  */
 721 void
 722 flowop_delete_all(flowop_t **flowoplist)
 723 {
 724         flowop_t *flowop = *flowoplist;
 725         flowop_t *next_flowop;
 726 
 727         (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
 728 
 729         while (flowop) {
 730                 filebench_log(LOG_DEBUG_IMPL, "Deleting flowop (%s-%d)",
 731                     flowop->fo_name, flowop->fo_instance);
 732 
 733                 if (flowop->fo_instance &&
 734                     (flowop->fo_instance == FLOW_MASTER)) {
 735                         flowop = flowop->fo_exec_next;
 736                         continue;
 737                 }
 738                 next_flowop = flowop->fo_exec_next;
 739                 flowop_delete(flowoplist, flowop);
 740                 flowop = next_flowop;
 741         }
 742 
 743         (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
 744 }
 745 
 746 /*
 747  * Allocates a flowop entity and initializes it with inherited
 748  * contents from the "inherit" flowop, if it is supplied, or
 749  * with zeros otherwise. In either case the fo_next and fo_exec_next
 750  * pointers are set to NULL, and fo_thread is set to point to
 751  * the owning threadflow. The initialized flowop is placed at
 752  * the head of the global flowop list, and also placed on the
 753  * tail of the supplied local flowop list, which will either
 754  * be a threadflow's tf_thrd_fops list or a composite flowop's
 755  * fo_comp_fops list. The routine locks the flowop's fo_lock and
 756  * leaves it held on return. If successful, it returns a pointer
 757  * to the allocated and initialized flowop, otherwise it returns NULL.
 758  *
 759  * filebench_shm->shm_flowop_lock must be held by caller.
 760  */
 761 static flowop_t *
 762 flowop_define_common(threadflow_t *threadflow, char *name, flowop_t *inherit,
 763     flowop_t **flowoplist_hdp, int instance, int type)
 764 {
 765         flowop_t *flowop;
 766 
 767         if (name == NULL)
 768                 return (NULL);
 769 
 770         if ((flowop = (flowop_t *)ipc_malloc(FILEBENCH_FLOWOP)) == NULL) {
 771                 filebench_log(LOG_ERROR,
 772                     "flowop_define: Can't malloc flowop");
 773                 return (NULL);
 774         }
 775 
 776         filebench_log(LOG_DEBUG_IMPL, "defining flowops %s-%d, addr %zx",
 777             name, instance, flowop);
 778 
 779         if (flowop == NULL)
 780                 return (NULL);
 781 
 782         if (inherit) {
 783                 (void) memcpy(flowop, inherit, sizeof (flowop_t));
 784                 (void) pthread_mutex_init(&flowop->fo_lock,
 785                     ipc_mutexattr(IPC_MUTEX_PRI_ROB));
 786                 (void) ipc_mutex_lock(&flowop->fo_lock);
 787                 flowop->fo_next = NULL;
 788                 flowop->fo_exec_next = NULL;
 789                 filebench_log(LOG_DEBUG_IMPL,
 790                     "flowop %s-%d calling init", name, instance);
 791         } else {
 792                 (void) memset(flowop, 0, sizeof (flowop_t));
 793                 flowop->fo_iters = avd_int_alloc(1);
 794                 flowop->fo_type = type;
 795                 (void) pthread_mutex_init(&flowop->fo_lock,
 796                     ipc_mutexattr(IPC_MUTEX_PRI_ROB));
 797                 (void) ipc_mutex_lock(&flowop->fo_lock);
 798         }
 799 
 800         /* Create backpointer to thread */
 801         flowop->fo_thread = threadflow;
 802 
 803         /* Add flowop to global list */
 804         if (filebench_shm->shm_flowoplist == NULL) {
 805                 filebench_shm->shm_flowoplist = flowop;
 806                 flowop->fo_next = NULL;
 807         } else {
 808                 flowop->fo_next = filebench_shm->shm_flowoplist;
 809                 filebench_shm->shm_flowoplist = flowop;
 810         }
 811 
 812         (void) strcpy(flowop->fo_name, name);
 813         flowop->fo_instance = instance;
 814 
 815         if (flowoplist_hdp == NULL)
 816                 return (flowop);
 817 
 818         /* Add flowop to thread op list */
 819         if (*flowoplist_hdp == NULL) {
 820                 *flowoplist_hdp = flowop;
 821                 flowop->fo_exec_next = NULL;
 822         } else {
 823                 flowop_t *flowend;
 824 
 825                 /* Find the end of the thread list */
 826                 flowend = *flowoplist_hdp;
 827                 while (flowend->fo_exec_next != NULL)
 828                         flowend = flowend->fo_exec_next;
 829                 flowend->fo_exec_next = flowop;
 830                 flowop->fo_exec_next = NULL;
 831         }
 832 
 833         return (flowop);
 834 }
 835 
 836 /*
 837  * Calls flowop_define_common() to allocate and initialize a
 838  * flowop, and holds the shared flowop_lock during the call.
 839  * It releases the created flowop's fo_lock when done.
 840  */
 841 flowop_t *
 842 flowop_define(threadflow_t *threadflow, char *name, flowop_t *inherit,
 843     flowop_t **flowoplist_hdp, int instance, int type)
 844 {
 845         flowop_t        *flowop;
 846 
 847         (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
 848         flowop = flowop_define_common(threadflow, name,
 849             inherit, flowoplist_hdp, instance, type);
 850         (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
 851 
 852         if (flowop == NULL)
 853                 return (NULL);
 854 
 855         (void) ipc_mutex_unlock(&flowop->fo_lock);
 856 
 857         return (flowop);
 858 }
 859 
 860 /*
 861  * Calls flowop_define_common() to allocate and initialize a
 862  * composite flowop, and holds the shared flowop_lock during the call.
 863  * It releases the created flowop's fo_lock when done.
 864  */
 865 flowop_t *
 866 flowop_new_composite_define(char *name)
 867 {
 868         flowop_t *flowop;
 869 
 870         (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
 871         flowop = flowop_define_common(NULL, name,
 872             NULL, NULL, 0, FLOW_TYPE_COMPOSITE);
 873         (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
 874 
 875         if (flowop == NULL)
 876                 return (NULL);
 877 
 878         flowop->fo_func = flowop_composite;
 879         flowop->fo_init = flowop_composite_init;
 880         flowop->fo_destruct = flowop_composite_destruct;
 881         (void) ipc_mutex_unlock(&flowop->fo_lock);
 882 
 883         return (flowop);
 884 }
 885 
 886 /*
 887  * Attempts to take a write lock on the flowop_find_lock that is
 888  * defined in interprocess shared memory. Since each call to
 889  * flowop_start() holds a read lock on flowop_find_lock, this
 890  * routine effectively blocks until all instances of
 891  * flowop_start() have finished. The flowop_find() routine calls
 892  * this routine so that flowops won't be searched for until all
 893  * flowops have been created by flowop_start.
 894  */
 895 static void
 896 flowop_find_barrier(void)
 897 {
 898         /* Block on wrlock to ensure find waits for all creates */
 899         (void) pthread_rwlock_wrlock(&filebench_shm->shm_flowop_find_lock);
 900         (void) pthread_rwlock_unlock(&filebench_shm->shm_flowop_find_lock);
 901 }
 902 
 903 /*
 904  * Returns a list of flowops named "name" from the master
 905  * flowop list.
 906  */
 907 flowop_t *
 908 flowop_find(char *name)
 909 {
 910         flowop_t *flowop;
 911         flowop_t *result = NULL;
 912 
 913         flowop_find_barrier();
 914 
 915         (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
 916 
 917         flowop = filebench_shm->shm_flowoplist;
 918 
 919         while (flowop) {
 920                 if (strcmp(name, flowop->fo_name) == 0) {
 921 
 922                         /* Add flowop to result list */
 923                         if (result == NULL) {
 924                                 result = flowop;
 925                                 flowop->fo_resultnext = NULL;
 926                         } else {
 927                                 flowop->fo_resultnext = result;
 928                                 result = flowop;
 929                         }
 930                 }
 931                 flowop = flowop->fo_next;
 932         }
 933 
 934         (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
 935 
 936 
 937         return (result);
 938 }
 939 
 940 /*
 941  * Returns a pointer to the specified instance of flowop
 942  * "name" from the global list.
 943  */
 944 flowop_t *
 945 flowop_find_one(char *name, int instance)
 946 {
 947         flowop_t *test_flowop;
 948 
 949         flowop_find_barrier();
 950 
 951         (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
 952 
 953         test_flowop = filebench_shm->shm_flowoplist;
 954 
 955         while (test_flowop) {
 956                 if ((strcmp(name, test_flowop->fo_name) == 0) &&
 957                     (instance == test_flowop->fo_instance))
 958                         break;
 959 
 960                 test_flowop = test_flowop->fo_next;
 961         }
 962 
 963         (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
 964 
 965         return (test_flowop);
 966 }
 967 
 968 /*
 969  * recursively searches through lists of flowops on a given thread
 970  * and those on any included composite flowops for the named flowop.
 971  * either returns with a pointer to the named flowop or NULL if it
 972  * cannot be found.
 973  */
 974 static flowop_t *
 975 flowop_recurse_search(char *path, char *name, flowop_t *list)
 976 {
 977         flowop_t *test_flowop;
 978         char fullname[MAXPATHLEN];
 979 
 980         test_flowop = list;
 981 
 982         /*
 983          * when searching a list of inner flowops, "path" is the fullname
 984          * of the containing composite flowop. Use it to form the
 985          * full name of the inner flowop to search for.
 986          */
 987         if (path) {
 988                 if ((strlen(path) + strlen(name) + 1) > MAXPATHLEN) {
 989                         filebench_log(LOG_ERROR,
 990                             "composite flowop path name %s.%s too long",
 991                             path, name);
 992                         return (NULL);
 993                 }
 994 
 995                 /* create composite_name.name for recursive search */
 996                 (void) strcpy(fullname, path);
 997                 (void) strcat(fullname, ".");
 998                 (void) strcat(fullname, name);
 999         } else {
1000                 (void) strcpy(fullname, name);
1001         }
1002 
1003         /*
1004          * loop through all flowops on the supplied tf_thrd_fops (flowop)
1005          * list or fo_comp_fops (inner flowop) list.
1006          */
1007         while (test_flowop) {
1008                 if (strcmp(fullname, test_flowop->fo_name) == 0)
1009                         return (test_flowop);
1010 
1011                 if (test_flowop->fo_type == FLOW_TYPE_COMPOSITE) {
1012                         flowop_t *found_flowop;
1013 
1014                         found_flowop = flowop_recurse_search(
1015                             test_flowop->fo_name, name,
1016                             test_flowop->fo_comp_fops);
1017 
1018                         if (found_flowop)
1019                                 return (found_flowop);
1020                 }
1021                 test_flowop = test_flowop->fo_exec_next;
1022         }
1023 
1024         /* not found here or on any child lists */
1025         return (NULL);
1026 }
1027 
1028 /*
1029  * Returns a pointer to flowop named "name" from the supplied tf_thrd_fops
1030  * list of flowops. Returns the named flowop if found, or NULL.
1031  */
1032 flowop_t *
1033 flowop_find_from_list(char *name, flowop_t *list)
1034 {
1035         flowop_t *found_flowop;
1036 
1037         flowop_find_barrier();
1038 
1039         (void) ipc_mutex_lock(&filebench_shm->shm_flowop_lock);
1040 
1041         found_flowop = flowop_recurse_search(NULL, name, list);
1042 
1043         (void) ipc_mutex_unlock(&filebench_shm->shm_flowop_lock);
1044 
1045         return (found_flowop);
1046 }
1047 
1048 /*
1049  * Composite flowop method. Does one pass through its list of
1050  * inner flowops per iteration.
1051  */
1052 static int
1053 flowop_composite(threadflow_t *threadflow, flowop_t *flowop)
1054 {
1055         flowop_t        *inner_flowop;
1056 
1057         /* get the first flowop in the list */
1058         inner_flowop = flowop->fo_comp_fops;
1059 
1060         /* make a pass through the list of sub flowops */
1061         while (inner_flowop) {
1062                 int     i, count;
1063 
1064                 /* Abort if asked */
1065                 if (threadflow->tf_abort || filebench_shm->shm_f_abort)
1066                         return (FILEBENCH_DONE);
1067 
1068                 if (inner_flowop->fo_stats.fs_stime == 0)
1069                         inner_flowop->fo_stats.fs_stime = gethrtime();
1070 
1071                 /* Execute the flowop for fo_iters times */
1072                 count = (int)avd_get_int(inner_flowop->fo_iters);
1073                 for (i = 0; i < count; i++) {
1074 
1075                         filebench_log(LOG_DEBUG_SCRIPT, "%s: executing flowop "
1076                             "%s-%d", threadflow->tf_name,
1077                             inner_flowop->fo_name,
1078                             inner_flowop->fo_instance);
1079 
1080                         switch ((*inner_flowop->fo_func)(threadflow,
1081                             inner_flowop)) {
1082 
1083                         /* all done */
1084                         case FILEBENCH_DONE:
1085                                 return (FILEBENCH_DONE);
1086 
1087                         /* quit if inner flowop limit reached */
1088                         case FILEBENCH_NORSC:
1089                                 return (FILEBENCH_NORSC);
1090 
1091                         /* quit on inner flowop error */
1092                         case FILEBENCH_ERROR:
1093                                 filebench_log(LOG_ERROR,
1094                                     "inner flowop %s failed",
1095                                     inner_flowop->fo_name);
1096                                 return (FILEBENCH_ERROR);
1097 
1098                         /* otherwise keep going */
1099                         default:
1100                                 break;
1101                         }
1102 
1103                 }
1104 
1105                 /* advance to next flowop */
1106                 inner_flowop = inner_flowop->fo_exec_next;
1107         }
1108 
1109         /* finished with this pass */
1110         return (FILEBENCH_OK);
1111 }
1112 
1113 /*
1114  * Composite flowop initialization. Creates runtime inner flowops
1115  * from prototype inner flowops.
1116  */
1117 static int
1118 flowop_composite_init(flowop_t *flowop)
1119 {
1120         int err;
1121 
1122         err = flowop_create_runtime_flowops(flowop->fo_thread,
1123             &flowop->fo_comp_fops);
1124         if (err != FILEBENCH_OK)
1125                 return (err);
1126 
1127         (void) ipc_mutex_unlock(&flowop->fo_lock);
1128         return (0);
1129 }
1130 
1131 /*
1132  * clean up inner flowops
1133  */
1134 static void
1135 flowop_composite_destruct(flowop_t *flowop)
1136 {
1137         flowop_t *inner_flowop = flowop->fo_comp_fops;
1138 
1139         while (inner_flowop) {
1140                 filebench_log(LOG_DEBUG_IMPL, "Deleting inner flowop (%s-%d)",
1141                     inner_flowop->fo_name, inner_flowop->fo_instance);
1142 
1143                 if (inner_flowop->fo_instance &&
1144                     (inner_flowop->fo_instance == FLOW_MASTER)) {
1145                         inner_flowop = inner_flowop->fo_exec_next;
1146                         continue;
1147                 }
1148                 flowop_delete(&flowop->fo_comp_fops, inner_flowop);
1149                 inner_flowop = inner_flowop->fo_exec_next;
1150         }
1151 }
1152 
1153 /*
1154  * Support routines for libraries of flowops
1155  */
1156 
1157 int
1158 flowop_init_generic(flowop_t *flowop)
1159 {
1160         (void) ipc_mutex_unlock(&flowop->fo_lock);
1161         return (FILEBENCH_OK);
1162 }
1163 
1164 void
1165 flowop_destruct_generic(flowop_t *flowop)
1166 {
1167         char *buf;
1168 
1169         /* release any local resources held by the flowop */
1170         (void) ipc_mutex_lock(&flowop->fo_lock);
1171         buf = flowop->fo_buf;
1172         flowop->fo_buf = NULL;
1173         (void) ipc_mutex_unlock(&flowop->fo_lock);
1174 
1175         if (buf)
1176                 free(buf);
1177 }
1178 
1179 
1180 /*
1181  * Loops through the supplied list of flowops and creates and initializes
1182  * a flowop for each one by calling flowop_define. As a side effect of
1183  * calling flowop define, the created flowops are placed on the
1184  * master flowop list. All created flowops are set to instance "0".
1185  */
1186 void
1187 flowop_flow_init(flowop_proto_t *list, int nops)
1188 {
1189         int i;
1190 
1191         for (i = 0; i < nops; i++) {
1192                 flowop_t *flowop;
1193                 flowop_proto_t *fl;
1194 
1195                 fl = &(list[i]);
1196 
1197                 if ((flowop = flowop_define(NULL,
1198                     fl->fl_name, NULL, NULL, 0, fl->fl_type)) == 0) {
1199                         filebench_log(LOG_ERROR,
1200                             "failed to create flowop %s\n",
1201                             fl->fl_name);
1202                         filebench_shutdown(1);
1203                 }
1204 
1205                 flowop->fo_func = fl->fl_func;
1206                 flowop->fo_init = fl->fl_init;
1207                 flowop->fo_destruct = fl->fl_destruct;
1208                 flowop->fo_attrs = fl->fl_attrs;
1209         }
1210 }