1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*      Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T     */
  22 /*        All Rights Reserved   */
  23 
  24 /*
  25  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  26  * Use is subject to license terms.
  27  */
  28 
  29 #include <sys/types.h>
  30 #include <sys/param.h>
  31 #include <sys/thread.h>
  32 #include <sys/sysmacros.h>
  33 #include <sys/stropts.h>
  34 #include <sys/stream.h>
  35 #include <sys/strsubr.h>
  36 #include <sys/strsun.h>
  37 #include <sys/conf.h>
  38 #include <sys/debug.h>
  39 #include <sys/cmn_err.h>
  40 #include <sys/kmem.h>
  41 #include <sys/atomic.h>
  42 #include <sys/errno.h>
  43 #include <sys/vtrace.h>
  44 #include <sys/ftrace.h>
  45 #include <sys/ontrap.h>
  46 #include <sys/multidata.h>
  47 #include <sys/multidata_impl.h>
  48 #include <sys/sdt.h>
  49 #include <sys/strft.h>
  50 
  51 #ifdef DEBUG
  52 #include <sys/kmem_impl.h>
  53 #endif
  54 
  55 /*
  56  * This file contains all the STREAMS utility routines that may
  57  * be used by modules and drivers.
  58  */
  59 
  60 /*
  61  * STREAMS message allocator: principles of operation
  62  *
  63  * The streams message allocator consists of all the routines that
  64  * allocate, dup and free streams messages: allocb(), [d]esballoc[a],
  65  * dupb(), freeb() and freemsg().  What follows is a high-level view
  66  * of how the allocator works.
  67  *
  68  * Every streams message consists of one or more mblks, a dblk, and data.
  69  * All mblks for all types of messages come from a common mblk_cache.
  70  * The dblk and data come in several flavors, depending on how the
  71  * message is allocated:
  72  *
  73  * (1) mblks up to DBLK_MAX_CACHE size are allocated from a collection of
  74  *     fixed-size dblk/data caches. For message sizes that are multiples of
  75  *     PAGESIZE, dblks are allocated separately from the buffer.
  76  *     The associated buffer is allocated by the constructor using kmem_alloc().
  77  *     For all other message sizes, dblk and its associated data is allocated
  78  *     as a single contiguous chunk of memory.
  79  *     Objects in these caches consist of a dblk plus its associated data.
  80  *     allocb() determines the nearest-size cache by table lookup:
  81  *     the dblk_cache[] array provides the mapping from size to dblk cache.
  82  *
  83  * (2) Large messages (size > DBLK_MAX_CACHE) are constructed by
  84  *     kmem_alloc()'ing a buffer for the data and supplying that
  85  *     buffer to gesballoc(), described below.
  86  *
  87  * (3) The four flavors of [d]esballoc[a] are all implemented by a
  88  *     common routine, gesballoc() ("generic esballoc").  gesballoc()
  89  *     allocates a dblk from the global dblk_esb_cache and sets db_base,
  90  *     db_lim and db_frtnp to describe the caller-supplied buffer.
  91  *
  92  * While there are several routines to allocate messages, there is only
  93  * one routine to free messages: freeb().  freeb() simply invokes the
  94  * dblk's free method, dbp->db_free(), which is set at allocation time.
  95  *
  96  * dupb() creates a new reference to a message by allocating a new mblk,
  97  * incrementing the dblk reference count and setting the dblk's free
  98  * method to dblk_decref().  The dblk's original free method is retained
  99  * in db_lastfree.  dblk_decref() decrements the reference count on each
 100  * freeb().  If this is not the last reference it just frees the mblk;
 101  * if this *is* the last reference, it restores db_free to db_lastfree,
 102  * sets db_mblk to the current mblk (see below), and invokes db_lastfree.
 103  *
 104  * The implementation makes aggressive use of kmem object caching for
 105  * maximum performance.  This makes the code simple and compact, but
 106  * also a bit abstruse in some places.  The invariants that constitute a
 107  * message's constructed state, described below, are more subtle than usual.
 108  *
 109  * Every dblk has an "attached mblk" as part of its constructed state.
 110  * The mblk is allocated by the dblk's constructor and remains attached
 111  * until the message is either dup'ed or pulled up.  In the dupb() case
 112  * the mblk association doesn't matter until the last free, at which time
 113  * dblk_decref() attaches the last mblk to the dblk.  pullupmsg() affects
 114  * the mblk association because it swaps the leading mblks of two messages,
 115  * so it is responsible for swapping their db_mblk pointers accordingly.
 116  * From a constructed-state viewpoint it doesn't matter that a dblk's
 117  * attached mblk can change while the message is allocated; all that
 118  * matters is that the dblk has *some* attached mblk when it's freed.
 119  *
 120  * The sizes of the allocb() small-message caches are not magical.
 121  * They represent a good trade-off between internal and external
 122  * fragmentation for current workloads.  They should be reevaluated
 123  * periodically, especially if allocations larger than DBLK_MAX_CACHE
 124  * become common.  We use 64-byte alignment so that dblks don't
 125  * straddle cache lines unnecessarily.
 126  */
 127 #define DBLK_MAX_CACHE          73728
 128 #define DBLK_CACHE_ALIGN        64
 129 #define DBLK_MIN_SIZE           8
 130 #define DBLK_SIZE_SHIFT         3
 131 
 132 #ifdef _BIG_ENDIAN
 133 #define DBLK_RTFU_SHIFT(field)  \
 134         (8 * (&((dblk_t *)0)->db_struioflag - &((dblk_t *)0)->field))
 135 #else
 136 #define DBLK_RTFU_SHIFT(field)  \
 137         (8 * (&((dblk_t *)0)->field - &((dblk_t *)0)->db_ref))
 138 #endif
 139 
 140 #define DBLK_RTFU(ref, type, flags, uioflag)    \
 141         (((ref) << DBLK_RTFU_SHIFT(db_ref)) | \
 142         ((type) << DBLK_RTFU_SHIFT(db_type)) | \
 143         (((flags) | (ref - 1)) << DBLK_RTFU_SHIFT(db_flags)) | \
 144         ((uioflag) << DBLK_RTFU_SHIFT(db_struioflag)))
 145 #define DBLK_RTFU_REF_MASK      (DBLK_REFMAX << DBLK_RTFU_SHIFT(db_ref))
 146 #define DBLK_RTFU_WORD(dbp)     (*((uint32_t *)&(dbp)->db_ref))
 147 #define MBLK_BAND_FLAG_WORD(mp) (*((uint32_t *)&(mp)->b_band))
 148 
 149 static size_t dblk_sizes[] = {
 150 #ifdef _LP64
 151         16, 80, 144, 208, 272, 336, 528, 1040, 1488, 1936, 2576, 3856,
 152         8192, 12048, 16384, 20240, 24576, 28432, 32768, 36624,
 153         40960, 44816, 49152, 53008, 57344, 61200, 65536, 69392,
 154 #else
 155         64, 128, 320, 576, 1088, 1536, 1984, 2624, 3904,
 156         8192, 12096, 16384, 20288, 24576, 28480, 32768, 36672,
 157         40960, 44864, 49152, 53056, 57344, 61248, 65536, 69440,
 158 #endif
 159         DBLK_MAX_CACHE, 0
 160 };
 161 
 162 static struct kmem_cache *dblk_cache[DBLK_MAX_CACHE / DBLK_MIN_SIZE];
 163 static struct kmem_cache *mblk_cache;
 164 static struct kmem_cache *dblk_esb_cache;
 165 static struct kmem_cache *fthdr_cache;
 166 static struct kmem_cache *ftblk_cache;
 167 
 168 static void dblk_lastfree(mblk_t *mp, dblk_t *dbp);
 169 static mblk_t *allocb_oversize(size_t size, int flags);
 170 static int allocb_tryhard_fails;
 171 static void frnop_func(void *arg);
 172 frtn_t frnop = { frnop_func };
 173 static void bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp);
 174 
 175 static boolean_t rwnext_enter(queue_t *qp);
 176 static void rwnext_exit(queue_t *qp);
 177 
 178 /*
 179  * Patchable mblk/dblk kmem_cache flags.
 180  */
 181 int dblk_kmem_flags = 0;
 182 int mblk_kmem_flags = 0;
 183 
 184 static int
 185 dblk_constructor(void *buf, void *cdrarg, int kmflags)
 186 {
 187         dblk_t *dbp = buf;
 188         ssize_t msg_size = (ssize_t)cdrarg;
 189         size_t index;
 190 
 191         ASSERT(msg_size != 0);
 192 
 193         index = (msg_size - 1) >> DBLK_SIZE_SHIFT;
 194 
 195         ASSERT(index < (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT));
 196 
 197         if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
 198                 return (-1);
 199         if ((msg_size & PAGEOFFSET) == 0) {
 200                 dbp->db_base = kmem_alloc(msg_size, kmflags);
 201                 if (dbp->db_base == NULL) {
 202                         kmem_cache_free(mblk_cache, dbp->db_mblk);
 203                         return (-1);
 204                 }
 205         } else {
 206                 dbp->db_base = (unsigned char *)&dbp[1];
 207         }
 208 
 209         dbp->db_mblk->b_datap = dbp;
 210         dbp->db_cache = dblk_cache[index];
 211         dbp->db_lim = dbp->db_base + msg_size;
 212         dbp->db_free = dbp->db_lastfree = dblk_lastfree;
 213         dbp->db_frtnp = NULL;
 214         dbp->db_fthdr = NULL;
 215         dbp->db_credp = NULL;
 216         dbp->db_cpid = -1;
 217         dbp->db_struioflag = 0;
 218         dbp->db_struioun.cksum.flags = 0;
 219         return (0);
 220 }
 221 
 222 /*ARGSUSED*/
 223 static int
 224 dblk_esb_constructor(void *buf, void *cdrarg, int kmflags)
 225 {
 226         dblk_t *dbp = buf;
 227 
 228         if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
 229                 return (-1);
 230         dbp->db_mblk->b_datap = dbp;
 231         dbp->db_cache = dblk_esb_cache;
 232         dbp->db_fthdr = NULL;
 233         dbp->db_credp = NULL;
 234         dbp->db_cpid = -1;
 235         dbp->db_struioflag = 0;
 236         dbp->db_struioun.cksum.flags = 0;
 237         return (0);
 238 }
 239 
 240 static int
 241 bcache_dblk_constructor(void *buf, void *cdrarg, int kmflags)
 242 {
 243         dblk_t *dbp = buf;
 244         bcache_t *bcp = cdrarg;
 245 
 246         if ((dbp->db_mblk = kmem_cache_alloc(mblk_cache, kmflags)) == NULL)
 247                 return (-1);
 248 
 249         dbp->db_base = kmem_cache_alloc(bcp->buffer_cache, kmflags);
 250         if (dbp->db_base == NULL) {
 251                 kmem_cache_free(mblk_cache, dbp->db_mblk);
 252                 return (-1);
 253         }
 254 
 255         dbp->db_mblk->b_datap = dbp;
 256         dbp->db_cache = (void *)bcp;
 257         dbp->db_lim = dbp->db_base + bcp->size;
 258         dbp->db_free = dbp->db_lastfree = bcache_dblk_lastfree;
 259         dbp->db_frtnp = NULL;
 260         dbp->db_fthdr = NULL;
 261         dbp->db_credp = NULL;
 262         dbp->db_cpid = -1;
 263         dbp->db_struioflag = 0;
 264         dbp->db_struioun.cksum.flags = 0;
 265         return (0);
 266 }
 267 
 268 /*ARGSUSED*/
 269 static void
 270 dblk_destructor(void *buf, void *cdrarg)
 271 {
 272         dblk_t *dbp = buf;
 273         ssize_t msg_size = (ssize_t)cdrarg;
 274 
 275         ASSERT(dbp->db_mblk->b_datap == dbp);
 276         ASSERT(msg_size != 0);
 277         ASSERT(dbp->db_struioflag == 0);
 278         ASSERT(dbp->db_struioun.cksum.flags == 0);
 279 
 280         if ((msg_size & PAGEOFFSET) == 0) {
 281                 kmem_free(dbp->db_base, msg_size);
 282         }
 283 
 284         kmem_cache_free(mblk_cache, dbp->db_mblk);
 285 }
 286 
 287 static void
 288 bcache_dblk_destructor(void *buf, void *cdrarg)
 289 {
 290         dblk_t *dbp = buf;
 291         bcache_t *bcp = cdrarg;
 292 
 293         kmem_cache_free(bcp->buffer_cache, dbp->db_base);
 294 
 295         ASSERT(dbp->db_mblk->b_datap == dbp);
 296         ASSERT(dbp->db_struioflag == 0);
 297         ASSERT(dbp->db_struioun.cksum.flags == 0);
 298 
 299         kmem_cache_free(mblk_cache, dbp->db_mblk);
 300 }
 301 
 302 /* ARGSUSED */
 303 static int
 304 ftblk_constructor(void *buf, void *cdrarg, int kmflags)
 305 {
 306         ftblk_t *fbp = buf;
 307         int i;
 308 
 309         bzero(fbp, sizeof (ftblk_t));
 310         if (str_ftstack != 0) {
 311                 for (i = 0; i < FTBLK_EVNTS; i++)
 312                         fbp->ev[i].stk = kmem_alloc(sizeof (ftstk_t), kmflags);
 313         }
 314 
 315         return (0);
 316 }
 317 
 318 /* ARGSUSED */
 319 static void
 320 ftblk_destructor(void *buf, void *cdrarg)
 321 {
 322         ftblk_t *fbp = buf;
 323         int i;
 324 
 325         if (str_ftstack != 0) {
 326                 for (i = 0; i < FTBLK_EVNTS; i++) {
 327                         if (fbp->ev[i].stk != NULL) {
 328                                 kmem_free(fbp->ev[i].stk, sizeof (ftstk_t));
 329                                 fbp->ev[i].stk = NULL;
 330                         }
 331                 }
 332         }
 333 }
 334 
 335 static int
 336 fthdr_constructor(void *buf, void *cdrarg, int kmflags)
 337 {
 338         fthdr_t *fhp = buf;
 339 
 340         return (ftblk_constructor(&fhp->first, cdrarg, kmflags));
 341 }
 342 
 343 static void
 344 fthdr_destructor(void *buf, void *cdrarg)
 345 {
 346         fthdr_t *fhp = buf;
 347 
 348         ftblk_destructor(&fhp->first, cdrarg);
 349 }
 350 
 351 void
 352 streams_msg_init(void)
 353 {
 354         char name[40];
 355         size_t size;
 356         size_t lastsize = DBLK_MIN_SIZE;
 357         size_t *sizep;
 358         struct kmem_cache *cp;
 359         size_t tot_size;
 360         int offset;
 361 
 362         mblk_cache = kmem_cache_create("streams_mblk", sizeof (mblk_t), 32,
 363             NULL, NULL, NULL, NULL, NULL, mblk_kmem_flags);
 364 
 365         for (sizep = dblk_sizes; (size = *sizep) != 0; sizep++) {
 366 
 367                 if ((offset = (size & PAGEOFFSET)) != 0) {
 368                         /*
 369                          * We are in the middle of a page, dblk should
 370                          * be allocated on the same page
 371                          */
 372                         tot_size = size + sizeof (dblk_t);
 373                         ASSERT((offset + sizeof (dblk_t) + sizeof (kmem_slab_t))
 374                             < PAGESIZE);
 375                         ASSERT((tot_size & (DBLK_CACHE_ALIGN - 1)) == 0);
 376 
 377                 } else {
 378 
 379                         /*
 380                          * buf size is multiple of page size, dblk and
 381                          * buffer are allocated separately.
 382                          */
 383 
 384                         ASSERT((size & (DBLK_CACHE_ALIGN - 1)) == 0);
 385                         tot_size = sizeof (dblk_t);
 386                 }
 387 
 388                 (void) sprintf(name, "streams_dblk_%ld", size);
 389                 cp = kmem_cache_create(name, tot_size, DBLK_CACHE_ALIGN,
 390                     dblk_constructor, dblk_destructor, NULL, (void *)(size),
 391                     NULL, dblk_kmem_flags);
 392 
 393                 while (lastsize <= size) {
 394                         dblk_cache[(lastsize - 1) >> DBLK_SIZE_SHIFT] = cp;
 395                         lastsize += DBLK_MIN_SIZE;
 396                 }
 397         }
 398 
 399         dblk_esb_cache = kmem_cache_create("streams_dblk_esb", sizeof (dblk_t),
 400             DBLK_CACHE_ALIGN, dblk_esb_constructor, dblk_destructor, NULL,
 401             (void *)sizeof (dblk_t), NULL, dblk_kmem_flags);
 402         fthdr_cache = kmem_cache_create("streams_fthdr", sizeof (fthdr_t), 32,
 403             fthdr_constructor, fthdr_destructor, NULL, NULL, NULL, 0);
 404         ftblk_cache = kmem_cache_create("streams_ftblk", sizeof (ftblk_t), 32,
 405             ftblk_constructor, ftblk_destructor, NULL, NULL, NULL, 0);
 406 
 407         /* Initialize Multidata caches */
 408         mmd_init();
 409 
 410         /* initialize throttling queue for esballoc */
 411         esballoc_queue_init();
 412 }
 413 
 414 /*ARGSUSED*/
 415 mblk_t *
 416 allocb(size_t size, uint_t pri)
 417 {
 418         dblk_t *dbp;
 419         mblk_t *mp;
 420         size_t index;
 421 
 422         index =  (size - 1)  >> DBLK_SIZE_SHIFT;
 423 
 424         if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
 425                 if (size != 0) {
 426                         mp = allocb_oversize(size, KM_NOSLEEP);
 427                         goto out;
 428                 }
 429                 index = 0;
 430         }
 431 
 432         if ((dbp = kmem_cache_alloc(dblk_cache[index], KM_NOSLEEP)) == NULL) {
 433                 mp = NULL;
 434                 goto out;
 435         }
 436 
 437         mp = dbp->db_mblk;
 438         DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
 439         mp->b_next = mp->b_prev = mp->b_cont = NULL;
 440         mp->b_rptr = mp->b_wptr = dbp->db_base;
 441         mp->b_queue = NULL;
 442         MBLK_BAND_FLAG_WORD(mp) = 0;
 443         STR_FTALLOC(&dbp->db_fthdr, FTEV_ALLOCB, size);
 444 out:
 445         FTRACE_1("allocb(): mp=0x%p", (uintptr_t)mp);
 446 
 447         return (mp);
 448 }
 449 
 450 /*
 451  * Allocate an mblk taking db_credp and db_cpid from the template.
 452  * Allow the cred to be NULL.
 453  */
 454 mblk_t *
 455 allocb_tmpl(size_t size, const mblk_t *tmpl)
 456 {
 457         mblk_t *mp = allocb(size, 0);
 458 
 459         if (mp != NULL) {
 460                 dblk_t *src = tmpl->b_datap;
 461                 dblk_t *dst = mp->b_datap;
 462                 cred_t *cr;
 463                 pid_t cpid;
 464 
 465                 cr = msg_getcred(tmpl, &cpid);
 466                 if (cr != NULL)
 467                         crhold(dst->db_credp = cr);
 468                 dst->db_cpid = cpid;
 469                 dst->db_type = src->db_type;
 470         }
 471         return (mp);
 472 }
 473 
 474 mblk_t *
 475 allocb_cred(size_t size, cred_t *cr, pid_t cpid)
 476 {
 477         mblk_t *mp = allocb(size, 0);
 478 
 479         ASSERT(cr != NULL);
 480         if (mp != NULL) {
 481                 dblk_t *dbp = mp->b_datap;
 482 
 483                 crhold(dbp->db_credp = cr);
 484                 dbp->db_cpid = cpid;
 485         }
 486         return (mp);
 487 }
 488 
 489 mblk_t *
 490 allocb_cred_wait(size_t size, uint_t flags, int *error, cred_t *cr, pid_t cpid)
 491 {
 492         mblk_t *mp = allocb_wait(size, 0, flags, error);
 493 
 494         ASSERT(cr != NULL);
 495         if (mp != NULL) {
 496                 dblk_t *dbp = mp->b_datap;
 497 
 498                 crhold(dbp->db_credp = cr);
 499                 dbp->db_cpid = cpid;
 500         }
 501 
 502         return (mp);
 503 }
 504 
 505 /*
 506  * Extract the db_cred (and optionally db_cpid) from a message.
 507  * We find the first mblk which has a non-NULL db_cred and use that.
 508  * If none found we return NULL.
 509  * Does NOT get a hold on the cred.
 510  */
 511 cred_t *
 512 msg_getcred(const mblk_t *mp, pid_t *cpidp)
 513 {
 514         cred_t *cr = NULL;
 515         cred_t *cr2;
 516         mblk_t *mp2;
 517 
 518         while (mp != NULL) {
 519                 dblk_t *dbp = mp->b_datap;
 520 
 521                 cr = dbp->db_credp;
 522                 if (cr == NULL) {
 523                         mp = mp->b_cont;
 524                         continue;
 525                 }
 526                 if (cpidp != NULL)
 527                         *cpidp = dbp->db_cpid;
 528 
 529 #ifdef DEBUG
 530                 /*
 531                  * Normally there should at most one db_credp in a message.
 532                  * But if there are multiple (as in the case of some M_IOC*
 533                  * and some internal messages in TCP/IP bind logic) then
 534                  * they must be identical in the normal case.
 535                  * However, a socket can be shared between different uids
 536                  * in which case data queued in TCP would be from different
 537                  * creds. Thus we can only assert for the zoneid being the
 538                  * same. Due to Multi-level Level Ports for TX, some
 539                  * cred_t can have a NULL cr_zone, and we skip the comparison
 540                  * in that case.
 541                  */
 542                 mp2 = mp->b_cont;
 543                 while (mp2 != NULL) {
 544                         cr2 = DB_CRED(mp2);
 545                         if (cr2 != NULL) {
 546                                 DTRACE_PROBE2(msg__getcred,
 547                                     cred_t *, cr, cred_t *, cr2);
 548                                 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
 549                                     crgetzone(cr) == NULL ||
 550                                     crgetzone(cr2) == NULL);
 551                         }
 552                         mp2 = mp2->b_cont;
 553                 }
 554 #endif
 555                 return (cr);
 556         }
 557         if (cpidp != NULL)
 558                 *cpidp = NOPID;
 559         return (NULL);
 560 }
 561 
 562 /*
 563  * Variant of msg_getcred which, when a cred is found
 564  * 1. Returns with a hold on the cred
 565  * 2. Clears the first cred in the mblk.
 566  * This is more efficient to use than a msg_getcred() + crhold() when
 567  * the message is freed after the cred has been extracted.
 568  *
 569  * The caller is responsible for ensuring that there is no other reference
 570  * on the message since db_credp can not be cleared when there are other
 571  * references.
 572  */
 573 cred_t *
 574 msg_extractcred(mblk_t *mp, pid_t *cpidp)
 575 {
 576         cred_t *cr = NULL;
 577         cred_t *cr2;
 578         mblk_t *mp2;
 579 
 580         while (mp != NULL) {
 581                 dblk_t *dbp = mp->b_datap;
 582 
 583                 cr = dbp->db_credp;
 584                 if (cr == NULL) {
 585                         mp = mp->b_cont;
 586                         continue;
 587                 }
 588                 ASSERT(dbp->db_ref == 1);
 589                 dbp->db_credp = NULL;
 590                 if (cpidp != NULL)
 591                         *cpidp = dbp->db_cpid;
 592 #ifdef DEBUG
 593                 /*
 594                  * Normally there should at most one db_credp in a message.
 595                  * But if there are multiple (as in the case of some M_IOC*
 596                  * and some internal messages in TCP/IP bind logic) then
 597                  * they must be identical in the normal case.
 598                  * However, a socket can be shared between different uids
 599                  * in which case data queued in TCP would be from different
 600                  * creds. Thus we can only assert for the zoneid being the
 601                  * same. Due to Multi-level Level Ports for TX, some
 602                  * cred_t can have a NULL cr_zone, and we skip the comparison
 603                  * in that case.
 604                  */
 605                 mp2 = mp->b_cont;
 606                 while (mp2 != NULL) {
 607                         cr2 = DB_CRED(mp2);
 608                         if (cr2 != NULL) {
 609                                 DTRACE_PROBE2(msg__extractcred,
 610                                     cred_t *, cr, cred_t *, cr2);
 611                                 ASSERT(crgetzoneid(cr) == crgetzoneid(cr2) ||
 612                                     crgetzone(cr) == NULL ||
 613                                     crgetzone(cr2) == NULL);
 614                         }
 615                         mp2 = mp2->b_cont;
 616                 }
 617 #endif
 618                 return (cr);
 619         }
 620         return (NULL);
 621 }
 622 /*
 623  * Get the label for a message. Uses the first mblk in the message
 624  * which has a non-NULL db_credp.
 625  * Returns NULL if there is no credp.
 626  */
 627 extern struct ts_label_s *
 628 msg_getlabel(const mblk_t *mp)
 629 {
 630         cred_t *cr = msg_getcred(mp, NULL);
 631 
 632         if (cr == NULL)
 633                 return (NULL);
 634 
 635         return (crgetlabel(cr));
 636 }
 637 
 638 void
 639 freeb(mblk_t *mp)
 640 {
 641         dblk_t *dbp = mp->b_datap;
 642 
 643         ASSERT(dbp->db_ref > 0);
 644         ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
 645         FTRACE_1("freeb(): mp=0x%lx", (uintptr_t)mp);
 646 
 647         STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
 648 
 649         dbp->db_free(mp, dbp);
 650 }
 651 
 652 void
 653 freemsg(mblk_t *mp)
 654 {
 655         FTRACE_1("freemsg(): mp=0x%lx", (uintptr_t)mp);
 656         while (mp) {
 657                 dblk_t *dbp = mp->b_datap;
 658                 mblk_t *mp_cont = mp->b_cont;
 659 
 660                 ASSERT(dbp->db_ref > 0);
 661                 ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
 662 
 663                 STR_FTEVENT_MBLK(mp, caller(), FTEV_FREEB, dbp->db_ref);
 664 
 665                 dbp->db_free(mp, dbp);
 666                 mp = mp_cont;
 667         }
 668 }
 669 
 670 /*
 671  * Reallocate a block for another use.  Try hard to use the old block.
 672  * If the old data is wanted (copy), leave b_wptr at the end of the data,
 673  * otherwise return b_wptr = b_rptr.
 674  *
 675  * This routine is private and unstable.
 676  */
 677 mblk_t  *
 678 reallocb(mblk_t *mp, size_t size, uint_t copy)
 679 {
 680         mblk_t          *mp1;
 681         unsigned char   *old_rptr;
 682         ptrdiff_t       cur_size;
 683 
 684         if (mp == NULL)
 685                 return (allocb(size, BPRI_HI));
 686 
 687         cur_size = mp->b_wptr - mp->b_rptr;
 688         old_rptr = mp->b_rptr;
 689 
 690         ASSERT(mp->b_datap->db_ref != 0);
 691 
 692         if (mp->b_datap->db_ref == 1 && MBLKSIZE(mp) >= size) {
 693                 /*
 694                  * If the data is wanted and it will fit where it is, no
 695                  * work is required.
 696                  */
 697                 if (copy && mp->b_datap->db_lim - mp->b_rptr >= size)
 698                         return (mp);
 699 
 700                 mp->b_wptr = mp->b_rptr = mp->b_datap->db_base;
 701                 mp1 = mp;
 702         } else if ((mp1 = allocb_tmpl(size, mp)) != NULL) {
 703                 /* XXX other mp state could be copied too, db_flags ... ? */
 704                 mp1->b_cont = mp->b_cont;
 705         } else {
 706                 return (NULL);
 707         }
 708 
 709         if (copy) {
 710                 bcopy(old_rptr, mp1->b_rptr, cur_size);
 711                 mp1->b_wptr = mp1->b_rptr + cur_size;
 712         }
 713 
 714         if (mp != mp1)
 715                 freeb(mp);
 716 
 717         return (mp1);
 718 }
 719 
 720 static void
 721 dblk_lastfree(mblk_t *mp, dblk_t *dbp)
 722 {
 723         ASSERT(dbp->db_mblk == mp);
 724         if (dbp->db_fthdr != NULL)
 725                 str_ftfree(dbp);
 726 
 727         /* set credp and projid to be 'unspecified' before returning to cache */
 728         if (dbp->db_credp != NULL) {
 729                 crfree(dbp->db_credp);
 730                 dbp->db_credp = NULL;
 731         }
 732         dbp->db_cpid = -1;
 733 
 734         /* Reset the struioflag and the checksum flag fields */
 735         dbp->db_struioflag = 0;
 736         dbp->db_struioun.cksum.flags = 0;
 737 
 738         /* and the COOKED and/or UIOA flag(s) */
 739         dbp->db_flags &= ~(DBLK_COOKED | DBLK_UIOA);
 740 
 741         kmem_cache_free(dbp->db_cache, dbp);
 742 }
 743 
 744 static void
 745 dblk_decref(mblk_t *mp, dblk_t *dbp)
 746 {
 747         if (dbp->db_ref != 1) {
 748                 uint32_t rtfu = atomic_add_32_nv(&DBLK_RTFU_WORD(dbp),
 749                     -(1 << DBLK_RTFU_SHIFT(db_ref)));
 750                 /*
 751                  * atomic_add_32_nv() just decremented db_ref, so we no longer
 752                  * have a reference to the dblk, which means another thread
 753                  * could free it.  Therefore we cannot examine the dblk to
 754                  * determine whether ours was the last reference.  Instead,
 755                  * we extract the new and minimum reference counts from rtfu.
 756                  * Note that all we're really saying is "if (ref != refmin)".
 757                  */
 758                 if (((rtfu >> DBLK_RTFU_SHIFT(db_ref)) & DBLK_REFMAX) !=
 759                     ((rtfu >> DBLK_RTFU_SHIFT(db_flags)) & DBLK_REFMIN)) {
 760                         kmem_cache_free(mblk_cache, mp);
 761                         return;
 762                 }
 763         }
 764         dbp->db_mblk = mp;
 765         dbp->db_free = dbp->db_lastfree;
 766         dbp->db_lastfree(mp, dbp);
 767 }
 768 
 769 mblk_t *
 770 dupb(mblk_t *mp)
 771 {
 772         dblk_t *dbp = mp->b_datap;
 773         mblk_t *new_mp;
 774         uint32_t oldrtfu, newrtfu;
 775 
 776         if ((new_mp = kmem_cache_alloc(mblk_cache, KM_NOSLEEP)) == NULL)
 777                 goto out;
 778 
 779         new_mp->b_next = new_mp->b_prev = new_mp->b_cont = NULL;
 780         new_mp->b_rptr = mp->b_rptr;
 781         new_mp->b_wptr = mp->b_wptr;
 782         new_mp->b_datap = dbp;
 783         new_mp->b_queue = NULL;
 784         MBLK_BAND_FLAG_WORD(new_mp) = MBLK_BAND_FLAG_WORD(mp);
 785 
 786         STR_FTEVENT_MBLK(mp, caller(), FTEV_DUPB, dbp->db_ref);
 787 
 788         dbp->db_free = dblk_decref;
 789         do {
 790                 ASSERT(dbp->db_ref > 0);
 791                 oldrtfu = DBLK_RTFU_WORD(dbp);
 792                 newrtfu = oldrtfu + (1 << DBLK_RTFU_SHIFT(db_ref));
 793                 /*
 794                  * If db_ref is maxed out we can't dup this message anymore.
 795                  */
 796                 if ((oldrtfu & DBLK_RTFU_REF_MASK) == DBLK_RTFU_REF_MASK) {
 797                         kmem_cache_free(mblk_cache, new_mp);
 798                         new_mp = NULL;
 799                         goto out;
 800                 }
 801         } while (atomic_cas_32(&DBLK_RTFU_WORD(dbp), oldrtfu, newrtfu) !=
 802             oldrtfu);
 803 
 804 out:
 805         FTRACE_1("dupb(): new_mp=0x%lx", (uintptr_t)new_mp);
 806         return (new_mp);
 807 }
 808 
 809 static void
 810 dblk_lastfree_desb(mblk_t *mp, dblk_t *dbp)
 811 {
 812         frtn_t *frp = dbp->db_frtnp;
 813 
 814         ASSERT(dbp->db_mblk == mp);
 815         frp->free_func(frp->free_arg);
 816         if (dbp->db_fthdr != NULL)
 817                 str_ftfree(dbp);
 818 
 819         /* set credp and projid to be 'unspecified' before returning to cache */
 820         if (dbp->db_credp != NULL) {
 821                 crfree(dbp->db_credp);
 822                 dbp->db_credp = NULL;
 823         }
 824         dbp->db_cpid = -1;
 825         dbp->db_struioflag = 0;
 826         dbp->db_struioun.cksum.flags = 0;
 827 
 828         kmem_cache_free(dbp->db_cache, dbp);
 829 }
 830 
 831 /*ARGSUSED*/
 832 static void
 833 frnop_func(void *arg)
 834 {
 835 }
 836 
 837 /*
 838  * Generic esballoc used to implement the four flavors: [d]esballoc[a].
 839  */
 840 static mblk_t *
 841 gesballoc(unsigned char *base, size_t size, uint32_t db_rtfu, frtn_t *frp,
 842         void (*lastfree)(mblk_t *, dblk_t *), int kmflags)
 843 {
 844         dblk_t *dbp;
 845         mblk_t *mp;
 846 
 847         ASSERT(base != NULL && frp != NULL);
 848 
 849         if ((dbp = kmem_cache_alloc(dblk_esb_cache, kmflags)) == NULL) {
 850                 mp = NULL;
 851                 goto out;
 852         }
 853 
 854         mp = dbp->db_mblk;
 855         dbp->db_base = base;
 856         dbp->db_lim = base + size;
 857         dbp->db_free = dbp->db_lastfree = lastfree;
 858         dbp->db_frtnp = frp;
 859         DBLK_RTFU_WORD(dbp) = db_rtfu;
 860         mp->b_next = mp->b_prev = mp->b_cont = NULL;
 861         mp->b_rptr = mp->b_wptr = base;
 862         mp->b_queue = NULL;
 863         MBLK_BAND_FLAG_WORD(mp) = 0;
 864 
 865 out:
 866         FTRACE_1("gesballoc(): mp=0x%lx", (uintptr_t)mp);
 867         return (mp);
 868 }
 869 
 870 /*ARGSUSED*/
 871 mblk_t *
 872 esballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 873 {
 874         mblk_t *mp;
 875 
 876         /*
 877          * Note that this is structured to allow the common case (i.e.
 878          * STREAMS flowtracing disabled) to call gesballoc() with tail
 879          * call optimization.
 880          */
 881         if (!str_ftnever) {
 882                 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 883                     frp, freebs_enqueue, KM_NOSLEEP);
 884 
 885                 if (mp != NULL)
 886                         STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
 887                 return (mp);
 888         }
 889 
 890         return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 891             frp, freebs_enqueue, KM_NOSLEEP));
 892 }
 893 
 894 /*
 895  * Same as esballoc() but sleeps waiting for memory.
 896  */
 897 /*ARGSUSED*/
 898 mblk_t *
 899 esballoc_wait(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 900 {
 901         mblk_t *mp;
 902 
 903         /*
 904          * Note that this is structured to allow the common case (i.e.
 905          * STREAMS flowtracing disabled) to call gesballoc() with tail
 906          * call optimization.
 907          */
 908         if (!str_ftnever) {
 909                 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 910                     frp, freebs_enqueue, KM_SLEEP);
 911 
 912                 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOC, size);
 913                 return (mp);
 914         }
 915 
 916         return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 917             frp, freebs_enqueue, KM_SLEEP));
 918 }
 919 
 920 /*ARGSUSED*/
 921 mblk_t *
 922 desballoc(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 923 {
 924         mblk_t *mp;
 925 
 926         /*
 927          * Note that this is structured to allow the common case (i.e.
 928          * STREAMS flowtracing disabled) to call gesballoc() with tail
 929          * call optimization.
 930          */
 931         if (!str_ftnever) {
 932                 mp = gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 933                     frp, dblk_lastfree_desb, KM_NOSLEEP);
 934 
 935                 if (mp != NULL)
 936                         STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOC, size);
 937                 return (mp);
 938         }
 939 
 940         return (gesballoc(base, size, DBLK_RTFU(1, M_DATA, 0, 0),
 941             frp, dblk_lastfree_desb, KM_NOSLEEP));
 942 }
 943 
 944 /*ARGSUSED*/
 945 mblk_t *
 946 esballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 947 {
 948         mblk_t *mp;
 949 
 950         /*
 951          * Note that this is structured to allow the common case (i.e.
 952          * STREAMS flowtracing disabled) to call gesballoc() with tail
 953          * call optimization.
 954          */
 955         if (!str_ftnever) {
 956                 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
 957                     frp, freebs_enqueue, KM_NOSLEEP);
 958 
 959                 if (mp != NULL)
 960                         STR_FTALLOC(&DB_FTHDR(mp), FTEV_ESBALLOCA, size);
 961                 return (mp);
 962         }
 963 
 964         return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
 965             frp, freebs_enqueue, KM_NOSLEEP));
 966 }
 967 
 968 /*ARGSUSED*/
 969 mblk_t *
 970 desballoca(unsigned char *base, size_t size, uint_t pri, frtn_t *frp)
 971 {
 972         mblk_t *mp;
 973 
 974         /*
 975          * Note that this is structured to allow the common case (i.e.
 976          * STREAMS flowtracing disabled) to call gesballoc() with tail
 977          * call optimization.
 978          */
 979         if (!str_ftnever) {
 980                 mp = gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
 981                     frp, dblk_lastfree_desb, KM_NOSLEEP);
 982 
 983                 if (mp != NULL)
 984                         STR_FTALLOC(&DB_FTHDR(mp), FTEV_DESBALLOCA, size);
 985                 return (mp);
 986         }
 987 
 988         return (gesballoc(base, size, DBLK_RTFU(2, M_DATA, 0, 0),
 989             frp, dblk_lastfree_desb, KM_NOSLEEP));
 990 }
 991 
 992 static void
 993 bcache_dblk_lastfree(mblk_t *mp, dblk_t *dbp)
 994 {
 995         bcache_t *bcp = dbp->db_cache;
 996 
 997         ASSERT(dbp->db_mblk == mp);
 998         if (dbp->db_fthdr != NULL)
 999                 str_ftfree(dbp);
1000 
1001         /* set credp and projid to be 'unspecified' before returning to cache */
1002         if (dbp->db_credp != NULL) {
1003                 crfree(dbp->db_credp);
1004                 dbp->db_credp = NULL;
1005         }
1006         dbp->db_cpid = -1;
1007         dbp->db_struioflag = 0;
1008         dbp->db_struioun.cksum.flags = 0;
1009 
1010         mutex_enter(&bcp->mutex);
1011         kmem_cache_free(bcp->dblk_cache, dbp);
1012         bcp->alloc--;
1013 
1014         if (bcp->alloc == 0 && bcp->destroy != 0) {
1015                 kmem_cache_destroy(bcp->dblk_cache);
1016                 kmem_cache_destroy(bcp->buffer_cache);
1017                 mutex_exit(&bcp->mutex);
1018                 mutex_destroy(&bcp->mutex);
1019                 kmem_free(bcp, sizeof (bcache_t));
1020         } else {
1021                 mutex_exit(&bcp->mutex);
1022         }
1023 }
1024 
1025 bcache_t *
1026 bcache_create(char *name, size_t size, uint_t align)
1027 {
1028         bcache_t *bcp;
1029         char buffer[255];
1030 
1031         ASSERT((align & (align - 1)) == 0);
1032 
1033         if ((bcp = kmem_alloc(sizeof (bcache_t), KM_NOSLEEP)) == NULL)
1034                 return (NULL);
1035 
1036         bcp->size = size;
1037         bcp->align = align;
1038         bcp->alloc = 0;
1039         bcp->destroy = 0;
1040 
1041         mutex_init(&bcp->mutex, NULL, MUTEX_DRIVER, NULL);
1042 
1043         (void) sprintf(buffer, "%s_buffer_cache", name);
1044         bcp->buffer_cache = kmem_cache_create(buffer, size, align, NULL, NULL,
1045             NULL, NULL, NULL, 0);
1046         (void) sprintf(buffer, "%s_dblk_cache", name);
1047         bcp->dblk_cache = kmem_cache_create(buffer, sizeof (dblk_t),
1048             DBLK_CACHE_ALIGN, bcache_dblk_constructor, bcache_dblk_destructor,
1049             NULL, (void *)bcp, NULL, 0);
1050 
1051         return (bcp);
1052 }
1053 
1054 void
1055 bcache_destroy(bcache_t *bcp)
1056 {
1057         ASSERT(bcp != NULL);
1058 
1059         mutex_enter(&bcp->mutex);
1060         if (bcp->alloc == 0) {
1061                 kmem_cache_destroy(bcp->dblk_cache);
1062                 kmem_cache_destroy(bcp->buffer_cache);
1063                 mutex_exit(&bcp->mutex);
1064                 mutex_destroy(&bcp->mutex);
1065                 kmem_free(bcp, sizeof (bcache_t));
1066         } else {
1067                 bcp->destroy++;
1068                 mutex_exit(&bcp->mutex);
1069         }
1070 }
1071 
1072 /*ARGSUSED*/
1073 mblk_t *
1074 bcache_allocb(bcache_t *bcp, uint_t pri)
1075 {
1076         dblk_t *dbp;
1077         mblk_t *mp = NULL;
1078 
1079         ASSERT(bcp != NULL);
1080 
1081         mutex_enter(&bcp->mutex);
1082         if (bcp->destroy != 0) {
1083                 mutex_exit(&bcp->mutex);
1084                 goto out;
1085         }
1086 
1087         if ((dbp = kmem_cache_alloc(bcp->dblk_cache, KM_NOSLEEP)) == NULL) {
1088                 mutex_exit(&bcp->mutex);
1089                 goto out;
1090         }
1091         bcp->alloc++;
1092         mutex_exit(&bcp->mutex);
1093 
1094         ASSERT(((uintptr_t)(dbp->db_base) & (bcp->align - 1)) == 0);
1095 
1096         mp = dbp->db_mblk;
1097         DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1098         mp->b_next = mp->b_prev = mp->b_cont = NULL;
1099         mp->b_rptr = mp->b_wptr = dbp->db_base;
1100         mp->b_queue = NULL;
1101         MBLK_BAND_FLAG_WORD(mp) = 0;
1102         STR_FTALLOC(&dbp->db_fthdr, FTEV_BCALLOCB, bcp->size);
1103 out:
1104         FTRACE_1("bcache_allocb(): mp=0x%p", (uintptr_t)mp);
1105 
1106         return (mp);
1107 }
1108 
1109 static void
1110 dblk_lastfree_oversize(mblk_t *mp, dblk_t *dbp)
1111 {
1112         ASSERT(dbp->db_mblk == mp);
1113         if (dbp->db_fthdr != NULL)
1114                 str_ftfree(dbp);
1115 
1116         /* set credp and projid to be 'unspecified' before returning to cache */
1117         if (dbp->db_credp != NULL) {
1118                 crfree(dbp->db_credp);
1119                 dbp->db_credp = NULL;
1120         }
1121         dbp->db_cpid = -1;
1122         dbp->db_struioflag = 0;
1123         dbp->db_struioun.cksum.flags = 0;
1124 
1125         kmem_free(dbp->db_base, dbp->db_lim - dbp->db_base);
1126         kmem_cache_free(dbp->db_cache, dbp);
1127 }
1128 
1129 static mblk_t *
1130 allocb_oversize(size_t size, int kmflags)
1131 {
1132         mblk_t *mp;
1133         void *buf;
1134 
1135         size = P2ROUNDUP(size, DBLK_CACHE_ALIGN);
1136         if ((buf = kmem_alloc(size, kmflags)) == NULL)
1137                 return (NULL);
1138         if ((mp = gesballoc(buf, size, DBLK_RTFU(1, M_DATA, 0, 0),
1139             &frnop, dblk_lastfree_oversize, kmflags)) == NULL)
1140                 kmem_free(buf, size);
1141 
1142         if (mp != NULL)
1143                 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBIG, size);
1144 
1145         return (mp);
1146 }
1147 
1148 mblk_t *
1149 allocb_tryhard(size_t target_size)
1150 {
1151         size_t size;
1152         mblk_t *bp;
1153 
1154         for (size = target_size; size < target_size + 512;
1155             size += DBLK_CACHE_ALIGN)
1156                 if ((bp = allocb(size, BPRI_HI)) != NULL)
1157                         return (bp);
1158         allocb_tryhard_fails++;
1159         return (NULL);
1160 }
1161 
1162 /*
1163  * This routine is consolidation private for STREAMS internal use
1164  * This routine may only be called from sync routines (i.e., not
1165  * from put or service procedures).  It is located here (rather
1166  * than strsubr.c) so that we don't have to expose all of the
1167  * allocb() implementation details in header files.
1168  */
1169 mblk_t *
1170 allocb_wait(size_t size, uint_t pri, uint_t flags, int *error)
1171 {
1172         dblk_t *dbp;
1173         mblk_t *mp;
1174         size_t index;
1175 
1176         index = (size -1) >> DBLK_SIZE_SHIFT;
1177 
1178         if (flags & STR_NOSIG) {
1179                 if (index >= (DBLK_MAX_CACHE >> DBLK_SIZE_SHIFT)) {
1180                         if (size != 0) {
1181                                 mp = allocb_oversize(size, KM_SLEEP);
1182                                 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx",
1183                                     (uintptr_t)mp);
1184                                 return (mp);
1185                         }
1186                         index = 0;
1187                 }
1188 
1189                 dbp = kmem_cache_alloc(dblk_cache[index], KM_SLEEP);
1190                 mp = dbp->db_mblk;
1191                 DBLK_RTFU_WORD(dbp) = DBLK_RTFU(1, M_DATA, 0, 0);
1192                 mp->b_next = mp->b_prev = mp->b_cont = NULL;
1193                 mp->b_rptr = mp->b_wptr = dbp->db_base;
1194                 mp->b_queue = NULL;
1195                 MBLK_BAND_FLAG_WORD(mp) = 0;
1196                 STR_FTALLOC(&DB_FTHDR(mp), FTEV_ALLOCBW, size);
1197 
1198                 FTRACE_1("allocb_wait (NOSIG): mp=0x%lx", (uintptr_t)mp);
1199 
1200         } else {
1201                 while ((mp = allocb(size, pri)) == NULL) {
1202                         if ((*error = strwaitbuf(size, BPRI_HI)) != 0)
1203                                 return (NULL);
1204                 }
1205         }
1206 
1207         return (mp);
1208 }
1209 
1210 /*
1211  * Call function 'func' with 'arg' when a class zero block can
1212  * be allocated with priority 'pri'.
1213  */
1214 bufcall_id_t
1215 esbbcall(uint_t pri, void (*func)(void *), void *arg)
1216 {
1217         return (bufcall(1, pri, func, arg));
1218 }
1219 
1220 /*
1221  * Allocates an iocblk (M_IOCTL) block. Properly sets the credentials
1222  * ioc_id, rval and error of the struct ioctl to set up an ioctl call.
1223  * This provides consistency for all internal allocators of ioctl.
1224  */
1225 mblk_t *
1226 mkiocb(uint_t cmd)
1227 {
1228         struct iocblk   *ioc;
1229         mblk_t          *mp;
1230 
1231         /*
1232          * Allocate enough space for any of the ioctl related messages.
1233          */
1234         if ((mp = allocb(sizeof (union ioctypes), BPRI_MED)) == NULL)
1235                 return (NULL);
1236 
1237         bzero(mp->b_rptr, sizeof (union ioctypes));
1238 
1239         /*
1240          * Set the mblk_t information and ptrs correctly.
1241          */
1242         mp->b_wptr += sizeof (struct iocblk);
1243         mp->b_datap->db_type = M_IOCTL;
1244 
1245         /*
1246          * Fill in the fields.
1247          */
1248         ioc             = (struct iocblk *)mp->b_rptr;
1249         ioc->ioc_cmd = cmd;
1250         ioc->ioc_cr  = kcred;
1251         ioc->ioc_id  = getiocseqno();
1252         ioc->ioc_flag        = IOC_NATIVE;
1253         return (mp);
1254 }
1255 
1256 /*
1257  * test if block of given size can be allocated with a request of
1258  * the given priority.
1259  * 'pri' is no longer used, but is retained for compatibility.
1260  */
1261 /* ARGSUSED */
1262 int
1263 testb(size_t size, uint_t pri)
1264 {
1265         return ((size + sizeof (dblk_t)) <= kmem_avail());
1266 }
1267 
1268 /*
1269  * Call function 'func' with argument 'arg' when there is a reasonably
1270  * good chance that a block of size 'size' can be allocated.
1271  * 'pri' is no longer used, but is retained for compatibility.
1272  */
1273 /* ARGSUSED */
1274 bufcall_id_t
1275 bufcall(size_t size, uint_t pri, void (*func)(void *), void *arg)
1276 {
1277         static long bid = 1;    /* always odd to save checking for zero */
1278         bufcall_id_t bc_id;
1279         struct strbufcall *bcp;
1280 
1281         if ((bcp = kmem_alloc(sizeof (strbufcall_t), KM_NOSLEEP)) == NULL)
1282                 return (0);
1283 
1284         bcp->bc_func = func;
1285         bcp->bc_arg = arg;
1286         bcp->bc_size = size;
1287         bcp->bc_next = NULL;
1288         bcp->bc_executor = NULL;
1289 
1290         mutex_enter(&strbcall_lock);
1291         /*
1292          * After bcp is linked into strbcalls and strbcall_lock is dropped there
1293          * should be no references to bcp since it may be freed by
1294          * runbufcalls(). Since bcp_id field is returned, we save its value in
1295          * the local var.
1296          */
1297         bc_id = bcp->bc_id = (bufcall_id_t)(bid += 2);       /* keep it odd */
1298 
1299         /*
1300          * add newly allocated stream event to existing
1301          * linked list of events.
1302          */
1303         if (strbcalls.bc_head == NULL) {
1304                 strbcalls.bc_head = strbcalls.bc_tail = bcp;
1305         } else {
1306                 strbcalls.bc_tail->bc_next = bcp;
1307                 strbcalls.bc_tail = bcp;
1308         }
1309 
1310         cv_signal(&strbcall_cv);
1311         mutex_exit(&strbcall_lock);
1312         return (bc_id);
1313 }
1314 
1315 /*
1316  * Cancel a bufcall request.
1317  */
1318 void
1319 unbufcall(bufcall_id_t id)
1320 {
1321         strbufcall_t *bcp, *pbcp;
1322 
1323         mutex_enter(&strbcall_lock);
1324 again:
1325         pbcp = NULL;
1326         for (bcp = strbcalls.bc_head; bcp; bcp = bcp->bc_next) {
1327                 if (id == bcp->bc_id)
1328                         break;
1329                 pbcp = bcp;
1330         }
1331         if (bcp) {
1332                 if (bcp->bc_executor != NULL) {
1333                         if (bcp->bc_executor != curthread) {
1334                                 cv_wait(&bcall_cv, &strbcall_lock);
1335                                 goto again;
1336                         }
1337                 } else {
1338                         if (pbcp)
1339                                 pbcp->bc_next = bcp->bc_next;
1340                         else
1341                                 strbcalls.bc_head = bcp->bc_next;
1342                         if (bcp == strbcalls.bc_tail)
1343                                 strbcalls.bc_tail = pbcp;
1344                         kmem_free(bcp, sizeof (strbufcall_t));
1345                 }
1346         }
1347         mutex_exit(&strbcall_lock);
1348 }
1349 
1350 /*
1351  * Duplicate a message block by block (uses dupb), returning
1352  * a pointer to the duplicate message.
1353  * Returns a non-NULL value only if the entire message
1354  * was dup'd.
1355  */
1356 mblk_t *
1357 dupmsg(mblk_t *bp)
1358 {
1359         mblk_t *head, *nbp;
1360 
1361         if (!bp || !(nbp = head = dupb(bp)))
1362                 return (NULL);
1363 
1364         while (bp->b_cont) {
1365                 if (!(nbp->b_cont = dupb(bp->b_cont))) {
1366                         freemsg(head);
1367                         return (NULL);
1368                 }
1369                 nbp = nbp->b_cont;
1370                 bp = bp->b_cont;
1371         }
1372         return (head);
1373 }
1374 
1375 #define DUPB_NOLOAN(bp) \
1376         ((((bp)->b_datap->db_struioflag & STRUIO_ZC) != 0) ? \
1377         copyb((bp)) : dupb((bp)))
1378 
1379 mblk_t *
1380 dupmsg_noloan(mblk_t *bp)
1381 {
1382         mblk_t *head, *nbp;
1383 
1384         if (bp == NULL || DB_TYPE(bp) != M_DATA ||
1385             ((nbp = head = DUPB_NOLOAN(bp)) == NULL))
1386                 return (NULL);
1387 
1388         while (bp->b_cont) {
1389                 if ((nbp->b_cont = DUPB_NOLOAN(bp->b_cont)) == NULL) {
1390                         freemsg(head);
1391                         return (NULL);
1392                 }
1393                 nbp = nbp->b_cont;
1394                 bp = bp->b_cont;
1395         }
1396         return (head);
1397 }
1398 
1399 /*
1400  * Copy data from message and data block to newly allocated message and
1401  * data block. Returns new message block pointer, or NULL if error.
1402  * The alignment of rptr (w.r.t. word alignment) will be the same in the copy
1403  * as in the original even when db_base is not word aligned. (bug 1052877)
1404  */
1405 mblk_t *
1406 copyb(mblk_t *bp)
1407 {
1408         mblk_t  *nbp;
1409         dblk_t  *dp, *ndp;
1410         uchar_t *base;
1411         size_t  size;
1412         size_t  unaligned;
1413 
1414         ASSERT(bp->b_wptr >= bp->b_rptr);
1415 
1416         dp = bp->b_datap;
1417         if (dp->db_fthdr != NULL)
1418                 STR_FTEVENT_MBLK(bp, caller(), FTEV_COPYB, 0);
1419 
1420         /*
1421          * Special handling for Multidata message; this should be
1422          * removed once a copy-callback routine is made available.
1423          */
1424         if (dp->db_type == M_MULTIDATA) {
1425                 cred_t *cr;
1426 
1427                 if ((nbp = mmd_copy(bp, KM_NOSLEEP)) == NULL)
1428                         return (NULL);
1429 
1430                 nbp->b_flag = bp->b_flag;
1431                 nbp->b_band = bp->b_band;
1432                 ndp = nbp->b_datap;
1433 
1434                 /* See comments below on potential issues. */
1435                 STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1436 
1437                 ASSERT(ndp->db_type == dp->db_type);
1438                 cr = dp->db_credp;
1439                 if (cr != NULL)
1440                         crhold(ndp->db_credp = cr);
1441                 ndp->db_cpid = dp->db_cpid;
1442                 return (nbp);
1443         }
1444 
1445         size = dp->db_lim - dp->db_base;
1446         unaligned = P2PHASE((uintptr_t)dp->db_base, sizeof (uint_t));
1447         if ((nbp = allocb_tmpl(size + unaligned, bp)) == NULL)
1448                 return (NULL);
1449         nbp->b_flag = bp->b_flag;
1450         nbp->b_band = bp->b_band;
1451         ndp = nbp->b_datap;
1452 
1453         /*
1454          * Well, here is a potential issue.  If we are trying to
1455          * trace a flow, and we copy the message, we might lose
1456          * information about where this message might have been.
1457          * So we should inherit the FT data.  On the other hand,
1458          * a user might be interested only in alloc to free data.
1459          * So I guess the real answer is to provide a tunable.
1460          */
1461         STR_FTEVENT_MBLK(nbp, caller(), FTEV_COPYB, 1);
1462 
1463         base = ndp->db_base + unaligned;
1464         bcopy(dp->db_base, ndp->db_base + unaligned, size);
1465 
1466         nbp->b_rptr = base + (bp->b_rptr - dp->db_base);
1467         nbp->b_wptr = nbp->b_rptr + MBLKL(bp);
1468 
1469         return (nbp);
1470 }
1471 
1472 /*
1473  * Copy data from message to newly allocated message using new
1474  * data blocks.  Returns a pointer to the new message, or NULL if error.
1475  */
1476 mblk_t *
1477 copymsg(mblk_t *bp)
1478 {
1479         mblk_t *head, *nbp;
1480 
1481         if (!bp || !(nbp = head = copyb(bp)))
1482                 return (NULL);
1483 
1484         while (bp->b_cont) {
1485                 if (!(nbp->b_cont = copyb(bp->b_cont))) {
1486                         freemsg(head);
1487                         return (NULL);
1488                 }
1489                 nbp = nbp->b_cont;
1490                 bp = bp->b_cont;
1491         }
1492         return (head);
1493 }
1494 
1495 /*
1496  * link a message block to tail of message
1497  */
1498 void
1499 linkb(mblk_t *mp, mblk_t *bp)
1500 {
1501         ASSERT(mp && bp);
1502 
1503         for (; mp->b_cont; mp = mp->b_cont)
1504                 ;
1505         mp->b_cont = bp;
1506 }
1507 
1508 /*
1509  * unlink a message block from head of message
1510  * return pointer to new message.
1511  * NULL if message becomes empty.
1512  */
1513 mblk_t *
1514 unlinkb(mblk_t *bp)
1515 {
1516         mblk_t *bp1;
1517 
1518         bp1 = bp->b_cont;
1519         bp->b_cont = NULL;
1520         return (bp1);
1521 }
1522 
1523 /*
1524  * remove a message block "bp" from message "mp"
1525  *
1526  * Return pointer to new message or NULL if no message remains.
1527  * Return -1 if bp is not found in message.
1528  */
1529 mblk_t *
1530 rmvb(mblk_t *mp, mblk_t *bp)
1531 {
1532         mblk_t *tmp;
1533         mblk_t *lastp = NULL;
1534 
1535         ASSERT(mp && bp);
1536         for (tmp = mp; tmp; tmp = tmp->b_cont) {
1537                 if (tmp == bp) {
1538                         if (lastp)
1539                                 lastp->b_cont = tmp->b_cont;
1540                         else
1541                                 mp = tmp->b_cont;
1542                         tmp->b_cont = NULL;
1543                         return (mp);
1544                 }
1545                 lastp = tmp;
1546         }
1547         return ((mblk_t *)-1);
1548 }
1549 
1550 /*
1551  * Concatenate and align first len bytes of common
1552  * message type.  Len == -1, means concat everything.
1553  * Returns 1 on success, 0 on failure
1554  * After the pullup, mp points to the pulled up data.
1555  */
1556 int
1557 pullupmsg(mblk_t *mp, ssize_t len)
1558 {
1559         mblk_t *bp, *b_cont;
1560         dblk_t *dbp;
1561         ssize_t n;
1562 
1563         ASSERT(mp->b_datap->db_ref > 0);
1564         ASSERT(mp->b_next == NULL && mp->b_prev == NULL);
1565 
1566         /*
1567          * We won't handle Multidata message, since it contains
1568          * metadata which this function has no knowledge of; we
1569          * assert on DEBUG, and return failure otherwise.
1570          */
1571         ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1572         if (mp->b_datap->db_type == M_MULTIDATA)
1573                 return (0);
1574 
1575         if (len == -1) {
1576                 if (mp->b_cont == NULL && str_aligned(mp->b_rptr))
1577                         return (1);
1578                 len = xmsgsize(mp);
1579         } else {
1580                 ssize_t first_mblk_len = mp->b_wptr - mp->b_rptr;
1581                 ASSERT(first_mblk_len >= 0);
1582                 /*
1583                  * If the length is less than that of the first mblk,
1584                  * we want to pull up the message into an aligned mblk.
1585                  * Though not part of the spec, some callers assume it.
1586                  */
1587                 if (len <= first_mblk_len) {
1588                         if (str_aligned(mp->b_rptr))
1589                                 return (1);
1590                         len = first_mblk_len;
1591                 } else if (xmsgsize(mp) < len)
1592                         return (0);
1593         }
1594 
1595         if ((bp = allocb_tmpl(len, mp)) == NULL)
1596                 return (0);
1597 
1598         dbp = bp->b_datap;
1599         *bp = *mp;              /* swap mblks so bp heads the old msg... */
1600         mp->b_datap = dbp;   /* ... and mp heads the new message */
1601         mp->b_datap->db_mblk = mp;
1602         bp->b_datap->db_mblk = bp;
1603         mp->b_rptr = mp->b_wptr = dbp->db_base;
1604 
1605         do {
1606                 ASSERT(bp->b_datap->db_ref > 0);
1607                 ASSERT(bp->b_wptr >= bp->b_rptr);
1608                 n = MIN(bp->b_wptr - bp->b_rptr, len);
1609                 ASSERT(n >= 0);              /* allow zero-length mblk_t's */
1610                 if (n > 0)
1611                         bcopy(bp->b_rptr, mp->b_wptr, (size_t)n);
1612                 mp->b_wptr += n;
1613                 bp->b_rptr += n;
1614                 len -= n;
1615                 if (bp->b_rptr != bp->b_wptr)
1616                         break;
1617                 b_cont = bp->b_cont;
1618                 freeb(bp);
1619                 bp = b_cont;
1620         } while (len && bp);
1621 
1622         mp->b_cont = bp;     /* tack on whatever wasn't pulled up */
1623 
1624         return (1);
1625 }
1626 
1627 /*
1628  * Concatenate and align at least the first len bytes of common message
1629  * type.  Len == -1 means concatenate everything.  The original message is
1630  * unaltered.  Returns a pointer to a new message on success, otherwise
1631  * returns NULL.
1632  */
1633 mblk_t *
1634 msgpullup(mblk_t *mp, ssize_t len)
1635 {
1636         mblk_t  *newmp;
1637         ssize_t totlen;
1638         ssize_t n;
1639 
1640         /*
1641          * We won't handle Multidata message, since it contains
1642          * metadata which this function has no knowledge of; we
1643          * assert on DEBUG, and return failure otherwise.
1644          */
1645         ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1646         if (mp->b_datap->db_type == M_MULTIDATA)
1647                 return (NULL);
1648 
1649         totlen = xmsgsize(mp);
1650 
1651         if ((len > 0) && (len > totlen))
1652                 return (NULL);
1653 
1654         /*
1655          * Copy all of the first msg type into one new mblk, then dupmsg
1656          * and link the rest onto this.
1657          */
1658 
1659         len = totlen;
1660 
1661         if ((newmp = allocb_tmpl(len, mp)) == NULL)
1662                 return (NULL);
1663 
1664         newmp->b_flag = mp->b_flag;
1665         newmp->b_band = mp->b_band;
1666 
1667         while (len > 0) {
1668                 n = mp->b_wptr - mp->b_rptr;
1669                 ASSERT(n >= 0);              /* allow zero-length mblk_t's */
1670                 if (n > 0)
1671                         bcopy(mp->b_rptr, newmp->b_wptr, n);
1672                 newmp->b_wptr += n;
1673                 len -= n;
1674                 mp = mp->b_cont;
1675         }
1676 
1677         if (mp != NULL) {
1678                 newmp->b_cont = dupmsg(mp);
1679                 if (newmp->b_cont == NULL) {
1680                         freemsg(newmp);
1681                         return (NULL);
1682                 }
1683         }
1684 
1685         return (newmp);
1686 }
1687 
1688 /*
1689  * Trim bytes from message
1690  *  len > 0, trim from head
1691  *  len < 0, trim from tail
1692  * Returns 1 on success, 0 on failure.
1693  */
1694 int
1695 adjmsg(mblk_t *mp, ssize_t len)
1696 {
1697         mblk_t *bp;
1698         mblk_t *save_bp = NULL;
1699         mblk_t *prev_bp;
1700         mblk_t *bcont;
1701         unsigned char type;
1702         ssize_t n;
1703         int fromhead;
1704         int first;
1705 
1706         ASSERT(mp != NULL);
1707         /*
1708          * We won't handle Multidata message, since it contains
1709          * metadata which this function has no knowledge of; we
1710          * assert on DEBUG, and return failure otherwise.
1711          */
1712         ASSERT(mp->b_datap->db_type != M_MULTIDATA);
1713         if (mp->b_datap->db_type == M_MULTIDATA)
1714                 return (0);
1715 
1716         if (len < 0) {
1717                 fromhead = 0;
1718                 len = -len;
1719         } else {
1720                 fromhead = 1;
1721         }
1722 
1723         if (xmsgsize(mp) < len)
1724                 return (0);
1725 
1726         if (fromhead) {
1727                 first = 1;
1728                 while (len) {
1729                         ASSERT(mp->b_wptr >= mp->b_rptr);
1730                         n = MIN(mp->b_wptr - mp->b_rptr, len);
1731                         mp->b_rptr += n;
1732                         len -= n;
1733 
1734                         /*
1735                          * If this is not the first zero length
1736                          * message remove it
1737                          */
1738                         if (!first && (mp->b_wptr == mp->b_rptr)) {
1739                                 bcont = mp->b_cont;
1740                                 freeb(mp);
1741                                 mp = save_bp->b_cont = bcont;
1742                         } else {
1743                                 save_bp = mp;
1744                                 mp = mp->b_cont;
1745                         }
1746                         first = 0;
1747                 }
1748         } else {
1749                 type = mp->b_datap->db_type;
1750                 while (len) {
1751                         bp = mp;
1752                         save_bp = NULL;
1753 
1754                         /*
1755                          * Find the last message of same type
1756                          */
1757                         while (bp && bp->b_datap->db_type == type) {
1758                                 ASSERT(bp->b_wptr >= bp->b_rptr);
1759                                 prev_bp = save_bp;
1760                                 save_bp = bp;
1761                                 bp = bp->b_cont;
1762                         }
1763                         if (save_bp == NULL)
1764                                 break;
1765                         n = MIN(save_bp->b_wptr - save_bp->b_rptr, len);
1766                         save_bp->b_wptr -= n;
1767                         len -= n;
1768 
1769                         /*
1770                          * If this is not the first message
1771                          * and we have taken away everything
1772                          * from this message, remove it
1773                          */
1774 
1775                         if ((save_bp != mp) &&
1776                             (save_bp->b_wptr == save_bp->b_rptr)) {
1777                                 bcont = save_bp->b_cont;
1778                                 freeb(save_bp);
1779                                 prev_bp->b_cont = bcont;
1780                         }
1781                 }
1782         }
1783         return (1);
1784 }
1785 
1786 /*
1787  * get number of data bytes in message
1788  */
1789 size_t
1790 msgdsize(mblk_t *bp)
1791 {
1792         size_t count = 0;
1793 
1794         for (; bp; bp = bp->b_cont)
1795                 if (bp->b_datap->db_type == M_DATA) {
1796                         ASSERT(bp->b_wptr >= bp->b_rptr);
1797                         count += bp->b_wptr - bp->b_rptr;
1798                 }
1799         return (count);
1800 }
1801 
1802 /*
1803  * Get a message off head of queue
1804  *
1805  * If queue has no buffers then mark queue
1806  * with QWANTR. (queue wants to be read by
1807  * someone when data becomes available)
1808  *
1809  * If there is something to take off then do so.
1810  * If queue falls below hi water mark turn off QFULL
1811  * flag.  Decrement weighted count of queue.
1812  * Also turn off QWANTR because queue is being read.
1813  *
1814  * The queue count is maintained on a per-band basis.
1815  * Priority band 0 (normal messages) uses q_count,
1816  * q_lowat, etc.  Non-zero priority bands use the
1817  * fields in their respective qband structures
1818  * (qb_count, qb_lowat, etc.)  All messages appear
1819  * on the same list, linked via their b_next pointers.
1820  * q_first is the head of the list.  q_count does
1821  * not reflect the size of all the messages on the
1822  * queue.  It only reflects those messages in the
1823  * normal band of flow.  The one exception to this
1824  * deals with high priority messages.  They are in
1825  * their own conceptual "band", but are accounted
1826  * against q_count.
1827  *
1828  * If queue count is below the lo water mark and QWANTW
1829  * is set, enable the closest backq which has a service
1830  * procedure and turn off the QWANTW flag.
1831  *
1832  * getq could be built on top of rmvq, but isn't because
1833  * of performance considerations.
1834  *
1835  * A note on the use of q_count and q_mblkcnt:
1836  *   q_count is the traditional byte count for messages that
1837  *   have been put on a queue.  Documentation tells us that
1838  *   we shouldn't rely on that count, but some drivers/modules
1839  *   do.  What was needed, however, is a mechanism to prevent
1840  *   runaway streams from consuming all of the resources,
1841  *   and particularly be able to flow control zero-length
1842  *   messages.  q_mblkcnt is used for this purpose.  It
1843  *   counts the number of mblk's that are being put on
1844  *   the queue.  The intention here, is that each mblk should
1845  *   contain one byte of data and, for the purpose of
1846  *   flow-control, logically does.  A queue will become
1847  *   full when EITHER of these values (q_count and q_mblkcnt)
1848  *   reach the highwater mark.  It will clear when BOTH
1849  *   of them drop below the highwater mark.  And it will
1850  *   backenable when BOTH of them drop below the lowwater
1851  *   mark.
1852  *   With this algorithm, a driver/module might be able
1853  *   to find a reasonably accurate q_count, and the
1854  *   framework can still try and limit resource usage.
1855  */
1856 mblk_t *
1857 getq(queue_t *q)
1858 {
1859         mblk_t *bp;
1860         uchar_t band = 0;
1861 
1862         bp = getq_noenab(q, 0);
1863         if (bp != NULL)
1864                 band = bp->b_band;
1865 
1866         /*
1867          * Inlined from qbackenable().
1868          * Quick check without holding the lock.
1869          */
1870         if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
1871                 return (bp);
1872 
1873         qbackenable(q, band);
1874         return (bp);
1875 }
1876 
1877 /*
1878  * Calculate number of data bytes in a single data message block taking
1879  * multidata messages into account.
1880  */
1881 
1882 #define ADD_MBLK_SIZE(mp, size)                                         \
1883         if (DB_TYPE(mp) != M_MULTIDATA) {                               \
1884                 (size) += MBLKL(mp);                                    \
1885         } else {                                                        \
1886                 uint_t  pinuse;                                         \
1887                                                                         \
1888                 mmd_getsize(mmd_getmultidata(mp), NULL, &pinuse);   \
1889                 (size) += pinuse;                                       \
1890         }
1891 
1892 /*
1893  * Returns the number of bytes in a message (a message is defined as a
1894  * chain of mblks linked by b_cont). If a non-NULL mblkcnt is supplied we
1895  * also return the number of distinct mblks in the message.
1896  */
1897 int
1898 mp_cont_len(mblk_t *bp, int *mblkcnt)
1899 {
1900         mblk_t  *mp;
1901         int     mblks = 0;
1902         int     bytes = 0;
1903 
1904         for (mp = bp; mp != NULL; mp = mp->b_cont) {
1905                 ADD_MBLK_SIZE(mp, bytes);
1906                 mblks++;
1907         }
1908 
1909         if (mblkcnt != NULL)
1910                 *mblkcnt = mblks;
1911 
1912         return (bytes);
1913 }
1914 
1915 /*
1916  * Like getq() but does not backenable.  This is used by the stream
1917  * head when a putback() is likely.  The caller must call qbackenable()
1918  * after it is done with accessing the queue.
1919  * The rbytes arguments to getq_noneab() allows callers to specify a
1920  * the maximum number of bytes to return. If the current amount on the
1921  * queue is less than this then the entire message will be returned.
1922  * A value of 0 returns the entire message and is equivalent to the old
1923  * default behaviour prior to the addition of the rbytes argument.
1924  */
1925 mblk_t *
1926 getq_noenab(queue_t *q, ssize_t rbytes)
1927 {
1928         mblk_t *bp, *mp1;
1929         mblk_t *mp2 = NULL;
1930         qband_t *qbp;
1931         kthread_id_t freezer;
1932         int     bytecnt = 0, mblkcnt = 0;
1933 
1934         /* freezestr should allow its caller to call getq/putq */
1935         freezer = STREAM(q)->sd_freezer;
1936         if (freezer == curthread) {
1937                 ASSERT(frozenstr(q));
1938                 ASSERT(MUTEX_HELD(QLOCK(q)));
1939         } else
1940                 mutex_enter(QLOCK(q));
1941 
1942         if ((bp = q->q_first) == 0) {
1943                 q->q_flag |= QWANTR;
1944         } else {
1945                 /*
1946                  * If the caller supplied a byte threshold and there is
1947                  * more than this amount on the queue then break up the
1948                  * the message appropriately.  We can only safely do
1949                  * this for M_DATA messages.
1950                  */
1951                 if ((DB_TYPE(bp) == M_DATA) && (rbytes > 0) &&
1952                     (q->q_count > rbytes)) {
1953                         /*
1954                          * Inline version of mp_cont_len() which terminates
1955                          * when we meet or exceed rbytes.
1956                          */
1957                         for (mp1 = bp; mp1 != NULL; mp1 = mp1->b_cont) {
1958                                 mblkcnt++;
1959                                 ADD_MBLK_SIZE(mp1, bytecnt);
1960                                 if (bytecnt  >= rbytes)
1961                                         break;
1962                         }
1963                         /*
1964                          * We need to account for the following scenarios:
1965                          *
1966                          * 1) Too much data in the first message:
1967                          *      mp1 will be the mblk which puts us over our
1968                          *      byte limit.
1969                          * 2) Not enough data in the first message:
1970                          *      mp1 will be NULL.
1971                          * 3) Exactly the right amount of data contained within
1972                          *    whole mblks:
1973                          *      mp1->b_cont will be where we break the message.
1974                          */
1975                         if (bytecnt > rbytes) {
1976                                 /*
1977                                  * Dup/copy mp1 and put what we don't need
1978                                  * back onto the queue. Adjust the read/write
1979                                  * and continuation pointers appropriately
1980                                  * and decrement the current mblk count to
1981                                  * reflect we are putting an mblk back onto
1982                                  * the queue.
1983                                  * When adjusting the message pointers, it's
1984                                  * OK to use the existing bytecnt and the
1985                                  * requested amount (rbytes) to calculate the
1986                                  * the new write offset (b_wptr) of what we
1987                                  * are taking. However, we  cannot use these
1988                                  * values when calculating the read offset of
1989                                  * the mblk we are putting back on the queue.
1990                                  * This is because the begining (b_rptr) of the
1991                                  * mblk represents some arbitrary point within
1992                                  * the message.
1993                                  * It's simplest to do this by advancing b_rptr
1994                                  * by the new length of mp1 as we don't have to
1995                                  * remember any intermediate state.
1996                                  */
1997                                 ASSERT(mp1 != NULL);
1998                                 mblkcnt--;
1999                                 if ((mp2 = dupb(mp1)) == NULL &&
2000                                     (mp2 = copyb(mp1)) == NULL) {
2001                                         bytecnt = mblkcnt = 0;
2002                                         goto dup_failed;
2003                                 }
2004                                 mp2->b_cont = mp1->b_cont;
2005                                 mp1->b_wptr -= bytecnt - rbytes;
2006                                 mp2->b_rptr += mp1->b_wptr - mp1->b_rptr;
2007                                 mp1->b_cont = NULL;
2008                                 bytecnt = rbytes;
2009                         } else {
2010                                 /*
2011                                  * Either there is not enough data in the first
2012                                  * message or there is no excess data to deal
2013                                  * with. If mp1 is NULL, we are taking the
2014                                  * whole message. No need to do anything.
2015                                  * Otherwise we assign mp1->b_cont to mp2 as
2016                                  * we will be putting this back onto the head of
2017                                  * the queue.
2018                                  */
2019                                 if (mp1 != NULL) {
2020                                         mp2 = mp1->b_cont;
2021                                         mp1->b_cont = NULL;
2022                                 }
2023                         }
2024                         /*
2025                          * If mp2 is not NULL then we have part of the message
2026                          * to put back onto the queue.
2027                          */
2028                         if (mp2 != NULL) {
2029                                 if ((mp2->b_next = bp->b_next) == NULL)
2030                                         q->q_last = mp2;
2031                                 else
2032                                         bp->b_next->b_prev = mp2;
2033                                 q->q_first = mp2;
2034                         } else {
2035                                 if ((q->q_first = bp->b_next) == NULL)
2036                                         q->q_last = NULL;
2037                                 else
2038                                         q->q_first->b_prev = NULL;
2039                         }
2040                 } else {
2041                         /*
2042                          * Either no byte threshold was supplied, there is
2043                          * not enough on the queue or we failed to
2044                          * duplicate/copy a data block. In these cases we
2045                          * just take the entire first message.
2046                          */
2047 dup_failed:
2048                         bytecnt = mp_cont_len(bp, &mblkcnt);
2049                         if ((q->q_first = bp->b_next) == NULL)
2050                                 q->q_last = NULL;
2051                         else
2052                                 q->q_first->b_prev = NULL;
2053                 }
2054                 if (bp->b_band == 0) {
2055                         q->q_count -= bytecnt;
2056                         q->q_mblkcnt -= mblkcnt;
2057                         if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2058                             (q->q_mblkcnt < q->q_hiwat))) {
2059                                 q->q_flag &= ~QFULL;
2060                         }
2061                 } else {
2062                         int i;
2063 
2064                         ASSERT(bp->b_band <= q->q_nband);
2065                         ASSERT(q->q_bandp != NULL);
2066                         ASSERT(MUTEX_HELD(QLOCK(q)));
2067                         qbp = q->q_bandp;
2068                         i = bp->b_band;
2069                         while (--i > 0)
2070                                 qbp = qbp->qb_next;
2071                         if (qbp->qb_first == qbp->qb_last) {
2072                                 qbp->qb_first = NULL;
2073                                 qbp->qb_last = NULL;
2074                         } else {
2075                                 qbp->qb_first = bp->b_next;
2076                         }
2077                         qbp->qb_count -= bytecnt;
2078                         qbp->qb_mblkcnt -= mblkcnt;
2079                         if (qbp->qb_mblkcnt == 0 ||
2080                             ((qbp->qb_count < qbp->qb_hiwat) &&
2081                             (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2082                                 qbp->qb_flag &= ~QB_FULL;
2083                         }
2084                 }
2085                 q->q_flag &= ~QWANTR;
2086                 bp->b_next = NULL;
2087                 bp->b_prev = NULL;
2088         }
2089         if (freezer != curthread)
2090                 mutex_exit(QLOCK(q));
2091 
2092         STR_FTEVENT_MSG(bp, q, FTEV_GETQ, NULL);
2093 
2094         return (bp);
2095 }
2096 
2097 /*
2098  * Determine if a backenable is needed after removing a message in the
2099  * specified band.
2100  * NOTE: This routine assumes that something like getq_noenab() has been
2101  * already called.
2102  *
2103  * For the read side it is ok to hold sd_lock across calling this (and the
2104  * stream head often does).
2105  * But for the write side strwakeq might be invoked and it acquires sd_lock.
2106  */
2107 void
2108 qbackenable(queue_t *q, uchar_t band)
2109 {
2110         int backenab = 0;
2111         qband_t *qbp;
2112         kthread_id_t freezer;
2113 
2114         ASSERT(q);
2115         ASSERT((q->q_flag & QREADR) || MUTEX_NOT_HELD(&STREAM(q)->sd_lock));
2116 
2117         /*
2118          * Quick check without holding the lock.
2119          * OK since after getq() has lowered the q_count these flags
2120          * would not change unless either the qbackenable() is done by
2121          * another thread (which is ok) or the queue has gotten QFULL
2122          * in which case another backenable will take place when the queue
2123          * drops below q_lowat.
2124          */
2125         if (band == 0 && (q->q_flag & (QWANTW|QWANTWSYNC)) == 0)
2126                 return;
2127 
2128         /* freezestr should allow its caller to call getq/putq */
2129         freezer = STREAM(q)->sd_freezer;
2130         if (freezer == curthread) {
2131                 ASSERT(frozenstr(q));
2132                 ASSERT(MUTEX_HELD(QLOCK(q)));
2133         } else
2134                 mutex_enter(QLOCK(q));
2135 
2136         if (band == 0) {
2137                 if (q->q_lowat == 0 || (q->q_count < q->q_lowat &&
2138                     q->q_mblkcnt < q->q_lowat)) {
2139                         backenab = q->q_flag & (QWANTW|QWANTWSYNC);
2140                 }
2141         } else {
2142                 int i;
2143 
2144                 ASSERT((unsigned)band <= q->q_nband);
2145                 ASSERT(q->q_bandp != NULL);
2146 
2147                 qbp = q->q_bandp;
2148                 i = band;
2149                 while (--i > 0)
2150                         qbp = qbp->qb_next;
2151 
2152                 if (qbp->qb_lowat == 0 || (qbp->qb_count < qbp->qb_lowat &&
2153                     qbp->qb_mblkcnt < qbp->qb_lowat)) {
2154                         backenab = qbp->qb_flag & QB_WANTW;
2155                 }
2156         }
2157 
2158         if (backenab == 0) {
2159                 if (freezer != curthread)
2160                         mutex_exit(QLOCK(q));
2161                 return;
2162         }
2163 
2164         /* Have to drop the lock across strwakeq and backenable */
2165         if (backenab & QWANTWSYNC)
2166                 q->q_flag &= ~QWANTWSYNC;
2167         if (backenab & (QWANTW|QB_WANTW)) {
2168                 if (band != 0)
2169                         qbp->qb_flag &= ~QB_WANTW;
2170                 else {
2171                         q->q_flag &= ~QWANTW;
2172                 }
2173         }
2174 
2175         if (freezer != curthread)
2176                 mutex_exit(QLOCK(q));
2177 
2178         if (backenab & QWANTWSYNC)
2179                 strwakeq(q, QWANTWSYNC);
2180         if (backenab & (QWANTW|QB_WANTW))
2181                 backenable(q, band);
2182 }
2183 
2184 /*
2185  * Remove a message from a queue.  The queue count and other
2186  * flow control parameters are adjusted and the back queue
2187  * enabled if necessary.
2188  *
2189  * rmvq can be called with the stream frozen, but other utility functions
2190  * holding QLOCK, and by streams modules without any locks/frozen.
2191  */
2192 void
2193 rmvq(queue_t *q, mblk_t *mp)
2194 {
2195         ASSERT(mp != NULL);
2196 
2197         rmvq_noenab(q, mp);
2198         if (curthread != STREAM(q)->sd_freezer && MUTEX_HELD(QLOCK(q))) {
2199                 /*
2200                  * qbackenable can handle a frozen stream but not a "random"
2201                  * qlock being held. Drop lock across qbackenable.
2202                  */
2203                 mutex_exit(QLOCK(q));
2204                 qbackenable(q, mp->b_band);
2205                 mutex_enter(QLOCK(q));
2206         } else {
2207                 qbackenable(q, mp->b_band);
2208         }
2209 }
2210 
2211 /*
2212  * Like rmvq() but without any backenabling.
2213  * This exists to handle SR_CONSOL_DATA in strrput().
2214  */
2215 void
2216 rmvq_noenab(queue_t *q, mblk_t *mp)
2217 {
2218         int i;
2219         qband_t *qbp = NULL;
2220         kthread_id_t freezer;
2221         int     bytecnt = 0, mblkcnt = 0;
2222 
2223         freezer = STREAM(q)->sd_freezer;
2224         if (freezer == curthread) {
2225                 ASSERT(frozenstr(q));
2226                 ASSERT(MUTEX_HELD(QLOCK(q)));
2227         } else if (MUTEX_HELD(QLOCK(q))) {
2228                 /* Don't drop lock on exit */
2229                 freezer = curthread;
2230         } else
2231                 mutex_enter(QLOCK(q));
2232 
2233         ASSERT(mp->b_band <= q->q_nband);
2234         if (mp->b_band != 0) {               /* Adjust band pointers */
2235                 ASSERT(q->q_bandp != NULL);
2236                 qbp = q->q_bandp;
2237                 i = mp->b_band;
2238                 while (--i > 0)
2239                         qbp = qbp->qb_next;
2240                 if (mp == qbp->qb_first) {
2241                         if (mp->b_next && mp->b_band == mp->b_next->b_band)
2242                                 qbp->qb_first = mp->b_next;
2243                         else
2244                                 qbp->qb_first = NULL;
2245                 }
2246                 if (mp == qbp->qb_last) {
2247                         if (mp->b_prev && mp->b_band == mp->b_prev->b_band)
2248                                 qbp->qb_last = mp->b_prev;
2249                         else
2250                                 qbp->qb_last = NULL;
2251                 }
2252         }
2253 
2254         /*
2255          * Remove the message from the list.
2256          */
2257         if (mp->b_prev)
2258                 mp->b_prev->b_next = mp->b_next;
2259         else
2260                 q->q_first = mp->b_next;
2261         if (mp->b_next)
2262                 mp->b_next->b_prev = mp->b_prev;
2263         else
2264                 q->q_last = mp->b_prev;
2265         mp->b_next = NULL;
2266         mp->b_prev = NULL;
2267 
2268         /* Get the size of the message for q_count accounting */
2269         bytecnt = mp_cont_len(mp, &mblkcnt);
2270 
2271         if (mp->b_band == 0) {               /* Perform q_count accounting */
2272                 q->q_count -= bytecnt;
2273                 q->q_mblkcnt -= mblkcnt;
2274                 if (q->q_mblkcnt == 0 || ((q->q_count < q->q_hiwat) &&
2275                     (q->q_mblkcnt < q->q_hiwat))) {
2276                         q->q_flag &= ~QFULL;
2277                 }
2278         } else {                        /* Perform qb_count accounting */
2279                 qbp->qb_count -= bytecnt;
2280                 qbp->qb_mblkcnt -= mblkcnt;
2281                 if (qbp->qb_mblkcnt == 0 || ((qbp->qb_count < qbp->qb_hiwat) &&
2282                     (qbp->qb_mblkcnt < qbp->qb_hiwat))) {
2283                         qbp->qb_flag &= ~QB_FULL;
2284                 }
2285         }
2286         if (freezer != curthread)
2287                 mutex_exit(QLOCK(q));
2288 
2289         STR_FTEVENT_MSG(mp, q, FTEV_RMVQ, NULL);
2290 }
2291 
2292 /*
2293  * Empty a queue.
2294  * If flag is set, remove all messages.  Otherwise, remove
2295  * only non-control messages.  If queue falls below its low
2296  * water mark, and QWANTW is set, enable the nearest upstream
2297  * service procedure.
2298  *
2299  * Historical note: when merging the M_FLUSH code in strrput with this
2300  * code one difference was discovered. flushq did not have a check
2301  * for q_lowat == 0 in the backenabling test.
2302  *
2303  * pcproto_flag specifies whether or not a M_PCPROTO message should be flushed
2304  * if one exists on the queue.
2305  */
2306 void
2307 flushq_common(queue_t *q, int flag, int pcproto_flag)
2308 {
2309         mblk_t *mp, *nmp;
2310         qband_t *qbp;
2311         int backenab = 0;
2312         unsigned char bpri;
2313         unsigned char   qbf[NBAND];     /* band flushing backenable flags */
2314 
2315         if (q->q_first == NULL)
2316                 return;
2317 
2318         mutex_enter(QLOCK(q));
2319         mp = q->q_first;
2320         q->q_first = NULL;
2321         q->q_last = NULL;
2322         q->q_count = 0;
2323         q->q_mblkcnt = 0;
2324         for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2325                 qbp->qb_first = NULL;
2326                 qbp->qb_last = NULL;
2327                 qbp->qb_count = 0;
2328                 qbp->qb_mblkcnt = 0;
2329                 qbp->qb_flag &= ~QB_FULL;
2330         }
2331         q->q_flag &= ~QFULL;
2332         mutex_exit(QLOCK(q));
2333         while (mp) {
2334                 nmp = mp->b_next;
2335                 mp->b_next = mp->b_prev = NULL;
2336 
2337                 STR_FTEVENT_MBLK(mp, q, FTEV_FLUSHQ, NULL);
2338 
2339                 if (pcproto_flag && (mp->b_datap->db_type == M_PCPROTO))
2340                         (void) putq(q, mp);
2341                 else if (flag || datamsg(mp->b_datap->db_type))
2342                         freemsg(mp);
2343                 else
2344                         (void) putq(q, mp);
2345                 mp = nmp;
2346         }
2347         bpri = 1;
2348         mutex_enter(QLOCK(q));
2349         for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2350                 if ((qbp->qb_flag & QB_WANTW) &&
2351                     (((qbp->qb_count < qbp->qb_lowat) &&
2352                     (qbp->qb_mblkcnt < qbp->qb_lowat)) ||
2353                     qbp->qb_lowat == 0)) {
2354                         qbp->qb_flag &= ~QB_WANTW;
2355                         backenab = 1;
2356                         qbf[bpri] = 1;
2357                 } else
2358                         qbf[bpri] = 0;
2359                 bpri++;
2360         }
2361         ASSERT(bpri == (unsigned char)(q->q_nband + 1));
2362         if ((q->q_flag & QWANTW) &&
2363             (((q->q_count < q->q_lowat) &&
2364             (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2365                 q->q_flag &= ~QWANTW;
2366                 backenab = 1;
2367                 qbf[0] = 1;
2368         } else
2369                 qbf[0] = 0;
2370 
2371         /*
2372          * If any band can now be written to, and there is a writer
2373          * for that band, then backenable the closest service procedure.
2374          */
2375         if (backenab) {
2376                 mutex_exit(QLOCK(q));
2377                 for (bpri = q->q_nband; bpri != 0; bpri--)
2378                         if (qbf[bpri])
2379                                 backenable(q, bpri);
2380                 if (qbf[0])
2381                         backenable(q, 0);
2382         } else
2383                 mutex_exit(QLOCK(q));
2384 }
2385 
2386 /*
2387  * The real flushing takes place in flushq_common. This is done so that
2388  * a flag which specifies whether or not M_PCPROTO messages should be flushed
2389  * or not. Currently the only place that uses this flag is the stream head.
2390  */
2391 void
2392 flushq(queue_t *q, int flag)
2393 {
2394         flushq_common(q, flag, 0);
2395 }
2396 
2397 /*
2398  * Flush the queue of messages of the given priority band.
2399  * There is some duplication of code between flushq and flushband.
2400  * This is because we want to optimize the code as much as possible.
2401  * The assumption is that there will be more messages in the normal
2402  * (priority 0) band than in any other.
2403  *
2404  * Historical note: when merging the M_FLUSH code in strrput with this
2405  * code one difference was discovered. flushband had an extra check for
2406  * did not have a check for (mp->b_datap->db_type < QPCTL) in the band 0
2407  * case. That check does not match the man page for flushband and was not
2408  * in the strrput flush code hence it was removed.
2409  */
2410 void
2411 flushband(queue_t *q, unsigned char pri, int flag)
2412 {
2413         mblk_t *mp;
2414         mblk_t *nmp;
2415         mblk_t *last;
2416         qband_t *qbp;
2417         int band;
2418 
2419         ASSERT((flag == FLUSHDATA) || (flag == FLUSHALL));
2420         if (pri > q->q_nband) {
2421                 return;
2422         }
2423         mutex_enter(QLOCK(q));
2424         if (pri == 0) {
2425                 mp = q->q_first;
2426                 q->q_first = NULL;
2427                 q->q_last = NULL;
2428                 q->q_count = 0;
2429                 q->q_mblkcnt = 0;
2430                 for (qbp = q->q_bandp; qbp; qbp = qbp->qb_next) {
2431                         qbp->qb_first = NULL;
2432                         qbp->qb_last = NULL;
2433                         qbp->qb_count = 0;
2434                         qbp->qb_mblkcnt = 0;
2435                         qbp->qb_flag &= ~QB_FULL;
2436                 }
2437                 q->q_flag &= ~QFULL;
2438                 mutex_exit(QLOCK(q));
2439                 while (mp) {
2440                         nmp = mp->b_next;
2441                         mp->b_next = mp->b_prev = NULL;
2442                         if ((mp->b_band == 0) &&
2443                             ((flag == FLUSHALL) ||
2444                             datamsg(mp->b_datap->db_type)))
2445                                 freemsg(mp);
2446                         else
2447                                 (void) putq(q, mp);
2448                         mp = nmp;
2449                 }
2450                 mutex_enter(QLOCK(q));
2451                 if ((q->q_flag & QWANTW) &&
2452                     (((q->q_count < q->q_lowat) &&
2453                     (q->q_mblkcnt < q->q_lowat)) || q->q_lowat == 0)) {
2454                         q->q_flag &= ~QWANTW;
2455                         mutex_exit(QLOCK(q));
2456 
2457                         backenable(q, pri);
2458                 } else
2459                         mutex_exit(QLOCK(q));
2460         } else {        /* pri != 0 */
2461                 boolean_t flushed = B_FALSE;
2462                 band = pri;
2463 
2464                 ASSERT(MUTEX_HELD(QLOCK(q)));
2465                 qbp = q->q_bandp;
2466                 while (--band > 0)
2467                         qbp = qbp->qb_next;
2468                 mp = qbp->qb_first;
2469                 if (mp == NULL) {
2470                         mutex_exit(QLOCK(q));
2471                         return;
2472                 }
2473                 last = qbp->qb_last->b_next;
2474                 /*
2475                  * rmvq_noenab() and freemsg() are called for each mblk that
2476                  * meets the criteria.  The loop is executed until the last
2477                  * mblk has been processed.
2478                  */
2479                 while (mp != last) {
2480                         ASSERT(mp->b_band == pri);
2481                         nmp = mp->b_next;
2482                         if (flag == FLUSHALL || datamsg(mp->b_datap->db_type)) {
2483                                 rmvq_noenab(q, mp);
2484                                 freemsg(mp);
2485                                 flushed = B_TRUE;
2486                         }
2487                         mp = nmp;
2488                 }
2489                 mutex_exit(QLOCK(q));
2490 
2491                 /*
2492                  * If any mblk(s) has been freed, we know that qbackenable()
2493                  * will need to be called.
2494                  */
2495                 if (flushed)
2496                         qbackenable(q, pri);
2497         }
2498 }
2499 
2500 /*
2501  * Return 1 if the queue is not full.  If the queue is full, return
2502  * 0 (may not put message) and set QWANTW flag (caller wants to write
2503  * to the queue).
2504  */
2505 int
2506 canput(queue_t *q)
2507 {
2508         TRACE_1(TR_FAC_STREAMS_FR, TR_CANPUT_IN, "canput:%p", q);
2509 
2510         /* this is for loopback transports, they should not do a canput */
2511         ASSERT(STRMATED(q->q_stream) || STREAM(q) == STREAM(q->q_nfsrv));
2512 
2513         /* Find next forward module that has a service procedure */
2514         q = q->q_nfsrv;
2515 
2516         if (!(q->q_flag & QFULL)) {
2517                 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2518                 return (1);
2519         }
2520         mutex_enter(QLOCK(q));
2521         if (q->q_flag & QFULL) {
2522                 q->q_flag |= QWANTW;
2523                 mutex_exit(QLOCK(q));
2524                 TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 0);
2525                 return (0);
2526         }
2527         mutex_exit(QLOCK(q));
2528         TRACE_2(TR_FAC_STREAMS_FR, TR_CANPUT_OUT, "canput:%p %d", q, 1);
2529         return (1);
2530 }
2531 
2532 /*
2533  * This is the new canput for use with priority bands.  Return 1 if the
2534  * band is not full.  If the band is full, return 0 (may not put message)
2535  * and set QWANTW(QB_WANTW) flag for zero(non-zero) band (caller wants to
2536  * write to the queue).
2537  */
2538 int
2539 bcanput(queue_t *q, unsigned char pri)
2540 {
2541         qband_t *qbp;
2542 
2543         TRACE_2(TR_FAC_STREAMS_FR, TR_BCANPUT_IN, "bcanput:%p %p", q, pri);
2544         if (!q)
2545                 return (0);
2546 
2547         /* Find next forward module that has a service procedure */
2548         q = q->q_nfsrv;
2549 
2550         mutex_enter(QLOCK(q));
2551         if (pri == 0) {
2552                 if (q->q_flag & QFULL) {
2553                         q->q_flag |= QWANTW;
2554                         mutex_exit(QLOCK(q));
2555                         TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2556                             "bcanput:%p %X %d", q, pri, 0);
2557                         return (0);
2558                 }
2559         } else {        /* pri != 0 */
2560                 if (pri > q->q_nband) {
2561                         /*
2562                          * No band exists yet, so return success.
2563                          */
2564                         mutex_exit(QLOCK(q));
2565                         TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2566                             "bcanput:%p %X %d", q, pri, 1);
2567                         return (1);
2568                 }
2569                 qbp = q->q_bandp;
2570                 while (--pri)
2571                         qbp = qbp->qb_next;
2572                 if (qbp->qb_flag & QB_FULL) {
2573                         qbp->qb_flag |= QB_WANTW;
2574                         mutex_exit(QLOCK(q));
2575                         TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2576                             "bcanput:%p %X %d", q, pri, 0);
2577                         return (0);
2578                 }
2579         }
2580         mutex_exit(QLOCK(q));
2581         TRACE_3(TR_FAC_STREAMS_FR, TR_BCANPUT_OUT,
2582             "bcanput:%p %X %d", q, pri, 1);
2583         return (1);
2584 }
2585 
2586 /*
2587  * Put a message on a queue.
2588  *
2589  * Messages are enqueued on a priority basis.  The priority classes
2590  * are HIGH PRIORITY (type >= QPCTL), PRIORITY (type < QPCTL && band > 0),
2591  * and B_NORMAL (type < QPCTL && band == 0).
2592  *
2593  * Add appropriate weighted data block sizes to queue count.
2594  * If queue hits high water mark then set QFULL flag.
2595  *
2596  * If QNOENAB is not set (putq is allowed to enable the queue),
2597  * enable the queue only if the message is PRIORITY,
2598  * or the QWANTR flag is set (indicating that the service procedure
2599  * is ready to read the queue.  This implies that a service
2600  * procedure must NEVER put a high priority message back on its own
2601  * queue, as this would result in an infinite loop (!).
2602  */
2603 int
2604 putq(queue_t *q, mblk_t *bp)
2605 {
2606         mblk_t *tmp;
2607         qband_t *qbp = NULL;
2608         int mcls = (int)queclass(bp);
2609         kthread_id_t freezer;
2610         int     bytecnt = 0, mblkcnt = 0;
2611 
2612         freezer = STREAM(q)->sd_freezer;
2613         if (freezer == curthread) {
2614                 ASSERT(frozenstr(q));
2615                 ASSERT(MUTEX_HELD(QLOCK(q)));
2616         } else
2617                 mutex_enter(QLOCK(q));
2618 
2619         /*
2620          * Make sanity checks and if qband structure is not yet
2621          * allocated, do so.
2622          */
2623         if (mcls == QPCTL) {
2624                 if (bp->b_band != 0)
2625                         bp->b_band = 0;              /* force to be correct */
2626         } else if (bp->b_band != 0) {
2627                 int i;
2628                 qband_t **qbpp;
2629 
2630                 if (bp->b_band > q->q_nband) {
2631 
2632                         /*
2633                          * The qband structure for this priority band is
2634                          * not on the queue yet, so we have to allocate
2635                          * one on the fly.  It would be wasteful to
2636                          * associate the qband structures with every
2637                          * queue when the queues are allocated.  This is
2638                          * because most queues will only need the normal
2639                          * band of flow which can be described entirely
2640                          * by the queue itself.
2641                          */
2642                         qbpp = &q->q_bandp;
2643                         while (*qbpp)
2644                                 qbpp = &(*qbpp)->qb_next;
2645                         while (bp->b_band > q->q_nband) {
2646                                 if ((*qbpp = allocband()) == NULL) {
2647                                         if (freezer != curthread)
2648                                                 mutex_exit(QLOCK(q));
2649                                         return (0);
2650                                 }
2651                                 (*qbpp)->qb_hiwat = q->q_hiwat;
2652                                 (*qbpp)->qb_lowat = q->q_lowat;
2653                                 q->q_nband++;
2654                                 qbpp = &(*qbpp)->qb_next;
2655                         }
2656                 }
2657                 ASSERT(MUTEX_HELD(QLOCK(q)));
2658                 qbp = q->q_bandp;
2659                 i = bp->b_band;
2660                 while (--i)
2661                         qbp = qbp->qb_next;
2662         }
2663 
2664         /*
2665          * If queue is empty, add the message and initialize the pointers.
2666          * Otherwise, adjust message pointers and queue pointers based on
2667          * the type of the message and where it belongs on the queue.  Some
2668          * code is duplicated to minimize the number of conditionals and
2669          * hopefully minimize the amount of time this routine takes.
2670          */
2671         if (!q->q_first) {
2672                 bp->b_next = NULL;
2673                 bp->b_prev = NULL;
2674                 q->q_first = bp;
2675                 q->q_last = bp;
2676                 if (qbp) {
2677                         qbp->qb_first = bp;
2678                         qbp->qb_last = bp;
2679                 }
2680         } else if (!qbp) {      /* bp->b_band == 0 */
2681 
2682                 /*
2683                  * If queue class of message is less than or equal to
2684                  * that of the last one on the queue, tack on to the end.
2685                  */
2686                 tmp = q->q_last;
2687                 if (mcls <= (int)queclass(tmp)) {
2688                         bp->b_next = NULL;
2689                         bp->b_prev = tmp;
2690                         tmp->b_next = bp;
2691                         q->q_last = bp;
2692                 } else {
2693                         tmp = q->q_first;
2694                         while ((int)queclass(tmp) >= mcls)
2695                                 tmp = tmp->b_next;
2696 
2697                         /*
2698                          * Insert bp before tmp.
2699                          */
2700                         bp->b_next = tmp;
2701                         bp->b_prev = tmp->b_prev;
2702                         if (tmp->b_prev)
2703                                 tmp->b_prev->b_next = bp;
2704                         else
2705                                 q->q_first = bp;
2706                         tmp->b_prev = bp;
2707                 }
2708         } else {                /* bp->b_band != 0 */
2709                 if (qbp->qb_first) {
2710                         tmp = qbp->qb_last;
2711 
2712                         /*
2713                          * Insert bp after the last message in this band.
2714                          */
2715                         bp->b_next = tmp->b_next;
2716                         if (tmp->b_next)
2717                                 tmp->b_next->b_prev = bp;
2718                         else
2719                                 q->q_last = bp;
2720                         bp->b_prev = tmp;
2721                         tmp->b_next = bp;
2722                 } else {
2723                         tmp = q->q_last;
2724                         if ((mcls < (int)queclass(tmp)) ||
2725                             (bp->b_band <= tmp->b_band)) {
2726 
2727                                 /*
2728                                  * Tack bp on end of queue.
2729                                  */
2730                                 bp->b_next = NULL;
2731                                 bp->b_prev = tmp;
2732                                 tmp->b_next = bp;
2733                                 q->q_last = bp;
2734                         } else {
2735                                 tmp = q->q_first;
2736                                 while (tmp->b_datap->db_type >= QPCTL)
2737                                         tmp = tmp->b_next;
2738                                 while (tmp->b_band >= bp->b_band)
2739                                         tmp = tmp->b_next;
2740 
2741                                 /*
2742                                  * Insert bp before tmp.
2743                                  */
2744                                 bp->b_next = tmp;
2745                                 bp->b_prev = tmp->b_prev;
2746                                 if (tmp->b_prev)
2747                                         tmp->b_prev->b_next = bp;
2748                                 else
2749                                         q->q_first = bp;
2750                                 tmp->b_prev = bp;
2751                         }
2752                         qbp->qb_first = bp;
2753                 }
2754                 qbp->qb_last = bp;
2755         }
2756 
2757         /* Get message byte count for q_count accounting */
2758         bytecnt = mp_cont_len(bp, &mblkcnt);
2759 
2760         if (qbp) {
2761                 qbp->qb_count += bytecnt;
2762                 qbp->qb_mblkcnt += mblkcnt;
2763                 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2764                     (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2765                         qbp->qb_flag |= QB_FULL;
2766                 }
2767         } else {
2768                 q->q_count += bytecnt;
2769                 q->q_mblkcnt += mblkcnt;
2770                 if ((q->q_count >= q->q_hiwat) ||
2771                     (q->q_mblkcnt >= q->q_hiwat)) {
2772                         q->q_flag |= QFULL;
2773                 }
2774         }
2775 
2776         STR_FTEVENT_MSG(bp, q, FTEV_PUTQ, NULL);
2777 
2778         if ((mcls > QNORM) ||
2779             (canenable(q) && (q->q_flag & QWANTR || bp->b_band)))
2780                 qenable_locked(q);
2781         ASSERT(MUTEX_HELD(QLOCK(q)));
2782         if (freezer != curthread)
2783                 mutex_exit(QLOCK(q));
2784 
2785         return (1);
2786 }
2787 
2788 /*
2789  * Put stuff back at beginning of Q according to priority order.
2790  * See comment on putq above for details.
2791  */
2792 int
2793 putbq(queue_t *q, mblk_t *bp)
2794 {
2795         mblk_t *tmp;
2796         qband_t *qbp = NULL;
2797         int mcls = (int)queclass(bp);
2798         kthread_id_t freezer;
2799         int     bytecnt = 0, mblkcnt = 0;
2800 
2801         ASSERT(q && bp);
2802         ASSERT(bp->b_next == NULL);
2803         freezer = STREAM(q)->sd_freezer;
2804         if (freezer == curthread) {
2805                 ASSERT(frozenstr(q));
2806                 ASSERT(MUTEX_HELD(QLOCK(q)));
2807         } else
2808                 mutex_enter(QLOCK(q));
2809 
2810         /*
2811          * Make sanity checks and if qband structure is not yet
2812          * allocated, do so.
2813          */
2814         if (mcls == QPCTL) {
2815                 if (bp->b_band != 0)
2816                         bp->b_band = 0;              /* force to be correct */
2817         } else if (bp->b_band != 0) {
2818                 int i;
2819                 qband_t **qbpp;
2820 
2821                 if (bp->b_band > q->q_nband) {
2822                         qbpp = &q->q_bandp;
2823                         while (*qbpp)
2824                                 qbpp = &(*qbpp)->qb_next;
2825                         while (bp->b_band > q->q_nband) {
2826                                 if ((*qbpp = allocband()) == NULL) {
2827                                         if (freezer != curthread)
2828                                                 mutex_exit(QLOCK(q));
2829                                         return (0);
2830                                 }
2831                                 (*qbpp)->qb_hiwat = q->q_hiwat;
2832                                 (*qbpp)->qb_lowat = q->q_lowat;
2833                                 q->q_nband++;
2834                                 qbpp = &(*qbpp)->qb_next;
2835                         }
2836                 }
2837                 qbp = q->q_bandp;
2838                 i = bp->b_band;
2839                 while (--i)
2840                         qbp = qbp->qb_next;
2841         }
2842 
2843         /*
2844          * If queue is empty or if message is high priority,
2845          * place on the front of the queue.
2846          */
2847         tmp = q->q_first;
2848         if ((!tmp) || (mcls == QPCTL)) {
2849                 bp->b_next = tmp;
2850                 if (tmp)
2851                         tmp->b_prev = bp;
2852                 else
2853                         q->q_last = bp;
2854                 q->q_first = bp;
2855                 bp->b_prev = NULL;
2856                 if (qbp) {
2857                         qbp->qb_first = bp;
2858                         qbp->qb_last = bp;
2859                 }
2860         } else if (qbp) {       /* bp->b_band != 0 */
2861                 tmp = qbp->qb_first;
2862                 if (tmp) {
2863 
2864                         /*
2865                          * Insert bp before the first message in this band.
2866                          */
2867                         bp->b_next = tmp;
2868                         bp->b_prev = tmp->b_prev;
2869                         if (tmp->b_prev)
2870                                 tmp->b_prev->b_next = bp;
2871                         else
2872                                 q->q_first = bp;
2873                         tmp->b_prev = bp;
2874                 } else {
2875                         tmp = q->q_last;
2876                         if ((mcls < (int)queclass(tmp)) ||
2877                             (bp->b_band < tmp->b_band)) {
2878 
2879                                 /*
2880                                  * Tack bp on end of queue.
2881                                  */
2882                                 bp->b_next = NULL;
2883                                 bp->b_prev = tmp;
2884                                 tmp->b_next = bp;
2885                                 q->q_last = bp;
2886                         } else {
2887                                 tmp = q->q_first;
2888                                 while (tmp->b_datap->db_type >= QPCTL)
2889                                         tmp = tmp->b_next;
2890                                 while (tmp->b_band > bp->b_band)
2891                                         tmp = tmp->b_next;
2892 
2893                                 /*
2894                                  * Insert bp before tmp.
2895                                  */
2896                                 bp->b_next = tmp;
2897                                 bp->b_prev = tmp->b_prev;
2898                                 if (tmp->b_prev)
2899                                         tmp->b_prev->b_next = bp;
2900                                 else
2901                                         q->q_first = bp;
2902                                 tmp->b_prev = bp;
2903                         }
2904                         qbp->qb_last = bp;
2905                 }
2906                 qbp->qb_first = bp;
2907         } else {                /* bp->b_band == 0 && !QPCTL */
2908 
2909                 /*
2910                  * If the queue class or band is less than that of the last
2911                  * message on the queue, tack bp on the end of the queue.
2912                  */
2913                 tmp = q->q_last;
2914                 if ((mcls < (int)queclass(tmp)) || (bp->b_band < tmp->b_band)) {
2915                         bp->b_next = NULL;
2916                         bp->b_prev = tmp;
2917                         tmp->b_next = bp;
2918                         q->q_last = bp;
2919                 } else {
2920                         tmp = q->q_first;
2921                         while (tmp->b_datap->db_type >= QPCTL)
2922                                 tmp = tmp->b_next;
2923                         while (tmp->b_band > bp->b_band)
2924                                 tmp = tmp->b_next;
2925 
2926                         /*
2927                          * Insert bp before tmp.
2928                          */
2929                         bp->b_next = tmp;
2930                         bp->b_prev = tmp->b_prev;
2931                         if (tmp->b_prev)
2932                                 tmp->b_prev->b_next = bp;
2933                         else
2934                                 q->q_first = bp;
2935                         tmp->b_prev = bp;
2936                 }
2937         }
2938 
2939         /* Get message byte count for q_count accounting */
2940         bytecnt = mp_cont_len(bp, &mblkcnt);
2941 
2942         if (qbp) {
2943                 qbp->qb_count += bytecnt;
2944                 qbp->qb_mblkcnt += mblkcnt;
2945                 if ((qbp->qb_count >= qbp->qb_hiwat) ||
2946                     (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
2947                         qbp->qb_flag |= QB_FULL;
2948                 }
2949         } else {
2950                 q->q_count += bytecnt;
2951                 q->q_mblkcnt += mblkcnt;
2952                 if ((q->q_count >= q->q_hiwat) ||
2953                     (q->q_mblkcnt >= q->q_hiwat)) {
2954                         q->q_flag |= QFULL;
2955                 }
2956         }
2957 
2958         STR_FTEVENT_MSG(bp, q, FTEV_PUTBQ, NULL);
2959 
2960         if ((mcls > QNORM) || (canenable(q) && (q->q_flag & QWANTR)))
2961                 qenable_locked(q);
2962         ASSERT(MUTEX_HELD(QLOCK(q)));
2963         if (freezer != curthread)
2964                 mutex_exit(QLOCK(q));
2965 
2966         return (1);
2967 }
2968 
2969 /*
2970  * Insert a message before an existing message on the queue.  If the
2971  * existing message is NULL, the new messages is placed on the end of
2972  * the queue.  The queue class of the new message is ignored.  However,
2973  * the priority band of the new message must adhere to the following
2974  * ordering:
2975  *
2976  *      emp->b_prev->b_band >= mp->b_band >= emp->b_band.
2977  *
2978  * All flow control parameters are updated.
2979  *
2980  * insq can be called with the stream frozen, but other utility functions
2981  * holding QLOCK, and by streams modules without any locks/frozen.
2982  */
2983 int
2984 insq(queue_t *q, mblk_t *emp, mblk_t *mp)
2985 {
2986         mblk_t *tmp;
2987         qband_t *qbp = NULL;
2988         int mcls = (int)queclass(mp);
2989         kthread_id_t freezer;
2990         int     bytecnt = 0, mblkcnt = 0;
2991 
2992         freezer = STREAM(q)->sd_freezer;
2993         if (freezer == curthread) {
2994                 ASSERT(frozenstr(q));
2995                 ASSERT(MUTEX_HELD(QLOCK(q)));
2996         } else if (MUTEX_HELD(QLOCK(q))) {
2997                 /* Don't drop lock on exit */
2998                 freezer = curthread;
2999         } else
3000                 mutex_enter(QLOCK(q));
3001 
3002         if (mcls == QPCTL) {
3003                 if (mp->b_band != 0)
3004                         mp->b_band = 0;              /* force to be correct */
3005                 if (emp && emp->b_prev &&
3006                     (emp->b_prev->b_datap->db_type < QPCTL))
3007                         goto badord;
3008         }
3009         if (emp) {
3010                 if (((mcls == QNORM) && (mp->b_band < emp->b_band)) ||
3011                     (emp->b_prev && (emp->b_prev->b_datap->db_type < QPCTL) &&
3012                     (emp->b_prev->b_band < mp->b_band))) {
3013                         goto badord;
3014                 }
3015         } else {
3016                 tmp = q->q_last;
3017                 if (tmp && (mcls == QNORM) && (mp->b_band > tmp->b_band)) {
3018 badord:
3019                         cmn_err(CE_WARN,
3020                             "insq: attempt to insert message out of order "
3021                             "on q %p", (void *)q);
3022                         if (freezer != curthread)
3023                                 mutex_exit(QLOCK(q));
3024                         return (0);
3025                 }
3026         }
3027 
3028         if (mp->b_band != 0) {
3029                 int i;
3030                 qband_t **qbpp;
3031 
3032                 if (mp->b_band > q->q_nband) {
3033                         qbpp = &q->q_bandp;
3034                         while (*qbpp)
3035                                 qbpp = &(*qbpp)->qb_next;
3036                         while (mp->b_band > q->q_nband) {
3037                                 if ((*qbpp = allocband()) == NULL) {
3038                                         if (freezer != curthread)
3039                                                 mutex_exit(QLOCK(q));
3040                                         return (0);
3041                                 }
3042                                 (*qbpp)->qb_hiwat = q->q_hiwat;
3043                                 (*qbpp)->qb_lowat = q->q_lowat;
3044                                 q->q_nband++;
3045                                 qbpp = &(*qbpp)->qb_next;
3046                         }
3047                 }
3048                 qbp = q->q_bandp;
3049                 i = mp->b_band;
3050                 while (--i)
3051                         qbp = qbp->qb_next;
3052         }
3053 
3054         if ((mp->b_next = emp) != NULL) {
3055                 if ((mp->b_prev = emp->b_prev) != NULL)
3056                         emp->b_prev->b_next = mp;
3057                 else
3058                         q->q_first = mp;
3059                 emp->b_prev = mp;
3060         } else {
3061                 if ((mp->b_prev = q->q_last) != NULL)
3062                         q->q_last->b_next = mp;
3063                 else
3064                         q->q_first = mp;
3065                 q->q_last = mp;
3066         }
3067 
3068         /* Get mblk and byte count for q_count accounting */
3069         bytecnt = mp_cont_len(mp, &mblkcnt);
3070 
3071         if (qbp) {      /* adjust qband pointers and count */
3072                 if (!qbp->qb_first) {
3073                         qbp->qb_first = mp;
3074                         qbp->qb_last = mp;
3075                 } else {
3076                         if (mp->b_prev == NULL || (mp->b_prev != NULL &&
3077                             (mp->b_prev->b_band != mp->b_band)))
3078                                 qbp->qb_first = mp;
3079                         else if (mp->b_next == NULL || (mp->b_next != NULL &&
3080                             (mp->b_next->b_band != mp->b_band)))
3081                                 qbp->qb_last = mp;
3082                 }
3083                 qbp->qb_count += bytecnt;
3084                 qbp->qb_mblkcnt += mblkcnt;
3085                 if ((qbp->qb_count >= qbp->qb_hiwat) ||
3086                     (qbp->qb_mblkcnt >= qbp->qb_hiwat)) {
3087                         qbp->qb_flag |= QB_FULL;
3088                 }
3089         } else {
3090                 q->q_count += bytecnt;
3091                 q->q_mblkcnt += mblkcnt;
3092                 if ((q->q_count >= q->q_hiwat) ||
3093                     (q->q_mblkcnt >= q->q_hiwat)) {
3094                         q->q_flag |= QFULL;
3095                 }
3096         }
3097 
3098         STR_FTEVENT_MSG(mp, q, FTEV_INSQ, NULL);
3099 
3100         if (canenable(q) && (q->q_flag & QWANTR))
3101                 qenable_locked(q);
3102 
3103         ASSERT(MUTEX_HELD(QLOCK(q)));
3104         if (freezer != curthread)
3105                 mutex_exit(QLOCK(q));
3106 
3107         return (1);
3108 }
3109 
3110 /*
3111  * Create and put a control message on queue.
3112  */
3113 int
3114 putctl(queue_t *q, int type)
3115 {
3116         mblk_t *bp;
3117 
3118         if ((datamsg(type) && (type != M_DELAY)) ||
3119             (bp = allocb_tryhard(0)) == NULL)
3120                 return (0);
3121         bp->b_datap->db_type = (unsigned char) type;
3122 
3123         put(q, bp);
3124 
3125         return (1);
3126 }
3127 
3128 /*
3129  * Control message with a single-byte parameter
3130  */
3131 int
3132 putctl1(queue_t *q, int type, int param)
3133 {
3134         mblk_t *bp;
3135 
3136         if ((datamsg(type) && (type != M_DELAY)) ||
3137             (bp = allocb_tryhard(1)) == NULL)
3138                 return (0);
3139         bp->b_datap->db_type = (unsigned char)type;
3140         *bp->b_wptr++ = (unsigned char)param;
3141 
3142         put(q, bp);
3143 
3144         return (1);
3145 }
3146 
3147 int
3148 putnextctl1(queue_t *q, int type, int param)
3149 {
3150         mblk_t *bp;
3151 
3152         if ((datamsg(type) && (type != M_DELAY)) ||
3153             ((bp = allocb_tryhard(1)) == NULL))
3154                 return (0);
3155 
3156         bp->b_datap->db_type = (unsigned char)type;
3157         *bp->b_wptr++ = (unsigned char)param;
3158 
3159         putnext(q, bp);
3160 
3161         return (1);
3162 }
3163 
3164 int
3165 putnextctl(queue_t *q, int type)
3166 {
3167         mblk_t *bp;
3168 
3169         if ((datamsg(type) && (type != M_DELAY)) ||
3170             ((bp = allocb_tryhard(0)) == NULL))
3171                 return (0);
3172         bp->b_datap->db_type = (unsigned char)type;
3173 
3174         putnext(q, bp);
3175 
3176         return (1);
3177 }
3178 
3179 /*
3180  * Return the queue upstream from this one
3181  */
3182 queue_t *
3183 backq(queue_t *q)
3184 {
3185         q = _OTHERQ(q);
3186         if (q->q_next) {
3187                 q = q->q_next;
3188                 return (_OTHERQ(q));
3189         }
3190         return (NULL);
3191 }
3192 
3193 /*
3194  * Send a block back up the queue in reverse from this
3195  * one (e.g. to respond to ioctls)
3196  */
3197 void
3198 qreply(queue_t *q, mblk_t *bp)
3199 {
3200         ASSERT(q && bp);
3201 
3202         putnext(_OTHERQ(q), bp);
3203 }
3204 
3205 /*
3206  * Streams Queue Scheduling
3207  *
3208  * Queues are enabled through qenable() when they have messages to
3209  * process.  They are serviced by queuerun(), which runs each enabled
3210  * queue's service procedure.  The call to queuerun() is processor
3211  * dependent - the general principle is that it be run whenever a queue
3212  * is enabled but before returning to user level.  For system calls,
3213  * the function runqueues() is called if their action causes a queue
3214  * to be enabled.  For device interrupts, queuerun() should be
3215  * called before returning from the last level of interrupt.  Beyond
3216  * this, no timing assumptions should be made about queue scheduling.
3217  */
3218 
3219 /*
3220  * Enable a queue: put it on list of those whose service procedures are
3221  * ready to run and set up the scheduling mechanism.
3222  * The broadcast is done outside the mutex -> to avoid the woken thread
3223  * from contending with the mutex. This is OK 'cos the queue has been
3224  * enqueued on the runlist and flagged safely at this point.
3225  */
3226 void
3227 qenable(queue_t *q)
3228 {
3229         mutex_enter(QLOCK(q));
3230         qenable_locked(q);
3231         mutex_exit(QLOCK(q));
3232 }
3233 /*
3234  * Return number of messages on queue
3235  */
3236 int
3237 qsize(queue_t *qp)
3238 {
3239         int count = 0;
3240         mblk_t *mp;
3241 
3242         mutex_enter(QLOCK(qp));
3243         for (mp = qp->q_first; mp; mp = mp->b_next)
3244                 count++;
3245         mutex_exit(QLOCK(qp));
3246         return (count);
3247 }
3248 
3249 /*
3250  * noenable - set queue so that putq() will not enable it.
3251  * enableok - set queue so that putq() can enable it.
3252  */
3253 void
3254 noenable(queue_t *q)
3255 {
3256         mutex_enter(QLOCK(q));
3257         q->q_flag |= QNOENB;
3258         mutex_exit(QLOCK(q));
3259 }
3260 
3261 void
3262 enableok(queue_t *q)
3263 {
3264         mutex_enter(QLOCK(q));
3265         q->q_flag &= ~QNOENB;
3266         mutex_exit(QLOCK(q));
3267 }
3268 
3269 /*
3270  * Set queue fields.
3271  */
3272 int
3273 strqset(queue_t *q, qfields_t what, unsigned char pri, intptr_t val)
3274 {
3275         qband_t *qbp = NULL;
3276         queue_t *wrq;
3277         int error = 0;
3278         kthread_id_t freezer;
3279 
3280         freezer = STREAM(q)->sd_freezer;
3281         if (freezer == curthread) {
3282                 ASSERT(frozenstr(q));
3283                 ASSERT(MUTEX_HELD(QLOCK(q)));
3284         } else
3285                 mutex_enter(QLOCK(q));
3286 
3287         if (what >= QBAD) {
3288                 error = EINVAL;
3289                 goto done;
3290         }
3291         if (pri != 0) {
3292                 int i;
3293                 qband_t **qbpp;
3294 
3295                 if (pri > q->q_nband) {
3296                         qbpp = &q->q_bandp;
3297                         while (*qbpp)
3298                                 qbpp = &(*qbpp)->qb_next;
3299                         while (pri > q->q_nband) {
3300                                 if ((*qbpp = allocband()) == NULL) {
3301                                         error = EAGAIN;
3302                                         goto done;
3303                                 }
3304                                 (*qbpp)->qb_hiwat = q->q_hiwat;
3305                                 (*qbpp)->qb_lowat = q->q_lowat;
3306                                 q->q_nband++;
3307                                 qbpp = &(*qbpp)->qb_next;
3308                         }
3309                 }
3310                 qbp = q->q_bandp;
3311                 i = pri;
3312                 while (--i)
3313                         qbp = qbp->qb_next;
3314         }
3315         switch (what) {
3316 
3317         case QHIWAT:
3318                 if (qbp)
3319                         qbp->qb_hiwat = (size_t)val;
3320                 else
3321                         q->q_hiwat = (size_t)val;
3322                 break;
3323 
3324         case QLOWAT:
3325                 if (qbp)
3326                         qbp->qb_lowat = (size_t)val;
3327                 else
3328                         q->q_lowat = (size_t)val;
3329                 break;
3330 
3331         case QMAXPSZ:
3332                 if (qbp)
3333                         error = EINVAL;
3334                 else
3335                         q->q_maxpsz = (ssize_t)val;
3336 
3337                 /*
3338                  * Performance concern, strwrite looks at the module below
3339                  * the stream head for the maxpsz each time it does a write
3340                  * we now cache it at the stream head.  Check to see if this
3341                  * queue is sitting directly below the stream head.
3342                  */
3343                 wrq = STREAM(q)->sd_wrq;
3344                 if (q != wrq->q_next)
3345                         break;
3346 
3347                 /*
3348                  * If the stream is not frozen drop the current QLOCK and
3349                  * acquire the sd_wrq QLOCK which protects sd_qn_*
3350                  */
3351                 if (freezer != curthread) {
3352                         mutex_exit(QLOCK(q));
3353                         mutex_enter(QLOCK(wrq));
3354                 }
3355                 ASSERT(MUTEX_HELD(QLOCK(wrq)));
3356 
3357                 if (strmsgsz != 0) {
3358                         if (val == INFPSZ)
3359                                 val = strmsgsz;
3360                         else  {
3361                                 if (STREAM(q)->sd_vnode->v_type == VFIFO)
3362                                         val = MIN(PIPE_BUF, val);
3363                                 else
3364                                         val = MIN(strmsgsz, val);
3365                         }
3366                 }
3367                 STREAM(q)->sd_qn_maxpsz = val;
3368                 if (freezer != curthread) {
3369                         mutex_exit(QLOCK(wrq));
3370                         mutex_enter(QLOCK(q));
3371                 }
3372                 break;
3373 
3374         case QMINPSZ:
3375                 if (qbp)
3376                         error = EINVAL;
3377                 else
3378                         q->q_minpsz = (ssize_t)val;
3379 
3380                 /*
3381                  * Performance concern, strwrite looks at the module below
3382                  * the stream head for the maxpsz each time it does a write
3383                  * we now cache it at the stream head.  Check to see if this
3384                  * queue is sitting directly below the stream head.
3385                  */
3386                 wrq = STREAM(q)->sd_wrq;
3387                 if (q != wrq->q_next)
3388                         break;
3389 
3390                 /*
3391                  * If the stream is not frozen drop the current QLOCK and
3392                  * acquire the sd_wrq QLOCK which protects sd_qn_*
3393                  */
3394                 if (freezer != curthread) {
3395                         mutex_exit(QLOCK(q));
3396                         mutex_enter(QLOCK(wrq));
3397                 }
3398                 STREAM(q)->sd_qn_minpsz = (ssize_t)val;
3399 
3400                 if (freezer != curthread) {
3401                         mutex_exit(QLOCK(wrq));
3402                         mutex_enter(QLOCK(q));
3403                 }
3404                 break;
3405 
3406         case QSTRUIOT:
3407                 if (qbp)
3408                         error = EINVAL;
3409                 else
3410                         q->q_struiot = (ushort_t)val;
3411                 break;
3412 
3413         case QCOUNT:
3414         case QFIRST:
3415         case QLAST:
3416         case QFLAG:
3417                 error = EPERM;
3418                 break;
3419 
3420         default:
3421                 error = EINVAL;
3422                 break;
3423         }
3424 done:
3425         if (freezer != curthread)
3426                 mutex_exit(QLOCK(q));
3427         return (error);
3428 }
3429 
3430 /*
3431  * Get queue fields.
3432  */
3433 int
3434 strqget(queue_t *q, qfields_t what, unsigned char pri, void *valp)
3435 {
3436         qband_t         *qbp = NULL;
3437         int             error = 0;
3438         kthread_id_t    freezer;
3439 
3440         freezer = STREAM(q)->sd_freezer;
3441         if (freezer == curthread) {
3442                 ASSERT(frozenstr(q));
3443                 ASSERT(MUTEX_HELD(QLOCK(q)));
3444         } else
3445                 mutex_enter(QLOCK(q));
3446         if (what >= QBAD) {
3447                 error = EINVAL;
3448                 goto done;
3449         }
3450         if (pri != 0) {
3451                 int i;
3452                 qband_t **qbpp;
3453 
3454                 if (pri > q->q_nband) {
3455                         qbpp = &q->q_bandp;
3456                         while (*qbpp)
3457                                 qbpp = &(*qbpp)->qb_next;
3458                         while (pri > q->q_nband) {
3459                                 if ((*qbpp = allocband()) == NULL) {
3460                                         error = EAGAIN;
3461                                         goto done;
3462                                 }
3463                                 (*qbpp)->qb_hiwat = q->q_hiwat;
3464                                 (*qbpp)->qb_lowat = q->q_lowat;
3465                                 q->q_nband++;
3466                                 qbpp = &(*qbpp)->qb_next;
3467                         }
3468                 }
3469                 qbp = q->q_bandp;
3470                 i = pri;
3471                 while (--i)
3472                         qbp = qbp->qb_next;
3473         }
3474         switch (what) {
3475         case QHIWAT:
3476                 if (qbp)
3477                         *(size_t *)valp = qbp->qb_hiwat;
3478                 else
3479                         *(size_t *)valp = q->q_hiwat;
3480                 break;
3481 
3482         case QLOWAT:
3483                 if (qbp)
3484                         *(size_t *)valp = qbp->qb_lowat;
3485                 else
3486                         *(size_t *)valp = q->q_lowat;
3487                 break;
3488 
3489         case QMAXPSZ:
3490                 if (qbp)
3491                         error = EINVAL;
3492                 else
3493                         *(ssize_t *)valp = q->q_maxpsz;
3494                 break;
3495 
3496         case QMINPSZ:
3497                 if (qbp)
3498                         error = EINVAL;
3499                 else
3500                         *(ssize_t *)valp = q->q_minpsz;
3501                 break;
3502 
3503         case QCOUNT:
3504                 if (qbp)
3505                         *(size_t *)valp = qbp->qb_count;
3506                 else
3507                         *(size_t *)valp = q->q_count;
3508                 break;
3509 
3510         case QFIRST:
3511                 if (qbp)
3512                         *(mblk_t **)valp = qbp->qb_first;
3513                 else
3514                         *(mblk_t **)valp = q->q_first;
3515                 break;
3516 
3517         case QLAST:
3518                 if (qbp)
3519                         *(mblk_t **)valp = qbp->qb_last;
3520                 else
3521                         *(mblk_t **)valp = q->q_last;
3522                 break;
3523 
3524         case QFLAG:
3525                 if (qbp)
3526                         *(uint_t *)valp = qbp->qb_flag;
3527                 else
3528                         *(uint_t *)valp = q->q_flag;
3529                 break;
3530 
3531         case QSTRUIOT:
3532                 if (qbp)
3533                         error = EINVAL;
3534                 else
3535                         *(short *)valp = q->q_struiot;
3536                 break;
3537 
3538         default:
3539                 error = EINVAL;
3540                 break;
3541         }
3542 done:
3543         if (freezer != curthread)
3544                 mutex_exit(QLOCK(q));
3545         return (error);
3546 }
3547 
3548 /*
3549  * Function awakes all in cvwait/sigwait/pollwait, on one of:
3550  *      QWANTWSYNC or QWANTR or QWANTW,
3551  *
3552  * Note: for QWANTWSYNC/QWANTW and QWANTR, if no WSLEEPer or RSLEEPer then a
3553  *       deferred wakeup will be done. Also if strpoll() in progress then a
3554  *       deferred pollwakeup will be done.
3555  */
3556 void
3557 strwakeq(queue_t *q, int flag)
3558 {
3559         stdata_t        *stp = STREAM(q);
3560         pollhead_t      *pl;
3561 
3562         mutex_enter(&stp->sd_lock);
3563         pl = &stp->sd_pollist;
3564         if (flag & QWANTWSYNC) {
3565                 ASSERT(!(q->q_flag & QREADR));
3566                 if (stp->sd_flag & WSLEEP) {
3567                         stp->sd_flag &= ~WSLEEP;
3568                         cv_broadcast(&stp->sd_wrq->q_wait);
3569                 } else {
3570                         stp->sd_wakeq |= WSLEEP;
3571                 }
3572 
3573                 mutex_exit(&stp->sd_lock);
3574                 pollwakeup(pl, POLLWRNORM);
3575                 mutex_enter(&stp->sd_lock);
3576 
3577                 if (stp->sd_sigflags & S_WRNORM)
3578                         strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3579         } else if (flag & QWANTR) {
3580                 if (stp->sd_flag & RSLEEP) {
3581                         stp->sd_flag &= ~RSLEEP;
3582                         cv_broadcast(&_RD(stp->sd_wrq)->q_wait);
3583                 } else {
3584                         stp->sd_wakeq |= RSLEEP;
3585                 }
3586 
3587                 mutex_exit(&stp->sd_lock);
3588                 pollwakeup(pl, POLLIN | POLLRDNORM);
3589                 mutex_enter(&stp->sd_lock);
3590 
3591                 {
3592                         int events = stp->sd_sigflags & (S_INPUT | S_RDNORM);
3593 
3594                         if (events)
3595                                 strsendsig(stp->sd_siglist, events, 0, 0);
3596                 }
3597         } else {
3598                 if (stp->sd_flag & WSLEEP) {
3599                         stp->sd_flag &= ~WSLEEP;
3600                         cv_broadcast(&stp->sd_wrq->q_wait);
3601                 }
3602 
3603                 mutex_exit(&stp->sd_lock);
3604                 pollwakeup(pl, POLLWRNORM);
3605                 mutex_enter(&stp->sd_lock);
3606 
3607                 if (stp->sd_sigflags & S_WRNORM)
3608                         strsendsig(stp->sd_siglist, S_WRNORM, 0, 0);
3609         }
3610         mutex_exit(&stp->sd_lock);
3611 }
3612 
3613 int
3614 struioget(queue_t *q, mblk_t *mp, struiod_t *dp, int noblock)
3615 {
3616         stdata_t *stp = STREAM(q);
3617         int typ  = STRUIOT_STANDARD;
3618         uio_t    *uiop = &dp->d_uio;
3619         dblk_t   *dbp;
3620         ssize_t  uiocnt;
3621         ssize_t  cnt;
3622         unsigned char *ptr;
3623         ssize_t  resid;
3624         int      error = 0;
3625         on_trap_data_t otd;
3626         queue_t *stwrq;
3627 
3628         /*
3629          * Plumbing may change while taking the type so store the
3630          * queue in a temporary variable. It doesn't matter even
3631          * if the we take the type from the previous plumbing,
3632          * that's because if the plumbing has changed when we were
3633          * holding the queue in a temporary variable, we can continue
3634          * processing the message the way it would have been processed
3635          * in the old plumbing, without any side effects but a bit
3636          * extra processing for partial ip header checksum.
3637          *
3638          * This has been done to avoid holding the sd_lock which is
3639          * very hot.
3640          */
3641 
3642         stwrq = stp->sd_struiowrq;
3643         if (stwrq)
3644                 typ = stwrq->q_struiot;
3645 
3646         for (; (resid = uiop->uio_resid) > 0 && mp; mp = mp->b_cont) {
3647                 dbp = mp->b_datap;
3648                 ptr = (uchar_t *)(mp->b_rptr + dbp->db_cksumstuff);
3649                 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3650                 cnt = MIN(uiocnt, uiop->uio_resid);
3651                 if (!(dbp->db_struioflag & STRUIO_SPEC) ||
3652                     (dbp->db_struioflag & STRUIO_DONE) || cnt == 0) {
3653                         /*
3654                          * Either this mblk has already been processed
3655                          * or there is no more room in this mblk (?).
3656                          */
3657                         continue;
3658                 }
3659                 switch (typ) {
3660                 case STRUIOT_STANDARD:
3661                         if (noblock) {
3662                                 if (on_trap(&otd, OT_DATA_ACCESS)) {
3663                                         no_trap();
3664                                         error = EWOULDBLOCK;
3665                                         goto out;
3666                                 }
3667                         }
3668                         if (error = uiomove(ptr, cnt, UIO_WRITE, uiop)) {
3669                                 if (noblock)
3670                                         no_trap();
3671                                 goto out;
3672                         }
3673                         if (noblock)
3674                                 no_trap();
3675                         break;
3676 
3677                 default:
3678                         error = EIO;
3679                         goto out;
3680                 }
3681                 dbp->db_struioflag |= STRUIO_DONE;
3682                 dbp->db_cksumstuff += cnt;
3683         }
3684 out:
3685         if (error == EWOULDBLOCK && (resid -= uiop->uio_resid) > 0) {
3686                 /*
3687                  * A fault has occured and some bytes were moved to the
3688                  * current mblk, the uio_t has already been updated by
3689                  * the appropriate uio routine, so also update the mblk
3690                  * to reflect this in case this same mblk chain is used
3691                  * again (after the fault has been handled).
3692                  */
3693                 uiocnt = dbp->db_cksumend - dbp->db_cksumstuff;
3694                 if (uiocnt >= resid)
3695                         dbp->db_cksumstuff += resid;
3696         }
3697         return (error);
3698 }
3699 
3700 /*
3701  * Try to enter queue synchronously. Any attempt to enter a closing queue will
3702  * fails. The qp->q_rwcnt keeps track of the number of successful entries so
3703  * that removeq() will not try to close the queue while a thread is inside the
3704  * queue.
3705  */
3706 static boolean_t
3707 rwnext_enter(queue_t *qp)
3708 {
3709         mutex_enter(QLOCK(qp));
3710         if (qp->q_flag & QWCLOSE) {
3711                 mutex_exit(QLOCK(qp));
3712                 return (B_FALSE);
3713         }
3714         qp->q_rwcnt++;
3715         ASSERT(qp->q_rwcnt != 0);
3716         mutex_exit(QLOCK(qp));
3717         return (B_TRUE);
3718 }
3719 
3720 /*
3721  * Decrease the count of threads running in sync stream queue and wake up any
3722  * threads blocked in removeq().
3723  */
3724 static void
3725 rwnext_exit(queue_t *qp)
3726 {
3727         mutex_enter(QLOCK(qp));
3728         qp->q_rwcnt--;
3729         if (qp->q_flag & QWANTRMQSYNC) {
3730                 qp->q_flag &= ~QWANTRMQSYNC;
3731                 cv_broadcast(&qp->q_wait);
3732         }
3733         mutex_exit(QLOCK(qp));
3734 }
3735 
3736 /*
3737  * The purpose of rwnext() is to call the rw procedure of the next
3738  * (downstream) modules queue.
3739  *
3740  * treated as put entrypoint for perimeter syncronization.
3741  *
3742  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3743  * sync queues). If it is CIPUT sync queue sq_count is incremented and it does
3744  * not matter if any regular put entrypoints have been already entered. We
3745  * can't increment one of the sq_putcounts (instead of sq_count) because
3746  * qwait_rw won't know which counter to decrement.
3747  *
3748  * It would be reasonable to add the lockless FASTPUT logic.
3749  */
3750 int
3751 rwnext(queue_t *qp, struiod_t *dp)
3752 {
3753         queue_t         *nqp;
3754         syncq_t         *sq;
3755         uint16_t        count;
3756         uint16_t        flags;
3757         struct qinit    *qi;
3758         int             (*proc)();
3759         struct stdata   *stp;
3760         int             isread;
3761         int             rval;
3762 
3763         stp = STREAM(qp);
3764         /*
3765          * Prevent q_next from changing by holding sd_lock until acquiring
3766          * SQLOCK. Note that a read-side rwnext from the streamhead will
3767          * already have sd_lock acquired. In either case sd_lock is always
3768          * released after acquiring SQLOCK.
3769          *
3770          * The streamhead read-side holding sd_lock when calling rwnext is
3771          * required to prevent a race condition were M_DATA mblks flowing
3772          * up the read-side of the stream could be bypassed by a rwnext()
3773          * down-call. In this case sd_lock acts as the streamhead perimeter.
3774          */
3775         if ((nqp = _WR(qp)) == qp) {
3776                 isread = 0;
3777                 mutex_enter(&stp->sd_lock);
3778                 qp = nqp->q_next;
3779         } else {
3780                 isread = 1;
3781                 if (nqp != stp->sd_wrq)
3782                         /* Not streamhead */
3783                         mutex_enter(&stp->sd_lock);
3784                 qp = _RD(nqp->q_next);
3785         }
3786         qi = qp->q_qinfo;
3787         if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_rwp)) {
3788                 /*
3789                  * Not a synchronous module or no r/w procedure for this
3790                  * queue, so just return EINVAL and let the caller handle it.
3791                  */
3792                 mutex_exit(&stp->sd_lock);
3793                 return (EINVAL);
3794         }
3795 
3796         if (rwnext_enter(qp) == B_FALSE) {
3797                 mutex_exit(&stp->sd_lock);
3798                 return (EINVAL);
3799         }
3800 
3801         sq = qp->q_syncq;
3802         mutex_enter(SQLOCK(sq));
3803         mutex_exit(&stp->sd_lock);
3804         count = sq->sq_count;
3805         flags = sq->sq_flags;
3806         ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3807 
3808         while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3809                 /*
3810                  * if this queue is being closed, return.
3811                  */
3812                 if (qp->q_flag & QWCLOSE) {
3813                         mutex_exit(SQLOCK(sq));
3814                         rwnext_exit(qp);
3815                         return (EINVAL);
3816                 }
3817 
3818                 /*
3819                  * Wait until we can enter the inner perimeter.
3820                  */
3821                 sq->sq_flags = flags | SQ_WANTWAKEUP;
3822                 cv_wait(&sq->sq_wait, SQLOCK(sq));
3823                 count = sq->sq_count;
3824                 flags = sq->sq_flags;
3825         }
3826 
3827         if (isread == 0 && stp->sd_struiowrq == NULL ||
3828             isread == 1 && stp->sd_struiordq == NULL) {
3829                 /*
3830                  * Stream plumbing changed while waiting for inner perimeter
3831                  * so just return EINVAL and let the caller handle it.
3832                  */
3833                 mutex_exit(SQLOCK(sq));
3834                 rwnext_exit(qp);
3835                 return (EINVAL);
3836         }
3837         if (!(flags & SQ_CIPUT))
3838                 sq->sq_flags = flags | SQ_EXCL;
3839         sq->sq_count = count + 1;
3840         ASSERT(sq->sq_count != 0);           /* Wraparound */
3841         /*
3842          * Note: The only message ordering guarantee that rwnext() makes is
3843          *       for the write queue flow-control case. All others (r/w queue
3844          *       with q_count > 0 (or q_first != 0)) are the resposibilty of
3845          *       the queue's rw procedure. This could be genralized here buy
3846          *       running the queue's service procedure, but that wouldn't be
3847          *       the most efficent for all cases.
3848          */
3849         mutex_exit(SQLOCK(sq));
3850         if (! isread && (qp->q_flag & QFULL)) {
3851                 /*
3852                  * Write queue may be flow controlled. If so,
3853                  * mark the queue for wakeup when it's not.
3854                  */
3855                 mutex_enter(QLOCK(qp));
3856                 if (qp->q_flag & QFULL) {
3857                         qp->q_flag |= QWANTWSYNC;
3858                         mutex_exit(QLOCK(qp));
3859                         rval = EWOULDBLOCK;
3860                         goto out;
3861                 }
3862                 mutex_exit(QLOCK(qp));
3863         }
3864 
3865         if (! isread && dp->d_mp)
3866                 STR_FTEVENT_MSG(dp->d_mp, nqp, FTEV_RWNEXT, dp->d_mp->b_rptr -
3867                     dp->d_mp->b_datap->db_base);
3868 
3869         rval = (*proc)(qp, dp);
3870 
3871         if (isread && dp->d_mp)
3872                 STR_FTEVENT_MSG(dp->d_mp, _RD(nqp), FTEV_RWNEXT,
3873                     dp->d_mp->b_rptr - dp->d_mp->b_datap->db_base);
3874 out:
3875         /*
3876          * The queue is protected from being freed by sq_count, so it is
3877          * safe to call rwnext_exit and reacquire SQLOCK(sq).
3878          */
3879         rwnext_exit(qp);
3880 
3881         mutex_enter(SQLOCK(sq));
3882         flags = sq->sq_flags;
3883         ASSERT(sq->sq_count != 0);
3884         sq->sq_count--;
3885         if (flags & SQ_TAIL) {
3886                 putnext_tail(sq, qp, flags);
3887                 /*
3888                  * The only purpose of this ASSERT is to preserve calling stack
3889                  * in DEBUG kernel.
3890                  */
3891                 ASSERT(flags & SQ_TAIL);
3892                 return (rval);
3893         }
3894         ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
3895         /*
3896          * Safe to always drop SQ_EXCL:
3897          *      Not SQ_CIPUT means we set SQ_EXCL above
3898          *      For SQ_CIPUT SQ_EXCL will only be set if the put procedure
3899          *      did a qwriter(INNER) in which case nobody else
3900          *      is in the inner perimeter and we are exiting.
3901          *
3902          * I would like to make the following assertion:
3903          *
3904          * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
3905          *      sq->sq_count == 0);
3906          *
3907          * which indicates that if we are both putshared and exclusive,
3908          * we became exclusive while executing the putproc, and the only
3909          * claim on the syncq was the one we dropped a few lines above.
3910          * But other threads that enter putnext while the syncq is exclusive
3911          * need to make a claim as they may need to drop SQLOCK in the
3912          * has_writers case to avoid deadlocks.  If these threads are
3913          * delayed or preempted, it is possible that the writer thread can
3914          * find out that there are other claims making the (sq_count == 0)
3915          * test invalid.
3916          */
3917 
3918         sq->sq_flags = flags & ~SQ_EXCL;
3919         if (sq->sq_flags & SQ_WANTWAKEUP) {
3920                 sq->sq_flags &= ~SQ_WANTWAKEUP;
3921                 cv_broadcast(&sq->sq_wait);
3922         }
3923         mutex_exit(SQLOCK(sq));
3924         return (rval);
3925 }
3926 
3927 /*
3928  * The purpose of infonext() is to call the info procedure of the next
3929  * (downstream) modules queue.
3930  *
3931  * treated as put entrypoint for perimeter syncronization.
3932  *
3933  * There's no need to grab sq_putlocks here (which only exist for CIPUT
3934  * sync queues). If it is CIPUT sync queue regular sq_count is incremented and
3935  * it does not matter if any regular put entrypoints have been already
3936  * entered.
3937  */
3938 int
3939 infonext(queue_t *qp, infod_t *idp)
3940 {
3941         queue_t         *nqp;
3942         syncq_t         *sq;
3943         uint16_t        count;
3944         uint16_t        flags;
3945         struct qinit    *qi;
3946         int             (*proc)();
3947         struct stdata   *stp;
3948         int             rval;
3949 
3950         stp = STREAM(qp);
3951         /*
3952          * Prevent q_next from changing by holding sd_lock until
3953          * acquiring SQLOCK.
3954          */
3955         mutex_enter(&stp->sd_lock);
3956         if ((nqp = _WR(qp)) == qp) {
3957                 qp = nqp->q_next;
3958         } else {
3959                 qp = _RD(nqp->q_next);
3960         }
3961         qi = qp->q_qinfo;
3962         if (qp->q_struiot == STRUIOT_NONE || ! (proc = qi->qi_infop)) {
3963                 mutex_exit(&stp->sd_lock);
3964                 return (EINVAL);
3965         }
3966         sq = qp->q_syncq;
3967         mutex_enter(SQLOCK(sq));
3968         mutex_exit(&stp->sd_lock);
3969         count = sq->sq_count;
3970         flags = sq->sq_flags;
3971         ASSERT(sq->sq_ciputctrl == NULL || (flags & SQ_CIPUT));
3972 
3973         while ((flags & SQ_GOAWAY) || (!(flags & SQ_CIPUT) && count != 0)) {
3974                 /*
3975                  * Wait until we can enter the inner perimeter.
3976                  */
3977                 sq->sq_flags = flags | SQ_WANTWAKEUP;
3978                 cv_wait(&sq->sq_wait, SQLOCK(sq));
3979                 count = sq->sq_count;
3980                 flags = sq->sq_flags;
3981         }
3982 
3983         if (! (flags & SQ_CIPUT))
3984                 sq->sq_flags = flags | SQ_EXCL;
3985         sq->sq_count = count + 1;
3986         ASSERT(sq->sq_count != 0);           /* Wraparound */
3987         mutex_exit(SQLOCK(sq));
3988 
3989         rval = (*proc)(qp, idp);
3990 
3991         mutex_enter(SQLOCK(sq));
3992         flags = sq->sq_flags;
3993         ASSERT(sq->sq_count != 0);
3994         sq->sq_count--;
3995         if (flags & SQ_TAIL) {
3996                 putnext_tail(sq, qp, flags);
3997                 /*
3998                  * The only purpose of this ASSERT is to preserve calling stack
3999                  * in DEBUG kernel.
4000                  */
4001                 ASSERT(flags & SQ_TAIL);
4002                 return (rval);
4003         }
4004         ASSERT(flags & (SQ_EXCL|SQ_CIPUT));
4005 /*
4006  * XXXX
4007  * I am not certain the next comment is correct here.  I need to consider
4008  * why the infonext is called, and if dropping SQ_EXCL unless non-CIPUT
4009  * might cause other problems.  It just might be safer to drop it if
4010  * !SQ_CIPUT because that is when we set it.
4011  */
4012         /*
4013          * Safe to always drop SQ_EXCL:
4014          *      Not SQ_CIPUT means we set SQ_EXCL above
4015          *      For SQ_CIPUT SQ_EXCL will only be set if the put procedure
4016          *      did a qwriter(INNER) in which case nobody else
4017          *      is in the inner perimeter and we are exiting.
4018          *
4019          * I would like to make the following assertion:
4020          *
4021          * ASSERT((flags & (SQ_EXCL|SQ_CIPUT)) != (SQ_EXCL|SQ_CIPUT) ||
4022          *      sq->sq_count == 0);
4023          *
4024          * which indicates that if we are both putshared and exclusive,
4025          * we became exclusive while executing the putproc, and the only
4026          * claim on the syncq was the one we dropped a few lines above.
4027          * But other threads that enter putnext while the syncq is exclusive
4028          * need to make a claim as they may need to drop SQLOCK in the
4029          * has_writers case to avoid deadlocks.  If these threads are
4030          * delayed or preempted, it is possible that the writer thread can
4031          * find out that there are other claims making the (sq_count == 0)
4032          * test invalid.
4033          */
4034 
4035         sq->sq_flags = flags & ~SQ_EXCL;
4036         mutex_exit(SQLOCK(sq));
4037         return (rval);
4038 }
4039 
4040 /*
4041  * Return nonzero if the queue is responsible for struio(), else return 0.
4042  */
4043 int
4044 isuioq(queue_t *q)
4045 {
4046         if (q->q_flag & QREADR)
4047                 return (STREAM(q)->sd_struiordq == q);
4048         else
4049                 return (STREAM(q)->sd_struiowrq == q);
4050 }
4051 
4052 #if defined(__sparc)
4053 int disable_putlocks = 0;
4054 #else
4055 int disable_putlocks = 1;
4056 #endif
4057 
4058 /*
4059  * called by create_putlock.
4060  */
4061 static void
4062 create_syncq_putlocks(queue_t *q)
4063 {
4064         syncq_t *sq = q->q_syncq;
4065         ciputctrl_t *cip;
4066         int i;
4067 
4068         ASSERT(sq != NULL);
4069 
4070         ASSERT(disable_putlocks == 0);
4071         ASSERT(n_ciputctrl >= min_n_ciputctrl);
4072         ASSERT(ciputctrl_cache != NULL);
4073 
4074         if (!(sq->sq_type & SQ_CIPUT))
4075                 return;
4076 
4077         for (i = 0; i <= 1; i++) {
4078                 if (sq->sq_ciputctrl == NULL) {
4079                         cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4080                         SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4081                         mutex_enter(SQLOCK(sq));
4082                         if (sq->sq_ciputctrl != NULL) {
4083                                 mutex_exit(SQLOCK(sq));
4084                                 kmem_cache_free(ciputctrl_cache, cip);
4085                         } else {
4086                                 ASSERT(sq->sq_nciputctrl == 0);
4087                                 sq->sq_nciputctrl = n_ciputctrl - 1;
4088                                 /*
4089                                  * putnext checks sq_ciputctrl without holding
4090                                  * SQLOCK. if it is not NULL putnext assumes
4091                                  * sq_nciputctrl is initialized. membar below
4092                                  * insures that.
4093                                  */
4094                                 membar_producer();
4095                                 sq->sq_ciputctrl = cip;
4096                                 mutex_exit(SQLOCK(sq));
4097                         }
4098                 }
4099                 ASSERT(sq->sq_nciputctrl == n_ciputctrl - 1);
4100                 if (i == 1)
4101                         break;
4102                 q = _OTHERQ(q);
4103                 if (!(q->q_flag & QPERQ)) {
4104                         ASSERT(sq == q->q_syncq);
4105                         break;
4106                 }
4107                 ASSERT(q->q_syncq != NULL);
4108                 ASSERT(sq != q->q_syncq);
4109                 sq = q->q_syncq;
4110                 ASSERT(sq->sq_type & SQ_CIPUT);
4111         }
4112 }
4113 
4114 /*
4115  * If stream argument is 0 only create per cpu sq_putlocks/sq_putcounts for
4116  * syncq of q. If stream argument is not 0 create per cpu stream_putlocks for
4117  * the stream of q and per cpu sq_putlocks/sq_putcounts for all syncq's
4118  * starting from q and down to the driver.
4119  *
4120  * This should be called after the affected queues are part of stream
4121  * geometry. It should be called from driver/module open routine after
4122  * qprocson() call. It is also called from nfs syscall where it is known that
4123  * stream is configured and won't change its geometry during create_putlock
4124  * call.
4125  *
4126  * caller normally uses 0 value for the stream argument to speed up MT putnext
4127  * into the perimeter of q for example because its perimeter is per module
4128  * (e.g. IP).
4129  *
4130  * caller normally uses non 0 value for the stream argument to hint the system
4131  * that the stream of q is a very contended global system stream
4132  * (e.g. NFS/UDP) and the part of the stream from q to the driver is
4133  * particularly MT hot.
4134  *
4135  * Caller insures stream plumbing won't happen while we are here and therefore
4136  * q_next can be safely used.
4137  */
4138 
4139 void
4140 create_putlocks(queue_t *q, int stream)
4141 {
4142         ciputctrl_t     *cip;
4143         struct stdata   *stp = STREAM(q);
4144 
4145         q = _WR(q);
4146         ASSERT(stp != NULL);
4147 
4148         if (disable_putlocks != 0)
4149                 return;
4150 
4151         if (n_ciputctrl < min_n_ciputctrl)
4152                 return;
4153 
4154         ASSERT(ciputctrl_cache != NULL);
4155 
4156         if (stream != 0 && stp->sd_ciputctrl == NULL) {
4157                 cip = kmem_cache_alloc(ciputctrl_cache, KM_SLEEP);
4158                 SUMCHECK_CIPUTCTRL_COUNTS(cip, n_ciputctrl - 1, 0);
4159                 mutex_enter(&stp->sd_lock);
4160                 if (stp->sd_ciputctrl != NULL) {
4161                         mutex_exit(&stp->sd_lock);
4162                         kmem_cache_free(ciputctrl_cache, cip);
4163                 } else {
4164                         ASSERT(stp->sd_nciputctrl == 0);
4165                         stp->sd_nciputctrl = n_ciputctrl - 1;
4166                         /*
4167                          * putnext checks sd_ciputctrl without holding
4168                          * sd_lock. if it is not NULL putnext assumes
4169                          * sd_nciputctrl is initialized. membar below
4170                          * insures that.
4171                          */
4172                         membar_producer();
4173                         stp->sd_ciputctrl = cip;
4174                         mutex_exit(&stp->sd_lock);
4175                 }
4176         }
4177 
4178         ASSERT(stream == 0 || stp->sd_nciputctrl == n_ciputctrl - 1);
4179 
4180         while (_SAMESTR(q)) {
4181                 create_syncq_putlocks(q);
4182                 if (stream == 0)
4183                         return;
4184                 q = q->q_next;
4185         }
4186         ASSERT(q != NULL);
4187         create_syncq_putlocks(q);
4188 }
4189 
4190 /*
4191  * STREAMS Flow Trace - record STREAMS Flow Trace events as an mblk flows
4192  * through a stream.
4193  *
4194  * Data currently record per-event is a timestamp, module/driver name,
4195  * downstream module/driver name, optional callstack, event type and a per
4196  * type datum.  Much of the STREAMS framework is instrumented for automatic
4197  * flow tracing (when enabled).  Events can be defined and used by STREAMS
4198  * modules and drivers.
4199  *
4200  * Global objects:
4201  *
4202  *      str_ftevent() - Add a flow-trace event to a dblk.
4203  *      str_ftfree() - Free flow-trace data
4204  *
4205  * Local objects:
4206  *
4207  *      fthdr_cache - pointer to the kmem cache for trace header.
4208  *      ftblk_cache - pointer to the kmem cache for trace data blocks.
4209  */
4210 
4211 int str_ftnever = 1;    /* Don't do STREAMS flow tracing */
4212 int str_ftstack = 0;    /* Don't record event call stacks */
4213 
4214 void
4215 str_ftevent(fthdr_t *hp, void *p, ushort_t evnt, ushort_t data)
4216 {
4217         ftblk_t *bp = hp->tail;
4218         ftblk_t *nbp;
4219         ftevnt_t *ep;
4220         int ix, nix;
4221 
4222         ASSERT(hp != NULL);
4223 
4224         for (;;) {
4225                 if ((ix = bp->ix) == FTBLK_EVNTS) {
4226                         /*
4227                          * Tail doesn't have room, so need a new tail.
4228                          *
4229                          * To make this MT safe, first, allocate a new
4230                          * ftblk, and initialize it.  To make life a
4231                          * little easier, reserve the first slot (mostly
4232                          * by making ix = 1).  When we are finished with
4233                          * the initialization, CAS this pointer to the
4234                          * tail.  If this succeeds, this is the new
4235                          * "next" block.  Otherwise, another thread
4236                          * got here first, so free the block and start
4237                          * again.
4238                          */
4239                         nbp = kmem_cache_alloc(ftblk_cache, KM_NOSLEEP);
4240                         if (nbp == NULL) {
4241                                 /* no mem, so punt */
4242                                 str_ftnever++;
4243                                 /* free up all flow data? */
4244                                 return;
4245                         }
4246                         nbp->nxt = NULL;
4247                         nbp->ix = 1;
4248                         /*
4249                          * Just in case there is another thread about
4250                          * to get the next index, we need to make sure
4251                          * the value is there for it.
4252                          */
4253                         membar_producer();
4254                         if (atomic_cas_ptr(&hp->tail, bp, nbp) == bp) {
4255                                 /* CAS was successful */
4256                                 bp->nxt = nbp;
4257                                 membar_producer();
4258                                 bp = nbp;
4259                                 ix = 0;
4260                                 goto cas_good;
4261                         } else {
4262                                 kmem_cache_free(ftblk_cache, nbp);
4263                                 bp = hp->tail;
4264                                 continue;
4265                         }
4266                 }
4267                 nix = ix + 1;
4268                 if (atomic_cas_32((uint32_t *)&bp->ix, ix, nix) == ix) {
4269                 cas_good:
4270                         if (curthread != hp->thread) {
4271                                 hp->thread = curthread;
4272                                 evnt |= FTEV_CS;
4273                         }
4274                         if (CPU->cpu_seqid != hp->cpu_seqid) {
4275                                 hp->cpu_seqid = CPU->cpu_seqid;
4276                                 evnt |= FTEV_PS;
4277                         }
4278                         ep = &bp->ev[ix];
4279                         break;
4280                 }
4281         }
4282 
4283         if (evnt & FTEV_QMASK) {
4284                 queue_t *qp = p;
4285 
4286                 if (!(qp->q_flag & QREADR))
4287                         evnt |= FTEV_ISWR;
4288 
4289                 ep->mid = Q2NAME(qp);
4290 
4291                 /*
4292                  * We only record the next queue name for FTEV_PUTNEXT since
4293                  * that's the only time we *really* need it, and the putnext()
4294                  * code ensures that qp->q_next won't vanish.  (We could use
4295                  * claimstr()/releasestr() but at a performance cost.)
4296                  */
4297                 if ((evnt & FTEV_MASK) == FTEV_PUTNEXT && qp->q_next != NULL)
4298                         ep->midnext = Q2NAME(qp->q_next);
4299                 else
4300                         ep->midnext = NULL;
4301         } else {
4302                 ep->mid = p;
4303                 ep->midnext = NULL;
4304         }
4305 
4306         if (ep->stk != NULL)
4307                 ep->stk->fs_depth = getpcstack(ep->stk->fs_stk, FTSTK_DEPTH);
4308 
4309         ep->ts = gethrtime();
4310         ep->evnt = evnt;
4311         ep->data = data;
4312         hp->hash = (hp->hash << 9) + hp->hash;
4313         hp->hash += (evnt << 16) | data;
4314         hp->hash += (uintptr_t)ep->mid;
4315 }
4316 
4317 /*
4318  * Free flow-trace data.
4319  */
4320 void
4321 str_ftfree(dblk_t *dbp)
4322 {
4323         fthdr_t *hp = dbp->db_fthdr;
4324         ftblk_t *bp = &hp->first;
4325         ftblk_t *nbp;
4326 
4327         if (bp != hp->tail || bp->ix != 0) {
4328                 /*
4329                  * Clear out the hash, have the tail point to itself, and free
4330                  * any continuation blocks.
4331                  */
4332                 bp = hp->first.nxt;
4333                 hp->tail = &hp->first;
4334                 hp->hash = 0;
4335                 hp->first.nxt = NULL;
4336                 hp->first.ix = 0;
4337                 while (bp != NULL) {
4338                         nbp = bp->nxt;
4339                         kmem_cache_free(ftblk_cache, bp);
4340                         bp = nbp;
4341                 }
4342         }
4343         kmem_cache_free(fthdr_cache, hp);
4344         dbp->db_fthdr = NULL;
4345 }